跳到内容

代理类#

AgentWorkflow #

继承自: Workflow, PromptMixin

管理多个带交接代理的工作流。

源代码位于 llama-index-core/llama_index/core/agent/workflow/multi_agent_workflow.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
class AgentWorkflow(Workflow, PromptMixin, metaclass=AgentWorkflowMeta):
    """A workflow for managing multiple agents with handoffs."""

    def __init__(
        self,
        agents: List[BaseWorkflowAgent],
        initial_state: Optional[Dict] = None,
        root_agent: Optional[str] = None,
        handoff_prompt: Optional[Union[str, BasePromptTemplate]] = None,
        handoff_output_prompt: Optional[Union[str, BasePromptTemplate]] = None,
        state_prompt: Optional[Union[str, BasePromptTemplate]] = None,
        timeout: Optional[float] = None,
        **workflow_kwargs: Any,
    ):
        super().__init__(timeout=timeout, **workflow_kwargs)
        if not agents:
            raise ValueError("At least one agent must be provided")

        # Raise an error if any agent has no name or no description
        if len(agents) > 1 and any(
            agent.name == DEFAULT_AGENT_NAME for agent in agents
        ):
            raise ValueError("All agents must have a name in a multi-agent workflow")

        if len(agents) > 1 and any(
            agent.description == DEFAULT_AGENT_DESCRIPTION for agent in agents
        ):
            raise ValueError(
                "All agents must have a description in a multi-agent workflow"
            )

        self.agents = {cfg.name: cfg for cfg in agents}
        if len(agents) == 1:
            root_agent = agents[0].name
        elif root_agent is None:
            raise ValueError("Exactly one root agent must be provided")
        else:
            root_agent = root_agent

        if root_agent not in self.agents:
            raise ValueError(f"Root agent {root_agent} not found in provided agents")

        self.root_agent = root_agent
        self.initial_state = initial_state or {}

        handoff_prompt = handoff_prompt or DEFAULT_HANDOFF_PROMPT
        if isinstance(handoff_prompt, str):
            handoff_prompt = PromptTemplate(handoff_prompt)
            if "{agent_info}" not in handoff_prompt.get_template():
                raise ValueError("Handoff prompt must contain {agent_info}")
        self.handoff_prompt = handoff_prompt

        handoff_output_prompt = handoff_output_prompt or DEFAULT_HANDOFF_OUTPUT_PROMPT
        if isinstance(handoff_output_prompt, str):
            handoff_output_prompt = PromptTemplate(handoff_output_prompt)
            if (
                "{to_agent}" not in handoff_output_prompt.get_template()
                or "{reason}" not in handoff_output_prompt.get_template()
            ):
                raise ValueError(
                    "Handoff output prompt must contain {to_agent} and {reason}"
                )
        self.handoff_output_prompt = handoff_output_prompt

        state_prompt = state_prompt or DEFAULT_STATE_PROMPT
        if isinstance(state_prompt, str):
            state_prompt = PromptTemplate(state_prompt)
            if (
                "{state}" not in state_prompt.get_template()
                or "{msg}" not in state_prompt.get_template()
            ):
                raise ValueError("State prompt must contain {state} and {msg}")
        self.state_prompt = state_prompt

    def _get_prompts(self) -> PromptDictType:
        """Get prompts."""
        return {
            "handoff_prompt": self.handoff_prompt,
            "handoff_output_prompt": self.handoff_output_prompt,
            "state_prompt": self.state_prompt,
        }

    def _get_prompt_modules(self) -> PromptMixinType:
        """Get prompt sub-modules."""
        return {agent.name: agent for agent in self.agents.values()}

    def _update_prompts(self, prompts_dict: PromptDictType) -> None:
        """Update prompts."""
        if "handoff_prompt" in prompts_dict:
            self.handoff_prompt = prompts_dict["handoff_prompt"]
        if "handoff_output_prompt" in prompts_dict:
            self.handoff_output_prompt = prompts_dict["handoff_output_prompt"]
        if "state_prompt" in prompts_dict:
            self.state_prompt = prompts_dict["state_prompt"]

    def _ensure_tools_are_async(
        self, tools: Sequence[BaseTool]
    ) -> Sequence[AsyncBaseTool]:
        """Ensure all tools are async."""
        return [adapt_to_async_tool(tool) for tool in tools]

    def _get_handoff_tool(
        self, current_agent: BaseWorkflowAgent
    ) -> Optional[AsyncBaseTool]:
        """Creates a handoff tool for the given agent."""
        # Do not create a handoff tool if there is only one agent
        if len(self.agents) == 1:
            return None

        agent_info = {cfg.name: cfg.description for cfg in self.agents.values()}

        # Filter out agents that the current agent cannot handoff to
        configs_to_remove = []
        for name in agent_info:
            if name == current_agent.name:
                configs_to_remove.append(name)
            elif (
                current_agent.can_handoff_to is not None
                and name not in current_agent.can_handoff_to
            ):
                configs_to_remove.append(name)

        for name in configs_to_remove:
            agent_info.pop(name)

        if not agent_info:
            return None

        fn_tool_prompt = self.handoff_prompt.format(agent_info=str(agent_info))
        return FunctionTool.from_defaults(
            async_fn=handoff, description=fn_tool_prompt, return_direct=True
        )

    async def get_tools(
        self, agent_name: str, input_str: Optional[str] = None
    ) -> Sequence[AsyncBaseTool]:
        """Get tools for the given agent."""
        agent_tools = self.agents[agent_name].tools or []
        tools = [*agent_tools]
        retriever = self.agents[agent_name].tool_retriever
        if retriever is not None:
            retrieved_tools = await retriever.aretrieve(input_str or "")
            tools.extend(retrieved_tools)

        if (
            self.agents[agent_name].can_handoff_to
            or self.agents[agent_name].can_handoff_to is None
        ):
            handoff_tool = self._get_handoff_tool(self.agents[agent_name])
            if handoff_tool:
                tools.append(handoff_tool)

        return self._ensure_tools_are_async(cast(List[BaseTool], tools))

    async def _init_context(self, ctx: Context, ev: StartEvent) -> None:
        """Initialize the context once, if needed."""
        if not await ctx.get("memory", default=None):
            default_memory = ev.get("memory", default=None)
            default_memory = default_memory or ChatMemoryBuffer.from_defaults(
                llm=self.agents[self.root_agent].llm or Settings.llm
            )
            await ctx.set("memory", default_memory)
        if not await ctx.get("agents", default=None):
            await ctx.set("agents", list(self.agents.keys()))
        if not await ctx.get("can_handoff_to", default=None):
            await ctx.set(
                "can_handoff_to",
                {
                    agent: agent_cfg.can_handoff_to
                    for agent, agent_cfg in self.agents.items()
                },
            )
        if not await ctx.get("state", default=None):
            await ctx.set("state", self.initial_state)
        if not await ctx.get("current_agent_name", default=None):
            await ctx.set("current_agent_name", self.root_agent)
        if not await ctx.get("handoff_output_prompt", default=None):
            await ctx.set(
                "handoff_output_prompt", self.handoff_output_prompt.get_template()
            )

        # always set to false initially
        await ctx.set("formatted_input_with_state", False)

    async def _call_tool(
        self,
        ctx: Context,
        tool: AsyncBaseTool,
        tool_input: dict,
    ) -> ToolOutput:
        """Call the given tool with the given input."""
        try:
            if isinstance(tool, FunctionTool) and tool.requires_context:
                tool_output = await tool.acall(ctx=ctx, **tool_input)
            else:
                tool_output = await tool.acall(**tool_input)
        except Exception as e:
            tool_output = ToolOutput(
                content=str(e),
                tool_name=tool.metadata.name,
                raw_input=tool_input,
                raw_output=str(e),
                is_error=True,
            )

        return tool_output

    @step
    async def init_run(self, ctx: Context, ev: AgentWorkflowStartEvent) -> AgentInput:
        """Sets up the workflow and validates inputs."""
        await self._init_context(ctx, ev)

        user_msg: Optional[Union[str, ChatMessage]] = ev.get("user_msg")
        chat_history: Optional[List[ChatMessage]] = ev.get("chat_history", [])

        # Convert string user_msg to ChatMessage
        if isinstance(user_msg, str):
            user_msg = ChatMessage(role="user", content=user_msg)

        # Add messages to memory
        memory: BaseMemory = await ctx.get("memory")

        # First set chat history if it exists
        if chat_history:
            await memory.aset(chat_history)

        # Then add user message if it exists
        if user_msg:
            await memory.aput(user_msg)
            content_str = "\n".join(
                [
                    block.text
                    for block in user_msg.blocks
                    if isinstance(block, TextBlock)
                ]
            )
            await ctx.set("user_msg_str", content_str)
        elif chat_history:
            # If no user message, use the last message from chat history as user_msg_str
            content_str = "\n".join(
                [
                    block.text
                    for block in chat_history[-1].blocks
                    if isinstance(block, TextBlock)
                ]
            )
            await ctx.set("user_msg_str", content_str)
        else:
            raise ValueError("Must provide either user_msg or chat_history")

        # Get all messages from memory
        input_messages = await memory.aget()

        # send to the current agent
        current_agent_name: str = await ctx.get("current_agent_name")
        return AgentInput(input=input_messages, current_agent_name=current_agent_name)

    @step
    async def setup_agent(self, ctx: Context, ev: AgentInput) -> AgentSetup:
        """Main agent handling logic."""
        current_agent_name = ev.current_agent_name
        agent = self.agents[current_agent_name]
        llm_input = [*ev.input]

        if agent.system_prompt:
            llm_input = [
                ChatMessage(role="system", content=agent.system_prompt),
                *llm_input,
            ]

        state = await ctx.get("state", default=None)
        formatted_input_with_state = await ctx.get(
            "formatted_input_with_state", default=False
        )
        if state and not formatted_input_with_state:
            # update last message with current state
            for block in llm_input[-1].blocks[::-1]:
                if isinstance(block, TextBlock):
                    block.text = self.state_prompt.format(state=state, msg=block.text)
                    break
            await ctx.set("formatted_input_with_state", True)

        return AgentSetup(
            input=llm_input,
            current_agent_name=ev.current_agent_name,
        )

    @step
    async def run_agent_step(self, ctx: Context, ev: AgentSetup) -> AgentOutput:
        """Run the agent."""
        memory: BaseMemory = await ctx.get("memory")
        agent = self.agents[ev.current_agent_name]
        user_msg_str = await ctx.get("user_msg_str")
        tools = await self.get_tools(ev.current_agent_name, user_msg_str or "")

        agent_output = await agent.take_step(
            ctx,
            ev.input,
            tools,
            memory,
        )

        ctx.write_event_to_stream(agent_output)
        return agent_output

    @step
    async def parse_agent_output(
        self, ctx: Context, ev: AgentOutput
    ) -> Union[StopEvent, ToolCall, None]:
        if not ev.tool_calls:
            agent = self.agents[ev.current_agent_name]
            memory: BaseMemory = await ctx.get("memory")
            output = await agent.finalize(ctx, ev, memory)

            cur_tool_calls: List[ToolCallResult] = await ctx.get(
                "current_tool_calls", default=[]
            )
            output.tool_calls.extend(cur_tool_calls)  # type: ignore
            await ctx.set("current_tool_calls", [])

            return StopEvent(result=output)

        await ctx.set("num_tool_calls", len(ev.tool_calls))

        for tool_call in ev.tool_calls:
            ctx.send_event(
                ToolCall(
                    tool_name=tool_call.tool_name,
                    tool_kwargs=tool_call.tool_kwargs,
                    tool_id=tool_call.tool_id,
                )
            )

        return None

    @step
    async def call_tool(self, ctx: Context, ev: ToolCall) -> ToolCallResult:
        """Calls the tool and handles the result."""
        ctx.write_event_to_stream(
            ToolCall(
                tool_name=ev.tool_name,
                tool_kwargs=ev.tool_kwargs,
                tool_id=ev.tool_id,
            )
        )

        current_agent_name = await ctx.get("current_agent_name")
        tools = await self.get_tools(current_agent_name, ev.tool_name)
        tools_by_name = {tool.metadata.name: tool for tool in tools}
        if ev.tool_name not in tools_by_name:
            tool = None
            result = ToolOutput(
                content=f"Tool {ev.tool_name} not found. Please select a tool that is available.",
                tool_name=ev.tool_name,
                raw_input=ev.tool_kwargs,
                raw_output=None,
                is_error=True,
            )
        else:
            tool = tools_by_name[ev.tool_name]
            result = await self._call_tool(ctx, tool, ev.tool_kwargs)

        result_ev = ToolCallResult(
            tool_name=ev.tool_name,
            tool_kwargs=ev.tool_kwargs,
            tool_id=ev.tool_id,
            tool_output=result,
            return_direct=tool.metadata.return_direct if tool else False,
        )

        ctx.write_event_to_stream(result_ev)
        return result_ev

    @step
    async def aggregate_tool_results(
        self, ctx: Context, ev: ToolCallResult
    ) -> Union[AgentInput, StopEvent, None]:
        """Aggregate tool results and return the next agent input."""
        num_tool_calls = await ctx.get("num_tool_calls", default=0)
        if num_tool_calls == 0:
            raise ValueError("No tool calls found, cannot aggregate results.")

        tool_call_results: list[ToolCallResult] = ctx.collect_events(  # type: ignore
            ev, expected=[ToolCallResult] * num_tool_calls
        )
        if not tool_call_results:
            return None

        memory: BaseMemory = await ctx.get("memory")
        agent_name: str = await ctx.get("current_agent_name")
        agent: BaseWorkflowAgent = self.agents[agent_name]

        # track tool calls made during a .run() call
        cur_tool_calls: List[ToolCallResult] = await ctx.get(
            "current_tool_calls", default=[]
        )
        cur_tool_calls.extend(tool_call_results)
        await ctx.set("current_tool_calls", cur_tool_calls)

        await agent.handle_tool_call_results(ctx, tool_call_results, memory)

        # set the next agent, if needed
        # the handoff tool sets this
        next_agent_name = await ctx.get("next_agent", default=None)
        if next_agent_name:
            await ctx.set("current_agent_name", next_agent_name)

        if any(
            tool_call_result.return_direct for tool_call_result in tool_call_results
        ):
            # if any tool calls return directly, take the first one
            return_direct_tool = next(
                tool_call_result
                for tool_call_result in tool_call_results
                if tool_call_result.return_direct
            )

            # always finalize the agent, even if we're just handing off
            result = AgentOutput(
                response=ChatMessage(
                    role="assistant",
                    content=return_direct_tool.tool_output.content or "",
                ),
                tool_calls=[
                    ToolSelection(
                        tool_id=t.tool_id,
                        tool_name=t.tool_name,
                        tool_kwargs=t.tool_kwargs,
                    )
                    for t in cur_tool_calls
                ],
                raw=return_direct_tool.tool_output.raw_output,
                current_agent_name=agent.name,
            )
            result = await agent.finalize(ctx, result, memory)

            # we don't want to stop the system if we're just handing off
            if return_direct_tool.tool_name != "handoff":
                await ctx.set("current_tool_calls", [])
                return StopEvent(result=result)

        user_msg_str = await ctx.get("user_msg_str")
        input_messages = await memory.aget(input=user_msg_str)

        # get this again, in case it changed
        agent_name = await ctx.get("current_agent_name")
        agent = self.agents[agent_name]

        return AgentInput(input=input_messages, current_agent_name=agent.name)

    def run(
        self,
        user_msg: Optional[Union[str, ChatMessage]] = None,
        chat_history: Optional[List[ChatMessage]] = None,
        memory: Optional[BaseMemory] = None,
        ctx: Optional[Context] = None,
        stepwise: bool = False,
        checkpoint_callback: Optional[CheckpointCallback] = None,
        **kwargs: Any,
    ) -> WorkflowHandler:
        return super().run(
            start_event=AgentWorkflowStartEvent(
                user_msg=user_msg,
                chat_history=chat_history,
                memory=memory,
            ),
            ctx=ctx,
            stepwise=stepwise,
            checkpoint_callback=checkpoint_callback,
            **kwargs,
        )

    @classmethod
    def from_tools_or_functions(
        cls,
        tools_or_functions: List[Union[BaseTool, Callable]],
        llm: Optional[LLM] = None,
        system_prompt: Optional[str] = None,
        state_prompt: Optional[Union[str, BasePromptTemplate]] = None,
        initial_state: Optional[dict] = None,
        timeout: Optional[float] = None,
        verbose: bool = False,
    ) -> "AgentWorkflow":
        """
        Initializes an AgentWorkflow from a list of tools or functions.

        The workflow will be initialized with a single agent that uses the provided tools or functions.

        If the LLM is a function calling model, the workflow will use the FunctionAgent.
        Otherwise, it will use the ReActAgent.
        """
        llm = llm or Settings.llm
        agent_cls = (
            FunctionAgent if llm.metadata.is_function_calling_model else ReActAgent
        )

        tools = [
            FunctionTool.from_defaults(fn=tool)
            if not isinstance(tool, BaseTool)
            else tool
            for tool in tools_or_functions
        ]
        return cls(
            agents=[
                agent_cls(
                    name="Agent",
                    description="A single agent that uses the provided tools or functions.",
                    tools=tools,
                    llm=llm,
                    system_prompt=system_prompt,
                )
            ],
            state_prompt=state_prompt,
            initial_state=initial_state,
            timeout=timeout,
            verbose=verbose,
        )

