跳至内容

工作流#

LlamaIndex 中的 Workflow 是一个事件驱动的抽象,用于将多个事件串联起来。工作流由 steps 组成,每个步骤负责处理特定事件类型并触发新的事件。

LlamaIndex 中的 Workflow 通过使用 @step 装饰器装饰函数来实现。这用于推断每个工作流的输入和输出类型进行验证,并确保每个步骤仅在接收到接受的事件时运行。

您可以使用 Workflow 实现任何功能!构建一个代理、一个 RAG 流程、一个提取流程,或者任何您想要的东西。

工作流也会自动进行插桩(instrumentation),因此您可以使用 Arize Pheonix 等工具对每个步骤进行可观测性分析。(注意:可观测性适用于利用较新插桩系统的集成。使用方式可能有所不同。)

提示

工作流将异步作为一级公民对待,本页面假设您在异步环境中运行。这对您来说意味着需要正确设置您的异步代码。如果您已经在像 FastAPI 这样的服务器或在笔记本中运行,则可以自由地使用 await!

如果您正在运行自己的 Python 脚本,最佳实践是拥有一个单一的异步入口点。

async def main():
    w = MyWorkflow(...)
    result = await w.run(...)
    print(result)


if __name__ == "__main__":
    import asyncio

    asyncio.run(main())

入门#

作为一个说明性示例,让我们考虑一个简单的流程,其中生成一个笑话,然后进行评论。

from llama_index.core.workflow import (
    Event,
    StartEvent,
    StopEvent,
    Workflow,
    step,
)

# `pip install llama-index-llms-openai` if you don't already have it
from llama_index.llms.openai import OpenAI


class JokeEvent(Event):
    joke: str


class JokeFlow(Workflow):
    llm = OpenAI()

    @step
    async def generate_joke(self, ev: StartEvent) -> JokeEvent:
        topic = ev.topic

        prompt = f"Write your best joke about {topic}."
        response = await self.llm.acomplete(prompt)
        return JokeEvent(joke=str(response))

    @step
    async def critique_joke(self, ev: JokeEvent) -> StopEvent:
        joke = ev.joke

        prompt = f"Give a thorough analysis and critique of the following joke: {joke}"
        response = await self.llm.acomplete(prompt)
        return StopEvent(result=str(response))


w = JokeFlow(timeout=60, verbose=False)
result = await w.run(topic="pirates")
print(str(result))

这里有一些活动部分,所以让我们逐步介绍。

定义工作流事件#

class JokeEvent(Event):
    joke: str

事件是用户定义的 pydantic 对象。您可以控制属性和任何其他辅助方法。在此示例中,我们的工作流依赖于一个单一的用户定义事件,即 JokeEvent

设置工作流类#

class JokeFlow(Workflow):
    llm = OpenAI(model="gpt-4o-mini")
    ...

我们的工作流通过继承 Workflow 类实现。为简单起见,我们附加了一个静态的 OpenAI llm 实例。

工作流入口点#

class JokeFlow(Workflow):
    ...

    @step
    async def generate_joke(self, ev: StartEvent) -> JokeEvent:
        topic = ev.topic

        prompt = f"Write your best joke about {topic}."
        response = await self.llm.acomplete(prompt)
        return JokeEvent(joke=str(response))

    ...

这里是我们的工作流入口点。虽然大多数事件是用户定义的,但框架提供了两个特殊的事件,即 StartEventStopEvent。这里的 StartEvent 指示了初始工作流输入的发送位置。

StartEvent 有点特殊,因为它包含任意属性。在这里,我们使用 ev.topic 访问了主题,如果主题不存在则会引发错误。您也可以使用 ev.get("topic") 来处理属性可能不存在的情况,而不会引发错误。

此时,您可能已经注意到我们没有明确告诉工作流哪些事件由哪些步骤处理。相反,使用 @step 装饰器来推断每个步骤的输入和输出类型。此外,这些推断出的输入和输出类型也用于验证您的工作流在运行之前是否有效!

工作流出口点#

class JokeFlow(Workflow):
    ...

    @step
    async def critique_joke(self, ev: JokeEvent) -> StopEvent:
        joke = ev.joke

        prompt = f"Give a thorough analysis and critique of the following joke: {joke}"
        response = await self.llm.acomplete(prompt)
        return StopEvent(result=str(response))

    ...

