跳到内容

使用模式#

使用模式指南更深入地介绍了 QueryPipeline 的设置和使用。

设置流水线#

这里我们将介绍几种不同的设置查询流水线的方法。

定义顺序链#

一些简单的流水线本质上是纯线性的 - 前一个模块的输出直接作为下一个模块的输入。

一些示例

  • 提示 -> LLM -> 输出解析
  • 提示 -> LLM -> 提示 -> LLM
  • Retriever -> 响应合成器

这些工作流可以通过简化的 chain 语法在 QueryPipeline 中轻松表达。

from llama_index.core.query_pipeline import QueryPipeline

# try chaining basic prompts
prompt_str = "Please generate related movies to {movie_name}"
prompt_tmpl = PromptTemplate(prompt_str)
llm = OpenAI(model="gpt-3.5-turbo")

p = QueryPipeline(chain=[prompt_tmpl, llm], verbose=True)

定义 DAG#

许多流水线会要求您设置一个 DAG (例如,如果您想实现标准 RAG 流水线中的所有步骤)。

这里我们提供一个低层 API 来添加模块及其键,并定义前一个模块输出到下一个模块输入之间的链接。

from llama_index.postprocessor.cohere_rerank import CohereRerank
from llama_index.core.response_synthesizers import TreeSummarize

# define modules
prompt_str = "Please generate a question about Paul Graham's life regarding the following topic {topic}"
prompt_tmpl = PromptTemplate(prompt_str)
llm = OpenAI(model="gpt-3.5-turbo")
retriever = index.as_retriever(similarity_top_k=3)
reranker = CohereRerank()
summarizer = TreeSummarize(llm=llm)

# define query pipeline
p = QueryPipeline(verbose=True)
p.add_modules(
    {
        "llm": llm,
        "prompt_tmpl": prompt_tmpl,
        "retriever": retriever,
        "summarizer": summarizer,
        "reranker": reranker,
    }
)
p.add_link("prompt_tmpl", "llm")
p.add_link("llm", "retriever")
p.add_link("retriever", "reranker", dest_key="nodes")
p.add_link("llm", "reranker", dest_key="query_str")
p.add_link("reranker", "summarizer", dest_key="nodes")
p.add_link("llm", "summarizer", dest_key="query_str")

运行流水线#

单输入/单输出#

输入是第一个组件的 kwargs。

如果最后一个组件的输出是单个对象 (而不是对象字典),则我们直接返回该对象。

以上一个示例中的流水线为例,由于最后一步是 TreeSummarize 响应合成模块,输出将是一个 Response 对象。

output = p.run(topic="YC")
# output type is Response
type(output)

多输入/多输出#

如果您的 DAG 有多个根节点和/或输出节点,您可以尝试使用 run_multi。传入一个包含模块键 -> 输入字典的输入字典。输出是一个模块键 -> 输出字典的字典。

如果我们运行上一个示例,

output_dict = p.run_multi({"llm": {"topic": "YC"}})
print(output_dict)

# output dict is {"summarizer": {"output": response}}

定义偏函数#

如果您希望预填充模块的某些输入,您可以使用 partial 来实现!然后 DAG 将只连接到未填充的输入。

您可能需要通过 as_query_component 转换模块。

这里有一个示例

summarizer = TreeSummarize(llm=llm)
summarizer_c = summarizer.as_query_component(partial={"nodes": nodes})
# can define a chain because llm output goes into query_str, nodes is pre-filled
p = QueryPipeline(chain=[prompt_tmpl, llm, summarizer_c])
# run pipeline
p.run(topic="YC")

批量输入#

如果您希望运行流水线进行多轮单输入/多输入,可以在函数调用中设置 batch=True - runarunrun_multiarun_multi 都支持。传入一个您想要运行的单独单输入/多输入列表。batch 模式将返回一个与输入顺序相同的响应列表。