get_tools async #

get_tools(agent_name: str, input_str: Optional[str] = None) -> Sequence[AsyncBaseTool]

获取给定代理的工具。

源代码位于 llama-index-core/llama_index/core/agent/workflow/multi_agent_workflow.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
async def get_tools(
    self, agent_name: str, input_str: Optional[str] = None
) -> Sequence[AsyncBaseTool]:
    """Get tools for the given agent."""
    agent_tools = self.agents[agent_name].tools or []
    tools = [*agent_tools]
    retriever = self.agents[agent_name].tool_retriever
    if retriever is not None:
        retrieved_tools = await retriever.aretrieve(input_str or "")
        tools.extend(retrieved_tools)

    if (
        self.agents[agent_name].can_handoff_to
        or self.agents[agent_name].can_handoff_to is None
    ):
        handoff_tool = self._get_handoff_tool(self.agents[agent_name])
        if handoff_tool:
            tools.append(handoff_tool)

    return self._ensure_tools_are_async(cast(List[BaseTool], tools))

init_run async #

init_run(ctx: Context, ev: AgentWorkflowStartEvent) -> AgentInput

设置工作流并验证输入。

源代码位于 llama-index-core/llama_index/core/agent/workflow/multi_agent_workflow.py
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
@step
async def init_run(self, ctx: Context, ev: AgentWorkflowStartEvent) -> AgentInput:
    """Sets up the workflow and validates inputs."""
    await self._init_context(ctx, ev)

    user_msg: Optional[Union[str, ChatMessage]] = ev.get("user_msg")
    chat_history: Optional[List[ChatMessage]] = ev.get("chat_history", [])

    # Convert string user_msg to ChatMessage
    if isinstance(user_msg, str):
        user_msg = ChatMessage(role="user", content=user_msg)

    # Add messages to memory
    memory: BaseMemory = await ctx.get("memory")

    # First set chat history if it exists
    if chat_history:
        await memory.aset(chat_history)

    # Then add user message if it exists
    if user_msg:
        await memory.aput(user_msg)
        content_str = "\n".join(
            [
                block.text
                for block in user_msg.blocks
                if isinstance(block, TextBlock)
            ]
        )
        await ctx.set("user_msg_str", content_str)
    elif chat_history:
        # If no user message, use the last message from chat history as user_msg_str
        content_str = "\n".join(
            [
                block.text
                for block in chat_history[-1].blocks
                if isinstance(block, TextBlock)
            ]
        )
        await ctx.set("user_msg_str", content_str)
    else:
        raise ValueError("Must provide either user_msg or chat_history")

    # Get all messages from memory
    input_messages = await memory.aget()

    # send to the current agent
    current_agent_name: str = await ctx.get("current_agent_name")
    return AgentInput(input=input_messages, current_agent_name=current_agent_name)

