跳到内容区

工作流的并发执行#

除了循环和分支之外,工作流还可以并发运行步骤。当您有多个步骤可以独立运行且它们包含耗时的 await 操作时,并发执行非常有用,这允许其他步骤并行运行。

发射多个事件#

到目前为止,在我们的示例中,每个步骤只发射了一个事件。但在许多情况下,您会希望并行运行步骤。要做到这一点,您需要发射多个事件。您可以使用 send_event 来完成。

class ParallelFlow(Workflow):
    @step
    async def start(self, ctx: Context, ev: StartEvent) -> StepTwoEvent:
        ctx.send_event(StepTwoEvent(query="Query 1"))
        ctx.send_event(StepTwoEvent(query="Query 2"))
        ctx.send_event(StepTwoEvent(query="Query 3"))

    @step(num_workers=4)
    async def step_two(self, ctx: Context, ev: StepTwoEvent) -> StopEvent:
        print("Running slow query ", ev.query)
        await asyncio.sleep(random.randint(1, 5))

        return StopEvent(result=ev.query)

在此示例中,我们的 start 步骤发射 3 个 StepTwoEventstep_two 步骤使用 num_workers=4 进行装饰,这会告诉工作流最多并发运行此步骤的 4 个实例(这是默认值)。

收集事件#

如果您执行上一个示例,您会注意到工作流会在第一个完成的查询之后停止。有时这很有用,但在其他时候,您会希望等待所有耗时操作完成后再继续下一步。您可以使用 collect_events 来实现这一点。

class ConcurrentFlow(Workflow):
    @step
    async def start(self, ctx: Context, ev: StartEvent) -> StepTwoEvent:
        ctx.send_event(StepTwoEvent(query="Query 1"))
        ctx.send_event(StepTwoEvent(query="Query 2"))
        ctx.send_event(StepTwoEvent(query="Query 3"))

    @step(num_workers=4)
    async def step_two(self, ctx: Context, ev: StepTwoEvent) -> StepThreeEvent:
        print("Running query ", ev.query)
        await asyncio.sleep(random.randint(1, 5))
        return StepThreeEvent(result=ev.query)

    @step
    async def step_three(self, ctx: Context, ev: StepThreeEvent) -> StopEvent:
        # wait until we receive 3 events
        result = ctx.collect_events(ev, [StepThreeEvent] * 3)
        if result is None:
            return None

        # do something with all 3 results together
        print(result)
        return StopEvent(result="Done")

collect_events 方法位于 Context 上,它接受触发该步骤的事件以及要等待的事件类型数组。在此示例中,我们正在等待 3 个相同 StepThreeEvent 类型的事件。

step_three 步骤在每次收到 StepThreeEvent 时触发,但 collect_events 将返回 None,直到收到所有 3 个事件。此时,该步骤将继续执行,您可以一起处理所有 3 个结果。

collect_events 返回的 result 是一个包含收集到的事件的数组,其顺序与收到事件的顺序一致。

多种事件类型#

当然,您不必等待相同类型的事件。您可以等待任何您喜欢的事件组合,例如在此示例中:

class ConcurrentFlow(Workflow):
    @step
    async def start(
        self, ctx: Context, ev: StartEvent
    ) -> StepAEvent | StepBEvent | StepCEvent:
        ctx.send_event(StepAEvent(query="Query 1"))
        ctx.send_event(StepBEvent(query="Query 2"))
        ctx.send_event(StepCEvent(query="Query 3"))

    @step
    async def step_a(self, ctx: Context, ev: StepAEvent) -> StepACompleteEvent:
        print("Doing something A-ish")
        return StepACompleteEvent(result=ev.query)

    @step
    async def step_b(self, ctx: Context, ev: StepBEvent) -> StepBCompleteEvent:
        print("Doing something B-ish")
        return StepBCompleteEvent(result=ev.query)

    @step
    async def step_c(self, ctx: Context, ev: StepCEvent) -> StepCCompleteEvent:
        print("Doing something C-ish")
        return StepCCompleteEvent(result=ev.query)

    @step
    async def step_three(
        self,
        ctx: Context,
        ev: StepACompleteEvent | StepBCompleteEvent | StepCCompleteEvent,
    ) -> StopEvent:
        print("Received event ", ev.result)

        # wait until we receive 3 events
        if (
            ctx.collect_events(
                ev,
                [StepCCompleteEvent, StepACompleteEvent, StepBCompleteEvent],
            )
            is None
        ):
            return None

        # do something with all 3 results together
        return StopEvent(result="Done")

为了处理多种事件类型,我们进行了一些更改:

  • start 现在被声明为发射 3 种不同的事件类型
  • step_three 现在被声明为接受 3 种不同的事件类型
  • collect_events 现在接受一个要等待的事件类型数组

请注意,传递给 collect_events 的数组中事件类型的顺序很重要。事件将按照传递给 collect_events 的顺序返回,无论何时收到它们。

此工作流的可视化效果非常令人愉悦

A concurrent workflow

现在让我们看看如何通过 子类化 和其他技术来扩展工作流。