查询流水线聊天引擎¶
通过将查询流水线与记忆缓冲区结合,我们可以设计自己的定制聊天引擎循环。
In [ ]
已复制!
%pip install llama-index-core
%pip install llama-index-llms-openai
%pip install llama-index-embeddings-openai
%pip install llama-index-postprocessor-colbert-rerank
%pip install llama-index-readers-web
%pip install llama-index-core %pip install llama-index-llms-openai %pip install llama-index-embeddings-openai %pip install llama-index-postprocessor-colbert-rerank %pip install llama-index-readers-web
In [ ]
已复制!
import os
os.environ["OPENAI_API_KEY"] = "sk-..."
import os os.environ["OPENAI_API_KEY"] = "sk-..."
索引构建¶
作为测试,我们将索引 Anthropic 关于工具/函数调用的最新文档。
In [ ]
已复制!
from llama_index.readers.web import BeautifulSoupWebReader
reader = BeautifulSoupWebReader()
documents = reader.load_data(
["https://docs.anthropic.com/claude/docs/tool-use"]
)
from llama_index.readers.web import BeautifulSoupWebReader reader = BeautifulSoupWebReader() documents = reader.load_data( ["https://docs.anthropic.com/claude/docs/tool-use"] )
如果你检查文档文本,你会注意到有太多空行,让我们稍微清理一下。
In [ ]
已复制!
lines = documents[0].text.split("\n")
# remove sections with more than two empty lines in a row
fixed_lines = [lines[0]]
for idx in range(1, len(lines)):
if lines[idx].strip() == "" and lines[idx - 1].strip() == "":
continue
fixed_lines.append(lines[idx])
documents[0].text = "\n".join(fixed_lines)
lines = documents[0].text.split("\n") # remove sections with more than two empty lines in a row # 移除连续超过两行的空白行 fixed_lines = [lines[0]] for idx in range(1, len(lines)): if lines[idx].strip() == "" and lines[idx - 1].strip() == "": continue fixed_lines.append(lines[idx]) documents[0].text = "\n".join(fixed_lines)
现在,我们可以使用 OpenAI 嵌入来创建索引。
In [ ]
已复制!
from llama_index.core import VectorStoreIndex
from llama_index.embeddings.openai import OpenAIEmbedding
index = VectorStoreIndex.from_documents(
documents,
embed_model=OpenAIEmbedding(
model="text-embedding-3-large", embed_batch_size=256
),
)
from llama_index.core import VectorStoreIndex from llama_index.embeddings.openai import OpenAIEmbedding index = VectorStoreIndex.from_documents( documents, embed_model=OpenAIEmbedding( model="text-embedding-3-large", embed_batch_size=256 ), )
查询流水线构建¶
作为演示,我们来构建一个鲁棒的查询流水线,使用 HyDE 进行检索,使用 Colbert 进行重排序。
In [ ]
已复制!
from llama_index.core.query_pipeline import (
QueryPipeline,
InputComponent,
ArgPackComponent,
)
from llama_index.core.prompts import PromptTemplate
from llama_index.llms.openai import OpenAI
from llama_index.postprocessor.colbert_rerank import ColbertRerank
# First, we create an input component to capture the user query
input_component = InputComponent()
# Next, we use the LLM to rewrite a user query
rewrite = (
"Please write a query to a semantic search engine using the current conversation.\n"
"\n"
"\n"
"{chat_history_str}"
"\n"
"\n"
"Latest message: {query_str}\n"
'Query:"""\n'
)
rewrite_template = PromptTemplate(rewrite)
llm = OpenAI(
model="gpt-4-turbo-preview",
temperature=0.2,
)
# we will retrieve two times, so we need to pack the retrieved nodes into a single list
argpack_component = ArgPackComponent()
# using that, we will retrieve...
retriever = index.as_retriever(similarity_top_k=6)
# then postprocess/rerank with Colbert
reranker = ColbertRerank(top_n=3)
from llama_index.core.query_pipeline import ( QueryPipeline, InputComponent, ArgPackComponent, ) from llama_index.core.prompts import PromptTemplate from llama_index.llms.openai import OpenAI from llama_index.postprocessor.colbert_rerank import ColbertRerank # First, we create an input component to capture the user query # 首先,我们创建一个输入组件来捕获用户查询 input_component = InputComponent() # Next, we use the LLM to rewrite a user query # 接下来,我们使用 LLM 重写用户查询 rewrite = ( "Please write a query to a semantic search engine using the current conversation.\n" "\n" "\n" "{chat_history_str}" "\n" "\n" "Latest message: {query_str}\n" 'Query:"""\n' ) rewrite_template = PromptTemplate(rewrite) llm = OpenAI( model="gpt-4-turbo-preview", temperature=0.2, ) # we will retrieve two times, so we need to pack the retrieved nodes into a single list # 我们将检索两次,因此需要将检索到的节点打包到一个列表中 argpack_component = ArgPackComponent() # using that, we will retrieve... # 使用该组件,我们将进行检索... retriever = index.as_retriever(similarity_top_k=6) # then postprocess/rerank with Colbert # 然后使用 Colbert 进行后处理/重排序 reranker = ColbertRerank(top_n=3)
为了使用聊天记录 + 检索到的节点来生成响应,让我们创建一个自定义组件。
In [ ]
已复制!
# then lastly, we need to create a response using the nodes AND chat history
from typing import Any, Dict, List, Optional
from llama_index.core.bridge.pydantic import Field
from llama_index.core.llms import ChatMessage
from llama_index.core.query_pipeline import CustomQueryComponent
from llama_index.core.schema import NodeWithScore
DEFAULT_CONTEXT_PROMPT = (
"Here is some context that may be relevant:\n"
"-----\n"
"{node_context}\n"
"-----\n"
"Please write a response to the following question, using the above context:\n"
"{query_str}\n"
)
class ResponseWithChatHistory(CustomQueryComponent):
llm: OpenAI = Field(..., description="OpenAI LLM")
system_prompt: Optional[str] = Field(
default=None, description="System prompt to use for the LLM"
)
context_prompt: str = Field(
default=DEFAULT_CONTEXT_PROMPT,
description="Context prompt to use for the LLM",
)
def _validate_component_inputs(
self, input: Dict[str, Any]
) -> Dict[str, Any]:
"""Validate component inputs during run_component."""
# NOTE: this is OPTIONAL but we show you where to do validation as an example
return input
@property
def _input_keys(self) -> set:
"""Input keys dict."""
# NOTE: These are required inputs. If you have optional inputs please override
# `optional_input_keys_dict`
return {"chat_history", "nodes", "query_str"}
@property
def _output_keys(self) -> set:
return {"response"}
def _prepare_context(
self,
chat_history: List[ChatMessage],
nodes: List[NodeWithScore],
query_str: str,
) -> List[ChatMessage]:
node_context = ""
for idx, node in enumerate(nodes):
node_text = node.get_content(metadata_mode="llm")
node_context += f"Context Chunk {idx}:\n{node_text}\n\n"
formatted_context = self.context_prompt.format(
node_context=node_context, query_str=query_str
)
user_message = ChatMessage(role="user", content=formatted_context)
chat_history.append(user_message)
if self.system_prompt is not None:
chat_history = [
ChatMessage(role="system", content=self.system_prompt)
] + chat_history
return chat_history
def _run_component(self, **kwargs) -> Dict[str, Any]:
"""Run the component."""
chat_history = kwargs["chat_history"]
nodes = kwargs["nodes"]
query_str = kwargs["query_str"]
prepared_context = self._prepare_context(
chat_history, nodes, query_str
)
response = llm.chat(prepared_context)
return {"response": response}
async def _arun_component(self, **kwargs: Any) -> Dict[str, Any]:
"""Run the component asynchronously."""
# NOTE: Optional, but async LLM calls are easy to implement
chat_history = kwargs["chat_history"]
nodes = kwargs["nodes"]
query_str = kwargs["query_str"]
prepared_context = self._prepare_context(
chat_history, nodes, query_str
)
response = await llm.achat(prepared_context)
return {"response": response}
response_component = ResponseWithChatHistory(
llm=llm,
system_prompt=(
"You are a Q&A system. You will be provided with the previous chat history, "
"as well as possibly relevant context, to assist in answering a user message."
),
)
# 然后最后,我们需要使用节点和聊天历史来创建响应 from typing import Any, Dict, List, Optional from llama_index.core.bridge.pydantic import Field from llama_index.core.llms import ChatMessage from llama_index.core.query_pipeline import CustomQueryComponent from llama_index.core.schema import NodeWithScore DEFAULT_CONTEXT_PROMPT = ( "Here is some context that may be relevant:\n" "-----\n" "{node_context}\n" "-----\n" "Please write a response to the following question, using the above context:\n" "{query_str}\n" ) class ResponseWithChatHistory(CustomQueryComponent): llm: OpenAI = Field(..., description="OpenAI 大语言模型") system_prompt: Optional[str] = Field( default=None, description="用于大语言模型的系统提示" ) context_prompt: str = Field( default=DEFAULT_CONTEXT_PROMPT, description="用于大语言模型的上下文提示", ) def _validate_component_inputs( self, input: Dict[str, Any] ) -> Dict[str, Any]: """在 run_component 期间验证组件输入。""" # 注意:这是可选的,但我们在这里展示如何进行验证作为示例 return input @property def _input_keys(self) -> set: """输入键字典。""" # 注意:这些是必需的输入。如果存在可选输入,请覆盖 # `optional_input_keys_dict` return {"chat_history", "nodes", "query_str"} @property def _output_keys(self) -> set: return {"response"} def _prepare_context( self, chat_history: List[ChatMessage], nodes: List[NodeWithScore], query_str: str, ) -> List[ChatMessage]: node_context = "" for idx, node in enumerate(nodes): node_text = node.get_content(metadata_mode="llm") node_context += f"Context Chunk {idx}:\n{node_text}\n\n" formatted_context = self.context_prompt.format( node_context=node_context, query_str=query_str ) user_message = ChatMessage(role="user", content=formatted_context) chat_history.append(user_message) if self.system_prompt is not None: chat_history = [ ChatMessage(role="system", content=self.system_prompt) ] + chat_history return chat_history def _run_component(self, **kwargs) -> Dict[str, Any]: """运行组件。""" chat_history = kwargs["chat_history"] nodes = kwargs["nodes"] query_str = kwargs["query_str"] prepared_context = self._prepare_context( chat_history, nodes, query_str ) response = llm.chat(prepared_context) return {"response": response} async def _arun_component(self, **kwargs: Any) -> Dict[str, Any]: """异步运行组件。""" # 注意:可选,但异步大语言模型调用很容易实现 chat_history = kwargs["chat_history"] nodes = kwargs["nodes"] query_str = kwargs["query_str"] prepared_context = self._prepare_context( chat_history, nodes, query_str ) response = await llm.achat(prepared_context) return {"response": response} response_component = ResponseWithChatHistory( llm=llm, system_prompt=( "您是一个问答系统。您将获得先前的聊天历史," "以及可能相关的上下文,以帮助回答用户消息。" ), )
创建好模块后,我们可以将它们连接起来构建一个查询管道。
In [ ]
已复制!
pipeline = QueryPipeline(
modules={
"input": input_component,
"rewrite_template": rewrite_template,
"llm": llm,
"rewrite_retriever": retriever,
"query_retriever": retriever,
"join": argpack_component,
"reranker": reranker,
"response_component": response_component,
},
verbose=False,
)
# run both retrievers -- once with the hallucinated query, once with the real query
pipeline.add_link(
"input", "rewrite_template", src_key="query_str", dest_key="query_str"
)
pipeline.add_link(
"input",
"rewrite_template",
src_key="chat_history_str",
dest_key="chat_history_str",
)
pipeline.add_link("rewrite_template", "llm")
pipeline.add_link("llm", "rewrite_retriever")
pipeline.add_link("input", "query_retriever", src_key="query_str")
# each input to the argpack component needs a dest key -- it can be anything
# then, the argpack component will pack all the inputs into a single list
pipeline.add_link("rewrite_retriever", "join", dest_key="rewrite_nodes")
pipeline.add_link("query_retriever", "join", dest_key="query_nodes")
# reranker needs the packed nodes and the query string
pipeline.add_link("join", "reranker", dest_key="nodes")
pipeline.add_link(
"input", "reranker", src_key="query_str", dest_key="query_str"
)
# synthesizer needs the reranked nodes and query str
pipeline.add_link("reranker", "response_component", dest_key="nodes")
pipeline.add_link(
"input", "response_component", src_key="query_str", dest_key="query_str"
)
pipeline.add_link(
"input",
"response_component",
src_key="chat_history",
dest_key="chat_history",
)
pipeline = QueryPipeline( modules={ "input": input_component, "rewrite_template": rewrite_template, "llm": llm, "rewrite_retriever": retriever, "query_retriever": retriever, "join": argpack_component, "reranker": reranker, "response_component": response_component, }, verbose=False, ) # 运行两个检索器——一次使用幻觉查询,一次使用真实查询 pipeline.add_link( "input", "rewrite_template", src_key="query_str", dest_key="query_str" ) pipeline.add_link( "input", "rewrite_template", src_key="chat_history_str", dest_key="chat_history_str", ) pipeline.add_link("rewrite_template", "llm") pipeline.add_link("llm", "rewrite_retriever") pipeline.add_link("input", "query_retriever", src_key="query_str") # argpack 组件的每个输入都需要一个 dest 键——可以是任何东西 # 然后,argpack 组件将所有输入打包到一个列表中 pipeline.add_link("rewrite_retriever", "join", dest_key="rewrite_nodes") pipeline.add_link("query_retriever", "join", dest_key="query_nodes") # 重新排序器需要打包的节点和查询字符串 pipeline.add_link("join", "reranker", dest_key="nodes") pipeline.add_link( "input", "reranker", src_key="query_str", dest_key="query_str" ) # 合成器需要重新排序的节点和查询字符串 pipeline.add_link("reranker", "response_component", dest_key="nodes") pipeline.add_link( "input", "response_component", src_key="query_str", dest_key="query_str" ) pipeline.add_link( "input", "response_component", src_key="chat_history", dest_key="chat_history", )
让我们测试一下管道,确认它能正常工作!
使用内存运行管道¶
上面的管道使用两个输入 -- 一个查询字符串和一个 chat_history 列表。
查询字符串就是简单的字符串输入/查询。
聊天记录列表是一个 ChatMessage 对象的列表。我们可以使用来自 llama-index 的内存模块来直接管理和创建内存!
In [ ]
已复制!
from llama_index.core.memory import ChatMemoryBuffer
pipeline_memory = ChatMemoryBuffer.from_defaults(token_limit=8000)
from llama_index.core.memory import ChatMemoryBuffer pipeline_memory = ChatMemoryBuffer.from_defaults(token_limit=8000)
让我们预先创建一个“聊天会话”,并观察其运行过程。
In [ ]
已复制!
user_inputs = [
"Hello!",
"How does tool-use work with Claude-3 work?",
"What models support it?",
"Thanks, that what I needed to know!",
]
for msg in user_inputs:
# get memory
chat_history = pipeline_memory.get()
# prepare inputs
chat_history_str = "\n".join([str(x) for x in chat_history])
# run pipeline
response = pipeline.run(
query_str=msg,
chat_history=chat_history,
chat_history_str=chat_history_str,
)
# update memory
user_msg = ChatMessage(role="user", content=msg)
pipeline_memory.put(user_msg)
print(str(user_msg))
pipeline_memory.put(response.message)
print(str(response.message))
print()
user_inputs = [ "Hello!", "How does tool-use work with Claude-3 work?", "What models support it?", "Thanks, that what I needed to know!", ] for msg in user_inputs: # 获取内存中的聊天历史 chat_history = pipeline_memory.get() # 准备输入 chat_history_str = "\n".join([str(x) for x in chat_history]) # 运行管道 response = pipeline.run( query_str=msg, chat_history=chat_history, chat_history_str=chat_history_str, ) # 更新内存 user_msg = ChatMessage(role="user", content=msg) pipeline_memory.put(user_msg) print(str(user_msg)) pipeline_memory.put(response.message) print(str(response.message)) print()
user: Hello! assistant: Hello! How can I assist you today? user: How does tool-use work with Claude-3 work? assistant: Tool use with Claude-3 operates under a framework designed to extend the model's capabilities by integrating it with external data sources and functionalities through user-provided tools. This process involves several key steps and considerations to ensure effective tool integration and utilization. Here's a breakdown of how tool use works with Claude-3: 1. **Tool Specification**: Users define tools in the API request, specifying the tool's name, a detailed description of its purpose and behavior, and an input schema that outlines the expected parameters. This schema is crucial for Claude to understand when and how to use the tool correctly. 2. **Decision to Use a Tool**: When Claude-3 receives a user prompt that may benefit from tool use, it assesses whether any available tools can assist with the query or task. This decision is based on the context provided by the user and the detailed descriptions of the tools. 3. **Tool Use Request Formation**: If Claude decides to use a tool, it constructs a properly formatted tool use request. This includes selecting the appropriate tool(s) and determining the necessary inputs based on the user's prompt and the tool's input schema. 4. **Execution of Tool Code**: The actual execution of the tool code occurs on the client side. The system extracts the tool name and input from Claude's tool use request, runs the tool code, and then returns the results to Claude. 5. **Formulating a Response**: After receiving the tool results, Claude uses this information to formulate its final response to the user's original prompt. This step may involve interpreting the tool's output and integrating it into a coherent and informative answer. 6. **Sequential Tool Use**: Claude generally prefers using one tool at a time, using the output of one tool to inform its next action. This sequential approach helps manage dependencies between tools and simplifies the tool use process. 7. **Error Handling and Retries**: If a tool use request is invalid or missing required parameters, Claude can retry the request with the missing information filled in, based on error responses from the client side. However, after a few failed attempts, Claude may stop trying and apologize to the user. 8. **Debugging and Improvement**: Developers are encouraged to debug unexpected tool use behavior by examining Claude's chain of thought output and refining tool descriptions and schemas for clarity and comprehensiveness. By adhering to these steps and best practices, developers can effectively integrate and utilize tools with Claude-3, significantly expanding its capabilities beyond its base knowledge. This framework allows for the creation of complex, agentic orchestrations where Claude can perform a wide variety of tasks, from simple data retrieval to more complex problem-solving scenarios. user: What models support it? assistant: The tool use feature, as described in the provided context, is supported by Claude-3 models, including specific versions like Claude-3 Opus and Haiku. These models are designed to interact with external client-side tools and functions, allowing for a wide variety of tasks to be performed by equipping Claude with custom tools. The context specifically mentions Claude-3 Opus as being capable of handling more complex tool use scenarios, including managing multiple tools simultaneously and better catching missing arguments. Haiku is mentioned for dealing with more straightforward tools, inferring missing parameters when they are not explicitly given. user: Thanks, that what I needed to know! assistant: You're welcome! If you have any more questions or need further assistance, feel free to ask. Happy to help!