setup_agent async #

setup_agent(ctx: Context, ev: AgentInput) -> AgentSetup

主要代理处理逻辑。

源代码位于 llama-index-core/llama_index/core/agent/workflow/multi_agent_workflow.py
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
@step
async def setup_agent(self, ctx: Context, ev: AgentInput) -> AgentSetup:
    """Main agent handling logic."""
    current_agent_name = ev.current_agent_name
    agent = self.agents[current_agent_name]
    llm_input = [*ev.input]

    if agent.system_prompt:
        llm_input = [
            ChatMessage(role="system", content=agent.system_prompt),
            *llm_input,
        ]

    state = await ctx.get("state", default=None)
    formatted_input_with_state = await ctx.get(
        "formatted_input_with_state", default=False
    )
    if state and not formatted_input_with_state:
        # update last message with current state
        for block in llm_input[-1].blocks[::-1]:
            if isinstance(block, TextBlock):
                block.text = self.state_prompt.format(state=state, msg=block.text)
                break
        await ctx.set("formatted_input_with_state", True)

    return AgentSetup(
        input=llm_input,
        current_agent_name=ev.current_agent_name,
    )

run_agent_step async #

run_agent_step(ctx: Context, ev: AgentSetup) -> AgentOutput

运行代理。

源代码位于 llama-index-core/llama_index/core/agent/workflow/multi_agent_workflow.py
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
@step
async def run_agent_step(self, ctx: Context, ev: AgentSetup) -> AgentOutput:
    """Run the agent."""
    memory: BaseMemory = await ctx.get("memory")
    agent = self.agents[ev.current_agent_name]
    user_msg_str = await ctx.get("user_msg_str")
    tools = await self.get_tools(ev.current_agent_name, user_msg_str or "")

    agent_output = await agent.take_step(
        ctx,
        ev.input,
        tools,
        memory,
    )

    ctx.write_event_to_stream(agent_output)
    return agent_output

call_tool async #

call_tool(ctx: Context, ev: ToolCall) -> ToolCallResult

调用工具并处理结果。

源代码位于 llama-index-core/llama_index/core/agent/workflow/multi_agent_workflow.py
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
@step
async def call_tool(self, ctx: Context, ev: ToolCall) -> ToolCallResult:
    """Calls the tool and handles the result."""
    ctx.write_event_to_stream(
        ToolCall(
            tool_name=ev.tool_name,
            tool_kwargs=ev.tool_kwargs,
            tool_id=ev.tool_id,
        )
    )

    current_agent_name = await ctx.get("current_agent_name")
    tools = await self.get_tools(current_agent_name, ev.tool_name)
    tools_by_name = {tool.metadata.name: tool for tool in tools}
    if ev.tool_name not in tools_by_name:
        tool = None
        result = ToolOutput(
            content=f"Tool {ev.tool_name} not found. Please select a tool that is available.",
            tool_name=ev.tool_name,
            raw_input=ev.tool_kwargs,
            raw_output=None,
            is_error=True,
        )
    else:
        tool = tools_by_name[ev.tool_name]
        result = await self._call_tool(ctx, tool, ev.tool_kwargs)

    result_ev = ToolCallResult(
        tool_name=ev.tool_name,
        tool_kwargs=ev.tool_kwargs,
        tool_id=ev.tool_id,
        tool_output=result,
        return_direct=tool.metadata.return_direct if tool else False,
    )

    ctx.write_event_to_stream(result_ev)
    return result_ev

aggregate_tool_results async #

aggregate_tool_results(ctx: Context, ev: ToolCallResult) -> Union[AgentInput, StopEvent, None]

汇总工具结果并返回下一个代理输入。

源代码位于 llama-index-core/llama_index/core/agent/workflow/multi_agent_workflow.py
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
@step
async def aggregate_tool_results(
    self, ctx: Context, ev: ToolCallResult
) -> Union[AgentInput, StopEvent, None]:
    """Aggregate tool results and return the next agent input."""
    num_tool_calls = await ctx.get("num_tool_calls", default=0)
    if num_tool_calls == 0:
        raise ValueError("No tool calls found, cannot aggregate results.")

    tool_call_results: list[ToolCallResult] = ctx.collect_events(  # type: ignore
        ev, expected=[ToolCallResult] * num_tool_calls
    )
    if not tool_call_results:
        return None

    memory: BaseMemory = await ctx.get("memory")
    agent_name: str = await ctx.get("current_agent_name")
    agent: BaseWorkflowAgent = self.agents[agent_name]

    # track tool calls made during a .run() call
    cur_tool_calls: List[ToolCallResult] = await ctx.get(
        "current_tool_calls", default=[]
    )
    cur_tool_calls.extend(tool_call_results)
    await ctx.set("current_tool_calls", cur_tool_calls)

    await agent.handle_tool_call_results(ctx, tool_call_results, memory)

    # set the next agent, if needed
    # the handoff tool sets this
    next_agent_name = await ctx.get("next_agent", default=None)
    if next_agent_name:
        await ctx.set("current_agent_name", next_agent_name)

    if any(
        tool_call_result.return_direct for tool_call_result in tool_call_results
    ):
        # if any tool calls return directly, take the first one
        return_direct_tool = next(
            tool_call_result
            for tool_call_result in tool_call_results
            if tool_call_result.return_direct
        )

        # always finalize the agent, even if we're just handing off
        result = AgentOutput(
            response=ChatMessage(
                role="assistant",
                content=return_direct_tool.tool_output.content or "",
            ),
            tool_calls=[
                ToolSelection(
                    tool_id=t.tool_id,
                    tool_name=t.tool_name,
                    tool_kwargs=t.tool_kwargs,
                )
                for t in cur_tool_calls
            ],
            raw=return_direct_tool.tool_output.raw_output,
            current_agent_name=agent.name,
        )
        result = await agent.finalize(ctx, result, memory)

        # we don't want to stop the system if we're just handing off
        if return_direct_tool.tool_name != "handoff":
            await ctx.set("current_tool_calls", [])
            return StopEvent(result=result)

    user_msg_str = await ctx.get("user_msg_str")
    input_messages = await memory.aget(input=user_msg_str)

    # get this again, in case it changed
    agent_name = await ctx.get("current_agent_name")
    agent = self.agents[agent_name]

    return AgentInput(input=input_messages, current_agent_name=agent.name)

from_tools_or_functions classmethod #

from_tools_or_functions(tools_or_functions: List[Union[BaseTool, Callable]], llm: Optional[LLM] = None, system_prompt: Optional[str] = None, state_prompt: Optional[Union[str, BasePromptTemplate]] = None, initial_state: Optional[dict] = None, timeout: Optional[float] = None, verbose: bool = False) -> AgentWorkflow

从工具或函数列表初始化 AgentWorkflow。

工作流将使用一个单一代理初始化,该代理使用提供的工具或函数。

如果 LLM 是函数调用模型,则工作流将使用 FunctionAgent。否则,将使用 ReActAgent。

源代码位于 llama-index-core/llama_index/core/agent/workflow/multi_agent_workflow.py
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
@classmethod
def from_tools_or_functions(
    cls,
    tools_or_functions: List[Union[BaseTool, Callable]],
    llm: Optional[LLM] = None,
    system_prompt: Optional[str] = None,
    state_prompt: Optional[Union[str, BasePromptTemplate]] = None,
    initial_state: Optional[dict] = None,
    timeout: Optional[float] = None,
    verbose: bool = False,
) -> "AgentWorkflow":
    """
    Initializes an AgentWorkflow from a list of tools or functions.

    The workflow will be initialized with a single agent that uses the provided tools or functions.

    If the LLM is a function calling model, the workflow will use the FunctionAgent.
    Otherwise, it will use the ReActAgent.
    """
    llm = llm or Settings.llm
    agent_cls = (
        FunctionAgent if llm.metadata.is_function_calling_model else ReActAgent
    )

    tools = [
        FunctionTool.from_defaults(fn=tool)
        if not isinstance(tool, BaseTool)
        else tool
        for tool in tools_or_functions
    ]
    return cls(
        agents=[
            agent_cls(
                name="Agent",
                description="A single agent that uses the provided tools or functions.",
                tools=tools,
                llm=llm,
                system_prompt=system_prompt,
            )
        ],
        state_prompt=state_prompt,
        initial_state=initial_state,
        timeout=timeout,
        verbose=verbose,
    )

BaseWorkflowAgent #

