可观测性#
调试对于任何应用程序开发都至关重要,工作流为您提供了多种调试方法。
可视化#
最简单的调试形式是可视化,我们已经在本次教程中大量使用了它。您可以随时运行以下代码来可视化您的工作流
from llama_index.utils.workflow import draw_all_possible_flows
draw_all_possible_flows(MyWorkflow, filename="some_filename.html")
这将把您的流程的交互式可视化输出到 some_filename.html
文件中,您可以在任何浏览器中查看。
详细模式#
运行任何工作流时,您都可以始终传递 verbose=True
。这将输出每个步骤执行时的名称,以及它是否返回了何种类型的事件。使用本教程前一阶段的 ConcurrentWorkflow
class ConcurrentFlow(Workflow):
@step
async def start(
self, ctx: Context, ev: StartEvent
) -> StepAEvent | StepBEvent | StepCEvent:
ctx.send_event(StepAEvent(query="Query 1"))
ctx.send_event(StepBEvent(query="Query 2"))
ctx.send_event(StepCEvent(query="Query 3"))
@step
async def step_a(self, ctx: Context, ev: StepAEvent) -> StepACompleteEvent:
print("Doing something A-ish")
return StepACompleteEvent(result=ev.query)
@step
async def step_b(self, ctx: Context, ev: StepBEvent) -> StepBCompleteEvent:
print("Doing something B-ish")
return StepBCompleteEvent(result=ev.query)
@step
async def step_c(self, ctx: Context, ev: StepCEvent) -> StepCCompleteEvent:
print("Doing something C-ish")
return StepCCompleteEvent(result=ev.query)
@step
async def step_three(
self,
ctx: Context,
ev: StepACompleteEvent | StepBCompleteEvent | StepCCompleteEvent,
) -> StopEvent:
print("Received event ", ev.result)
# wait until we receive 3 events
if (
ctx.collect_events(
ev,
[StepCCompleteEvent, StepACompleteEvent, StepBCompleteEvent],
)
is None
):
return None
# do something with all 3 results together
return StopEvent(result="Done")
您可以像这样以详细模式运行工作流
w = ConcurrentFlow(timeout=10, verbose=True)
result = await w.run()
您将看到如下输出
Running step start
Step start produced no event
Running step step_a
Doing something A-ish
Step step_a produced event StepACompleteEvent
Running step step_b
Doing something B-ish
Step step_b produced event StepBCompleteEvent
Running step step_c
Doing something C-ish
Step step_c produced event StepCCompleteEvent
Running step step_three
Received event Query 1
Step step_three produced no event
Running step step_three
Received event Query 2
Step step_three produced no event
Running step step_three
Received event Query 3
Step step_three produced event StopEvent
步进执行#
在笔记本环境中,逐步运行工作流可能会很有帮助。您可以通过在处理程序对象上调用 run_step
来实现这一点
w = ConcurrentFlow(timeout=10, verbose=True)
handler = w.run(stepwise=True)
# Each time we call `run_step`, the workflow will advance and return all the events
# that were produced in the last step. This events need to be manually propagated
# for the workflow to keep going (we assign them to `produced_events` with the := operator).
while produced_events := await handler.run_step():
# If we're here, it means there's at least an event we need to propagate,
# let's do it with `send_event`
for ev in produced_events:
handler.ctx.send_event(ev)
# If we're here, it means the workflow execution completed, and
# we can now access the final result.
result = await handler
您可以多次调用 run_step
来一步步执行工作流。
可视化最近的执行#
如果您正在逐步运行工作流,或者刚刚执行了一个带有分支的工作流,您可以使用 draw_most_recent_execution
让可视化器只绘制刚刚执行的精确步骤
from llama_index.utils.workflow import draw_most_recent_execution
draw_most_recent_execution(w, filename="last_execution.html")
注意,这里传递的是工作流的实例 w
,而不是类名。
检查点#
完整的工作流执行可能需要花费大量时间,而且通常只需同时调试和观察几个步骤。为了加快工作流开发周期,WorkflowCheckpointer
对象会封装一个 Workflow
,并在每次运行的每个步骤完成后创建并存储 Checkpoint
。这些检查点可以被查看、检查,并被选作未来运行的起始点。
from llama_index.core.workflow.checkpointer import WorkflowCheckpointer
w = ConcurrentFlow()
w_ckptr = WorkflowCheckpointer(workflow=w)
# run the workflow via w_ckptr to get checkpoints
handler = w_cptr.run()
await handler
# view checkpoints of the last run
w_ckptr.checkpoints[handler.run_id]
# run from a previous ckpt
ckpt = w_ckptr.checkpoints[handler.run_id][0]
handler = w_ckptr.run_from(checkpoint=ckpt)
await handler
第三方工具#
您还可以使用我们支持的任何第三方可视化和调试工具,例如 Arize。
还有一件事#
本教程的最后一步是使用无绑定函数而不是类来定义工作流的替代语法。