从头开始创建 CodeAct Agent¶
虽然 LlamaIndex 提供了一个预构建的 CodeActAgent,我们也可以从头开始创建自己的。
这样,我们可以完全理解和自定义 Agent 的行为,超越预构建 Agent 所提供的功能。
在此 Notebook 中,我们将
- 创建一个用于生成和解析代码的工作流
- 实现基本的代码执行
- 为 Agent 添加内存和状态
In [ ]
已复制!
# Define a few helper functions
def add(a: int, b: int) -> int:
"""Add two numbers together"""
return a + b
def subtract(a: int, b: int) -> int:
"""Subtract two numbers"""
return a - b
def multiply(a: int, b: int) -> int:
"""Multiply two numbers"""
return a * b
def divide(a: int, b: int) -> float:
"""Divide two numbers"""
return a / b
# 定义一些辅助函数 def add(a: int, b: int) -> int: """将两个数字相加""" return a + b def subtract(a: int, b: int) -> int: """减去两个数字""" return a - b def multiply(a: int, b: int) -> int: """乘以两个数字""" return a * b def divide(a: int, b: int) -> float: """除以两个数字""" return a / b
创建代码执行器¶
为了执行代码,我们需要创建一个代码执行器。
这里,我们将使用一个简单的进程内代码执行器,它维护自己的状态。
注意:这是一个简单的示例,不包含适当的沙箱。在生产环境中,您应该使用 Docker 或适当的代码沙箱环境等工具。
In [ ]
已复制!
from typing import Any, Dict, Tuple
import io
import contextlib
import ast
import traceback
class SimpleCodeExecutor:
"""
A simple code executor that runs Python code with state persistence.
This executor maintains a global and local state between executions,
allowing for variables to persist across multiple code runs.
NOTE: not safe for production use! Use with caution.
"""
def __init__(self, locals: Dict[str, Any], globals: Dict[str, Any]):
"""
Initialize the code executor.
Args:
locals: Local variables to use in the execution context
globals: Global variables to use in the execution context
"""
# State that persists between executions
self.globals = globals
self.locals = locals
def execute(self, code: str) -> Tuple[bool, str, Any]:
"""
Execute Python code and capture output and return values.
Args:
code: Python code to execute
Returns:
Dict with keys `success`, `output`, and `return_value`
"""
# Capture stdout and stderr
stdout = io.StringIO()
stderr = io.StringIO()
output = ""
return_value = None
try:
# Execute with captured output
with contextlib.redirect_stdout(
stdout
), contextlib.redirect_stderr(stderr):
# Try to detect if there's a return value (last expression)
try:
tree = ast.parse(code)
last_node = tree.body[-1] if tree.body else None
# If the last statement is an expression, capture its value
if isinstance(last_node, ast.Expr):
# Split code to add a return value assignment
last_line = code.rstrip().split("\n")[-1]
exec_code = (
code[: -len(last_line)]
+ "\n__result__ = "
+ last_line
)
# Execute modified code
exec(exec_code, self.globals, self.locals)
return_value = self.locals.get("__result__")
else:
# Normal execution
exec(code, self.globals, self.locals)
except:
# If parsing fails, just execute the code as is
exec(code, self.globals, self.locals)
# Get output
output = stdout.getvalue()
if stderr.getvalue():
output += "\n" + stderr.getvalue()
except Exception as e:
# Capture exception information
output = f"Error: {type(e).__name__}: {str(e)}\n"
output += traceback.format_exc()
if return_value is not None:
output += "\n\n" + str(return_value)
return output
from typing import Any, Dict, Tuple import io import contextlib import ast import traceback class SimpleCodeExecutor: """ 一个简单的代码执行器,以状态持久化方式运行 Python 代码。 这个执行器在多次执行之间维护全局和局部状态, 允许变量在多次代码运行中持续存在。 注意:不适用于生产环境!请谨慎使用。 """ def __init__(self, locals: Dict[str, Any], globals: Dict[str, Any]): """ 初始化代码执行器。 Args: locals: 在执行上下文中使用局部变量 globals: 在执行上下文中使用全局变量 """ # 在执行之间持续存在的状态 self.globals = globals self.locals = locals def execute(self, code: str) -> Tuple[bool, str, Any]: """ 执行 Python 代码并捕获输出和返回值。 Args: code: 要执行的 Python 代码 Returns: 包含键 `success`、`output` 和 `return_value` 的 Dict """ # 捕获标准输出和标准错误 stdout = io.StringIO() stderr = io.StringIO() output = "" return_value = None try: # 在捕获的输出中执行 with contextlib.redirect_stdout( stdout ), contextlib.redirect_stderr(stderr): # 尝试检测是否存在返回值(最后一个表达式) try: tree = ast.parse(code) last_node = tree.body[-1] if tree.body else None # 如果最后一个语句是表达式,捕获其值 if isinstance(last_node, ast.Expr): # 分割代码以添加返回值赋值 last_line = code.rstrip().split("\n")[-1] exec_code = ( code[: -len(last_line)] + "\n__result__ = " + last_line ) # 执行修改后的代码 exec(exec_code, self.globals, self.locals) return_value = self.locals.get("__result__") else: # 正常执行 exec(code, self.globals, self.locals) except: # 如果解析失败,只按原样执行代码 exec(code, self.globals, self.locals) # 获取输出 output = stdout.getvalue() if stderr.getvalue(): output += "\n" + stderr.getvalue() except Exception as e: # 捕获异常信息 output = f"错误: {type(e).__name__}: {str(e)}\n" output += traceback.format_exc() if return_value is not None: output += "\n\n" + str(return_value) return output
In [ ]
已复制!
code_executor = SimpleCodeExecutor(
# give access to our functions defined above
locals={
"add": add,
"subtract": subtract,
"multiply": multiply,
"divide": divide,
},
globals={
# give access to all builtins
"__builtins__": __builtins__,
# give access to numpy
"np": __import__("numpy"),
},
)
code_executor = SimpleCodeExecutor( # 赋予对我们上面定义的函数的访问权 locals={ "add": add, "subtract": subtract, "multiply": multiply, "divide": divide, }, globals={ # 赋予对所有内置函数的访问权 "__builtins__": __builtins__, # 赋予对 numpy 的访问权 "np": __import__("numpy"), }, )
定义 CodeAct Agent¶
现在,我们可以使用 LlamaIndex Workflows 来定义 Agent 的工作流。
基本流程如下:
- 接收我们的 Prompt + 聊天历史
- 解析出要执行的代码(如果存在)
- 执行代码
- 将代码执行的输出提供回 Agent
- 重复直到 Agent 对答案满意
首先,我们可以创建工作流中的事件。
In [ ]
已复制!
from llama_index.core.llms import ChatMessage
from llama_index.core.workflow import Event
class InputEvent(Event):
input: list[ChatMessage]
class StreamEvent(Event):
delta: str
class CodeExecutionEvent(Event):
code: str
from llama_index.core.llms import ChatMessage from llama_index.core.workflow import Event class InputEvent(Event): input: list[ChatMessage] class StreamEvent(Event): delta: str class CodeExecutionEvent(Event): code: str
接下来,我们可以定义编排这些事件的工作流。
In [ ]
已复制!
import inspect
import re
from typing import Any, Callable, List
from llama_index.core.llms import ChatMessage, LLM
from llama_index.core.memory import ChatMemoryBuffer
from llama_index.core.tools.types import BaseTool
from llama_index.core.workflow import (
Context,
Workflow,
StartEvent,
StopEvent,
step,
)
from llama_index.llms.openai import OpenAI
CODEACT_SYSTEM_PROMPT = """
You are a helpful assistant that can execute code.
Given the chat history, you can write code within <execute>...</execute> tags to help the user with their question.
In your code, you can reference any previously used variables or functions.
The user has also provided you with some predefined functions:
{fn_str}
To execute code, write the code between <execute>...</execute> tags.
"""
class CodeActAgent(Workflow):
def __init__(
self,
fns: List[Callable],
code_execute_fn: Callable,
llm: LLM | None = None,
**workflow_kwargs: Any,
) -> None:
super().__init__(**workflow_kwargs)
self.fns = fns or []
self.code_execute_fn = code_execute_fn
self.llm = llm or OpenAI(model="gpt-4o-mini")
# parse the functions into truncated function strings
self.fn_str = "\n\n".join(
f'def {fn.__name__}{str(inspect.signature(fn))}:\n """ {fn.__doc__} """\n ...'
for fn in self.fns
)
self.system_message = ChatMessage(
role="system",
content=CODEACT_SYSTEM_PROMPT.format(fn_str=self.fn_str),
)
def _parse_code(self, response: str) -> str | None:
# find the code between <execute>...</execute> tags
matches = re.findall(r"<execute>(.*?)</execute>", response, re.DOTALL)
if matches:
return "\n\n".join(matches)
return None
@step
async def prepare_chat_history(
self, ctx: Context, ev: StartEvent
) -> InputEvent:
# check if memory is setup
memory = await ctx.get("memory", default=None)
if not memory:
memory = ChatMemoryBuffer.from_defaults(llm=self.llm)
# get user input
user_input = ev.get("user_input")
if user_input is None:
raise ValueError("user_input kwarg is required")
user_msg = ChatMessage(role="user", content=user_input)
memory.put(user_msg)
# get chat history
chat_history = memory.get()
# update context
await ctx.set("memory", memory)
# add the system message to the chat history and return
return InputEvent(input=[self.system_message, *chat_history])
@step
async def handle_llm_input(
self, ctx: Context, ev: InputEvent
) -> CodeExecutionEvent | StopEvent:
chat_history = ev.input
# stream the response
response_stream = await self.llm.astream_chat(chat_history)
async for response in response_stream:
ctx.write_event_to_stream(StreamEvent(delta=response.delta or ""))
# save the final response, which should have all content
memory = await ctx.get("memory")
memory.put(response.message)
await ctx.set("memory", memory)
# get the code to execute
code = self._parse_code(response.message.content)
if not code:
return StopEvent(result=response)
else:
return CodeExecutionEvent(code=code)
@step
async def handle_code_execution(
self, ctx: Context, ev: CodeExecutionEvent
) -> InputEvent:
# execute the code
ctx.write_event_to_stream(ev)
output = self.code_execute_fn(ev.code)
# update the memory
memory = await ctx.get("memory")
memory.put(ChatMessage(role="assistant", content=output))
await ctx.set("memory", memory)
# get the latest chat history and loop back to the start
chat_history = memory.get()
return InputEvent(input=[self.system_message, *chat_history])
import inspect import re from typing import Any, Callable, List from llama_index.core.llms import ChatMessage, LLM from llama_index.core.memory import ChatMemoryBuffer from llama_index.core.tools.types import BaseTool from llama_index.core.workflow import ( Context, Workflow, StartEvent, StopEvent, step, ) from llama_index.llms.openai import OpenAI CODEACT_SYSTEM_PROMPT = """ 你是一个有用的助手,可以执行代码。 考虑到聊天历史,你可以写下在... 标签之间的代码来帮助用户解决问题。 在你的代码中,你可以引用任何以前使用过的变量或函数。 用户还为你提供了一些预定义函数: {fn_str} 要执行代码,请将代码写在... 标签之间。 """ class CodeActAgent(Workflow): def __init__( self, fns: List[Callable], code_execute_fn: Callable, llm: LLM | None = None, **workflow_kwargs: Any, ) -> None: super().__init__(**workflow_kwargs) self.fns = fns or [] self.code_execute_fn = code_execute_fn self.llm = llm or OpenAI(model="gpt-4o-mini") # 将函数解析为截断的函数字符串 self.fn_str = "\n\n".join( f'def {fn.__name__}{str(inspect.signature(fn))}:\n """ {fn.__doc__} """\n ...' for fn in self.fns ) self.system_message = ChatMessage( role="system", content=CODEACT_SYSTEM_PROMPT.format(fn_str=self.fn_str), ) def _parse_code(self, response: str) -> str | None: # 在... 标签之间查找代码 matches = re.findall(r"(.*?) ", response, re.DOTALL) if matches: return "\n\n".join(matches) return None @step async def prepare_chat_history( self, ctx: Context, ev: StartEvent ) -> InputEvent: # 检查是否设置了内存 memory = await ctx.get("memory", default=None) if not memory: memory = ChatMemoryBuffer.from_defaults(llm=self.llm) # 获取用户输入 user_input = ev.get("user_input") if user_input is None: raise ValueError("user_input kwarg is required") user_msg = ChatMessage(role="user", content=user_input) memory.put(user_msg) # 获取聊天历史 chat_history = memory.get() # 更新上下文 await ctx.set("memory", memory) # 将系统消息添加到聊天历史并返回 return InputEvent(input=[self.system_message, *chat_history]) @step async def handle_llm_input( self, ctx: Context, ev: InputEvent ) -> CodeExecutionEvent | StopEvent: chat_history = ev.input # 流式传输响应 response_stream = await self.llm.astream_chat(chat_history) async for response in response_stream: ctx.write_event_to_stream(StreamEvent(delta=response.delta or "")) # 保存最终响应,其中应包含所有内容 memory = await ctx.get("memory") memory.put(response.message) await ctx.set("memory", memory) # 获取要执行的代码 code = self._parse_code(response.message.content) if not code: return StopEvent(result=response) else: return CodeExecutionEvent(code=code) @step async def handle_code_execution( self, ctx: Context, ev: CodeExecutionEvent ) -> InputEvent: # 执行代码 ctx.write_event_to_stream(ev) output = self.code_execute_fn(ev.code) # 更新内存 memory = await ctx.get("memory") memory.put(ChatMessage(role="assistant", content=output)) await ctx.set("memory", memory) # 获取最新的聊天历史并循环回开始 return InputEvent(input=[self.system_message, *chat_history])
In [ ]
已复制!
from llama_index.core.workflow import Context
agent = CodeActAgent(
fns=[add, subtract, multiply, divide],
code_execute_fn=code_executor.execute,
llm=OpenAI(model="gpt-4o-mini", api_key="sk-..."),
)
# context to hold the agent's state / memory
ctx = Context(agent)
from llama_index.core.workflow import Context agent = CodeActAgent( fns=[add, subtract, multiply, divide], code_execute_fn=code_executor.execute, llm=OpenAI(model="gpt-4o-mini", api_key="sk-..."), ) # 上下文用于保存 Agent 的状态 / 内存 ctx = Context(agent)
In [ ]
已复制!
async def run_agent_verbose(agent: CodeActAgent, ctx: Context, query: str):
handler = agent.run(user_input=query, ctx=ctx)
print(f"User: {query}")
async for event in handler.stream_events():
if isinstance(event, StreamEvent):
print(f"{event.delta}", end="", flush=True)
elif isinstance(event, CodeExecutionEvent):
print(f"\n-----------\nParsed code:\n{event.code}\n")
return await handler
async def run_agent_verbose(agent: CodeActAgent, ctx: Context, query: str): handler = agent.run(user_input=query, ctx=ctx) print(f"用户: {query}") async for event in handler.stream_events(): if isinstance(event, StreamEvent): print(f"{event.delta}", end="", flush=True) elif isinstance(event, CodeExecutionEvent): print(f"\n-----------\n解析的代码:\n{event.code}\n") return await handler
In [ ]
已复制!
response = await run_agent_verbose(
agent, ctx, "Calculate the sum of all numbers from 1 to 10"
)
response = await run_agent_verbose( agent, ctx, "计算从 1 到 10 的所有数字之和" )
User: Calculate the sum of all numbers from 1 to 10 To calculate the sum of all numbers from 1 to 10, we can use the `add` function in a loop. Here's how we can do it: <execute> total_sum = 0 for number in range(1, 11): total_sum = add(total_sum, number) total_sum </execute> ----------- Parsed code: total_sum = 0 for number in range(1, 11): total_sum = add(total_sum, number) total_sum The sum of all numbers from 1 to 10 is 55.
In [ ]
已复制!
response = await run_agent_verbose(
agent, ctx, "Add 5 and 3, then multiply the result by 2"
)
response = await run_agent_verbose( agent, ctx, "将 5 和 3 相加,然后将结果乘以 2" )
User: Add 5 and 3, then multiply the result by 2 To perform the calculation, we will first add 5 and 3 using the `add` function, and then multiply the result by 2 using the `multiply` function. Here's how we can do it: <execute> result_addition = add(5, 3) final_result = multiply(result_addition, 2) final_result </execute> ----------- Parsed code: result_addition = add(5, 3) final_result = multiply(result_addition, 2) final_result The final result of adding 5 and 3, then multiplying by 2, is 16.
In [ ]
已复制!
response = await run_agent_verbose(
agent, ctx, "Calculate the sum of the first 10 fibonacci numbers0"
)
response = await run_agent_verbose( agent, ctx, "计算前 10 个斐波那契数的和" )
User: Calculate the sum of the first 10 fibonacci numbers0 To calculate the sum of the first 10 Fibonacci numbers, we first need to generate the Fibonacci sequence up to the 10th number and then sum those numbers. The Fibonacci sequence starts with 0 and 1, and each subsequent number is the sum of the two preceding ones. Here's how we can do it: <execute> def fibonacci(n: int) -> int: """ Return the nth Fibonacci number """ if n == 0: return 0 elif n == 1: return 1 else: a, b = 0, 1 for _ in range(2, n + 1): a, b = b, a + b return b # Calculate the sum of the first 10 Fibonacci numbers fibonacci_sum = 0 for i in range(10): fibonacci_sum = add(fibonacci_sum, fibonacci(i)) fibonacci_sum </execute> ----------- Parsed code: def fibonacci(n: int) -> int: """ Return the nth Fibonacci number """ if n == 0: return 0 elif n == 1: return 1 else: a, b = 0, 1 for _ in range(2, n + 1): a, b = b, a + b return b # Calculate the sum of the first 10 Fibonacci numbers fibonacci_sum = 0 for i in range(10): fibonacci_sum = add(fibonacci_sum, fibonacci(i)) fibonacci_sum The sum of the first 10 Fibonacci numbers is 55.
In [ ]
已复制!
response = await run_agent_verbose(
agent, ctx, "Calculate the sum of the first 20 fibonacci numbers"
)
response = await run_agent_verbose( agent, ctx, "计算前 20 个斐波那契数的和" )
User: Calculate the sum of the first 20 fibonacci numbers To calculate the sum of the first 20 Fibonacci numbers, we can use the same approach as before, but this time we will iterate up to 20. Here's how we can do it: <execute> # Calculate the sum of the first 20 Fibonacci numbers fibonacci_sum_20 = 0 for i in range(20): fibonacci_sum_20 = add(fibonacci_sum_20, fibonacci(i)) fibonacci_sum_20 </execute> ----------- Parsed code: # Calculate the sum of the first 20 Fibonacci numbers fibonacci_sum_20 = 0 for i in range(20): fibonacci_sum_20 = add(fibonacci_sum_20, fibonacci(i)) fibonacci_sum_20 The sum of the first 20 Fibonacci numbers is 6765.