继承自: BaseModel, PromptMixin, ABC

所有代理的基类,结合了配置和逻辑。

参数

名称 类型 描述 默认值
name str

代理的名称

'Agent'
description str

代理的功能和职责描述

'An agent that can perform a task'
system_prompt str | None

代理的系统提示词

tools List[Union[BaseTool, Callable]] | None

代理可以使用的工具

tool_retriever ObjectRetriever | None

代理的工具检索器,可以代替工具提供

can_handoff_to List[str] | None

此代理可以移交给的代理名称

llm LLM

代理使用的 LLM

<dynamic>
源代码位于 llama-index-core/llama_index/core/agent/workflow/base_agent.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
class BaseWorkflowAgent(BaseModel, PromptMixin, ABC):
    """Base class for all agents, combining config and logic."""

    model_config = ConfigDict(arbitrary_types_allowed=True)

    name: str = Field(default=DEFAULT_AGENT_NAME, description="The name of the agent")
    description: str = Field(
        default=DEFAULT_AGENT_DESCRIPTION,
        description="The description of what the agent does and is responsible for",
    )
    system_prompt: Optional[str] = Field(
        default=None, description="The system prompt for the agent"
    )
    tools: Optional[List[Union[BaseTool, Callable]]] = Field(
        default=None, description="The tools that the agent can use"
    )
    tool_retriever: Optional[ObjectRetriever] = Field(
        default=None,
        description="The tool retriever for the agent, can be provided instead of tools",
    )
    can_handoff_to: Optional[List[str]] = Field(
        default=None, description="The agent names that this agent can hand off to"
    )
    llm: LLM = Field(
        default_factory=get_default_llm, description="The LLM that the agent uses"
    )

    @field_validator("tools", mode="before")
    def validate_tools(
        cls, v: Optional[Sequence[Union[BaseTool, Callable]]]
    ) -> Optional[Sequence[BaseTool]]:
        """
        Validate tools.

        If tools are not of type BaseTool, they will be converted to FunctionTools.
        This assumes the inputs are tools or callable functions.
        """
        if v is None:
            return None

        validated_tools: List[BaseTool] = []
        for tool in v:
            if not isinstance(tool, BaseTool):
                validated_tools.append(FunctionTool.from_defaults(tool))
            else:
                validated_tools.append(tool)

        for tool in validated_tools:
            if tool.metadata.name == "handoff":
                raise ValueError(
                    "'handoff' is a reserved tool name. Please use a different name."
                )

        return validated_tools  # type: ignore[return-value]

    def _get_prompts(self) -> PromptDictType:
        """Get prompts."""
        return {}

    def _get_prompt_modules(self) -> PromptMixinType:
        """Get prompt sub-modules."""
        return {}

    def _update_prompts(self, prompts_dict: PromptDictType) -> None:
        """Update prompts."""

    @abstractmethod
    async def take_step(
        self,
        ctx: Context,
        llm_input: List[ChatMessage],
        tools: Sequence[AsyncBaseTool],
        memory: BaseMemory,
    ) -> AgentOutput:
        """Take a single step with the agent."""

    @abstractmethod
    async def handle_tool_call_results(
        self, ctx: Context, results: List[ToolCallResult], memory: BaseMemory
    ) -> None:
        """Handle tool call results."""

    @abstractmethod
    async def finalize(
        self, ctx: Context, output: AgentOutput, memory: BaseMemory
    ) -> AgentOutput:
        """Finalize the agent's execution."""

    @abstractmethod
    def run(
        self,
        user_msg: Optional[Union[str, ChatMessage]] = None,
        chat_history: Optional[List[ChatMessage]] = None,
        memory: Optional[BaseMemory] = None,
        ctx: Optional[Context] = None,
        stepwise: bool = False,
        checkpoint_callback: Optional[CheckpointCallback] = None,
        **workflow_kwargs: Any,
    ) -> WorkflowHandler:
        """Run the agent."""

validate_tools #

validate_tools(v: Optional[Sequence[Union[BaseTool, Callable]]]) -> Optional[Sequence[BaseTool]]

验证工具。

如果工具不是 BaseTool 类型,它们将被转换为 FunctionTools。这假定输入是工具或可调用函数。

源代码位于 llama-index-core/llama_index/core/agent/workflow/base_agent.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
@field_validator("tools", mode="before")
def validate_tools(
    cls, v: Optional[Sequence[Union[BaseTool, Callable]]]
) -> Optional[Sequence[BaseTool]]:
    """
    Validate tools.

    If tools are not of type BaseTool, they will be converted to FunctionTools.
    This assumes the inputs are tools or callable functions.
    """
    if v is None:
        return None

    validated_tools: List[BaseTool] = []
    for tool in v:
        if not isinstance(tool, BaseTool):
            validated_tools.append(FunctionTool.from_defaults(tool))
        else:
            validated_tools.append(tool)

    for tool in validated_tools:
        if tool.metadata.name == "handoff":
            raise ValueError(
                "'handoff' is a reserved tool name. Please use a different name."
            )

    return validated_tools  # type: ignore[return-value]

take_step abstractmethod async #

take_step(ctx: Context, llm_input: List[ChatMessage], tools: Sequence[AsyncBaseTool], memory: BaseMemory) -> AgentOutput

与代理一起执行一个单步。

源代码位于 llama-index-core/llama_index/core/agent/workflow/base_agent.py
 98
 99
100
101
102
103
104
105
106
@abstractmethod
async def take_step(
    self,
    ctx: Context,
    llm_input: List[ChatMessage],
    tools: Sequence[AsyncBaseTool],
    memory: BaseMemory,
) -> AgentOutput:
    """Take a single step with the agent."""

handle_tool_call_results abstractmethod async #

handle_tool_call_results(ctx: Context, results: List[ToolCallResult], memory: BaseMemory) -> None

处理工具调用结果。

源代码位于 llama-index-core/llama_index/core/agent/workflow/base_agent.py
108
109
110
111
112
@abstractmethod
async def handle_tool_call_results(
    self, ctx: Context, results: List[ToolCallResult], memory: BaseMemory
) -> None:
    """Handle tool call results."""

finalize abstractmethod async #

finalize(ctx: Context, output: AgentOutput, memory: BaseMemory) -> AgentOutput

完成代理的执行。

源代码位于 llama-index-core/llama_index/core/agent/workflow/base_agent.py
114
115
116
117
118
@abstractmethod
async def finalize(
    self, ctx: Context, output: AgentOutput, memory: BaseMemory
) -> AgentOutput:
    """Finalize the agent's execution."""

run abstractmethod #

run(user_msg: Optional[Union[str, ChatMessage]] = None, chat_history: Optional[List[ChatMessage]] = None, memory: Optional[BaseMemory] = None, ctx: Optional[Context] = None, stepwise: bool = False, checkpoint_callback: Optional[CheckpointCallback] = None, **workflow_kwargs: Any) -> WorkflowHandler

运行代理。

源代码位于 llama-index-core/llama_index/core/agent/workflow/base_agent.py
120
121
122
123
124
125
126
127
128
129
130
131
@abstractmethod
def run(
    self,
    user_msg: Optional[Union[str, ChatMessage]] = None,
    chat_history: Optional[List[ChatMessage]] = None,
    memory: Optional[BaseMemory] = None,
    ctx: Optional[Context] = None,
    stepwise: bool = False,
    checkpoint_callback: Optional[CheckpointCallback] = None,
    **workflow_kwargs: Any,
) -> WorkflowHandler:
    """Run the agent."""

FunctionAgent #

继承自: SingleAgentRunnerMixin, BaseWorkflowAgent

函数调用代理实现。

参数

名称 类型 描述 默认值
scratchpad_key str
'scratchpad'
源代码位于 llama-index-core/llama_index/core/agent/workflow/function_agent.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
class FunctionAgent(SingleAgentRunnerMixin, BaseWorkflowAgent):
    """Function calling agent implementation."""

    scratchpad_key: str = "scratchpad"

    async def take_step(
        self,
        ctx: Context,
        llm_input: List[ChatMessage],
        tools: Sequence[AsyncBaseTool],
        memory: BaseMemory,
    ) -> AgentOutput:
        """Take a single step with the function calling agent."""
        if not self.llm.metadata.is_function_calling_model:
            raise ValueError("LLM must be a FunctionCallingLLM")

        scratchpad: List[ChatMessage] = await ctx.get(self.scratchpad_key, default=[])
        current_llm_input = [*llm_input, *scratchpad]

        ctx.write_event_to_stream(
            AgentInput(input=current_llm_input, current_agent_name=self.name)
        )

        response = await self.llm.astream_chat_with_tools(  # type: ignore
            tools, chat_history=current_llm_input, allow_parallel_tool_calls=True
        )
        # last_chat_response will be used later, after the loop.
        # We initialize it so it's valid even when 'response' is empty
        last_chat_response = ChatResponse(message=ChatMessage())
        async for last_chat_response in response:
            tool_calls = self.llm.get_tool_calls_from_response(  # type: ignore
                last_chat_response, error_on_no_tool_call=False
            )
            raw = (
                last_chat_response.raw.model_dump()
                if isinstance(last_chat_response.raw, BaseModel)
                else last_chat_response.raw
            )
            ctx.write_event_to_stream(
                AgentStream(
                    delta=last_chat_response.delta or "",
                    response=last_chat_response.message.content or "",
                    tool_calls=tool_calls or [],
                    raw=raw,
                    current_agent_name=self.name,
                )
            )

        tool_calls = self.llm.get_tool_calls_from_response(  # type: ignore
            last_chat_response, error_on_no_tool_call=False
        )

        # only add to scratchpad if we didn't select the handoff tool
        scratchpad.append(last_chat_response.message)
        await ctx.set(self.scratchpad_key, scratchpad)

        raw = (
            last_chat_response.raw.model_dump()
            if isinstance(last_chat_response.raw, BaseModel)
            else last_chat_response.raw
        )
        return AgentOutput(
            response=last_chat_response.message,
            tool_calls=tool_calls or [],
            raw=raw,
            current_agent_name=self.name,
        )

    async def handle_tool_call_results(
        self, ctx: Context, results: List[ToolCallResult], memory: BaseMemory
    ) -> None:
        """Handle tool call results for function calling agent."""
        scratchpad: List[ChatMessage] = await ctx.get(self.scratchpad_key, default=[])

        for tool_call_result in results:
            scratchpad.append(
                ChatMessage(
                    role="tool",
                    content=str(tool_call_result.tool_output.content),
                    additional_kwargs={"tool_call_id": tool_call_result.tool_id},
                )
            )

            if (
                tool_call_result.return_direct
                and tool_call_result.tool_name != "handoff"
            ):
                scratchpad.append(
                    ChatMessage(
                        role="assistant",
                        content=str(tool_call_result.tool_output.content),
                        additional_kwargs={"tool_call_id": tool_call_result.tool_id},
                    )
                )
                break

        await ctx.set(self.scratchpad_key, scratchpad)

    async def finalize(
        self, ctx: Context, output: AgentOutput, memory: BaseMemory
    ) -> AgentOutput:
        """
        Finalize the function calling agent.

        Adds all in-progress messages to memory.
        """
        scratchpad: List[ChatMessage] = await ctx.get(self.scratchpad_key, default=[])
        await memory.aput_messages(scratchpad)

        # reset scratchpad
        await ctx.set(self.scratchpad_key, [])

        return output