这里是工作流的第二个也是最后一个步骤。我们知道它是最后一个步骤,因为返回了特殊的 StopEvent。当工作流遇到返回的 StopEvent 时,它会立即停止工作流并返回我们在 result 参数中传递的任何内容。

在此示例中,结果是一个字符串,但它可以是一个字典、列表或任何其他对象。

运行工作流#

w = JokeFlow(timeout=60, verbose=False)
result = await w.run(topic="pirates")
print(str(result))

最后,我们创建并运行工作流。有一些设置,如超时(以秒为单位)和详细程度(verbosity),有助于调试。

.run() 方法是异步的,所以我们在这里使用 await 来等待结果。传递给 run() 的关键字参数将成为特殊 StartEvent 的字段,该事件将自动发出并启动工作流。正如我们所见,在此示例中,可以使用 ev.topic 从步骤中访问 topic

定制入口点和出口点#

大多数情况下,依赖于我们在[入门]部分看到的默认入口点和出口点就足够了。但是,工作流支持自定义事件,这些自定义事件通常可以代替 StartEventStopEvent,让我们看看如何实现。

使用自定义 StartEvent#

当我们调用工作流实例的 run() 方法时,传递的关键字参数将成为底层自动创建的 StartEvent 实例的字段。如果我们想传递复杂数据来启动工作流,这种方法可能会变得繁琐,这时我们可以引入自定义的启动事件。

要使用自定义启动事件,第一步是创建一个继承自 StartEvent 的自定义类

from pathlib import Path

from llama_index.core.workflow import StartEvent
from llama_index.indices.managed.llama_cloud import LlamaCloudIndex
from llama_index.llms.openai import OpenAI


class MyCustomStartEvent(StartEvent):
    a_string_field: str
    a_path_to_somewhere: Path
    an_index: LlamaCloudIndex
    an_llm: OpenAI

现在我们所需要做的就是在使用作入口点的步骤中将 MyCustomStartEvent 作为事件类型。例如,考虑这个人为复杂的步骤

class JokeFlow(Workflow):
    ...

    @step
    async def generate_joke_from_index(
        self, ev: MyCustomStartEvent
    ) -> JokeEvent:
        # Build a query engine using the index and the llm from the start event
        query_engine = ev.an_index.as_query_engine(llm=ev.an_llm)
        topic = query_engine.query(
            f"What is the closest topic to {a_string_field}"
        )
        # Use the llm attached to the start event to instruct the model
        prompt = f"Write your best joke about {topic}."
        response = await ev.an_llm.acomplete(prompt)
        # Dump the response on disk using the Path object from the event
        ev.a_path_to_somewhere.write_text(str(response))
        # Finally, pass the JokeEvent along
        return JokeEvent(joke=str(response))

我们仍然可以将 MyCustomStartEvent 的字段作为关键字参数传递给工作流的 run 方法,但这同样会很繁琐。更好的方法是通过 start_event 关键字参数传递事件实例,像这样

custom_start_event = MyCustomStartEvent(...)
w = JokeFlow(timeout=60, verbose=False)
result = await w.run(start_event=custom_start_event)
print(str(result))

这种方法使代码更清晰、更具说明性,并允许 IDE 中的自动完成功能正常工作。

使用自定义 StopEvent#

StartEvent 类似,依赖内置的 StopEvent 大部分时间都有效,但并非总是如此。实际上,当我们使用 StopEvent 时,工作流的结果必须设置为事件实例的 result 字段。由于结果可以是任何 Python 对象,StopEventresult 字段类型被指定为 Any,从而丧失了类型系统的优势。此外,返回多个对象也很繁琐:我们通常将一堆不相关的对象塞进一个字典中,然后将其赋值给 StopEvent.result

支持自定义停止事件的第一步是创建一个 StopEvent 的子类

from llama_index.core.workflow import StopEvent


class MyStopEvent(StopEvent):
    critique: CompletionResponse

现在我们可以将工作流中的 StopEvent 替换为 MyStopEvent

