路由器查询引擎¶
RouterQueryEngine
从多个选项中选择最合适的查询引擎来处理给定的查询。
本笔记本演示了使用工作流程实现路由器查询引擎的过程。
具体来说,我们将实现 RouterQueryEngine。
!pip install -U llama-index
import os
os.environ["OPENAI_API_KEY"] = "sk-.."
由于工作流程是异步优先的,因此在笔记本中运行良好。如果你在自己的代码中运行,如果尚未运行异步事件循环,你会希望使用 asyncio.run()
来启动它。
async def main():
<async code>
if __name__ == "__main__":
import asyncio
asyncio.run(main())
定义事件¶
from llama_index.core.workflow import Event
from llama_index.core.base.base_selector import SelectorResult
from typing import Dict, List, Any
from llama_index.core.base.response.schema import RESPONSE_TYPE
class QueryEngineSelectionEvent(Event):
"""Result of selecting the query engine tools."""
selected_query_engines: SelectorResult
class SynthesizeEvent(Event):
"""Event for synthesizing the response from different query engines."""
result: List[RESPONSE_TYPE]
selected_query_engines: SelectorResult
工作流程¶
选择器
- 它将 StartEvent 作为输入,并返回 QueryEngineSelectionEvent。
- The
LLMSingleSelector
/PydanticSingleSelector
/PydanticMultiSelector
will select one/ multiple query engine tools.
生成响应
此函数使用选定的查询引擎生成响应,并返回 SynthesizeEvent。
合成响应
如果选择了多个查询引擎,此函数会合并生成的响应并合成最终响应;否则,返回单个生成的响应。
这些步骤将使用内置的 StartEvent
和 StopEvent
事件。
定义好事件后,我们可以构建工作流程和步骤。
from llama_index.core.workflow import (
Context,
Workflow,
StartEvent,
StopEvent,
step,
)
from llama_index.llms.openai import OpenAI
from llama_index.core.selectors.utils import get_selector_from_llm
from llama_index.core.base.response.schema import (
PydanticResponse,
Response,
AsyncStreamingResponse,
)
from llama_index.core.bridge.pydantic import BaseModel
from llama_index.core.response_synthesizers import TreeSummarize
from llama_index.core.schema import QueryBundle
from llama_index.core import Settings
from IPython.display import Markdown, display
import asyncio
class RouterQueryEngineWorkflow(Workflow):
@step
async def selector(
self, ctx: Context, ev: StartEvent
) -> QueryEngineSelectionEvent:
"""
Selects a single/ multiple query engines based on the query.
"""
await ctx.set("query", ev.get("query"))
await ctx.set("llm", ev.get("llm"))
await ctx.set("query_engine_tools", ev.get("query_engine_tools"))
await ctx.set("summarizer", ev.get("summarizer"))
llm = Settings.llm
select_multiple_query_engines = ev.get("select_multi")
query = ev.get("query")
query_engine_tools = ev.get("query_engine_tools")
selector = get_selector_from_llm(
llm, is_multi=select_multiple_query_engines
)
query_engines_metadata = [
query_engine.metadata for query_engine in query_engine_tools
]
selected_query_engines = await selector.aselect(
query_engines_metadata, query
)
return QueryEngineSelectionEvent(
selected_query_engines=selected_query_engines
)
@step
async def generate_responses(
self, ctx: Context, ev: QueryEngineSelectionEvent
) -> SynthesizeEvent:
"""Generate the responses from the selected query engines."""
query = await ctx.get("query", default=None)
selected_query_engines = ev.selected_query_engines
query_engine_tools = await ctx.get("query_engine_tools")
query_engines = [engine.query_engine for engine in query_engine_tools]
print(
f"number of selected query engines: {len(selected_query_engines.selections)}"
)
if len(selected_query_engines.selections) > 1:
tasks = []
for selected_query_engine in selected_query_engines.selections:
print(
f"Selected query engine: {selected_query_engine.index}: {selected_query_engine.reason}"
)
query_engine = query_engines[selected_query_engine.index]
tasks.append(query_engine.aquery(query))
response_generated = await asyncio.gather(*tasks)
else:
query_engine = query_engines[
selected_query_engines.selections[0].index
]
print(
f"Selected query engine: {selected_query_engines.ind}: {selected_query_engines.reason}"
)
response_generated = [await query_engine.aquery(query)]
return SynthesizeEvent(
result=response_generated,
selected_query_engines=selected_query_engines,
)
async def acombine_responses(
self,
summarizer: TreeSummarize,
responses: List[RESPONSE_TYPE],
query_bundle: QueryBundle,
) -> RESPONSE_TYPE:
"""Async combine multiple response from sub-engines."""
print("Combining responses from multiple query engines.")
response_strs = []
source_nodes = []
for response in responses:
if isinstance(
response, (AsyncStreamingResponse, PydanticResponse)
):
response_obj = await response.aget_response()
else:
response_obj = response
source_nodes.extend(response_obj.source_nodes)
response_strs.append(str(response))
summary = await summarizer.aget_response(
query_bundle.query_str, response_strs
)
if isinstance(summary, str):
return Response(response=summary, source_nodes=source_nodes)
elif isinstance(summary, BaseModel):
return PydanticResponse(
response=summary, source_nodes=source_nodes
)
else:
return AsyncStreamingResponse(
response_gen=summary, source_nodes=source_nodes
)
@step
async def synthesize_responses(
self, ctx: Context, ev: SynthesizeEvent
) -> StopEvent:
"""Synthesizes the responses from the generated responses."""
response_generated = ev.result
query = await ctx.get("query", default=None)
summarizer = await ctx.get("summarizer")
selected_query_engines = ev.selected_query_engines
if len(response_generated) > 1:
response = await self.acombine_responses(
summarizer, response_generated, QueryBundle(query_str=query)
)
else:
response = response_generated[0]
response.metadata = response.metadata or {}
response.metadata["selector_result"] = selected_query_engines
return StopEvent(result=response)
定义 LLM¶
llm = OpenAI(model="gpt-4o-mini")
Settings.llm = llm
定义摘要器¶
from llama_index.core.prompts.default_prompt_selectors import (
DEFAULT_TREE_SUMMARIZE_PROMPT_SEL,
)
summarizer = TreeSummarize(
llm=llm,
summary_template=DEFAULT_TREE_SUMMARIZE_PROMPT_SEL,
)
下载数据¶
!mkdir -p 'data/paul_graham/'
!wget 'https://raw.githubusercontent.com/run-llama/llama_index/main/docs/docs/examples/data/paul_graham/paul_graham_essay.txt' -O 'data/paul_graham/paul_graham_essay.txt'
--2024-08-26 22:46:42-- https://raw.githubusercontent.com/run-llama/llama_index/main/docs/docs/examples/data/paul_graham/paul_graham_essay.txt Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 2606:50c0:8000::154, 2606:50c0:8003::154, 2606:50c0:8002::154, ... Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|2606:50c0:8000::154|:443... connected. HTTP request sent, awaiting response... 200 OK Length: 75042 (73K) [text/plain] Saving to: ‘data/paul_graham/paul_graham_essay.txt’ data/paul_graham/pa 100%[===================>] 73.28K --.-KB/s in 0.02s 2024-08-26 22:46:42 (3.82 MB/s) - ‘data/paul_graham/paul_graham_essay.txt’ saved [75042/75042]
加载数据¶
from llama_index.core import SimpleDirectoryReader
documents = SimpleDirectoryReader("./data/paul_graham").load_data()
创建节点¶
nodes = Settings.node_parser.get_nodes_from_documents(documents)
创建索引¶
我们将创建三个索引:SummaryIndex、VectorStoreIndex 和 SimpleKeywordTableIndex。
from llama_index.core import (
VectorStoreIndex,
SummaryIndex,
SimpleKeywordTableIndex,
)
summary_index = SummaryIndex(nodes)
vector_index = VectorStoreIndex(nodes)
keyword_index = SimpleKeywordTableIndex(nodes)
创建查询引擎工具¶
from llama_index.core.tools import QueryEngineTool
list_query_engine = summary_index.as_query_engine(
response_mode="tree_summarize",
use_async=True,
)
vector_query_engine = vector_index.as_query_engine()
keyword_query_engine = keyword_index.as_query_engine()
list_tool = QueryEngineTool.from_defaults(
query_engine=list_query_engine,
description=(
"Useful for summarization questions related to Paul Graham eassy on"
" What I Worked On."
),
)
vector_tool = QueryEngineTool.from_defaults(
query_engine=vector_query_engine,
description=(
"Useful for retrieving specific context from Paul Graham essay on What"
" I Worked On."
),
)
keyword_tool = QueryEngineTool.from_defaults(
query_engine=keyword_query_engine,
description=(
"Useful for retrieving specific context using keywords from Paul"
" Graham essay on What I Worked On."
),
)
query_engine_tools = [list_tool, vector_tool, keyword_tool]
运行工作流程!¶
import nest_asyncio
nest_asyncio.apply()
w = RouterQueryEngineWorkflow(timeout=200)
查询¶
摘要查询¶
# This should use summary query engine/ tool.
query = "Provide the summary of the document?"
result = await w.run(
query=query,
llm=llm,
query_engine_tools=query_engine_tools,
summarizer=summarizer,
select_multi=True, # You can change it to default it to select only one query engine.
)
display(
Markdown("> Question: {}".format(query)),
Markdown("Answer: {}".format(result)),
)
number of selected query engines: 1 Selected query engine: 0: This choice directly addresses the need for a summary of the document.
问题:提供文档的摘要?
答案:这份文档讲述了一个人从年轻时写作和编程,到探索人工智能,最终成为成功的企业家和散文家的历程。他最初在大学学习哲学,但感到不满足,于是受到文学和纪录片的启发,将注意力转向人工智能。他的学术追求促使他逆向工程一个自然语言程序,但他很快意识到当时人工智能的局限性。
在获得博士学位后,他涉足艺术领域,参加课程和绘画,同时也写了一本关于 Lisp 编程的书。他在科技行业的经历,特别是在一家软件公司的经历,塑造了他对商业动态的理解以及成为市场入门选项的重要性。
在1990年代中期,他共同创立了 Viaweb,一个用于构建在线商店的早期网络应用,后来被雅虎收购。之后,他开始天使投资,并共同创立了 Y Combinator,这是一个通过同时支持多家初创企业来革新种子期融资的创业加速器。
叙述强调了作者对工作本质、追求非声望项目的意义以及他从编程到写作散文的兴趣演变的思考。他强调了独立思考的价值以及互联网对出版和创业的影响。最终,这份文档展示了一种以探索、创造力和致力于帮助他人成功为特征的生活。
指定上下文查询¶
# This should use vector query engine/ tool.
query = "What did the author do growing up?"
result = await w.run(
query=query,
llm=llm,
query_engine_tools=query_engine_tools,
summarizer=summarizer,
select_multi=False, # You can change it to select multiple query engines.
)
display(
Markdown("> Question: {}".format(query)),
Markdown("Answer: {}".format(result)),
)
number of selected query engines: 1 Selected query engine: 1: The question asks for specific context about the author's experiences growing up, which aligns with retrieving specific context from the essay.
问题:作者小时候做了什么?
答案:作者在成长过程中,课余时间主要专注于写作和编程。最初,他写短篇小说,后来他形容这些小说情节不足,但人物情感丰富。他很小的时候就在 IBM 1401 上开始编程,在那里他尝试了早期的 Fortran 和打孔卡。最终,他说服父亲购买了一台 TRS-80 微型计算机,这让他可以编写简单的游戏和文字处理器。尽管喜欢编程,但他最初计划在大学学习哲学,认为这是对终极真理的追求。然而,在发现哲学课程枯燥乏味后,他后来将重点转向了人工智能。
# This query could use either a keyword or vector query engine
# so it will combine responses from both
query = "What were noteable events and people from the authors time at Interleaf and YC?"
result = await w.run(
query=query,
llm=llm,
query_engine_tools=query_engine_tools,
summarizer=summarizer,
select_multi=True, # Since query should use two query engine tools, we enabled it.
)
display(
Markdown("> Question: {}".format(query)),
Markdown("Answer: {}".format(result)),
)
number of selected query engines: 2 Selected query engine: 1: This choice is useful for retrieving specific context related to notable events and people from the author's time at Interleaf and YC. Selected query engine: 2: This choice allows for retrieving specific context using keywords, which can help in identifying notable events and people. Combining responses from multiple query engines.
问题:作者在 Interleaf 和 YC 时期有哪些值得注意的事件和人物?
答案:作者在 Interleaf 期间值得注意的事件包括成立了一个大型的发布工程团队,这强调了软件更新和版本管理的复杂性。公司还做出了一项重要决定,引入了一种受 Emacs 启发的脚本语言,旨在吸引 Lisp 黑客来增强其软件功能。作者回顾这段时期是他们最接近一份正常工作的时候,尽管承认自己作为一名员工存在不足。
在 Y Combinator (YC),关键事件包括启动了首个夏季创始人计划,该计划收到了 225 份申请并资助了八家初创企业,其中不乏知名人物,如 Reddit 的创始人、Justin Kan 和 Emmett Shear(后者后来创立了 Twitch),以及 Aaron Swartz。该计划在创始人之间建立了一个支持性社区,标志着 YC 从一个小型倡议发展成为一个更大的组织。这一时期的重要人物包括 Jessica Livingston,作者与她有着密切的专业和个人关系,以及 Robert Morris 和 Trevor Blackwell,他们分别对购物车软件的开发做出了贡献,并因其编程技能而受到认可。Sam Altman,后来成为 YC 的第二任总裁,也在这一时期被提及为重要人物。