take_step async #

take_step(ctx: Context, llm_input: List[ChatMessage], tools: Sequence[AsyncBaseTool], memory: BaseMemory) -> AgentOutput

与函数调用代理一起执行一个单步。

源代码位于 llama-index-core/llama_index/core/agent/workflow/function_agent.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
async def take_step(
    self,
    ctx: Context,
    llm_input: List[ChatMessage],
    tools: Sequence[AsyncBaseTool],
    memory: BaseMemory,
) -> AgentOutput:
    """Take a single step with the function calling agent."""
    if not self.llm.metadata.is_function_calling_model:
        raise ValueError("LLM must be a FunctionCallingLLM")

    scratchpad: List[ChatMessage] = await ctx.get(self.scratchpad_key, default=[])
    current_llm_input = [*llm_input, *scratchpad]

    ctx.write_event_to_stream(
        AgentInput(input=current_llm_input, current_agent_name=self.name)
    )

    response = await self.llm.astream_chat_with_tools(  # type: ignore
        tools, chat_history=current_llm_input, allow_parallel_tool_calls=True
    )
    # last_chat_response will be used later, after the loop.
    # We initialize it so it's valid even when 'response' is empty
    last_chat_response = ChatResponse(message=ChatMessage())
    async for last_chat_response in response:
        tool_calls = self.llm.get_tool_calls_from_response(  # type: ignore
            last_chat_response, error_on_no_tool_call=False
        )
        raw = (
            last_chat_response.raw.model_dump()
            if isinstance(last_chat_response.raw, BaseModel)
            else last_chat_response.raw
        )
        ctx.write_event_to_stream(
            AgentStream(
                delta=last_chat_response.delta or "",
                response=last_chat_response.message.content or "",
                tool_calls=tool_calls or [],
                raw=raw,
                current_agent_name=self.name,
            )
        )

    tool_calls = self.llm.get_tool_calls_from_response(  # type: ignore
        last_chat_response, error_on_no_tool_call=False
    )

    # only add to scratchpad if we didn't select the handoff tool
    scratchpad.append(last_chat_response.message)
    await ctx.set(self.scratchpad_key, scratchpad)

    raw = (
        last_chat_response.raw.model_dump()
        if isinstance(last_chat_response.raw, BaseModel)
        else last_chat_response.raw
    )
    return AgentOutput(
        response=last_chat_response.message,
        tool_calls=tool_calls or [],
        raw=raw,
        current_agent_name=self.name,
    )

handle_tool_call_results async #

handle_tool_call_results(ctx: Context, results: List[ToolCallResult], memory: BaseMemory) -> None

处理函数调用代理的工具调用结果。

源代码位于 llama-index-core/llama_index/core/agent/workflow/function_agent.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
async def handle_tool_call_results(
    self, ctx: Context, results: List[ToolCallResult], memory: BaseMemory
) -> None:
    """Handle tool call results for function calling agent."""
    scratchpad: List[ChatMessage] = await ctx.get(self.scratchpad_key, default=[])

    for tool_call_result in results:
        scratchpad.append(
            ChatMessage(
                role="tool",
                content=str(tool_call_result.tool_output.content),
                additional_kwargs={"tool_call_id": tool_call_result.tool_id},
            )
        )

        if (
            tool_call_result.return_direct
            and tool_call_result.tool_name != "handoff"
        ):
            scratchpad.append(
                ChatMessage(
                    role="assistant",
                    content=str(tool_call_result.tool_output.content),
                    additional_kwargs={"tool_call_id": tool_call_result.tool_id},
                )
            )
            break

    await ctx.set(self.scratchpad_key, scratchpad)

finalize async #

finalize(ctx: Context, output: AgentOutput, memory: BaseMemory) -> AgentOutput

完成函数调用代理。

将所有进行中的消息添加到内存中。

源代码位于 llama-index-core/llama_index/core/agent/workflow/function_agent.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
async def finalize(
    self, ctx: Context, output: AgentOutput, memory: BaseMemory
) -> AgentOutput:
    """
    Finalize the function calling agent.

    Adds all in-progress messages to memory.
    """
    scratchpad: List[ChatMessage] = await ctx.get(self.scratchpad_key, default=[])
    await memory.aput_messages(scratchpad)

    # reset scratchpad
    await ctx.set(self.scratchpad_key, [])

    return output

ReActAgent #

继承自: SingleAgentRunnerMixin, BaseWorkflowAgent

React 代理实现。

参数

名称 类型 描述 默认值
reasoning_key str
'current_reasoning'
output_parser ReActOutputParser

React 输出解析器

<llama_index.core.agent.react.output_parser.ReActOutputParser object at 0x7e52a4307f50>
formatter ReActChatFormatter

用于将推理步骤和聊天历史格式化为 LLM 输入的 React 聊天格式化器。

