工作流的并发执行#
除了循环和分支之外,工作流还可以并发运行步骤。当您有多个步骤可以独立运行且它们包含耗时的 await
操作时,并发执行非常有用,这允许其他步骤并行运行。
发射多个事件#
到目前为止,在我们的示例中,每个步骤只发射了一个事件。但在许多情况下,您会希望并行运行步骤。要做到这一点,您需要发射多个事件。您可以使用 send_event
来完成。
class ParallelFlow(Workflow):
@step
async def start(self, ctx: Context, ev: StartEvent) -> StepTwoEvent:
ctx.send_event(StepTwoEvent(query="Query 1"))
ctx.send_event(StepTwoEvent(query="Query 2"))
ctx.send_event(StepTwoEvent(query="Query 3"))
@step(num_workers=4)
async def step_two(self, ctx: Context, ev: StepTwoEvent) -> StopEvent:
print("Running slow query ", ev.query)
await asyncio.sleep(random.randint(1, 5))
return StopEvent(result=ev.query)
在此示例中,我们的 start
步骤发射 3 个 StepTwoEvent
。step_two
步骤使用 num_workers=4
进行装饰,这会告诉工作流最多并发运行此步骤的 4 个实例(这是默认值)。
收集事件#
如果您执行上一个示例,您会注意到工作流会在第一个完成的查询之后停止。有时这很有用,但在其他时候,您会希望等待所有耗时操作完成后再继续下一步。您可以使用 collect_events
来实现这一点。
class ConcurrentFlow(Workflow):
@step
async def start(self, ctx: Context, ev: StartEvent) -> StepTwoEvent:
ctx.send_event(StepTwoEvent(query="Query 1"))
ctx.send_event(StepTwoEvent(query="Query 2"))
ctx.send_event(StepTwoEvent(query="Query 3"))
@step(num_workers=4)
async def step_two(self, ctx: Context, ev: StepTwoEvent) -> StepThreeEvent:
print("Running query ", ev.query)
await asyncio.sleep(random.randint(1, 5))
return StepThreeEvent(result=ev.query)
@step
async def step_three(self, ctx: Context, ev: StepThreeEvent) -> StopEvent:
# wait until we receive 3 events
result = ctx.collect_events(ev, [StepThreeEvent] * 3)
if result is None:
return None
# do something with all 3 results together
print(result)
return StopEvent(result="Done")
collect_events
方法位于 Context
上,它接受触发该步骤的事件以及要等待的事件类型数组。在此示例中,我们正在等待 3 个相同 StepThreeEvent
类型的事件。
step_three
步骤在每次收到 StepThreeEvent
时触发,但 collect_events
将返回 None
,直到收到所有 3 个事件。此时,该步骤将继续执行,您可以一起处理所有 3 个结果。
从 collect_events
返回的 result
是一个包含收集到的事件的数组,其顺序与收到事件的顺序一致。
多种事件类型#
当然,您不必等待相同类型的事件。您可以等待任何您喜欢的事件组合,例如在此示例中:
class ConcurrentFlow(Workflow):
@step
async def start(
self, ctx: Context, ev: StartEvent
) -> StepAEvent | StepBEvent | StepCEvent:
ctx.send_event(StepAEvent(query="Query 1"))
ctx.send_event(StepBEvent(query="Query 2"))
ctx.send_event(StepCEvent(query="Query 3"))
@step
async def step_a(self, ctx: Context, ev: StepAEvent) -> StepACompleteEvent:
print("Doing something A-ish")
return StepACompleteEvent(result=ev.query)
@step
async def step_b(self, ctx: Context, ev: StepBEvent) -> StepBCompleteEvent:
print("Doing something B-ish")
return StepBCompleteEvent(result=ev.query)
@step
async def step_c(self, ctx: Context, ev: StepCEvent) -> StepCCompleteEvent:
print("Doing something C-ish")
return StepCCompleteEvent(result=ev.query)
@step
async def step_three(
self,
ctx: Context,
ev: StepACompleteEvent | StepBCompleteEvent | StepCCompleteEvent,
) -> StopEvent:
print("Received event ", ev.result)
# wait until we receive 3 events
if (
ctx.collect_events(
ev,
[StepCCompleteEvent, StepACompleteEvent, StepBCompleteEvent],
)
is None
):
return None
# do something with all 3 results together
return StopEvent(result="Done")
为了处理多种事件类型,我们进行了一些更改:
start
现在被声明为发射 3 种不同的事件类型step_three
现在被声明为接受 3 种不同的事件类型collect_events
现在接受一个要等待的事件类型数组
请注意,传递给 collect_events
的数组中事件类型的顺序很重要。事件将按照传递给 collect_events
的顺序返回,无论何时收到它们。
此工作流的可视化效果非常令人愉悦
现在让我们看看如何通过 子类化 和其他技术来扩展工作流。