工作流 Cookbook:LlamaIndex 工作流功能详解¶
首先,我们安装依赖项。Core 包包含了大部分所需功能;OpenAI 用于处理 LLM 访问,而 utils-workflow 提供了我们稍后将使用的可视化功能。
!pip install --upgrade llama-index-core llama-index-llms-openai llama-index-utils-workflow
然后我们导入刚刚安装的依赖
from llama_index.core.workflow import (
Event,
StartEvent,
StopEvent,
Workflow,
step,
Context,
)
import random
from llama_index.core.workflow import draw_all_possible_flows
from llama_index.utils.workflow import draw_most_recent_execution
from llama_index.llms.openai import OpenAI
设置我们的 OpenAI 密钥,这样我们就可以执行实际的 LLM 操作。
import os
os.environ["OPENAI_API_KEY"] = "sk-proj-..."
工作流基础¶
让我们从最基本的工作流开始:它只是启动,做一件事,然后停止。如果你的任务如此简单,没有理由使用真正的工作流,但这只是为了演示它们的工作原理。
from llama_index.llms.openai import OpenAI
class OpenAIGenerator(Workflow):
@step
async def generate(self, ev: StartEvent) -> StopEvent:
llm = OpenAI(model="gpt-4o")
response = await llm.acomplete(ev.query)
return StopEvent(result=str(response))
w = OpenAIGenerator(timeout=10, verbose=False)
result = await w.run(query="What's LlamaIndex?")
print(result)
LlamaIndex, formerly known as GPT Index, is a data framework designed to facilitate the connection between large language models (LLMs) and external data sources. It provides tools to index various data types, such as documents, databases, and APIs, enabling LLMs to interact with and retrieve information from these sources more effectively. The framework supports the creation of indices that can be queried by LLMs, enhancing their ability to access and utilize external data in a structured manner. This capability is particularly useful for applications requiring the integration of LLMs with specific datasets or knowledge bases.
工作流的一个很酷的地方是我们可以使用 pyvis 将它们可视化。让我们看看这个非常简单的流程是什么样的。
draw_all_possible_flows(OpenAIGenerator, filename="trivial_workflow.html")
class FailedEvent(Event):
error: str
class QueryEvent(Event):
query: str
class LoopExampleFlow(Workflow):
@step
async def answer_query(
self, ev: StartEvent | QueryEvent
) -> FailedEvent | StopEvent:
query = ev.query
# try to answer the query
random_number = random.randint(0, 1)
if random_number == 0:
return FailedEvent(error="Failed to answer the query.")
else:
return StopEvent(result="The answer to your query")
@step
async def improve_query(self, ev: FailedEvent) -> QueryEvent | StopEvent:
# improve the query or decide it can't be fixed
random_number = random.randint(0, 1)
if random_number == 0:
return QueryEvent(query="Here's a better query.")
else:
return StopEvent(result="Your query can't be fixed.")
我们这里使用随机数来模拟 LLM 的行为,这样可以获得可靠有趣的表现。
answer_query() 接受一个开始事件。然后它可以做 2 件事
- 它可以回答查询并发出一个 StopEvent,返回结果
- 它可以判断查询不好并发出一个 FailedEvent
improve_query() 接受一个 FailedEvent。它也可以做 2 件事
- 它可以判断查询无法改进并发出一个 StopEvent,返回失败
- 它可以提供一个更好的查询并发出一个 QueryEvent,这会创建一个循环回到 answer_query()
我们也可以将这个更复杂的工作流可视化
draw_all_possible_flows(LoopExampleFlow, filename="loop_workflow.html")
loop_workflow.html
我们这里将 verbose
设置为 True
,这样我们可以看到触发了哪些事件。可以看到它方便地演示了循环然后回答的过程。
l = LoopExampleFlow(timeout=10, verbose=True)
result = await l.run(query="What's LlamaIndex?")
print(result)
Running step answer_query Step answer_query produced event FailedEvent Running step improve_query Step improve_query produced event StopEvent Your query can't be fixed.
在事件之间维护状态¶
有一个全局状态,允许你保留任意数据或函数,供所有事件处理程序使用。
class GlobalExampleFlow(Workflow):
@step
async def setup(self, ctx: Context, ev: StartEvent) -> QueryEvent:
# load our data here
await ctx.set("some_database", ["value1", "value2", "value3"])
return QueryEvent(query=ev.query)
@step
async def query(self, ctx: Context, ev: QueryEvent) -> StopEvent:
# use our data with our query
data = await ctx.get("some_database")
result = f"The answer to your query is {data[1]}"
return StopEvent(result=result)
g = GlobalExampleFlow(timeout=10, verbose=True)
result = await g.run(query="What's LlamaIndex?")
print(result)
Running step setup Step setup produced event QueryEvent Running step query Step query produced event StopEvent The answer to your query is value2
当然,这个流程本质上仍然是线性的。一个更实际的例子是,如果你的开始事件可以是一个查询或一个数据填充事件,并且你需要等待。让我们设置一个来看它是什么样子
class WaitExampleFlow(Workflow):
@step
async def setup(self, ctx: Context, ev: StartEvent) -> StopEvent:
if hasattr(ev, "data"):
await ctx.set("data", ev.data)
return StopEvent(result=None)
@step
async def query(self, ctx: Context, ev: StartEvent) -> StopEvent:
if hasattr(ev, "query"):
# do we have any data?
if hasattr(self, "data"):
data = await ctx.get("data")
return StopEvent(result=f"Got the data {data}")
else:
# there's non data yet
return None
else:
# this isn't a query
return None
w = WaitExampleFlow(verbose=True)
result = await w.run(query="Can I kick it?")
if result is None:
print("No you can't")
print("---")
result = await w.run(data="Yes you can")
print("---")
result = await w.run(query="Can I kick it?")
print(result)
Running step query Step query produced no event Running step setup Step setup produced event StopEvent No you can't --- Running step query Step query produced no event Running step setup Step setup produced event StopEvent --- Running step query Step query produced event StopEvent Running step setup Step setup produced event StopEvent Got the data Yes you can
让我们可视化这个流程是如何工作的
draw_all_possible_flows(WaitExampleFlow, filename="wait_workflow.html")
wait_workflow.html
等待一个或多个事件¶
由于等待事件是一种常见的模式,上下文对象提供了一个便捷函数 collect_events()
。它会捕获事件并存储它们,在收集到所有必需的事件之前返回 None
。这些事件将按照指定顺序附加到 collect_events
的输出中。让我们看看实际效果
class InputEvent(Event):
input: str
class SetupEvent(Event):
error: bool
class QueryEvent(Event):
query: str
class CollectExampleFlow(Workflow):
@step
async def setup(self, ctx: Context, ev: StartEvent) -> SetupEvent:
# generically start everything up
if not hasattr(self, "setup") or not self.setup:
self.setup = True
print("I got set up")
return SetupEvent(error=False)
@step
async def collect_input(self, ev: StartEvent) -> InputEvent:
if hasattr(ev, "input"):
# perhaps validate the input
print("I got some input")
return InputEvent(input=ev.input)
@step
async def parse_query(self, ev: StartEvent) -> QueryEvent:
if hasattr(ev, "query"):
# parse the query in some way
print("I got a query")
return QueryEvent(query=ev.query)
@step
async def run_query(
self, ctx: Context, ev: InputEvent | SetupEvent | QueryEvent
) -> StopEvent | None:
ready = ctx.collect_events(ev, [QueryEvent, InputEvent, SetupEvent])
if ready is None:
print("Not enough events yet")
return None
# run the query
print("Now I have all the events")
print(ready)
result = f"Ran query '{ready[0].query}' on input '{ready[1].input}'"
return StopEvent(result=result)
c = CollectExampleFlow()
result = await c.run(input="Here's some input", query="Here's my question")
print(result)
I got some input I got a query Not enough events yet Not enough events yet Now I have all the events [QueryEvent(query="Here's my question"), InputEvent(input="Here's some input"), SetupEvent(error=False)] Ran query 'Here's my question' on input 'Here's some input'
你可以看到每个事件都被触发,并且 collect 事件在收集到足够的事件之前反复返回 None
。让我们看看这在流程图中是什么样子
draw_all_possible_flows(CollectExampleFlow, "collect_workflow.html")
collect_workflow.html