<dynamic>
源代码位于 llama-index-core/llama_index/core/agent/workflow/react_agent.py
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
class ReActAgent(SingleAgentRunnerMixin, BaseWorkflowAgent):
    """React agent implementation."""

    reasoning_key: str = "current_reasoning"
    output_parser: ReActOutputParser = Field(
        default_factory=ReActOutputParser, description="The react output parser"
    )
    formatter: ReActChatFormatter = Field(
        default_factory=default_formatter,
        description="The react chat formatter to format the reasoning steps and chat history into an llm input.",
    )

    def _get_prompts(self) -> PromptDictType:
        """Get prompts."""
        # TODO: the ReAct formatter does not explicitly specify PromptTemplate
        # objects, but wrap it in this to obey the interface
        react_header = self.formatter.system_header
        return {"react_header": PromptTemplate(react_header)}

    def _update_prompts(self, prompts: PromptDictType) -> None:
        """Update prompts."""
        if "react_header" in prompts:
            react_header = prompts["react_header"]
            if isinstance(react_header, str):
                react_header = PromptTemplate(react_header)
            self.formatter.system_header = react_header.get_template()

    async def take_step(
        self,
        ctx: Context,
        llm_input: List[ChatMessage],
        tools: Sequence[AsyncBaseTool],
        memory: BaseMemory,
    ) -> AgentOutput:
        """Take a single step with the React agent."""
        # remove system prompt, since the react prompt will be combined with it
        if llm_input[0].role == "system":
            system_prompt = llm_input[0].content or ""
            llm_input = llm_input[1:]
        else:
            system_prompt = ""

        output_parser = self.output_parser
        react_chat_formatter = self.formatter
        react_chat_formatter.context = system_prompt

        # Format initial chat input
        current_reasoning: list[BaseReasoningStep] = await ctx.get(
            self.reasoning_key, default=[]
        )
        input_chat = react_chat_formatter.format(
            tools,
            chat_history=llm_input,
            current_reasoning=current_reasoning,
        )
        ctx.write_event_to_stream(
            AgentInput(input=input_chat, current_agent_name=self.name)
        )

        # Initial LLM call
        response = await self.llm.astream_chat(input_chat)
        # last_chat_response will be used later, after the loop.
        # We initialize it so it's valid even when 'response' is empty
        last_chat_response = ChatResponse(message=ChatMessage())
        async for last_chat_response in response:
            raw = (
                last_chat_response.raw.model_dump()
                if isinstance(last_chat_response.raw, BaseModel)
                else last_chat_response.raw
            )
            ctx.write_event_to_stream(
                AgentStream(
                    delta=last_chat_response.delta or "",
                    response=last_chat_response.message.content or "",
                    tool_calls=[],
                    raw=raw,
                    current_agent_name=self.name,
                )
            )

        # Parse reasoning step and check if done
        message_content = last_chat_response.message.content
        if not message_content:
            raise ValueError("Got empty message")

        try:
            reasoning_step = output_parser.parse(message_content, is_streaming=False)
        except ValueError as e:
            error_msg = f"Error: Could not parse output. Please follow the thought-action-input format. Try again. Details: {e!s}"
            await memory.aput(last_chat_response.message)
            await memory.aput(ChatMessage(role="user", content=error_msg))

            raw = (
                last_chat_response.raw.model_dump()
                if isinstance(last_chat_response.raw, BaseModel)
                else last_chat_response.raw
            )
            return AgentOutput(
                response=last_chat_response.message,
                tool_calls=[],
                raw=raw,
                current_agent_name=self.name,
            )

        # add to reasoning if not a handoff
        current_reasoning.append(reasoning_step)
        await ctx.set(self.reasoning_key, current_reasoning)

        # If response step, we're done
        raw = (
            last_chat_response.raw.model_dump()
            if isinstance(last_chat_response.raw, BaseModel)
            else last_chat_response.raw
        )
        if reasoning_step.is_done:
            return AgentOutput(
                response=last_chat_response.message,
                tool_calls=[],
                raw=raw,
                current_agent_name=self.name,
            )

        reasoning_step = cast(ActionReasoningStep, reasoning_step)
        if not isinstance(reasoning_step, ActionReasoningStep):
            raise ValueError(f"Expected ActionReasoningStep, got {reasoning_step}")

        # Create tool call
        tool_calls = [
            ToolSelection(
                tool_id=str(uuid.uuid4()),
                tool_name=reasoning_step.action,
                tool_kwargs=reasoning_step.action_input,
            )
        ]

        return AgentOutput(
            response=last_chat_response.message,
            tool_calls=tool_calls,
            raw=raw,
            current_agent_name=self.name,
        )

    async def handle_tool_call_results(
        self, ctx: Context, results: List[ToolCallResult], memory: BaseMemory
    ) -> None:
        """Handle tool call results for React agent."""
        current_reasoning: list[BaseReasoningStep] = await ctx.get(
            self.reasoning_key, default=[]
        )
        for tool_call_result in results:
            obs_step = ObservationReasoningStep(
                observation=str(tool_call_result.tool_output.content),
                return_direct=tool_call_result.return_direct,
            )
            current_reasoning.append(obs_step)

            if (
                tool_call_result.return_direct
                and tool_call_result.tool_name != "handoff"
            ):
                current_reasoning.append(
                    ResponseReasoningStep(
                        thought=obs_step.observation,
                        response=obs_step.observation,
                        is_streaming=False,
                    )
                )
                break

        await ctx.set(self.reasoning_key, current_reasoning)

    async def finalize(
        self, ctx: Context, output: AgentOutput, memory: BaseMemory
    ) -> AgentOutput:
        """Finalize the React agent."""
        current_reasoning: list[BaseReasoningStep] = await ctx.get(
            self.reasoning_key, default=[]
        )

        if len(current_reasoning) > 0 and isinstance(
            current_reasoning[-1], ResponseReasoningStep
        ):
            reasoning_str = "\n".join([x.get_content() for x in current_reasoning])

            if reasoning_str:
                reasoning_msg = ChatMessage(role="assistant", content=reasoning_str)
                await memory.aput(reasoning_msg)
                await ctx.set(self.reasoning_key, [])

            # remove "Answer:" from the response
            if output.response.content and "Answer:" in output.response.content:
                start_idx = output.response.content.find("Answer:")
                if start_idx != -1:
                    output.response.content = output.response.content[
                        start_idx + len("Answer:") :
                    ].strip()

            # clear scratchpad
            await ctx.set(self.reasoning_key, [])

        return output

take_step async #

take_step(ctx: Context, llm_input: List[ChatMessage], tools: Sequence[AsyncBaseTool], memory: BaseMemory) -> AgentOutput

与 React 代理一起执行一个单步。

源代码位于 llama-index-core/llama_index/core/agent/workflow/react_agent.py
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
async def take_step(
    self,
    ctx: Context,
    llm_input: List[ChatMessage],
    tools: Sequence[AsyncBaseTool],
    memory: BaseMemory,
) -> AgentOutput:
    """Take a single step with the React agent."""
    # remove system prompt, since the react prompt will be combined with it
    if llm_input[0].role == "system":
        system_prompt = llm_input[0].content or ""
        llm_input = llm_input[1:]
    else:
        system_prompt = ""

    output_parser = self.output_parser
    react_chat_formatter = self.formatter
    react_chat_formatter.context = system_prompt

    # Format initial chat input
    current_reasoning: list[BaseReasoningStep] = await ctx.get(
        self.reasoning_key, default=[]
    )
    input_chat = react_chat_formatter.format(
        tools,
        chat_history=llm_input,
        current_reasoning=current_reasoning,
    )
    ctx.write_event_to_stream(
        AgentInput(input=input_chat, current_agent_name=self.name)
    )

    # Initial LLM call
    response = await self.llm.astream_chat(input_chat)
    # last_chat_response will be used later, after the loop.
    # We initialize it so it's valid even when 'response' is empty
    last_chat_response = ChatResponse(message=ChatMessage())
    async for last_chat_response in response:
        raw = (
            last_chat_response.raw.model_dump()
            if isinstance(last_chat_response.raw, BaseModel)
            else last_chat_response.raw
        )
        ctx.write_event_to_stream(
            AgentStream(
                delta=last_chat_response.delta or "",
                response=last_chat_response.message.content or "",
                tool_calls=[],
                raw=raw,
                current_agent_name=self.name,
            )
        )

    # Parse reasoning step and check if done
    message_content = last_chat_response.message.content
    if not message_content:
        raise ValueError("Got empty message")

    try:
        reasoning_step = output_parser.parse(message_content, is_streaming=False)
    except ValueError as e:
        error_msg = f"Error: Could not parse output. Please follow the thought-action-input format. Try again. Details: {e!s}"
        await memory.aput(last_chat_response.message)
        await memory.aput(ChatMessage(role="user", content=error_msg))

        raw = (
            last_chat_response.raw.model_dump()
            if isinstance(last_chat_response.raw, BaseModel)
            else last_chat_response.raw
        )
        return AgentOutput(
            response=last_chat_response.message,
            tool_calls=[],
            raw=raw,
            current_agent_name=self.name,
        )

    # add to reasoning if not a handoff
    current_reasoning.append(reasoning_step)
    await ctx.set(self.reasoning_key, current_reasoning)

    # If response step, we're done
    raw = (
        last_chat_response.raw.model_dump()
        if isinstance(last_chat_response.raw, BaseModel)
        else last_chat_response.raw
    )
    if reasoning_step.is_done:
        return AgentOutput(
            response=last_chat_response.message,
            tool_calls=[],
            raw=raw,
            current_agent_name=self.name,
        )

    reasoning_step = cast(ActionReasoningStep, reasoning_step)
    if not isinstance(reasoning_step, ActionReasoningStep):
        raise ValueError(f"Expected ActionReasoningStep, got {reasoning_step}")

    # Create tool call
    tool_calls = [
        ToolSelection(
            tool_id=str(uuid.uuid4()),
            tool_name=reasoning_step.action,
            tool_kwargs=reasoning_step.action_input,
        )
    ]

    return AgentOutput(
        response=last_chat_response.message,
        tool_calls=tool_calls,
        raw=raw,
        current_agent_name=self.name,
    )

handle_tool_call_results async #

handle_tool_call_results(ctx: Context, results: List[ToolCallResult], memory: BaseMemory) -> None

处理 React 代理的工具调用结果。

源代码位于 llama-index-core/llama_index/core/agent/workflow/react_agent.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
async def handle_tool_call_results(
    self, ctx: Context, results: List[ToolCallResult], memory: BaseMemory
) -> None:
    """Handle tool call results for React agent."""
    current_reasoning: list[BaseReasoningStep] = await ctx.get(
        self.reasoning_key, default=[]
    )
    for tool_call_result in results:
        obs_step = ObservationReasoningStep(
            observation=str(tool_call_result.tool_output.content),
            return_direct=tool_call_result.return_direct,
        )
        current_reasoning.append(obs_step)

        if (
            tool_call_result.return_direct
            and tool_call_result.tool_name != "handoff"
        ):
            current_reasoning.append(
                ResponseReasoningStep(
                    thought=obs_step.observation,
                    response=obs_step.observation,
                    is_streaming=False,
                )
            )
            break

    await ctx.set(self.reasoning_key, current_reasoning)

finalize async #

finalize(ctx: Context, output: AgentOutput, memory: BaseMemory) -> AgentOutput

完成 React 代理。

源代码位于 llama-index-core/llama_index/core/agent/workflow/react_agent.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
async def finalize(
    self, ctx: Context, output: AgentOutput, memory: BaseMemory
) -> AgentOutput:
    """Finalize the React agent."""
    current_reasoning: list[BaseReasoningStep] = await ctx.get(
        self.reasoning_key, default=[]
    )

    if len(current_reasoning) > 0 and isinstance(
        current_reasoning[-1], ResponseReasoningStep
    ):
        reasoning_str = "\n".join([x.get_content() for x in current_reasoning])

        if reasoning_str:
            reasoning_msg = ChatMessage(role="assistant", content=reasoning_str)
            await memory.aput(reasoning_msg)
            await ctx.set(self.reasoning_key, [])

        # remove "Answer:" from the response
        if output.response.content and "Answer:" in output.response.content:
            start_idx = output.response.content.find("Answer:")
            if start_idx != -1:
                output.response.content = output.response.content[
                    start_idx + len("Answer:") :
                ].strip()

        # clear scratchpad
        await ctx.set(self.reasoning_key, [])

    return output

CodeActAgent #

继承自: SingleAgentRunnerMixin, BaseWorkflowAgent

可以执行代码的工作流代理。

