跳到内容

维护状态#

在目前为止的示例中,我们使用自定义事件的属性在步骤之间传递数据。这是一种强大的数据传递方式,但它也有局限性。例如,如果您想在不直接相连的步骤之间传递数据,则需要通过中间的所有步骤来传递数据。这会使您的代码更难阅读和维护。

为了避免这个陷阱,我们在工作流的每个步骤中都提供了 Context 对象。要使用它,只需在步骤中声明一个类型为 Context 的参数。以下是如何实现:

我们��要一个新的导入,即 Context 类型

from llama_index.core.workflow import (
    StartEvent,
    StopEvent,
    Workflow,
    step,
    Event,
    Context,
)

现在我们定义一个 start 事件,它检查数据是否已加载到上下文中。如果还没有,它会返回一个 SetupEvent,该事件会触发 setup 函数加载数据并循环回 start

class SetupEvent(Event):
    query: str


class StepTwoEvent(Event):
    query: str


class StatefulFlow(Workflow):
    @step
    async def start(
        self, ctx: Context, ev: StartEvent
    ) -> SetupEvent | StepTwoEvent:
        db = await ctx.get("some_database", default=None)
        if db is None:
            print("Need to load data")
            return SetupEvent(query=ev.query)

        # do something with the query
        return StepTwoEvent(query=ev.query)

    @step
    async def setup(self, ctx: Context, ev: SetupEvent) -> StartEvent:
        # load data
        await ctx.set("some_database", [1, 2, 3])
        return StartEvent(query=ev.query)

然后在 step_two 中,我们可以直接从上下文中访问数据,而无需显式传递。在生成式 AI 应用中,这对于加载索引和其他大型数据操作非常有用。

@step
async def step_two(self, ctx: Context, ev: StepTwoEvent) -> StopEvent:
    # do something with the data
    print("Data is ", await ctx.get("some_database"))

    return StopEvent(result=await ctx.get("some_database"))


w = StatefulFlow(timeout=10, verbose=False)
result = await w.run(query="Some query")
print(result)

接下来我们将学习如何从进行中的工作流中流式传输事件