class JokeFlow(Workflow):
    ...

    @step
    async def critique_joke(self, ev: JokeEvent) -> MyStopEvent:
        joke = ev.joke

        prompt = f"Give a thorough analysis and critique of the following joke: {joke}"
        response = await self.llm.acomplete(prompt)
        return MyStopEvent(response)

    ...

使用自定义停止事件时需要记住的重要一点是,工作流运行的结果将是该事件的实例

w = JokeFlow(timeout=60, verbose=False)
# Warning! `result` now contains an instance of MyStopEvent!
result = await w.run(topic="pirates")
# We can now access the event fields as any normal Event
print(result.critique.text)

这种方法利用了 Python 类型系统,对 IDE 中的自动完成友好,并允许外部应用程序进行自省,这些应用程序现在可以准确地知道工作流运行将返回什么。

绘制工作流#

利用步骤定义中的类型注解,工作流可以被可视化。您可以绘制工作流的所有可能路径,或者最近一次执行的路径,以帮助调试。

首先安装

pip install llama-index-utils-workflow

然后导入并使用

from llama_index.utils.workflow import (
    draw_all_possible_flows,
    draw_most_recent_execution,
)

# Draw all
draw_all_possible_flows(JokeFlow, filename="joke_flow_all.html")

# Draw an execution
w = JokeFlow()
await w.run(topic="Pirates")
draw_most_recent_execution(w, filename="joke_flow_recent.html")

使用全局上下文/状态#

或者,您可以选择在步骤之间使用全局上下文。例如,可能有多个步骤访问用户提供的原始 query 输入。您可以将其存储在全局上下文中,以便每个步骤都可以访问。

from llama_index.core.workflow import Context


@step
async def query(self, ctx: Context, ev: MyEvent) -> StopEvent:
    # retrieve from context
    query = await ctx.get("query")

    # do something with context and event
    val = ...
    result = ...

    # store in context
    await ctx.set("key", val)

    return StopEvent(result=result)

等待多个事件#

上下文不仅仅用于存储数据,它还提供了缓冲和等待多个事件的实用工具。

例如,您可能有一个步骤,它在合成响应之前等待一个查询和检索到的节点

from llama_index.core import get_response_synthesizer


@step
async def synthesize(
    self, ctx: Context, ev: QueryEvent | RetrieveEvent
) -> StopEvent | None:
    data = ctx.collect_events(ev, [QueryEvent, RetrieveEvent])
    # check if we can run
    if data is None:
        return None

    # unpack -- data is returned in order
    query_event, retrieve_event = data

    # run response synthesis
    synthesizer = get_response_synthesizer()
    response = synthesizer.synthesize(
        query_event.query, nodes=retrieve_event.nodes
    )

    return StopEvent(result=response)

使用 ctx.collect_events() 我们可以缓冲并等待所有预期事件到达。此函数只有在所有事件到达后才会返回数据(按请求的顺序)。

手动触发事件#

通常,事件是通过在步骤中返回另一个事件来触发的。但是,也可以在工作流中使用 ctx.send_event(event) 方法手动调度事件。

这里有一个简单的玩具示例,展示了如何使用它

from llama_index.core.workflow import step, Context, Event, Workflow


class MyEvent(Event):
    pass


class MyEventResult(Event):
    result: str


class GatherEvent(Event):
    pass


class MyWorkflow(Workflow):
    @step
    async def dispatch_step(
        self, ctx: Context, ev: StartEvent
    ) -> MyEvent | GatherEvent:
        ctx.send_event(MyEvent())
        ctx.send_event(MyEvent())

        return GatherEvent()

    @step
    async def handle_my_event(self, ev: MyEvent) -> MyEventResult:
        return MyEventResult(result="result")

    @step
    async def gather(
        self, ctx: Context, ev: GatherEvent | MyEventResult
    ) -> StopEvent | None:
        # wait for events to finish
        events = ctx.collect_events(ev, [MyEventResult, MyEventResult])
        if not events:
            return None

        return StopEvent(result=events)

事件流#

您还可以迭代接收到的事件。这对于流式处理、显示进度或调试非常有用。handler 对象将发出使用 ctx.write_event_to_stream() 显式写入流中的事件。

class ProgressEvent(Event):
    msg: str