单输入/单输出示例:p.run(field=[in1: Any, in2: Any], batch=True) --> [out1: Any, out2: Any]

output = p.run(topic=["YC", "RAG", "LlamaIndex"], batch=True)
# output is [ResponseYC, ResponseRAG, ResponseLlamaIndex]
print(output)

多输入/多输出示例:p.run_multi("root_node": {"field": [in1: Any, in2, Any]}, batch=True) --> {"output_node": {"field": [out1: Any, out2: Any]}}

output_dict = p.run_multi({"llm": {"topic": ["YC", "RAG", "LlamaIndex"]}})
print(output_dict)

# output dict is {"summarizer": {"output": [ResponseYC, ResponseRAG, ResponseLlamaIndex]}}

中间输出#

如果您希望获取 QueryPipeline 中模块的中间输出,对于单输入和多输入,可以分别使用 run_with_intermediatesrun_multi_with_intermediates

输出将是一个包含正常输出和一个字典的元组,该字典包含模块键 -> ComponentIntermediates。ComponentIntermediates 有两个字段:inputs 字典和 outputs 字典。

output, intermediates = p.run_with_intermediates(topic="YC")
print(output)
print(intermediates)

# output is (Response, {"module_key": ComponentIntermediates("inputs": {}, "outputs": {})})

定义自定义查询组件#

您可以轻松地定义自定义组件:可以是将函数传递给 FnComponent,也可以是继承 CustomQueryComponent

将函数传递给 FnComponent#

定义任意函数并将其传递给 FnComponent。位置参数名称 (args) 将被转换为必需的输入键,而关键字参数名称 (kwargs) 将被转换为可选的输入键。

**注意**:我们假定只有一个输出。

from llama_index.core.query_pipeline import FnComponent


def add(a: int, b: int) -> int:
    """Adds two numbers."""
    return a + b


add_component = FnComponent(fn=add, output_key="output")

# input keys to add_component are "a" and "b", output key is 'output'

继承 CustomQueryComponent#

只需继承 CustomQueryComponent,实现验证/运行函数 + 一些辅助函数,然后将其插入即可。

from llama_index.core.query_pipeline import CustomQueryComponent
from typing import Dict, Any


class MyComponent(CustomQueryComponent):
    """My component."""

    # Pydantic class, put any attributes here
    ...

    def _validate_component_inputs(
        self, input: Dict[str, Any]
    ) -> Dict[str, Any]:
        """Validate component inputs during run_component."""
        # NOTE: this is OPTIONAL but we show you here how to do validation as an example
        return input

    @property
    def _input_keys(self) -> set:
        """Input keys dict."""
        return {"input_key1", ...}

    @property
    def _output_keys(self) -> set:
        # can do multi-outputs too
        return {"output_key"}

    def _run_component(self, **kwargs) -> Dict[str, Any]:
        """Run the component."""
        # run logic
        ...
        return {"output_key": result}

有关更多详细信息,请查阅我们深入的查询转换指南

确保输出兼容#

通过链接 QueryPipeline 中的模块,一个模块的输出会作为下一个模块的输入。

通常,您必须确保为了使链接工作,期望的输出和输入类型 *大致* 一致。

我们说大致一致,是因为我们对现有模块做了一些特殊处理,以确保“可字符串化”的输出可以传递给可以作为“字符串”查询的输入。某些输出类型被视为可字符串化 - CompletionResponseChatResponseResponseQueryBundle 等。Retrievers/查询引擎会自动将 string 输入转换为 QueryBundle 对象。

这使您可以执行某些工作流,否则如果您自己编写代码,将需要大量的字符串转换样板代码,例如,

  • LLM -> 提示, LLM -> Retriever, LLM -> 查询引擎
  • 查询引擎 -> 提示, 查询引擎 -> Retriever

如果您正在定义自定义组件,您应该使用 _validate_component_inputs 来确保输入类型正确,并在类型不正确时抛出错误。