跳到内容

流式事件#

工作流可能很复杂——它们旨在处理复杂、分支、并行的逻辑——这意味着它们可能需要一些时间才能完全执行。为了给用户提供良好的体验,您可能希望通过流式传输事件来提供进度指示。工作流在 Context 对象上内置了对此的支持。

为了实现这一点,让我们引入所有需要的依赖项

from llama_index.core.workflow import (
    StartEvent,
    StopEvent,
    Workflow,
    step,
    Event,
    Context,
)
import asyncio
from llama_index.llms.openai import OpenAI
from llama_index.utils.workflow import draw_all_possible_flows

让我们为一个简单的三步工作流设置一些事件,再加上一个事件来处理我们进行时的进度流式传输

class FirstEvent(Event):
    first_output: str


class SecondEvent(Event):
    second_output: str
    response: str


class ProgressEvent(Event):
    msg: str

并定义一个发送事件的工作流类

class MyWorkflow(Workflow):
    @step
    async def step_one(self, ctx: Context, ev: StartEvent) -> FirstEvent:
        ctx.write_event_to_stream(ProgressEvent(msg="Step one is happening"))
        return FirstEvent(first_output="First step complete.")

    @step
    async def step_two(self, ctx: Context, ev: FirstEvent) -> SecondEvent:
        llm = OpenAI(model="gpt-4o-mini")
        generator = await llm.astream_complete(
            "Please give me the first 3 paragraphs of Moby Dick, a book in the public domain."
        )
        async for response in generator:
            # Allow the workflow to stream this piece of response
            ctx.write_event_to_stream(ProgressEvent(msg=response.delta))
        return SecondEvent(
            second_output="Second step complete, full response attached",
            response=str(response),
        )

    @step
    async def step_three(self, ctx: Context, ev: SecondEvent) -> StopEvent:
        ctx.write_event_to_stream(ProgressEvent(msg="Step three is happening"))
        return StopEvent(result="Workflow complete.")

提示

这里的 OpenAI() 假定您的环境中设置了 OPENAI_API_KEY。您也可以使用 api_key 参数传入一个密钥。

step_onestep_three 中,我们将单个事件写入事件流。在 step_two 中,我们使用 astream_complete 生成 LLM 响应的可迭代生成器,然后为 LLM 返回给我们的每个数据块(大约每个词一个)生成一个事件,最后再将最终响应返回给 step_three

要实际获取此输出,我们需要异步运行工作流并监听事件,如下所示

async def main():
    w = MyWorkflow(timeout=30, verbose=True)
    handler = w.run(first_input="Start the workflow.")

    async for ev in handler.stream_events():
        if isinstance(ev, ProgressEvent):
            print(ev.msg)

    final_result = await handler
    print("Final result", final_result)

    draw_all_possible_flows(MyWorkflow, filename="streaming_workflow.html")


if __name__ == "__main__":
    asyncio.run(main())

run 在后台运行工作流,而 stream_events 将提供写入流中的任何事件。当流传递一个 StopEvent 时,它将停止,之后您就可以像往常一样获取工作流的最终结果了。

接下来我们看看并行执行