class MyWorkflow(Workflow):
    @step
    async def step_one(self, ctx: Context, ev: StartEvent) -> FirstEvent:
        ctx.write_event_to_stream(ProgressEvent(msg="Step one is happening"))
        return FirstEvent(first_output="First step complete.")

然后您可以像这样接收事件

w = MyWorkflow(...)

handler = w.run(topic="Pirates")

async for event in handler.stream_events():
    print(event)

result = await handler

失败时重试步骤执行#

执行失败的步骤可能导致整个工作流失败,但错误通常是预期内的,并且可以安全地重试执行。例如,由于网络短暂拥塞导致的 HTTP 请求超时,或者外部 API 调用达到了速率限制。

对于所有您希望步骤重试的情况,您可以使用“重试策略 (Retry Policy)”。重试策略是一个对象,它指示工作流执行一个步骤多次,并规定在新尝试之前需要等待多长时间。策略会考虑自第一次失败以来经过的时间、连续失败的次数以及最后发生的错误。

要为特定步骤设置策略,只需将策略对象传递给 @step 装饰器

from llama_index.core.workflow.retry_policy import ConstantDelayRetryPolicy


class MyWorkflow(Workflow):
    # ...more workflow definition...

    # This policy will retry this step on failure every 5 seconds for at most 10 times
    @step(retry_policy=ConstantDelayRetryPolicy(delay=5, maximum_attempts=10))
    async def flaky_step(self, ctx: Context, ev: StartEvent) -> StopEvent:
        result = flaky_call()  # this might raise
        return StopEvent(result=result)

有关框架中可用策略的详细说明,请参阅API 文档。如果您找不到适合您用例的策略,您可以轻松编写一个自定义策略。自定义策略的唯一要求是编写一个符合 RetryPolicy 协议的 Python 类。换句话说,您的自定义策略类必须有一个具有以下签名的函数

def next(
    self, elapsed_time: float, attempts: int, error: Exception
) -> Optional[float]:
    ...

例如,这是一个对周末感到兴奋的重试策略,它只在周五重试一个步骤

from datetime import datetime


class RetryOnFridayPolicy:
    def next(
        self, elapsed_time: float, attempts: int, error: Exception
    ) -> Optional[float]:
        if datetime.today().strftime("%A") == "Friday":
            # retry in 5 seconds
            return 5
        # tell the workflow we don't want to retry
        return None

人工介入 (Human-in-the-loop)#

由于工作流非常灵活,因此有多种可能的方式来实现人工介入(human-in-the-loop)模式。

实现人工介入最简单的方法是在事件流处理过程中使用 InputRequiredEventHumanResponseEvent 事件。

from llama_index.core.workflow import InputRequiredEvent, HumanResponseEvent


class HumanInTheLoopWorkflow(Workflow):
    @step
    async def step1(self, ev: StartEvent) -> InputRequiredEvent:
        return InputRequiredEvent(prefix="Enter a number: ")

    @step
    async def step2(self, ev: HumanResponseEvent) -> StopEvent:
        return StopEvent(result=ev.response)


# workflow should work with streaming
workflow = HumanInTheLoopWorkflow()

handler = workflow.run()
async for event in handler.stream_events():
    if isinstance(event, InputRequiredEvent):
        # here, we can handle human input however you want
        # this means using input(), websockets, accessing async state, etc.
        # here, we just use input()
        response = input(event.prefix)
        handler.ctx.send_event(HumanResponseEvent(response=response))

final_result = await handler

在此,工作流将等待直到 HumanResponseEvent 被发出。

另请注意,您可以跳出循环,稍后再恢复。如果您想暂停工作流以等待人工响应,但稍后继续工作流,这将非常有用。

handler = workflow.run()
async for event in handler.stream_events():
    if isinstance(event, InputRequiredEvent):
        break

# now we handle the human response
response = input(event.prefix)
handler.ctx.send_event(HumanResponseEvent(response=response))

# now we resume the workflow streaming
async for event in handler.stream_events():
    continue

final_result = await handler

分步执行#

工作流内置了分步执行的实用工具,允许您在执行过程中控制执行并调试状态。

