FunctionAgent / AgentWorkflow 基本介绍¶
AgentWorkflow
是用于运行一个或多个代理系统的编排器。在此示例中,我们将创建一个包含单个 FunctionAgent
的简单工作流,并用它来介绍基本功能。
%pip install llama-index
from llama_index.llms.openai import OpenAI
llm = OpenAI(model="gpt-4o-mini", api_key="sk-...")
为了使我们的代理更有用,我们可以为其提供工具/操作。在此示例中,我们将使用 Tavily 来实现一个可以搜索网页信息的工具。您可以从Tavily获取免费的 API 密钥。
%pip install tavily-python
创建工具时,非常重要的一点是:
- 为工具提供一个合适的名称和文档字符串/描述。LLM 会使用这些信息来理解工具的功能。
- 注释类型。这有助于 LLM 理解预期的输入和输出类型。
- 尽可能使用异步,因为这将使工作流更高效。
from tavily import AsyncTavilyClient
async def search_web(query: str) -> str:
"""Useful for using the web to answer questions."""
client = AsyncTavilyClient(api_key="tvly-...")
return str(await client.search(query))
定义好工具和 LLM 后,我们可以创建一个使用该工具的 AgentWorkflow
。
from llama_index.core.agent.workflow import FunctionAgent
agent = FunctionAgent(
tools=[search_web],
llm=llm,
system_prompt="You are a helpful assistant that can search the web for information.",
)
运行代理¶
现在我们的代理已创建,我们可以运行它了!
response = await agent.run(user_msg="What is the weather in San Francisco?")
print(str(response))
The current weather in San Francisco is as follows: - **Temperature**: 13.3°C (55.9°F) - **Condition**: Sunny - **Wind**: 4.0 mph (6.5 kph) from the NNE - **Humidity**: 57% - **Pressure**: 1021 mb (30.16 in) - **Visibility**: 16 km (9 miles) For more details, you can check the weather [here](https://www.weatherapi.com/).
以上代码等同于使用包含单个 FunctionAgent
的 AgentWorkflow
:
from llama_index.core.agent.workflow import AgentWorkflow
workflow = AgentWorkflow(agents=[agent])
response = await workflow.run(user_msg="What is the weather in San Francisco?")
如果您要创建包含多个代理的工作流,可以将代理列表传递给 AgentWorkflow
构造函数。请在我们的多代理工作流示例中了解更多信息。
维护状态¶
默认情况下,FunctionAgent
在运行之间是无状态的。这意味着代理不会保留之前运行的任何记忆。
为了维护状态,我们需要跟踪之前的状态。由于 FunctionAgent
在 Workflow
中运行,状态存储在 Context
中。可以在多次运行之间传递此上下文以维护状态和历史记录。
from llama_index.core.workflow import Context
ctx = Context(agent)
response = await agent.run(
user_msg="My name is Logan, nice to meet you!", ctx=ctx
)
print(str(response))
Nice to meet you, Logan! How can I assist you today?
response = await agent.run(user_msg="What is my name?", ctx=ctx)
print(str(response))
Your name is Logan.
上下文是可序列化的,因此可以将其保存到数据库、文件等,稍后加载回来。
JsonSerializer
是一个简单的序列化器,它使用 json.dumps
和 json.loads
来序列化和反序列化上下文。
JsonPickleSerializer
是一个使用 pickle
来序列化和反序列化上下文的序列化器。如果您的上下文中包含不可序列化的对象,则可以使用此序列化器。
from llama_index.core.workflow import JsonPickleSerializer, JsonSerializer
ctx_dict = ctx.to_dict(serializer=JsonSerializer())
restored_ctx = Context.from_dict(agent, ctx_dict, serializer=JsonSerializer())
response = await agent.run(
user_msg="Do you still remember my name?", ctx=restored_ctx
)
print(str(response))
Yes, I remember your name is Logan.
流式传输¶
AgentWorkflow
/FunctionAgent
也支持流式传输。由于 AgentWorkflow
是一个 Workflow
,它可以像任何其他 Workflow
一样进行流式传输。这通过使用从工作流返回的处理程序来实现。有一些关键事件被流式传输,请随时在下面探索。
如果您只想流式传输 LLM 输出,可以使用 AgentStream
事件。
from llama_index.core.agent.workflow import (
AgentInput,
AgentOutput,
ToolCall,
ToolCallResult,
AgentStream,
)
handler = agent.run(user_msg="What is the weather in Saskatoon?")
async for event in handler.stream_events():
if isinstance(event, AgentStream):
print(event.delta, end="", flush=True)
# print(event.response) # the current full response
# print(event.raw) # the raw llm api response
# print(event.current_agent_name) # the current agent name
# elif isinstance(event, AgentInput):
# print(event.input) # the current input messages
# print(event.current_agent_name) # the current agent name
# elif isinstance(event, AgentOutput):
# print(event.response) # the current full response
# print(event.tool_calls) # the selected tool calls, if any
# print(event.raw) # the raw llm api response
# elif isinstance(event, ToolCallResult):
# print(event.tool_name) # the tool name
# print(event.tool_kwargs) # the tool kwargs
# print(event.tool_output) # the tool output
# elif isinstance(event, ToolCall):
# print(event.tool_name) # the tool name
# print(event.tool_kwargs) # the tool kwargs
The current weather in Saskatoon is as follows: - **Temperature**: 0.1°C (32.2°F) - **Condition**: Partly cloudy - **Wind**: 13.2 mph (21.2 kph) from the SSE - **Humidity**: 80% - **Feels Like**: -5.3°C (22.5°F) - **Visibility**: 24 km (14 miles) For more details, you can check the full weather report [here](https://www.weatherapi.com/).
from llama_index.core.workflow import Context
async def set_name(ctx: Context, name: str) -> str:
state = await ctx.get("state")
state["name"] = name
await ctx.set("state", state)
return f"Name set to {name}"
agent = AgentWorkflow(
agents=[
FunctionAgent(
tools=[set_name],
llm=llm,
system_prompt="You are a helpful assistant that can set a name.",
)
],
initial_state={"name": "unset"},
)
ctx = Context(agent)
response = await agent.run(user_msg="My name is Logan", ctx=ctx)
print(str(response))
state = await ctx.get("state")
print(state["name"])
Your name has been set to Logan. Logan
人在回路中¶
工具也可以定义为包含人在回路中。这对于需要人工输入的任务(例如确认工具调用或提供反馈)非常有用。
使用工作流事件,我们可以触发需要用户响应的事件。在这里,我们使用内置的 InputRequiredEvent
和 HumanResponseEvent
来处理人在回路中的情况,但您也可以定义自己的事件。
wait_for_event
将触发 waiter_event
并等待直到看到具有指定 requirements
的 HumanResponseEvent
。waiter_id
用于确保每个 waiter_id
只发送一个 waiter_event
。
from llama_index.core.workflow import (
Context,
InputRequiredEvent,
HumanResponseEvent,
)
async def dangerous_task(ctx: Context) -> str:
"""A dangerous task that requires human confirmation."""
question = "Are you sure you want to proceed?"
response = await ctx.wait_for_event(
HumanResponseEvent,
waiter_id=question,
waiter_event=InputRequiredEvent(
prefix=question,
user_name="Logan",
),
requirements={"user_name": "Logan"},
)
if response.response == "yes":
return "Dangerous task completed successfully."
else:
return "Dangerous task aborted."
agent = FunctionAgent(
tools=[dangerous_task],
llm=llm,
system_prompt="You are a helpful assistant that can perform dangerous tasks.",
)
handler = agent.run(user_msg="I want to proceed with the dangerous task.")
async for event in handler.stream_events():
if isinstance(event, InputRequiredEvent):
response = input(event.prefix).strip().lower()
handler.ctx.send_event(
HumanResponseEvent(
response=response,
user_name=event.user_name,
)
)
response = await handler
print(str(response))
The dangerous task has been completed successfully. If you need anything else, feel free to ask!
在生产场景中,您可能通过 WebSocket 或多个 API 请求处理人在回路中的情况。
如前所述,Context
对象是可序列化的,这意味着我们也可以在工作流运行中途保存它,稍后恢复。
注意: 恢复工作流时,任何正在进行中的函数/步骤将从头开始执行。
from llama_index.core.workflow import JsonSerializer
handler = agent.run(user_msg="I want to proceed with the dangerous task.")
input_ev = None
async for event in handler.stream_events():
if isinstance(event, InputRequiredEvent):
input_ev = event
break
# save the context somewhere for later
ctx_dict = handler.ctx.to_dict(serializer=JsonSerializer())
# get the response from the user
response_str = input(input_ev.prefix).strip().lower()
# restore the workflow
restored_ctx = Context.from_dict(agent, ctx_dict, serializer=JsonSerializer())
handler = agent.run(ctx=restored_ctx)
handler.ctx.send_event(
HumanResponseEvent(
response=response_str,
user_name=input_ev.user_name,
)
)
response = await handler
print(str(response))
The dangerous task has been initiated. Please confirm if you would like to proceed with it.