参数

名称 类型 描述 默认值
scratchpad_key str
'scratchpad'
code_execute_fn Callable | Awaitable

用于执行代码的函数。必须提供才能执行代理生成的代码。函数协议如下:async def code_execute_fn(code: str) -> Dict[str, Any]

required
code_act_system_prompt str | BasePromptTemplate

Code Act 代理的系统提示词。

'你是一个乐于助人的 AI 助手,可以编写并执行 Python 代码来解决问题。\n\n你将获得一个要执行的任务。你应该输出:\n- 用 <execute>...</execute> 标签包围的 Python 代码,提供任务的解决方案或解决方案的一步。任何你想从代码中提取的输出都应该打印到控制台。\n- 直接向用户显示的文本,如果你想询问更多信息或提供最终答案。\n- 如果之前的代码执行可以用于响应用户,则直接响应(通常你应该避免在响应中提及任何与代码执行相关的内容)。\n\n## 响应格式:\n正确代码格式示例:\n<execute>\nimport math\n\ndef calculate_area(radius):\n return math.pi * radius**2\n\n# Calculate the area for radius = 5\narea = calculate_area(5)\nprint(f"The area of the circle is {area:.2f} square units")\n</execute>\n\n除了 Python 标准库和你已经编写的任何函数之外,你还可以使用以下函数:\n{tool_descriptions}\n\n在之前代码片段的顶层定义的变量也可以在你的代码中引用。\n\n## 最终答案指南:\n- 提供最终答案时,重点是直接回答用户的问题\n- 除非明确要求,否则避免引用你生成的代码\n- 清晰简洁地呈现结果,就像是你直接计算出来的一样\n- 如果相关,可以简要提及使用的通用方法,但不要在最终答案中包含代码片段\n- 组织你的响应,就像是直接回答用户的查询,而不是解释你是如何解决的\n\n提醒:当你想运行代码时,始终将 Python 代码放在 <execute>...</execute> 标签之间。你可以在这些标签之外包含解释和其他内容。\n'
源代码位于 llama-index-core/llama_index/core/agent/workflow/codeact_agent.py
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
class CodeActAgent(SingleAgentRunnerMixin, BaseWorkflowAgent):
    """
    A workflow agent that can execute code.
    """

    scratchpad_key: str = "scratchpad"

    code_execute_fn: Union[Callable, Awaitable] = Field(
        description=(
            "The function to execute code. Required in order to execute code generated by the agent.\n"
            "The function protocol is as follows: async def code_execute_fn(code: str) -> Dict[str, Any]"
        ),
    )

    code_act_system_prompt: Union[str, BasePromptTemplate] = Field(
        default=DEFAULT_CODE_ACT_PROMPT,
        description="The system prompt for the code act agent.",
        validate_default=True,
    )

    def __init__(
        self,
        code_execute_fn: Union[Callable, Awaitable],
        name: str = "code_act_agent",
        description: str = "A workflow agent that can execute code.",
        system_prompt: Optional[str] = None,
        tools: Optional[Sequence[AsyncBaseTool]] = None,
        tool_retriever: Optional[ObjectRetriever] = None,
        can_handoff_to: Optional[List[str]] = None,
        llm: Optional[LLM] = None,
        code_act_system_prompt: Union[
            str, BasePromptTemplate
        ] = DEFAULT_CODE_ACT_PROMPT,
    ):
        tools = tools or []
        tools.append(  # type: ignore
            FunctionTool.from_defaults(code_execute_fn, name=EXECUTE_TOOL_NAME)  # type: ignore
        )
        if isinstance(code_act_system_prompt, str):
            code_act_system_prompt = PromptTemplate(code_act_system_prompt)

        super().__init__(
            name=name,
            description=description,
            system_prompt=system_prompt,
            tools=tools,
            tool_retriever=tool_retriever,
            can_handoff_to=can_handoff_to,
            llm=llm,
            code_act_system_prompt=code_act_system_prompt,
            code_execute_fn=code_execute_fn,
        )

    def _get_tool_fns(self, tools: Sequence[AsyncBaseTool]) -> List[Callable]:
        """Get the tool functions while validating that they are valid tools for the CodeActAgent."""
        callables = []
        for tool in tools:
            if (
                tool.metadata.name == "handoff"
                or tool.metadata.name == EXECUTE_TOOL_NAME
            ):
                continue

            if isinstance(tool, FunctionTool):
                if tool.requires_context:
                    raise ValueError(
                        f"Tool {tool.metadata.name} requires context. "
                        "CodeActAgent only supports tools that do not require context."
                    )

                callables.append(tool.real_fn)
            else:
                raise ValueError(
                    f"Tool {tool.metadata.name} is not a FunctionTool. "
                    "CodeActAgent only supports Functions and FunctionTools."
                )

        return callables

    def _extract_code_from_response(self, response_text: str) -> Optional[str]:
        """
        Extract code from the LLM response using XML-style <execute> tags.

        Args:
            response_text: The LLM response text

        Returns:
            Extracted code or None if no code found

        """
        # Match content between <execute> and </execute> tags
        execute_pattern = r"<execute>(.*?)</execute>"
        execute_matches = re.findall(execute_pattern, response_text, re.DOTALL)

        if execute_matches:
            return "\n\n".join([x.strip() for x in execute_matches])

        return None

    def _get_tool_descriptions(self, tools: Sequence[AsyncBaseTool]) -> str:
        """
        Generate tool descriptions for the system prompt using tool metadata.

        Args:
            tools: List of available tools

        Returns:
            Tool descriptions as a string

        """
        tool_descriptions = []

        tool_fns = self._get_tool_fns(tools)
        for fn in tool_fns:
            signature = inspect.signature(fn)
            fn_name: str = fn.__name__
            docstring: Optional[str] = inspect.getdoc(fn)

            tool_description = f"def {fn_name}{signature!s}:"
            if docstring:
                tool_description += f'\n  """\n{docstring}\n  """\n'

            tool_description += "\n  ...\n"
            tool_descriptions.append(tool_description)

        return "\n\n".join(tool_descriptions)

    async def take_step(
        self,
        ctx: Context,
        llm_input: List[ChatMessage],
        tools: Sequence[AsyncBaseTool],
        memory: BaseMemory,
    ) -> AgentOutput:
        """Take a single step with the code act agent."""
        if not self.code_execute_fn:
            raise ValueError("code_execute_fn must be provided for CodeActAgent")

        # Get current scratchpad
        scratchpad: List[ChatMessage] = await ctx.get(self.scratchpad_key, default=[])
        current_llm_input = [*llm_input, *scratchpad]

        # Create a system message with tool descriptions
        tool_descriptions = self._get_tool_descriptions(tools)
        system_prompt = self.code_act_system_prompt.format(
            tool_descriptions=tool_descriptions
        )

        # Add or overwrite system message
        has_system = False
        for i, msg in enumerate(current_llm_input):
            if msg.role.value == "system":
                current_llm_input[i] = ChatMessage(role="system", content=system_prompt)
                has_system = True
                break

        if not has_system:
            current_llm_input.insert(
                0, ChatMessage(role="system", content=system_prompt)
            )

        # Write the input to the event stream
        ctx.write_event_to_stream(
            AgentInput(input=current_llm_input, current_agent_name=self.name)
        )

        # For now, only support the handoff tool
        # All other tools should be part of the code execution
        if any(tool.metadata.name == "handoff" for tool in tools):
            if not isinstance(self.llm, FunctionCallingLLM):
                raise ValueError("llm must be a function calling LLM to use handoff")

            tools = [tool for tool in tools if tool.metadata.name == "handoff"]
            response = await self.llm.astream_chat_with_tools(
                tools, chat_history=current_llm_input
            )
        else:
            response = await self.llm.astream_chat(current_llm_input)

        # Initialize for streaming
        last_chat_response = ChatResponse(message=ChatMessage())
        full_response_text = ""

        # Process streaming response
        async for last_chat_response in response:
            delta = last_chat_response.delta or ""
            full_response_text += delta

            # Create a raw object for the event stream
            raw = (
                last_chat_response.raw.model_dump()
                if isinstance(last_chat_response.raw, BaseModel)
                else last_chat_response.raw
            )

            # Write delta to the event stream
            ctx.write_event_to_stream(
                AgentStream(
                    delta=delta,
                    response=full_response_text,
                    # We'll add the tool call after processing the full response
                    tool_calls=[],
                    raw=raw,
                    current_agent_name=self.name,
                )
            )

        # Extract code from the response
        code = self._extract_code_from_response(full_response_text)

        # Create a tool call for executing the code if code was found
        tool_calls = []
        if code:
            tool_id = str(uuid.uuid4())

            tool_calls = [
                ToolSelection(
                    tool_id=tool_id,
                    tool_name=EXECUTE_TOOL_NAME,
                    tool_kwargs={"code": code},
                )
            ]

        if isinstance(self.llm, FunctionCallingLLM):
            extra_tool_calls = self.llm.get_tool_calls_from_response(
                last_chat_response, error_on_no_tool_call=False
            )
            tool_calls.extend(extra_tool_calls)

        # Add the response to the scratchpad
        message = ChatMessage(role="assistant", content=full_response_text)
        scratchpad.append(message)
        await ctx.set(self.scratchpad_key, scratchpad)

        # Create the raw object for the output
        raw = (
            last_chat_response.raw.model_dump()
            if isinstance(last_chat_response.raw, BaseModel)
            else last_chat_response.raw
        )

        return AgentOutput(
            response=message,
            tool_calls=tool_calls,
            raw=raw,
            current_agent_name=self.name,
        )

    async def handle_tool_call_results(
        self, ctx: Context, results: List[ToolCallResult], memory: BaseMemory
    ) -> None:
        """Handle tool call results for code act agent."""
        scratchpad: List[ChatMessage] = await ctx.get(self.scratchpad_key, default=[])

        # handle code execution and handoff
        for tool_call_result in results:
            # Format the output as a tool response message
            if tool_call_result.tool_name == EXECUTE_TOOL_NAME:
                code_result = f"Result of executing the code given:\n\n{tool_call_result.tool_output.content}"
                scratchpad.append(
                    ChatMessage(
                        role="user",
                        content=code_result,
                    )
                )
            elif tool_call_result.tool_name == "handoff":
                scratchpad.append(
                    ChatMessage(
                        role="tool",
                        content=str(tool_call_result.tool_output.content),
                        additional_kwargs={"tool_call_id": tool_call_result.tool_id},
                    )
                )
            else:
                raise ValueError(f"Unknown tool name: {tool_call_result.tool_name}")

        await ctx.set(self.scratchpad_key, scratchpad)

    async def finalize(
        self, ctx: Context, output: AgentOutput, memory: BaseMemory
    ) -> AgentOutput:
        """
        Finalize the code act agent.

        Adds all in-progress messages to memory.
        """
        scratchpad: List[ChatMessage] = await ctx.get(self.scratchpad_key, default=[])
        await memory.aput_messages(scratchpad)

        # reset scratchpad
        await ctx.set(self.scratchpad_key, [])

        return output

