同一事件的并行执行示例¶
在此示例中,我们将演示如何使用工作流功能实现类似的能力,同时允许并行执行多个相同类型的事件。
通过在 @step
装饰器中设置 num_workers
参数,我们可以控制同时执行的步骤数量,从而实现高效的并行处理。
# %pip install llama-index-core llama-index-utils-workflow -q
导入所需库¶
安装依赖后,我们可以导入所需的库。
import asyncio
from llama_index.core.workflow import (
step,
Context,
Workflow,
Event,
StartEvent,
StopEvent,
)
我们将创建两个工作流:一个使用 @step(num_workers=N)
装饰器并行处理多个数据项,另一个不设置 num_workers,以便进行比较。
通过在 @step
装饰器中使用 num_workers
参数,我们可以限制同时执行的步骤数量,从而控制并行级别。这种方法特别适用于需要在管理资源使用的同时处理相似任务的场景。
例如,您可以同时执行多个子查询,但请注意 num_workers 不能无限制设置。它取决于您的工作负载或 token 限制。
定义事件类型¶
我们将定义两种事件类型:一种用于待处理的输入事件,另一种用于处理结果。
class ProcessEvent(Event):
data: str
class ResultEvent(Event):
result: str
创建顺序和并行工作流¶
现在,我们将创建一个 SequentialWorkflow 和一个 ParallelWorkflow 类,它们包含三个主要步骤。
- start:初始化并发送多个并行事件
- process_data:处理数据
- combine_results:收集并合并所有处理结果
import random
class SequentialWorkflow(Workflow):
@step
async def start(self, ctx: Context, ev: StartEvent) -> ProcessEvent:
data_list = ["A", "B", "C"]
await ctx.set("num_to_collect", len(data_list))
for item in data_list:
ctx.send_event(ProcessEvent(data=item))
return None
@step(num_workers=1)
async def process_data(self, ev: ProcessEvent) -> ResultEvent:
# Simulate some time-consuming processing
processing_time = 2 + random.random()
await asyncio.sleep(processing_time)
result = f"Processed: {ev.data}"
print(f"Completed processing: {ev.data}")
return ResultEvent(result=result)
@step
async def combine_results(
self, ctx: Context, ev: ResultEvent
) -> StopEvent | None:
num_to_collect = await ctx.get("num_to_collect")
results = ctx.collect_events(ev, [ResultEvent] * num_to_collect)
if results is None:
return None
combined_result = ", ".join([event.result for event in results])
return StopEvent(result=combined_result)
class ParallelWorkflow(Workflow):
@step
async def start(self, ctx: Context, ev: StartEvent) -> ProcessEvent:
data_list = ["A", "B", "C"]
await ctx.set("num_to_collect", len(data_list))
for item in data_list:
ctx.send_event(ProcessEvent(data=item))
return None
@step(num_workers=3)
async def process_data(self, ev: ProcessEvent) -> ResultEvent:
# Simulate some time-consuming processing
processing_time = 2 + random.random()
await asyncio.sleep(processing_time)
result = f"Processed: {ev.data}"
print(f"Completed processing: {ev.data}")
return ResultEvent(result=result)
@step
async def combine_results(
self, ctx: Context, ev: ResultEvent
) -> StopEvent | None:
num_to_collect = await ctx.get("num_to_collect")
results = ctx.collect_events(ev, [ResultEvent] * num_to_collect)
if results is None:
return None
combined_result = ", ".join([event.result for event in results])
return StopEvent(result=combined_result)
在这两个工作流中
- start 方法初始化并发送多个 ProcessEvent。
- process_data 方法使用
- 在 SequentialWorkflow 中仅使用
@step
装饰器 - 在 ParallelWorkflow 中使用
@step(num_workers=3)
装饰器,将同时执行的工作数量限制为 3。
- 在 SequentialWorkflow 中仅使用
- combine_results 方法收集并合并所有处理结果。
运行工作流¶
最后,我们可以创建一个主函数来运行我们的工作流。
import time
sequential_workflow = SequentialWorkflow()
print(
"Start a sequential workflow without setting num_workers in the step of process_data"
)
start_time = time.time()
result = await sequential_workflow.run()
end_time = time.time()
print(f"Workflow result: {result}")
print(f"Time taken: {end_time - start_time} seconds")
print("-" * 30)
parallel_workflow = ParallelWorkflow()
print(
"Start a parallel workflow with setting num_workers in the step of process_data"
)
start_time = time.time()
result = await parallel_workflow.run()
end_time = time.time()
print(f"Workflow result: {result}")
print(f"Time taken: {end_time - start_time} seconds")
Start a sequential workflow without setting num_workers in the step of process_data Completed processing: A Completed processing: B Completed processing: C Workflow result: Processed: A, Processed: B, Processed: C Time taken: 7.439495086669922 seconds ------------------------------ Start a parallel workflow with setting num_workers in the step of process_data Completed processing: C Completed processing: A Completed processing: B Workflow result: Processed: C, Processed: A, Processed: B Time taken: 2.5881590843200684 seconds
注意¶
- 如果不设置
num_workers=1
,总耗时可能需要 6-9 秒。通过设置num_workers=3
,处理会并行发生,一次处理 3 个项目,总共只需 2-3 秒。 - 在 ParallelWorkflow 中,已完成结果的顺序可能与输入顺序不同,这取决于任务的完成时间。
本示例演示了使用和不使用 num_workers 的执行速度,以及如何在工作流中实现并行处理。通过设置 num_workers,我们可以控制并行度,这对于需要在性能和资源使用之间取得平衡的场景非常有用。
检查点¶
对上面定义的并行执行工作流进行检查点也是可能的。为此,我们必须使用 WorkflowCheckpointer
对象包装 Workflow
,并使用这些实例执行运行。在工作流执行期间,检查点存储在此包装对象中,可用于检查以及作为运行执行的起始点。
from llama_index.core.workflow.checkpointer import WorkflowCheckpointer
wflow_ckptr = WorkflowCheckpointer(workflow=parallel_workflow)
handler = wflow_ckptr.run()
await handler
Completed processing: C Completed processing: A Completed processing: B
'Processed: C, Processed: A, Processed: B'
上述运行的检查点存储在 WorkflowCheckpointer.checkpoints
字典属性中。
for run_id, ckpts in wflow_ckptr.checkpoints.items():
print(f"Run: {run_id} has {[c.last_completed_step for c in ckpts]}")
Run: 90812bec-b571-4513-8ad5-aa957ad7d4fb has ['process_data', 'process_data', 'process_data', 'combine_results']
我们可以使用 WorkflowCheckpointer.run_from(checkpoint=...)
方法从存储的任何检查点运行。让我们取第一个“process_data”完成后的第一个存储检查点,并从它开始运行。
ckpt = wflow_ckptr.checkpoints[run_id][0]
handler = wflow_ckptr.run_from(ckpt)
await handler
Completed processing: B Completed processing: A
'Processed: C, Processed: B, Processed: A'
调用 run_from
或 run
将在 checkpoints
属性中创建一个新的运行条目。在从指定检查点开始的最新运行中,我们可以看到只剩下两个“process_data”步骤和最后的“combine_results”步骤需要完成。
for run_id, ckpts in wflow_ckptr.checkpoints.items():
print(f"Run: {run_id} has {[c.last_completed_step for c in ckpts]}")
Run: 90812bec-b571-4513-8ad5-aa957ad7d4fb has ['process_data', 'process_data', 'process_data', 'combine_results'] Run: 4e1d24cd-c672-4ed1-bb5b-b9f1a252abed has ['process_data', 'process_data', 'combine_results']
现在,如果我们使用与同一个初始运行的第二个“process_data”完成相关的检查点作为起点,那么我们应该会看到一个只包含两个步骤的新条目:“process_data”和“combine_results”。
# get the run_id of the first initial run
first_run_id = next(iter(wflow_ckptr.checkpoints.keys()))
first_run_id
'90812bec-b571-4513-8ad5-aa957ad7d4fb'
ckpt = wflow_ckptr.checkpoints[first_run_id][
1
] # checkpoint after the second "process_data" step
handler = wflow_ckptr.run_from(ckpt)
await handler
Completed processing: B
'Processed: C, Processed: A, Processed: B'
for run_id, ckpts in wflow_ckptr.checkpoints.items():
print(f"Run: {run_id} has {[c.last_completed_step for c in ckpts]}")
Run: 90812bec-b571-4513-8ad5-aa957ad7d4fb has ['process_data', 'process_data', 'process_data', 'combine_results'] Run: 4e1d24cd-c672-4ed1-bb5b-b9f1a252abed has ['process_data', 'process_data', 'combine_results'] Run: e4f94fcd-9b78-4e28-8981-e0232d068f6e has ['process_data', 'combine_results']
类似地,如果我们从初始运行的第三个“process_data”完成的检查点开始,那么我们应该只会看到最后的“combine_results”步骤。
ckpt = wflow_ckptr.checkpoints[first_run_id][
2
] # checkpoint after the third "process_data" step
handler = wflow_ckptr.run_from(ckpt)
await handler
'Processed: C, Processed: A, Processed: B'
for run_id, ckpts in wflow_ckptr.checkpoints.items():
print(f"Run: {run_id} has {[c.last_completed_step for c in ckpts]}")
Run: 90812bec-b571-4513-8ad5-aa957ad7d4fb has ['process_data', 'process_data', 'process_data', 'combine_results'] Run: 4e1d24cd-c672-4ed1-bb5b-b9f1a252abed has ['process_data', 'process_data', 'combine_results'] Run: e4f94fcd-9b78-4e28-8981-e0232d068f6e has ['process_data', 'combine_results'] Run: c498a1a0-cf4c-4d80-a1e2-a175bb90b66d has ['combine_results']