多步查询引擎¶
MultiStepQueryEngine
将复杂查询分解为按顺序的子问题。
要回答查询:“作者在他第一家公司 Viaweb 建立在哪个城市?”,我们需要按顺序回答以下子问题:
- 创立他的第一家公司 Viaweb 的作者是谁?
- Paul Graham 在哪个城市创立了他的第一家公司 Viaweb?
例如,每一步(子查询-1)的答案用于生成下一步的问题(子查询-2),步骤是按顺序创建的,而不是一次性全部创建。
在这个笔记本中,我们将使用工作流来实现 MultiStepQueryEngine。
In [ ]
已复制!
!pip install -U llama-index
!pip install -U llama-index
In [ ]
已复制!
import os
os.environ["OPENAI_API_KEY"] = "sk-..."
import os os.environ["OPENAI_API_KEY"] = "sk-..."
由于工作流是异步优先的,这在笔记本中运行良好。如果您在自己的代码中运行,如果还没有运行异步事件循环,您会希望使用 asyncio.run()
来启动它。
async def main():
<async code>
if __name__ == "__main__":
import asyncio
asyncio.run(main())
定义事件¶
In [ ]
已复制!
from llama_index.core.workflow import Event
from typing import Dict, List, Any
from llama_index.core.schema import NodeWithScore
class QueryMultiStepEvent(Event):
"""
Event containing results of a multi-step query process.
Attributes:
nodes (List[NodeWithScore]): List of nodes with their associated scores.
source_nodes (List[NodeWithScore]): List of source nodes with their scores.
final_response_metadata (Dict[str, Any]): Metadata associated with the final response.
"""
nodes: List[NodeWithScore]
source_nodes: List[NodeWithScore]
final_response_metadata: Dict[str, Any]
from llama_index.core.workflow import Event from typing import Dict, List, Any from llama_index.core.schema import NodeWithScore class QueryMultiStepEvent(Event): """ 包含多步查询过程结果的事件。 Attributes: nodes (List[NodeWithScore]): 节点及其相关分数的列表。 source_nodes (List[NodeWithScore]): 源节点及其分数的列表。 final_response_metadata (Dict[str, Any]): 与最终响应关联的元数据。 """ nodes: List[NodeWithScore] source_nodes: List[NodeWithScore] final_response_metadata: Dict[str, Any]
[nltk_data] Downloading package punkt_tab to [nltk_data] /Users/ravithejad/Desktop/llamaindex/lib/python3.9/sit [nltk_data] e-packages/llama_index/core/_static/nltk_cache... [nltk_data] Package punkt_tab is already up-to-date!
定义工作流¶
In [ ]
已复制!
from llama_index.core.indices.query.query_transform.base import (
StepDecomposeQueryTransform,
)
from llama_index.core.response_synthesizers import (
get_response_synthesizer,
)
from llama_index.core.schema import QueryBundle, TextNode
from llama_index.core.workflow import (
Context,
Workflow,
StartEvent,
StopEvent,
step,
)
from llama_index.core import Settings
from llama_index.core.llms import LLM
from typing import cast
from IPython.display import Markdown, display
class MultiStepQueryEngineWorkflow(Workflow):
def combine_queries(
self,
query_bundle: QueryBundle,
prev_reasoning: str,
index_summary: str,
llm: LLM,
) -> QueryBundle:
"""Combine queries using StepDecomposeQueryTransform."""
transform_metadata = {
"prev_reasoning": prev_reasoning,
"index_summary": index_summary,
}
return StepDecomposeQueryTransform(llm=llm)(
query_bundle, metadata=transform_metadata
)
def default_stop_fn(self, stop_dict: Dict) -> bool:
"""Stop function for multi-step query combiner."""
query_bundle = cast(QueryBundle, stop_dict.get("query_bundle"))
if query_bundle is None:
raise ValueError("Response must be provided to stop function.")
return "none" in query_bundle.query_str.lower()
@step
async def query_multistep(
self, ctx: Context, ev: StartEvent
) -> QueryMultiStepEvent:
"""Execute multi-step query process."""
prev_reasoning = ""
cur_response = None
should_stop = False
cur_steps = 0
# use response
final_response_metadata: Dict[str, Any] = {"sub_qa": []}
text_chunks = []
source_nodes = []
query = ev.get("query")
await ctx.set("query", ev.get("query"))
llm = Settings.llm
stop_fn = self.default_stop_fn
num_steps = ev.get("num_steps")
query_engine = ev.get("query_engine")
index_summary = ev.get("index_summary")
while not should_stop:
if num_steps is not None and cur_steps >= num_steps:
should_stop = True
break
elif should_stop:
break
updated_query_bundle = self.combine_queries(
QueryBundle(query_str=query),
prev_reasoning,
index_summary,
llm,
)
print(
f"Created query for the step - {cur_steps} is: {updated_query_bundle}"
)
stop_dict = {"query_bundle": updated_query_bundle}
if stop_fn(stop_dict):
should_stop = True
break
cur_response = query_engine.query(updated_query_bundle)
# append to response builder
cur_qa_text = (
f"\nQuestion: {updated_query_bundle.query_str}\n"
f"Answer: {cur_response!s}"
)
text_chunks.append(cur_qa_text)
for source_node in cur_response.source_nodes:
source_nodes.append(source_node)
# update metadata
final_response_metadata["sub_qa"].append(
(updated_query_bundle.query_str, cur_response)
)
prev_reasoning += (
f"- {updated_query_bundle.query_str}\n" f"- {cur_response!s}\n"
)
cur_steps += 1
nodes = [
NodeWithScore(node=TextNode(text=text_chunk))
for text_chunk in text_chunks
]
return QueryMultiStepEvent(
nodes=nodes,
source_nodes=source_nodes,
final_response_metadata=final_response_metadata,
)
@step
async def synthesize(
self, ctx: Context, ev: QueryMultiStepEvent
) -> StopEvent:
"""Synthesize the response."""
response_synthesizer = get_response_synthesizer()
query = await ctx.get("query", default=None)
final_response = await response_synthesizer.asynthesize(
query=query,
nodes=ev.nodes,
additional_source_nodes=ev.source_nodes,
)
final_response.metadata = ev.final_response_metadata
return StopEvent(result=final_response)
from llama_index.core.indices.query.query_transform.base import ( StepDecomposeQueryTransform, ) from llama_index.core.response_synthesizers import ( get_response_synthesizer, ) from llama_index.core.schema import QueryBundle, TextNode from llama_index.core.workflow import ( Context, Workflow, StartEvent, StopEvent, step, ) from llama_index.core import Settings from llama_index.core.llms import LLM from typing import cast from IPython.display import Markdown, display class MultiStepQueryEngineWorkflow(Workflow): def combine_queries( self, query_bundle: QueryBundle, prev_reasoning: str, index_summary: str, llm: LLM, ) -> QueryBundle: """Combine queries using StepDecomposeQueryTransform.""" transform_metadata = { "prev_reasoning": prev_reasoning, "index_summary": index_summary, } return StepDecomposeQueryTransform(llm=llm)( query_bundle, metadata=transform_metadata ) def default_stop_fn(self, stop_dict: Dict) -> bool: """Stop function for multi-step query combiner.""" query_bundle = cast(QueryBundle, stop_dict.get("query_bundle")) if query_bundle is None: raise ValueError("Response must be provided to stop function.") return "none" in query_bundle.query_str.lower() @step async def query_multistep( self, ctx: Context, ev: StartEvent ) -> QueryMultiStepEvent: """Execute multi-step query process.""" prev_reasoning = "" cur_response = None should_stop = False cur_steps = 0 # use response final_response_metadata: Dict[str, Any] = {"sub_qa": []} text_chunks = [] source_nodes = [] query = ev.get("query") await ctx.set("query", ev.get("query")) llm = Settings.llm stop_fn = self.default_stop_fn num_steps = ev.get("num_steps") query_engine = ev.get("query_engine") index_summary = ev.get("index_summary") while not should_stop: if num_steps is not None and cur_steps >= num_steps: should_stop = True break elif should_stop: break updated_query_bundle = self.combine_queries( QueryBundle(query_str=query), prev_reasoning, index_summary, llm, ) print( f"Created query for the step - {cur_steps} is: {updated_query_bundle}" ) stop_dict = {"query_bundle": updated_query_bundle} if stop_fn(stop_dict): should_stop = True break cur_response = query_engine.query(updated_query_bundle) # append to response builder cur_qa_text = ( f"\nQuestion: {updated_query_bundle.query_str}\n" f"Answer: {cur_response!s}" ) text_chunks.append(cur_qa_text) for source_node in cur_response.source_nodes: source_nodes.append(source_node) # update metadata final_response_metadata["sub_qa"].append( (updated_query_bundle.query_str, cur_response) ) prev_reasoning += ( f"- {updated_query_bundle.query_str}\n" f"- {cur_response!s}\n" ) cur_steps += 1 nodes = [ NodeWithScore(node=TextNode(text=text_chunk)) for text_chunk in text_chunks ] return QueryMultiStepEvent( nodes=nodes, source_nodes=source_nodes, final_response_metadata=final_response_metadata, ) @step async def synthesize( self, ctx: Context, ev: QueryMultiStepEvent ) -> StopEvent: """Synthesize the response.""" response_synthesizer = get_response_synthesizer() query = await ctx.get("query", default=None) final_response = await response_synthesizer.asynthesize( query=query, nodes=ev.nodes, additional_source_nodes=ev.source_nodes, ) final_response.metadata = ev.final_response_metadata return StopEvent(result=final_response)
下载数据¶
In [ ]
已复制!
!mkdir -p 'data/paul_graham/'
!wget 'https://raw.githubusercontent.com/run-llama/llama_index/main/docs/docs/examples/data/paul_graham/paul_graham_essay.txt' -O 'data/paul_graham/paul_graham_essay.txt'
!mkdir -p 'data/paul_graham/' !wget 'https://raw.githubusercontent.com/run-llama/llama_index/main/docs/docs/examples/data/paul_graham/paul_graham_essay.txt' -O 'data/paul_graham/paul_graham_essay.txt'
--2024-08-26 14:16:04-- https://raw.githubusercontent.com/run-llama/llama_index/main/docs/docs/examples/data/paul_graham/paul_graham_essay.txt Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 2606:50c0:8000::154, 2606:50c0:8002::154, 2606:50c0:8001::154, ... Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|2606:50c0:8000::154|:443... connected. HTTP request sent, awaiting response... 200 OK Length: 75042 (73K) [text/plain] Saving to: ‘data/paul_graham/paul_graham_essay.txt’ data/paul_graham/pa 100%[===================>] 73.28K --.-KB/s in 0.01s 2024-08-26 14:16:04 (6.91 MB/s) - ‘data/paul_graham/paul_graham_essay.txt’ saved [75042/75042]
加载数据¶
In [ ]
已复制!
from llama_index.core import SimpleDirectoryReader
documents = SimpleDirectoryReader("data/paul_graham").load_data()
from llama_index.core import SimpleDirectoryReader documents = SimpleDirectoryReader("data/paul_graham").load_data()
设置 LLM¶
In [ ]
已复制!
from llama_index.llms.openai import OpenAI
llm = OpenAI(model="gpt-4")
Settings.llm = llm
from llama_index.llms.openai import OpenAI llm = OpenAI(model="gpt-4") Settings.llm = llm
创建索引和查询引擎¶
In [ ]
已复制!
from llama_index.core import VectorStoreIndex
index = VectorStoreIndex.from_documents(
documents=documents,
)
query_engine = index.as_query_engine()
from llama_index.core import VectorStoreIndex index = VectorStoreIndex.from_documents( documents=documents, ) query_engine = index.as_query_engine()
运行工作流!¶
In [ ]
已复制!
w = MultiStepQueryEngineWorkflow(timeout=200)
w = MultiStepQueryEngineWorkflow(timeout=200)
设置参数¶
In [ ]
已复制!
# Sets maximum number of steps taken to answer the query.
num_steps = 3
# Set summary of the index, useful to create modified query at each step.
index_summary = "Used to answer questions about the author"
# 设置回答查询的最大步骤数。 num_steps = 3 # 设置索引摘要,有助于在每一步创建修改后的查询。 index_summary = "用于回答关于作者的问题"
使用查询进行测试¶
In [ ]
已复制!
query = "In which city did the author found his first company, Viaweb?"
query = "In which city did the author found his first company, Viaweb?"
结果¶
In [ ]
已复制!
result = await w.run(
query=query,
query_engine=query_engine,
index_summary=index_summary,
num_steps=num_steps,
)
# If created query in a step is None, the process will be stopped.
display(
Markdown("> Question: {}".format(query)),
Markdown("Answer: {}".format(result)),
)
result = await w.run( query=query, query_engine=query_engine, index_summary=index_summary, num_steps=num_steps, ) # 如果某一步创建的查询为 None,则过程将停止。 display( Markdown("> 问题:{}".format(query)), Markdown("答案:{}".format(result)), )
Created query for the step - 0 is: Who is the author who founded Viaweb? Created query for the step - 1 is: In which city did Paul Graham found his first company, Viaweb? Created query for the step - 2 is: None
问题:作者在他第一家公司 Viaweb 建立在哪个城市?
答案:作者在他的第一家公司 Viaweb 建立在剑桥。
显示创建的步骤查询¶
In [ ]
已复制!
sub_qa = result.metadata["sub_qa"]
tuples = [(t[0], t[1].response) for t in sub_qa]
display(Markdown(f"{tuples}"))
sub_qa = result.metadata["sub_qa"] tuples = [(t[0], t[1].response) for t in sub_qa] display(Markdown(f"{tuples}"))
[('创立 Viaweb 的作者是谁?', '创立 Viaweb 的作者是 Paul Graham。'), ('Paul Graham 在哪个城市创立了他的第一家公司 Viaweb?', 'Paul Graham 在剑桥创立了他的第一家公司 Viaweb。')]