take_step async #

take_step(ctx: Context, llm_input: List[ChatMessage], tools: Sequence[AsyncBaseTool], memory: BaseMemory) -> AgentOutput

与 Code Act 代理一起执行一个单步。

源代码位于 llama-index-core/llama_index/core/agent/workflow/codeact_agent.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
async def take_step(
    self,
    ctx: Context,
    llm_input: List[ChatMessage],
    tools: Sequence[AsyncBaseTool],
    memory: BaseMemory,
) -> AgentOutput:
    """Take a single step with the code act agent."""
    if not self.code_execute_fn:
        raise ValueError("code_execute_fn must be provided for CodeActAgent")

    # Get current scratchpad
    scratchpad: List[ChatMessage] = await ctx.get(self.scratchpad_key, default=[])
    current_llm_input = [*llm_input, *scratchpad]

    # Create a system message with tool descriptions
    tool_descriptions = self._get_tool_descriptions(tools)
    system_prompt = self.code_act_system_prompt.format(
        tool_descriptions=tool_descriptions
    )

    # Add or overwrite system message
    has_system = False
    for i, msg in enumerate(current_llm_input):
        if msg.role.value == "system":
            current_llm_input[i] = ChatMessage(role="system", content=system_prompt)
            has_system = True
            break

    if not has_system:
        current_llm_input.insert(
            0, ChatMessage(role="system", content=system_prompt)
        )

    # Write the input to the event stream
    ctx.write_event_to_stream(
        AgentInput(input=current_llm_input, current_agent_name=self.name)
    )

    # For now, only support the handoff tool
    # All other tools should be part of the code execution
    if any(tool.metadata.name == "handoff" for tool in tools):
        if not isinstance(self.llm, FunctionCallingLLM):
            raise ValueError("llm must be a function calling LLM to use handoff")

        tools = [tool for tool in tools if tool.metadata.name == "handoff"]
        response = await self.llm.astream_chat_with_tools(
            tools, chat_history=current_llm_input
        )
    else:
        response = await self.llm.astream_chat(current_llm_input)

    # Initialize for streaming
    last_chat_response = ChatResponse(message=ChatMessage())
    full_response_text = ""

    # Process streaming response
    async for last_chat_response in response:
        delta = last_chat_response.delta or ""
        full_response_text += delta

        # Create a raw object for the event stream
        raw = (
            last_chat_response.raw.model_dump()
            if isinstance(last_chat_response.raw, BaseModel)
            else last_chat_response.raw
        )

        # Write delta to the event stream
        ctx.write_event_to_stream(
            AgentStream(
                delta=delta,
                response=full_response_text,
                # We'll add the tool call after processing the full response
                tool_calls=[],
                raw=raw,
                current_agent_name=self.name,
            )
        )

    # Extract code from the response
    code = self._extract_code_from_response(full_response_text)

    # Create a tool call for executing the code if code was found
    tool_calls = []
    if code:
        tool_id = str(uuid.uuid4())

        tool_calls = [
            ToolSelection(
                tool_id=tool_id,
                tool_name=EXECUTE_TOOL_NAME,
                tool_kwargs={"code": code},
            )
        ]

    if isinstance(self.llm, FunctionCallingLLM):
        extra_tool_calls = self.llm.get_tool_calls_from_response(
            last_chat_response, error_on_no_tool_call=False
        )
        tool_calls.extend(extra_tool_calls)

    # Add the response to the scratchpad
    message = ChatMessage(role="assistant", content=full_response_text)
    scratchpad.append(message)
    await ctx.set(self.scratchpad_key, scratchpad)

    # Create the raw object for the output
    raw = (
        last_chat_response.raw.model_dump()
        if isinstance(last_chat_response.raw, BaseModel)
        else last_chat_response.raw
    )

    return AgentOutput(
        response=message,
        tool_calls=tool_calls,
        raw=raw,
        current_agent_name=self.name,
    )

handle_tool_call_results async #

handle_tool_call_results(ctx: Context, results: List[ToolCallResult], memory: BaseMemory) -> None

处理 Code Act 代理的工具调用结果。

源代码位于 llama-index-core/llama_index/core/agent/workflow/codeact_agent.py
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
async def handle_tool_call_results(
    self, ctx: Context, results: List[ToolCallResult], memory: BaseMemory
) -> None:
    """Handle tool call results for code act agent."""
    scratchpad: List[ChatMessage] = await ctx.get(self.scratchpad_key, default=[])

    # handle code execution and handoff
    for tool_call_result in results:
        # Format the output as a tool response message
        if tool_call_result.tool_name == EXECUTE_TOOL_NAME:
            code_result = f"Result of executing the code given:\n\n{tool_call_result.tool_output.content}"
            scratchpad.append(
                ChatMessage(
                    role="user",
                    content=code_result,
                )
            )
        elif tool_call_result.tool_name == "handoff":
            scratchpad.append(
                ChatMessage(
                    role="tool",
                    content=str(tool_call_result.tool_output.content),
                    additional_kwargs={"tool_call_id": tool_call_result.tool_id},
                )
            )
        else:
            raise ValueError(f"Unknown tool name: {tool_call_result.tool_name}")

    await ctx.set(self.scratchpad_key, scratchpad)

finalize async #

finalize(ctx: Context, output: AgentOutput, memory: BaseMemory) -> AgentOutput

完成 Code Act 代理。

将所有进行中的消息添加到内存中。

源代码位于 llama-index-core/llama_index/core/agent/workflow/codeact_agent.py
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
async def finalize(
    self, ctx: Context, output: AgentOutput, memory: BaseMemory
) -> AgentOutput:
    """
    Finalize the code act agent.

    Adds all in-progress messages to memory.
    """
    scratchpad: List[ChatMessage] = await ctx.get(self.scratchpad_key, default=[])
    await memory.aput_messages(scratchpad)

    # reset scratchpad
    await ctx.set(self.scratchpad_key, [])

    return output

AgentInput #

继承自: Event

LLM 输入。

参数

名称 类型 描述 默认值
input list[ChatMessage]
required
current_agent_name str
required
源代码位于 llama-index-core/llama_index/core/agent/workflow/workflow_events.py
 8
 9
10
11
12
class AgentInput(Event):
    """LLM input."""

    input: list[ChatMessage]
    current_agent_name: str

AgentStream #

继承自: Event

代理流。

参数

名称 类型 描述 默认值
delta str
required
response str
required
current_agent_name str
required
tool_calls list[ToolSelection]
required
raw Any
required
源代码位于 llama-index-core/llama_index/core/agent/workflow/workflow_events.py
22
23
24
25
26
27
28
29
class AgentStream(Event):
    """Agent stream."""

    delta: str
    response: str
    current_agent_name: str
    tool_calls: list[ToolSelection]
    raw: Any

AgentOutput #

继承自: Event

LLM 输出。

参数

名称 类型 描述 默认值
response ChatMessage
required
tool_calls list[ToolSelection]
required
raw Any
required
current_agent_name str
required
源代码位于 llama-index-core/llama_index/core/agent/workflow/workflow_events.py
32
33
34
35
36
37
38
39
40
41
class AgentOutput(Event):
    """LLM output."""

    response: ChatMessage
    tool_calls: list[ToolSelection]
    raw: Any
    current_agent_name: str

    def __str__(self) -> str:
        return self.response.content or ""

ToolCall #

继承自: Event

所有工具调用都被暴露。

参数

名称 类型 描述 默认值
tool_name str
required
tool_kwargs dict
required
tool_id str
required
源代码位于 llama-index-core/llama_index/core/agent/workflow/workflow_events.py
44
45
46
47
48
49
class ToolCall(Event):
    """All tool calls are surfaced."""

    tool_name: str
    tool_kwargs: dict
    tool_id: str

ToolCallResult #

继承自: ToolCall

工具调用结果。

参数

名称 类型 描述 默认值
tool_output ToolOutput
required
return_direct bool
required
源代码位于 llama-index-core/llama_index/core/agent/workflow/workflow_events.py
52
53
54
55
56
class ToolCallResult(ToolCall):
    """Tool call result."""

    tool_output: ToolOutput
    return_direct: bool