具有反射的多策略工作流¶
在这个 notebook 中,我们将演示一个工作流,它并行尝试 3 种不同的查询策略并选择最佳的一个。
如下图所示
- 首先判断查询的质量。如果查询是坏的,会发出一个
BadQueryEvent
,并且improve_query
步骤将尝试在再次尝试之前改进查询的质量。这就是反射。 - 一旦找到一个可接受的查询,会同时发出三个事件:一个
NaiveRAGEvent
,一个HighTopKEvent
,和一个RerankEvent
。 - 这些事件中的每一个都会由一个专门的步骤接收,该步骤在同一个索引上尝试不同的 RAG 策略。所有 3 个步骤都会发出一个
ResponseEvent
。 judge
步骤会等待直到收集到所有三个ResponseEvent
,然后比较它们。最后将最佳响应作为一个StopEvent
发出。
安装依赖项¶
我们选择 LlamaIndex、文件读取器(用于读取 PDF)、工作流可视化工具(用于绘制上图)以及 OpenAI 来嵌入数据和查询 LLM。
In [ ]
已复制!
!pip install llama-index-core llama-index-llms-openai llama-index-utils-workflow llama-index-readers-file llama-index-embeddings-openai
!pip install llama-index-core llama-index-llms-openai llama-index-utils-workflow llama-index-readers-file llama-index-embeddings-openai
获取数据¶
我们正在使用 3 个包含旧金山 2016 年至 2018 年年度预算的 PDF 文件。
In [ ]
已复制!
!mkdir data
!wget "https://www.dropbox.com/scl/fi/xt3squt47djba0j7emmjb/2016-CSF_Budget_Book_2016_FINAL_WEB_with-cover-page.pdf?rlkey=xs064cjs8cb4wma6t5pw2u2bl&dl=0" -O "data/2016-CSF_Budget_Book_2016_FINAL_WEB_with-cover-page.pdf"
!wget "https://www.dropbox.com/scl/fi/jvw59g5nscu1m7f96tjre/2017-Proposed-Budget-FY2017-18-FY2018-19_1.pdf?rlkey=v988oigs2whtcy87ti9wti6od&dl=0" -O "data/2017-Proposed-Budget-FY2017-18-FY2018-19_1.pdf"
!wget "https://www.dropbox.com/scl/fi/izknlwmbs7ia0lbn7zzyx/2018-o0181-18.pdf?rlkey=p5nv2ehtp7272ege3m9diqhei&dl=0" -O "data/2018-o0181-18.pdf"
!mkdir data !wget "https://www.dropbox.com/scl/fi/xt3squt47djba0j7emmjb/2016-CSF_Budget_Book_2016_FINAL_WEB_with-cover-page.pdf?rlkey=xs064cjs8cb4wma6t5pw2u2bl&dl=0" -O "data/2016-CSF_Budget_Book_2016_FINAL_WEB_with-cover-page.pdf" !wget "https://www.dropbox.com/scl/fi/jvw59g5nscu1m7f96tjre/2017-Proposed-Budget-FY2017-18-FY2018-19_1.pdf?rlkey=v988oigs2whtcy87ti9wti6od&dl=0" -O "data/2017-Proposed-Budget-FY2017-18-FY2018-19_1.pdf" !wget "https://www.dropbox.com/scl/fi/izknlwmbs7ia0lbn7zzyx/2018-o0181-18.pdf?rlkey=p5nv2ehtp7272ege3m9diqhei&dl=0" -O "data/2018-o0181-18.pdf"
引入依赖项¶
现在我们导入所有依赖项
In [ ]
已复制!
import os
from llama_index.core import (
SimpleDirectoryReader,
VectorStoreIndex,
StorageContext,
load_index_from_storage,
)
from llama_index.core.workflow import (
step,
Context,
Workflow,
Event,
StartEvent,
StopEvent,
)
from llama_index.llms.openai import OpenAI
from llama_index.core.postprocessor.rankGPT_rerank import RankGPTRerank
from llama_index.core.query_engine import RetrieverQueryEngine
from llama_index.core.chat_engine import SimpleChatEngine
from llama_index.utils.workflow import draw_all_possible_flows
import os from llama_index.core import ( SimpleDirectoryReader, VectorStoreIndex, StorageContext, load_index_from_storage, ) from llama_index.llms.openai import OpenAI from llama_index.core.postprocessor.rankGPT_rerank import RankGPTRerank from llama_index.core.query_engine import RetrieverQueryEngine from llama_index.core.chat_engine import SimpleChatEngine from llama_index.utils.workflow import draw_all_possible_flows
我们还需要设置 OpenAI 密钥。
In [ ]
已复制!
from google.colab import userdata
os.environ["OPENAI_API_KEY"] = userdata.get("openai-key")
from google.colab import userdata os.environ["OPENAI_API_KEY"] = userdata.get("openai-key")
定义事件类¶
我们的流程会生成相当多的不同事件类型。
In [ ]
已复制!
class JudgeEvent(Event):
query: str
class BadQueryEvent(Event):
query: str
class NaiveRAGEvent(Event):
query: str
class HighTopKEvent(Event):
query: str
class RerankEvent(Event):
query: str
class ResponseEvent(Event):
query: str
response: str
class SummarizeEvent(Event):
query: str
response: str
class JudgeEvent(Event): query: str class BadQueryEvent(Event): query: str class NaiveRAGEvent(Event): query: str class HighTopKEvent(Event): query: str class RerankEvent(Event): query: str class ResponseEvent(Event): query: str response: str class SummarizeEvent(Event): query: str response: str
定义工作流¶
这是我们工作流的核心内容,让我们分解来看
load_or_create_index
是一个正常的 RAG 函数,它从磁盘读取 PDF 文件并在未索引的情况下进行索引。如果已经索引过,它将简单地从磁盘恢复现有索引。judge_query
会做几件事- 它初始化 LLM 并调用
load_or_create_index
进行设置。它将这些内容存储在上下文中,以便稍后可用。 - 它判断查询的质量
- 如果查询不好,它会发出一个
BadQueryEvent
- 如果查询很好,它会发出一个
NaiveRAGEvent
、一个HighTopKEvent
和一个RerankerEvent
- 它初始化 LLM 并调用
improve_query
接收BadQueryEvent
并使用 LLM 尝试扩展查询并消除歧义(如果可能),然后循环回到judge_query
。naive_rag
、high_top_k
和rerank
分别接收各自的事件,并尝试 3 种不同的 RAG 策略。每个都会发出一个ResponseEvent
,包含其结果和一个指示使用了哪种策略的source
参数。judge
在每次发出ResponseEvent
时都会触发,但它使用collect_events
来缓冲事件,直到接收到所有 3 个事件。然后将响应发送给 LLM 并要求它选择“最佳”的一个。它将最佳响应作为一个 StopEvent 发出。
In [ ]
已复制!
class ComplicatedWorkflow(Workflow):
def load_or_create_index(self, directory_path, persist_dir):
# Check if the index already exists
if os.path.exists(persist_dir):
print("Loading existing index...")
# Load the index from disk
storage_context = StorageContext.from_defaults(
persist_dir=persist_dir
)
index = load_index_from_storage(storage_context)
else:
print("Creating new index...")
# Load documents from the specified directory
documents = SimpleDirectoryReader(directory_path).load_data()
# Create a new index from the documents
index = VectorStoreIndex.from_documents(documents)
# Persist the index to disk
index.storage_context.persist(persist_dir=persist_dir)
return index
@step
async def judge_query(
self, ctx: Context, ev: StartEvent | JudgeEvent
) -> BadQueryEvent | NaiveRAGEvent | HighTopKEvent | RerankEvent:
# initialize
llm = await ctx.get("llm", default=None)
if llm is None:
await ctx.set("llm", OpenAI(model="gpt-4o", temperature=0.1))
await ctx.set(
"index", self.load_or_create_index("data", "storage")
)
# we use a chat engine so it remembers previous interactions
await ctx.set("judge", SimpleChatEngine.from_defaults())
response = await ctx.get("judge").chat(
f"""
Given a user query, determine if this is likely to yield good results from a RAG system as-is. If it's good, return 'good', if it's bad, return 'bad'.
Good queries use a lot of relevant keywords and are detailed. Bad queries are vague or ambiguous.
Here is the query: {ev.query}
"""
)
if response == "bad":
# try again
return BadQueryEvent(query=ev.query)
else:
# send query to all 3 strategies
self.send_event(NaiveRAGEvent(query=ev.query))
self.send_event(HighTopKEvent(query=ev.query))
self.send_event(RerankEvent(query=ev.query))
@step
async def improve_query(
self, ctx: Context, ev: BadQueryEvent
) -> JudgeEvent:
response = await ctx.get("llm").complete(
f"""
This is a query to a RAG system: {ev.query}
The query is bad because it is too vague. Please provide a more detailed query that includes specific keywords and removes any ambiguity.
"""
)
return JudgeEvent(query=str(response))
@step
async def naive_rag(
self, ctx: Context, ev: NaiveRAGEvent
) -> ResponseEvent:
index = await ctx.get("index")
engine = index.as_query_engine(similarity_top_k=5)
response = engine.query(ev.query)
print("Naive response:", response)
return ResponseEvent(
query=ev.query, source="Naive", response=str(response)
)
@step
async def high_top_k(
self, ctx: Context, ev: HighTopKEvent
) -> ResponseEvent:
index = await ctx.get("index")
engine = index.as_query_engine(similarity_top_k=20)
response = engine.query(ev.query)
print("High top k response:", response)
return ResponseEvent(
query=ev.query, source="High top k", response=str(response)
)
@step
async def rerank(self, ctx: Context, ev: RerankEvent) -> ResponseEvent:
index = await ctx.get("index")
reranker = RankGPTRerank(top_n=5, llm=await ctx.get("llm"))
retriever = index.as_retriever(similarity_top_k=20)
engine = RetrieverQueryEngine.from_args(
retriever=retriever,
node_postprocessors=[reranker],
)
response = engine.query(ev.query)
print("Reranker response:", response)
return ResponseEvent(
query=ev.query, source="Reranker", response=str(response)
)
@step
async def judge(self, ctx: Context, ev: ResponseEvent) -> StopEvent:
ready = ctx.collect_events(ev, [ResponseEvent] * 3)
if ready is None:
return None
response = await ctx.get("judge").chat(
f"""
A user has provided a query and 3 different strategies have been used
to try to answer the query. Your job is to decide which strategy best
answered the query. The query was: {ev.query}
Response 1 ({ready[0].source}): {ready[0].response}
Response 2 ({ready[1].source}): {ready[1].response}
Response 3 ({ready[2].source}): {ready[2].response}
Please provide the number of the best response (1, 2, or 3).
Just provide the number, with no other text or preamble.
"""
)
best_response = int(str(response))
print(
f"Best response was number {best_response}, which was from {ready[best_response-1].source}"
)
return StopEvent(result=str(ready[best_response - 1].response))
class ComplicatedWorkflow(Workflow): def load_or_create_index(self, directory_path, persist_dir): # 检查索引是否已存在 if os.path.exists(persist_dir): print("正在加载现有索引...") # 从磁盘加载索引 storage_context = StorageContext.from_defaults( persist_dir=persist_dir ) index = load_index_from_storage(storage_context) else: print("正在创建新索引...") # 从指定目录加载文档 documents = SimpleDirectoryReader(directory_path).load_data() # 从文档创建新索引 index = VectorStoreIndex.from_documents(documents) # 将索引持久化到磁盘 index.storage_context.persist(persist_dir=persist_dir) return index @step async def judge_query( self, ctx: Context, ev: StartEvent | JudgeEvent ) -> BadQueryEvent | NaiveRAGEvent | HighTopKEvent | RerankEvent: # 初始化 llm = await ctx.get("llm", default=None) if llm is None: await ctx.set("llm", OpenAI(model="gpt-4o", temperature=0.1)) await ctx.set( "index", self.load_or_create_index("data", "storage") ) # 我们使用聊天引擎,以便它记住之前的交互 await ctx.set("judge", SimpleChatEngine.from_defaults()) response = await ctx.get("judge").chat( f""" 给定用户查询,判断它是否可能从 RAG 系统中直接获得好的结果。如果好,返回 'good',如果不好,返回 'bad'。好的查询使用大量相关关键词并且详细。不好的查询模糊不清或有歧义。查询如下:{ev.query} """ ) if response == "bad": # 再次尝试 return BadQueryEvent(query=ev.query) else: # 将查询发送到所有 3 种策略 self.send_event(NaiveRAGEvent(query=ev.query)) self.send_event(HighTopKEvent(query=ev.query)) self.send_event(RerankEvent(query=ev.query)) @step async def improve_query( self, ctx: Context, ev: BadQueryEvent ) -> JudgeEvent: response = await ctx.get("llm").complete( f""" 这是一个针对 RAG 系统的查询:{ev.query} 该查询不好,因为它过于模糊。请提供一个更详细的查询,包含具体关键词并消除任何歧义。""" ) return JudgeEvent(query=str(response)) @step async def naive_rag( self, ctx: Context, ev: NaiveRAGEvent ) -> ResponseEvent: index = await ctx.get("index") engine = index.as_query_engine(similarity_top_k=5) response = engine.query(ev.query) print("朴素响应:", response) return ResponseEvent( query=ev.query, source="Naive", response=str(response) ) @step async def high_top_k( self, ctx: Context, ev: HighTopKEvent ) -> ResponseEvent: index = await ctx.get("index") engine = index.as_query_engine(similarity_top_k=20) response = engine.query(ev.query) print("高 Top K 响应:", response) return ResponseEvent( query=ev.query, source="High top k", response=str(response) ) @step async def rerank(self, ctx: Context, ev: RerankEvent) -> ResponseEvent: index = await ctx.get("index") reranker = RankGPTRerank(top_n=5, llm=await ctx.get("llm")) retriever = index.as_retriever(similarity_top_k=20) engine = RetrieverQueryEngine.from_args( retriever=retriever, node_postprocessors=[reranker], ) response = engine.query(ev.query) print("重排器响应:", response) return ResponseEvent( query=ev.query, source="Reranker", response=str(response) ) @step async def judge(self, ctx: Context, ev: ResponseEvent) -> StopEvent: ready = ctx.collect_events(ev, [ResponseEvent] * 3) if ready is None: return None response = await ctx.get("judge").chat( f""" 用户提供了一个查询,并使用了 3 种不同的策略来尝试回答。你的任务是决定哪种策略最能回答查询。查询是:{ev.query} 响应 1 ({ready[0].source}):{ready[0].response} 响应 2 ({ready[1].source}):{ready[1].response} 响应 3 ({ready[2].source}):{ready[2].response} 请提供最佳响应的编号(1、2 或 3)。只需提供数字,不要包含其他文本或前言。""" ) best_response = int(str(response)) print( f"最佳响应是第 {best_response} 个,来自 {ready[best_response-1].source}" ) return StopEvent(result=str(ready[best_response - 1].response))
绘制流程图¶
这就是我们绘制出开头的流程图的方法。
In [ ]
已复制!
draw_all_possible_flows(
ComplicatedWorkflow, filename="complicated_workflow.html"
)
draw_all_possible_flows( ComplicatedWorkflow, filename="complicated_workflow.html" )
运行工作流¶
让我们运行一下这个工作流
judge_query
事件没有返回任何内容。这是因为它使用了send_event
。因此,查询被判断为“好的”。- 所有 3 个 RAG 步骤都运行,并生成对查询的不同答案
judge
步骤运行了 3 次。前两次没有产生事件,因为它还没有收集到所需的 3 个ResponseEvent
。- 第三次,它选择了最佳响应并返回一个
StopEvent
。
In [ ]
已复制!
c = ComplicatedWorkflow(timeout=120, verbose=True)
result = await c.run(
# query="How has spending on police changed in San Francisco's budgets from 2016 to 2018?"
# query="How has spending on healthcare changed in San Francisco?"
query="How has spending changed?"
)
print(result)
c = ComplicatedWorkflow(timeout=120, verbose=True) result = await c.run( # query="How has spending on police changed in San Francisco's budgets from 2016 to 2018?" # query="How has spending on healthcare changed in San Francisco?" query="How has spending changed?" ) print(result)
Running step judge_query Creating new index... Step judge_query produced no event Running step naive_rag Naive response: Spending has increased over the years due to various factors such as new voter-approved minimum spending requirements, the creation of new voter-approved baselines, and growth in baseline funded requirements. Additionally, there have been notable changes in spending across different service areas and departments, with increases in funding for areas like public protection, transportation, and public works. Step naive_rag produced event ResponseEvent Running step rerank Reranker response: Spending has increased over the years, with notable changes in the allocation of funds to various service areas and departments. The budget reflects adjustments in spending to address evolving needs and priorities, resulting in a rise in overall expenditures across different categories. Step rerank produced event ResponseEvent Running step high_top_k High top k response: Spending has increased over the years, with the total budget showing growth in various areas such as aid assistance/grants, materials & supplies, equipment, debt service, services of other departments, and professional & contractual services. Additionally, there have been new investments in programs like workforce development, economic development, film services, and finance and administration. The budget allocations have been adjusted to accommodate changing needs and priorities, reflecting an overall increase in spending across different departments and programs. Step high_top_k produced event ResponseEvent Running step judge Step judge produced no event Running step judge Step judge produced no event Running step judge Best response was number 3, which was from High top k Step judge produced event StopEvent Spending has increased over the years, with the total budget showing growth in various areas such as aid assistance/grants, materials & supplies, equipment, debt service, services of other departments, and professional & contractual services. Additionally, there have been new investments in programs like workforce development, economic development, film services, and finance and administration. The budget allocations have been adjusted to accommodate changing needs and priorities, reflecting an overall increase in spending across different departments and programs.