# Create a workflow, same as usual
workflow = JokeFlow()
# Get the handler. Passing `stepwise=True` will block execution, waiting for manual intervention
handler = workflow.run(stepwise=True)
# Each time we call `run_step`, the workflow will advance and return all the events
# that were produced in the last step. This events need to be manually propagated
# for the workflow to keep going (we assign them to `produced_events` with the := operator).
while produced_events := await handler.run_step():
    # If we're here, it means there's at least an event we need to propagate,
    # let's do it with `send_event`
    for ev in produced_events:
        handler.ctx.send_event(ev)

# If we're here, it means the workflow execution completed, and
# we can now access the final result.
result = await handler

装饰非类函数#

您还可以在不继承的情况下装饰函数并将步骤附加到工作流中。

下面是前面提到的 JokeFlow,但没有使用继承来定义。

from llama_index.core.workflow import (
    Event,
    StartEvent,
    StopEvent,
    Workflow,
    step,
)
from llama_index.llms.openai import OpenAI


class JokeEvent(Event):
    joke: str


joke_flow = Workflow(timeout=60, verbose=True)


@step(workflow=joke_flow)
async def generate_joke(ev: StartEvent) -> JokeEvent:
    topic = ev.topic

    prompt = f"Write your best joke about {topic}."

    llm = OpenAI()
    response = await llm.acomplete(prompt)
    return JokeEvent(joke=str(response))


@step(workflow=joke_flow)
async def critique_joke(ev: JokeEvent) -> StopEvent:
    joke = ev.joke

    prompt = (
        f"Give a thorough analysis and critique of the following joke: {joke}"
    )
    response = await llm.acomplete(prompt)
    return StopEvent(result=str(response))

跨运行维护上下文#

正如您所见,工作流有一个 Context 对象,可用于在步骤之间维护状态。

如果您想在工作流的多次运行中维护状态,可以将先前的上下文传递给 .run() 方法。

handler = w.run()
result = await handler

# continue with next run
handler = w.run(ctx=handler.ctx)
result = await handler

工作流检查点#

工作流运行还可以通过 WorfklowCheckpointer 对象在每个步骤完成时创建和存储检查点。这些检查点可以用作未来运行的起始点,这在工作流开发(和调试)过程中是一个非常有用的功能。

from llama_index.core.workflow import WorkflowCheckpointer

w = JokeFlow(...)
w_cptr = WorkflowCheckpointer(workflow=w)

# to checkpoint a run, use the `run` method from w_cptr
handler = w_cptr.run(topic="Pirates")
await handler

# to view the stored checkpoints of this run
w_cptr.checkpoints[handler.run_id]

# to run from one of the checkpoints, use `run_from` method
ckpt = w_cptr.checkpoints[handler.run_id][0]
handler = w_cptr.run_from(topic="Ships", checkpoint=ckpt)
await handler

部署工作流#

您可以使用 llama_deploy仓库)将工作流部署为多代理服务。每个代理服务都通过控制平面进行编排,并通过消息队列进行通信。可以在本地或 Kubernetes 上部署。

示例#

为了帮助您更熟悉工作流概念及其功能,LlamaIndex 文档提供了示例笔记本,您可以运行这些笔记本进行动手学习

  • 常见工作流模式通过简单的工​​作流向您介绍了循环和状态管理等常见使用模式。这通常是一个很好的起点。
  • RAG + 重排序展示了如何使用一个相当简单的工作流实现真实世界的用例,该工作流执行摄取和查询。
  • 引用查询引擎类似于 RAG + 重排序,该笔记本重点介绍了如何在检索和生成之间实现中间步骤。这是一个很好地说明如何在工作流中使用 Context 对象的示例。
  • 纠正性 RAG 在 RAG 工作流的基础上增加了一些复杂性,展示了如何在评估步骤后查询网络搜索引擎。
  • 利用并发解释了如何在工作流中管理步骤的并行执行,这对于随着工作流复杂性增加而掌握的知识非常重要。

RAG 应用易于理解,是学习工作流基础知识的绝佳机会。然而,更复杂的代理场景,涉及工具调用、记忆和路由,才是工作流真正擅长的领域。

下面的示例重点介绍了一些这些用例。

最后但同样重要的是,一些更高级的用例,展示了如果您需要快速实现原型(例如来自文献),工作流将极其方便