使用模式#
使用模式指南更深入地介绍了 QueryPipeline
的设置和使用。
设置流水线#
这里我们将介绍几种不同的设置查询流水线的方法。
定义顺序链#
一些简单的流水线本质上是纯线性的 - 前一个模块的输出直接作为下一个模块的输入。
一些示例
- 提示 -> LLM -> 输出解析
- 提示 -> LLM -> 提示 -> LLM
- Retriever -> 响应合成器
这些工作流可以通过简化的 chain
语法在 QueryPipeline
中轻松表达。
from llama_index.core.query_pipeline import QueryPipeline
# try chaining basic prompts
prompt_str = "Please generate related movies to {movie_name}"
prompt_tmpl = PromptTemplate(prompt_str)
llm = OpenAI(model="gpt-3.5-turbo")
p = QueryPipeline(chain=[prompt_tmpl, llm], verbose=True)
定义 DAG#
许多流水线会要求您设置一个 DAG (例如,如果您想实现标准 RAG 流水线中的所有步骤)。
这里我们提供一个低层 API 来添加模块及其键,并定义前一个模块输出到下一个模块输入之间的链接。
from llama_index.postprocessor.cohere_rerank import CohereRerank
from llama_index.core.response_synthesizers import TreeSummarize
# define modules
prompt_str = "Please generate a question about Paul Graham's life regarding the following topic {topic}"
prompt_tmpl = PromptTemplate(prompt_str)
llm = OpenAI(model="gpt-3.5-turbo")
retriever = index.as_retriever(similarity_top_k=3)
reranker = CohereRerank()
summarizer = TreeSummarize(llm=llm)
# define query pipeline
p = QueryPipeline(verbose=True)
p.add_modules(
{
"llm": llm,
"prompt_tmpl": prompt_tmpl,
"retriever": retriever,
"summarizer": summarizer,
"reranker": reranker,
}
)
p.add_link("prompt_tmpl", "llm")
p.add_link("llm", "retriever")
p.add_link("retriever", "reranker", dest_key="nodes")
p.add_link("llm", "reranker", dest_key="query_str")
p.add_link("reranker", "summarizer", dest_key="nodes")
p.add_link("llm", "summarizer", dest_key="query_str")
运行流水线#
单输入/单输出#
输入是第一个组件的 kwargs。
如果最后一个组件的输出是单个对象 (而不是对象字典),则我们直接返回该对象。
以上一个示例中的流水线为例,由于最后一步是 TreeSummarize
响应合成模块,输出将是一个 Response
对象。
output = p.run(topic="YC")
# output type is Response
type(output)
多输入/多输出#
如果您的 DAG 有多个根节点和/或输出节点,您可以尝试使用 run_multi
。传入一个包含模块键 -> 输入字典的输入字典。输出是一个模块键 -> 输出字典的字典。
如果我们运行上一个示例,
output_dict = p.run_multi({"llm": {"topic": "YC"}})
print(output_dict)
# output dict is {"summarizer": {"output": response}}
定义偏函数#
如果您希望预填充模块的某些输入,您可以使用 partial
来实现!然后 DAG 将只连接到未填充的输入。
您可能需要通过 as_query_component
转换模块。
这里有一个示例
summarizer = TreeSummarize(llm=llm)
summarizer_c = summarizer.as_query_component(partial={"nodes": nodes})
# can define a chain because llm output goes into query_str, nodes is pre-filled
p = QueryPipeline(chain=[prompt_tmpl, llm, summarizer_c])
# run pipeline
p.run(topic="YC")
批量输入#
如果您希望运行流水线进行多轮单输入/多输入,可以在函数调用中设置 batch=True
- run
、arun
、run_multi
和 arun_multi
都支持。传入一个您想要运行的单独单输入/多输入列表。batch
模式将返回一个与输入顺序相同的响应列表。
单输入/单输出示例:p.run(field=[in1: Any, in2: Any], batch=True)
--> [out1: Any, out2: Any]
output = p.run(topic=["YC", "RAG", "LlamaIndex"], batch=True)
# output is [ResponseYC, ResponseRAG, ResponseLlamaIndex]
print(output)
多输入/多输出示例:p.run_multi("root_node": {"field": [in1: Any, in2, Any]}, batch=True)
--> {"output_node": {"field": [out1: Any, out2: Any]}}
output_dict = p.run_multi({"llm": {"topic": ["YC", "RAG", "LlamaIndex"]}})
print(output_dict)
# output dict is {"summarizer": {"output": [ResponseYC, ResponseRAG, ResponseLlamaIndex]}}
中间输出#
如果您希望获取 QueryPipeline 中模块的中间输出,对于单输入和多输入,可以分别使用 run_with_intermediates
或 run_multi_with_intermediates
。
输出将是一个包含正常输出和一个字典的元组,该字典包含模块键 -> ComponentIntermediates
。ComponentIntermediates 有两个字段:inputs
字典和 outputs
字典。
output, intermediates = p.run_with_intermediates(topic="YC")
print(output)
print(intermediates)
# output is (Response, {"module_key": ComponentIntermediates("inputs": {}, "outputs": {})})
定义自定义查询组件#
您可以轻松地定义自定义组件:可以是将函数传递给 FnComponent
,也可以是继承 CustomQueryComponent
。
将函数传递给 FnComponent
#
定义任意函数并将其传递给 FnComponent
。位置参数名称 (args
) 将被转换为必需的输入键,而关键字参数名称 (kwargs
) 将被转换为可选的输入键。
**注意**:我们假定只有一个输出。
from llama_index.core.query_pipeline import FnComponent
def add(a: int, b: int) -> int:
"""Adds two numbers."""
return a + b
add_component = FnComponent(fn=add, output_key="output")
# input keys to add_component are "a" and "b", output key is 'output'
继承 CustomQueryComponent
#
只需继承 CustomQueryComponent
,实现验证/运行函数 + 一些辅助函数,然后将其插入即可。
from llama_index.core.query_pipeline import CustomQueryComponent
from typing import Dict, Any
class MyComponent(CustomQueryComponent):
"""My component."""
# Pydantic class, put any attributes here
...
def _validate_component_inputs(
self, input: Dict[str, Any]
) -> Dict[str, Any]:
"""Validate component inputs during run_component."""
# NOTE: this is OPTIONAL but we show you here how to do validation as an example
return input
@property
def _input_keys(self) -> set:
"""Input keys dict."""
return {"input_key1", ...}
@property
def _output_keys(self) -> set:
# can do multi-outputs too
return {"output_key"}
def _run_component(self, **kwargs) -> Dict[str, Any]:
"""Run the component."""
# run logic
...
return {"output_key": result}
有关更多详细信息,请查阅我们深入的查询转换指南。
确保输出兼容#
通过链接 QueryPipeline
中的模块,一个模块的输出会作为下一个模块的输入。
通常,您必须确保为了使链接工作,期望的输出和输入类型 *大致* 一致。
我们说大致一致,是因为我们对现有模块做了一些特殊处理,以确保“可字符串化”的输出可以传递给可以作为“字符串”查询的输入。某些输出类型被视为可字符串化 - CompletionResponse
、ChatResponse
、Response
、QueryBundle
等。Retrievers/查询引擎会自动将 string
输入转换为 QueryBundle
对象。
这使您可以执行某些工作流,否则如果您自己编写代码,将需要大量的字符串转换样板代码,例如,
- LLM -> 提示, LLM -> Retriever, LLM -> 查询引擎
- 查询引擎 -> 提示, 查询引擎 -> Retriever
如果您正在定义自定义组件,您应该使用 _validate_component_inputs
来确保输入类型正确,并在类型不正确时抛出错误。