ReAct 智能体的工作流¶
本 notebook 逐步介绍如何从(大部分)零开始设置 Workflow
来构建 ReAct 智能体。
ReAct 调用智能体的工作方式是提示 LLM 调用工具/函数,或返回最终响应。
我们的工作流将是有状态的,带有记忆功能,并且能够调用 LLM 来选择工具和处理传入的用户消息。
!pip install -U llama-index
import os
os.environ["OPENAI_API_KEY"] = "sk-proj-..."
[可选] 使用 Llamatrace 设置可观测性¶
设置跟踪以可视化工作流中的每个步骤。
!pip install "llama-index-core>=0.10.43" "openinference-instrumentation-llama-index>=2" "opentelemetry-proto>=1.12.0" opentelemetry-exporter-otlp opentelemetry-sdk
from opentelemetry.sdk import trace as trace_sdk
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
OTLPSpanExporter as HTTPSpanExporter,
)
from openinference.instrumentation.llama_index import LlamaIndexInstrumentor
# Add Phoenix API Key for tracing
PHOENIX_API_KEY = "<YOUR-PHOENIX-API-KEY>"
os.environ["OTEL_EXPORTER_OTLP_HEADERS"] = f"api_key={PHOENIX_API_KEY}"
# Add Phoenix
span_phoenix_processor = SimpleSpanProcessor(
HTTPSpanExporter(endpoint="https://app.phoenix.arize.com/v1/traces")
)
# Add them to the tracer
tracer_provider = trace_sdk.TracerProvider()
tracer_provider.add_span_processor(span_processor=span_phoenix_processor)
# Instrument the application
LlamaIndexInstrumentor().instrument(tracer_provider=tracer_provider)
由于工作流是异步优先的,这在 notebook 中运行良好。如果你在自己的代码中运行,如果还没有异步事件循环运行,你会想要使用 asyncio.run()
来启动一个。
async def main():
<async code>
if __name__ == "__main__":
import asyncio
asyncio.run(main())
设计工作流¶
一个智能体包含几个步骤:
- 处理最新的用户消息,包括添加到记忆中并准备聊天历史。
- 使用聊天历史和工具构建 ReAct 提示。
- 使用 ReAct 提示调用 LLM,并解析出函数/工具调用。
- 如果没有工具调用,我们可以返回。
- 如果有工具调用,我们需要执行它们,然后循环回去使用最新的工具调用获取新的 ReAct 提示。
工作流事件¶
为了处理这些步骤,我们需要定义一些事件:
- 一个处理新消息并准备聊天历史的事件。
- 一个流式传输 LLM 响应的事件。
- 一个使用 ReAct 提示词提示 LLM 的事件。
- 一个触发工具调用的事件(如果有的话)。
- 一个处理工具调用结果的事件(如果有的话)。
其他步骤将使用内置的 StartEvent
和 StopEvent
事件。
除了事件之外,我们还将使用全局上下文来存储当前的 ReAct 推理过程!
from llama_index.core.llms import ChatMessage
from llama_index.core.tools import ToolSelection, ToolOutput
from llama_index.core.workflow import Event
class PrepEvent(Event):
pass
class InputEvent(Event):
input: list[ChatMessage]
class StreamEvent(Event):
delta: str
class ToolCallEvent(Event):
tool_calls: list[ToolSelection]
class FunctionOutputEvent(Event):
output: ToolOutput
from typing import Any, List
from llama_index.core.agent.react import ReActChatFormatter, ReActOutputParser
from llama_index.core.agent.react.types import (
ActionReasoningStep,
ObservationReasoningStep,
)
from llama_index.core.llms.llm import LLM
from llama_index.core.memory import ChatMemoryBuffer
from llama_index.core.tools.types import BaseTool
from llama_index.core.workflow import (
Context,
Workflow,
StartEvent,
StopEvent,
step,
)
from llama_index.llms.openai import OpenAI
class ReActAgent(Workflow):
def __init__(
self,
*args: Any,
llm: LLM | None = None,
tools: list[BaseTool] | None = None,
extra_context: str | None = None,
**kwargs: Any,
) -> None:
super().__init__(*args, **kwargs)
self.tools = tools or []
self.llm = llm or OpenAI()
self.formatter = ReActChatFormatter.from_defaults(
context=extra_context or ""
)
self.output_parser = ReActOutputParser()
@step
async def new_user_msg(self, ctx: Context, ev: StartEvent) -> PrepEvent:
# clear sources
await ctx.set("sources", [])
# init memory if needed
memory = await ctx.get("memory", default=None)
if not memory:
memory = ChatMemoryBuffer.from_defaults(llm=self.llm)
# get user input
user_input = ev.input
user_msg = ChatMessage(role="user", content=user_input)
memory.put(user_msg)
# clear current reasoning
await ctx.set("current_reasoning", [])
# set memory
await ctx.set("memory", memory)
return PrepEvent()
@step
async def prepare_chat_history(
self, ctx: Context, ev: PrepEvent
) -> InputEvent:
# get chat history
memory = await ctx.get("memory")
chat_history = memory.get()
current_reasoning = await ctx.get("current_reasoning", default=[])
# format the prompt with react instructions
llm_input = self.formatter.format(
self.tools, chat_history, current_reasoning=current_reasoning
)
return InputEvent(input=llm_input)
@step
async def handle_llm_input(
self, ctx: Context, ev: InputEvent
) -> ToolCallEvent | StopEvent:
chat_history = ev.input
current_reasoning = await ctx.get("current_reasoning", default=[])
memory = await ctx.get("memory")
response_gen = await self.llm.astream_chat(chat_history)
async for response in response_gen:
ctx.write_event_to_stream(StreamEvent(delta=response.delta or ""))
try:
reasoning_step = self.output_parser.parse(response.message.content)
current_reasoning.append(reasoning_step)
if reasoning_step.is_done:
memory.put(
ChatMessage(
role="assistant", content=reasoning_step.response
)
)
await ctx.set("memory", memory)
await ctx.set("current_reasoning", current_reasoning)
sources = await ctx.get("sources", default=[])
return StopEvent(
result={
"response": reasoning_step.response,
"sources": [sources],
"reasoning": current_reasoning,
}
)
elif isinstance(reasoning_step, ActionReasoningStep):
tool_name = reasoning_step.action
tool_args = reasoning_step.action_input
return ToolCallEvent(
tool_calls=[
ToolSelection(
tool_id="fake",
tool_name=tool_name,
tool_kwargs=tool_args,
)
]
)
except Exception as e:
current_reasoning.append(
ObservationReasoningStep(
observation=f"There was an error in parsing my reasoning: {e}"
)
)
await ctx.set("current_reasoning", current_reasoning)
# if no tool calls or final response, iterate again
return PrepEvent()
@step
async def handle_tool_calls(
self, ctx: Context, ev: ToolCallEvent
) -> PrepEvent:
tool_calls = ev.tool_calls
tools_by_name = {tool.metadata.get_name(): tool for tool in self.tools}
current_reasoning = await ctx.get("current_reasoning", default=[])
sources = await ctx.get("sources", default=[])
# call tools -- safely!
for tool_call in tool_calls:
tool = tools_by_name.get(tool_call.tool_name)
if not tool:
current_reasoning.append(
ObservationReasoningStep(
observation=f"Tool {tool_call.tool_name} does not exist"
)
)
continue
try:
tool_output = tool(**tool_call.tool_kwargs)
sources.append(tool_output)
current_reasoning.append(
ObservationReasoningStep(observation=tool_output.content)
)
except Exception as e:
current_reasoning.append(
ObservationReasoningStep(
observation=f"Error calling tool {tool.metadata.get_name()}: {e}"
)
)
# save new state in context
await ctx.set("sources", sources)
await ctx.set("current_reasoning", current_reasoning)
# prep the next iteraiton
return PrepEvent()
就这样!让我们稍微探索一下我们写的工作流。
new_user_msg()
:将用户消息添加到内存中,并清除全局上下文以跟踪新的推理链。
prepare_chat_history()
:准备 ReAct 提示,使用聊天历史、工具和当前推理(如果有的话)。
handle_llm_input()
:使用我们的 ReAct 提示词提示 LLM,并使用一些辅助函数解析输出。如果没有工具调用,我们可以停止并发出 StopEvent
。否则,我们发出 ToolCallEvent
来处理工具调用。最后,如果没有工具调用且没有最终响应,我们只需再次循环。
handle_tool_calls()
:安全地调用工具并进行错误处理,将工具输出添加到当前推理中。然后,通过发出 PrepEvent
,我们循环回到下一轮 ReAct 提示和解析。
运行工作流!¶
注意:使用循环时,我们需要注意运行时长。这里,我们设置了 120 秒的超时。
from llama_index.core.tools import FunctionTool
from llama_index.llms.openai import OpenAI
def add(x: int, y: int) -> int:
"""Useful function to add two numbers."""
return x + y
def multiply(x: int, y: int) -> int:
"""Useful function to multiply two numbers."""
return x * y
tools = [
FunctionTool.from_defaults(add),
FunctionTool.from_defaults(multiply),
]
agent = ReActAgent(
llm=OpenAI(model="gpt-4o"), tools=tools, timeout=120, verbose=True
)
ret = await agent.run(input="Hello!")
Running step new_user_msg Step new_user_msg produced event PrepEvent Running step prepare_chat_history Step prepare_chat_history produced event InputEvent Running step handle_llm_input Step handle_llm_input produced event StopEvent
print(ret["response"])
Hello! How can I assist you today?
ret = await agent.run(input="What is (2123 + 2321) * 312?")
Running step new_user_msg Step new_user_msg produced event PrepEvent Running step prepare_chat_history Step prepare_chat_history produced event InputEvent Running step handle_llm_input Step handle_llm_input produced event ToolCallEvent Running step handle_tool_calls Step handle_tool_calls produced event PrepEvent Running step prepare_chat_history Step prepare_chat_history produced event InputEvent Running step handle_llm_input Step handle_llm_input produced event ToolCallEvent Running step handle_tool_calls Step handle_tool_calls produced event PrepEvent Running step prepare_chat_history Step prepare_chat_history produced event InputEvent Running step handle_llm_input Step handle_llm_input produced event StopEvent
print(ret["response"])
The result of (2123 + 2321) * 312 is 1,386,528.
聊天历史¶
默认情况下,工作流为每次运行创建一个新的 Context
。这意味着聊天历史不会在不同运行之间保留。然而,我们可以将自己的 Context
传递给工作流来保留聊天历史。
from llama_index.core.workflow import Context
ctx = Context(agent)
ret = await agent.run(input="Hello! My name is Logan", ctx=ctx)
print(ret["response"])
ret = await agent.run(input="What is my name?", ctx=ctx)
print(ret["response"])
Running step new_user_msg Step new_user_msg produced event PrepEvent Running step prepare_chat_history Step prepare_chat_history produced event InputEvent Running step handle_llm_input Step handle_llm_input produced event StopEvent Hello, Logan! How can I assist you today? Running step new_user_msg Step new_user_msg produced event PrepEvent Running step prepare_chat_history Step prepare_chat_history produced event InputEvent Running step handle_llm_input Step handle_llm_input produced event StopEvent Your name is Logan.
流式传输¶
我们还可以使用 .run()
方法返回的 handler
对象访问 LLM 的流式响应。
agent = ReActAgent(
llm=OpenAI(model="gpt-4o"), tools=tools, timeout=120, verbose=False
)
handler = agent.run(input="Hello! Tell me a joke.")
async for event in handler.stream_events():
if isinstance(event, StreamEvent):
print(event.delta, end="", flush=True)
response = await handler
# print(response)
Thought: The current language of the user is: English. I cannot use a tool to help me answer the question. Answer: Why don't scientists trust atoms? Because they make up everything!