维护状态#
在目前为止的示例中,我们使用自定义事件的属性在步骤之间传递数据。这是一种强大的数据传递方式,但它也有局限性。例如,如果您想在不直接相连的步骤之间传递数据,则需要通过中间的所有步骤来传递数据。这会使您的代码更难阅读和维护。
为了避免这个陷阱,我们在工作流的每个步骤中都提供了 Context
对象。要使用它,只需在步骤中声明一个类型为 Context
的参数。以下是如何实现:
我们��要一个新的导入,即 Context
类型
from llama_index.core.workflow import (
StartEvent,
StopEvent,
Workflow,
step,
Event,
Context,
)
现在我们定义一个 start
事件,它检查数据是否已加载到上下文中。如果还没有,它会返回一个 SetupEvent
,该事件会触发 setup
函数加载数据并循环回 start
。
class SetupEvent(Event):
query: str
class StepTwoEvent(Event):
query: str
class StatefulFlow(Workflow):
@step
async def start(
self, ctx: Context, ev: StartEvent
) -> SetupEvent | StepTwoEvent:
db = await ctx.get("some_database", default=None)
if db is None:
print("Need to load data")
return SetupEvent(query=ev.query)
# do something with the query
return StepTwoEvent(query=ev.query)
@step
async def setup(self, ctx: Context, ev: SetupEvent) -> StartEvent:
# load data
await ctx.set("some_database", [1, 2, 3])
return StartEvent(query=ev.query)
然后在 step_two
中,我们可以直接从上下文中访问数据,而无需显式传递。在生成式 AI 应用中,这对于加载索引和其他大型数据操作非常有用。
@step
async def step_two(self, ctx: Context, ev: StepTwoEvent) -> StopEvent:
# do something with the data
print("Data is ", await ctx.get("some_database"))
return StopEvent(result=await ctx.get("some_database"))
w = StatefulFlow(timeout=10, verbose=False)
result = await w.run(query="Some query")
print(result)
接下来我们将学习如何从进行中的工作流中流式传输事件。