带有重排的 RAG 工作流¶
本笔记本将逐步介绍如何设置一个 Workflow
来执行带有重排的基本 RAG。
In [ ]
已复制!
!pip install -U llama-index
!pip install -U llama-index
In [ ]
已复制!
import os
os.environ["OPENAI_API_KEY"] = "sk-proj-..."
import os os.environ["OPENAI_API_KEY"] = "sk-proj-..."
[可选] 使用 Llamatrace 设置可观测性¶
设置追踪以可视化工作流中的每个步骤。
In [ ]
已复制!
%pip install "openinference-instrumentation-llama-index>=3.0.0" "opentelemetry-proto>=1.12.0" opentelemetry-exporter-otlp opentelemetry-sdk
%pip install "openinference-instrumentation-llama-index>=3.0.0" "opentelemetry-proto>=1.12.0" opentelemetry-exporter-otlp opentelemetry-sdk
In [ ]
已复制!
from opentelemetry.sdk import trace as trace_sdk
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
OTLPSpanExporter as HTTPSpanExporter,
)
from openinference.instrumentation.llama_index import LlamaIndexInstrumentor
# Add Phoenix API Key for tracing
PHOENIX_API_KEY = "<YOUR-PHOENIX-API-KEY>"
os.environ["OTEL_EXPORTER_OTLP_HEADERS"] = f"api_key={PHOENIX_API_KEY}"
# Add Phoenix
span_phoenix_processor = SimpleSpanProcessor(
HTTPSpanExporter(endpoint="https://app.phoenix.arize.com/v1/traces")
)
# Add them to the tracer
tracer_provider = trace_sdk.TracerProvider()
tracer_provider.add_span_processor(span_processor=span_phoenix_processor)
# Instrument the application
LlamaIndexInstrumentor().instrument(tracer_provider=tracer_provider)
from opentelemetry.sdk import trace as trace_sdk from opentelemetry.sdk.trace.export import SimpleSpanProcessor from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( OTLPSpanExporter as HTTPSpanExporter, ) from openinference.instrumentation.llama_index import LlamaIndexInstrumentor # Add Phoenix API Key for tracing PHOENIX_API_KEY = "" os.environ["OTEL_EXPORTER_OTLP_HEADERS"] = f"api_key={PHOENIX_API_KEY}" # Add Phoenix span_phoenix_processor = SimpleSpanProcessor( HTTPSpanExporter(endpoint="https://app.phoenix.arize.com/v1/traces") ) # Add them to the tracer tracer_provider = trace_sdk.TracerProvider() tracer_provider.add_span_processor(span_processor=span_phoenix_processor) # Instrument the application LlamaIndexInstrumentor().instrument(tracer_provider=tracer_provider)
In [ ]
已复制!
!mkdir -p data
!wget --user-agent "Mozilla" "https://arxiv.org/pdf/2307.09288.pdf" -O "data/llama2.pdf"
!mkdir -p data !wget --user-agent "Mozilla" "https://arxiv.org/pdf/2307.09288.pdf" -O "data/llama2.pdf"
由于工作流首先是异步的,这在笔记本中运行得很好。如果您在自己的代码中运行,并且尚未运行异步事件循环,则需要使用 asyncio.run()
来启动一个。
async def main():
<async code>
if __name__ == "__main__":
import asyncio
asyncio.run(main())
In [ ]
已复制!
from llama_index.core.workflow import Event
from llama_index.core.schema import NodeWithScore
class RetrieverEvent(Event):
"""Result of running retrieval"""
nodes: list[NodeWithScore]
class RerankEvent(Event):
"""Result of running reranking on retrieved nodes"""
nodes: list[NodeWithScore]
from llama_index.core.workflow import Event from llama_index.core.schema import NodeWithScore class RetrieverEvent(Event): """Result of running retrieval""" nodes: list[NodeWithScore] class RerankEvent(Event): """Result of running reranking on retrieved nodes""" nodes: list[NodeWithScore]
In [ ]
已复制!
from llama_index.core import SimpleDirectoryReader, VectorStoreIndex
from llama_index.core.response_synthesizers import CompactAndRefine
from llama_index.core.postprocessor.llm_rerank import LLMRerank
from llama_index.core.workflow import (
Context,
Workflow,
StartEvent,
StopEvent,
step,
)
from llama_index.llms.openai import OpenAI
from llama_index.embeddings.openai import OpenAIEmbedding
class RAGWorkflow(Workflow):
@step
async def ingest(self, ctx: Context, ev: StartEvent) -> StopEvent | None:
"""Entry point to ingest a document, triggered by a StartEvent with `dirname`."""
dirname = ev.get("dirname")
if not dirname:
return None
documents = SimpleDirectoryReader(dirname).load_data()
index = VectorStoreIndex.from_documents(
documents=documents,
embed_model=OpenAIEmbedding(model_name="text-embedding-3-small"),
)
return StopEvent(result=index)
@step
async def retrieve(
self, ctx: Context, ev: StartEvent
) -> RetrieverEvent | None:
"Entry point for RAG, triggered by a StartEvent with `query`."
query = ev.get("query")
index = ev.get("index")
if not query:
return None
print(f"Query the database with: {query}")
# store the query in the global context
await ctx.set("query", query)
# get the index from the global context
if index is None:
print("Index is empty, load some documents before querying!")
return None
retriever = index.as_retriever(similarity_top_k=2)
nodes = await retriever.aretrieve(query)
print(f"Retrieved {len(nodes)} nodes.")
return RetrieverEvent(nodes=nodes)
@step
async def rerank(self, ctx: Context, ev: RetrieverEvent) -> RerankEvent:
# Rerank the nodes
ranker = LLMRerank(
choice_batch_size=5, top_n=3, llm=OpenAI(model="gpt-4o-mini")
)
print(await ctx.get("query", default=None), flush=True)
new_nodes = ranker.postprocess_nodes(
ev.nodes, query_str=await ctx.get("query", default=None)
)
print(f"Reranked nodes to {len(new_nodes)}")
return RerankEvent(nodes=new_nodes)
@step
async def synthesize(self, ctx: Context, ev: RerankEvent) -> StopEvent:
"""Return a streaming response using reranked nodes."""
llm = OpenAI(model="gpt-4o-mini")
summarizer = CompactAndRefine(llm=llm, streaming=True, verbose=True)
query = await ctx.get("query", default=None)
response = await summarizer.asynthesize(query, nodes=ev.nodes)
return StopEvent(result=response)
from llama_index.core import SimpleDirectoryReader, VectorStoreIndex from llama_index.core.response_synthesizers import CompactAndRefine from llama_index.core.postprocessor.llm_rerank import LLMRerank from llama_index.core.workflow import ( Context, Workflow, StartEvent, StopEvent, step, ) from llama_index.llms.openai import OpenAI from llama_index.embeddings.openai import OpenAIEmbedding class RAGWorkflow(Workflow): @step async def ingest(self, ctx: Context, ev: StartEvent) -> StopEvent | None: """Entry point to ingest a document, triggered by a StartEvent with `dirname`.""" dirname = ev.get("dirname") if not dirname: return None documents = SimpleDirectoryReader(dirname).load_data() index = VectorStoreIndex.from_documents( documents=documents, embed_model=OpenAIEmbedding(model_name="text-embedding-3-small"), ) return StopEvent(result=index) @step async def retrieve( self, ctx: Context, ev: StartEvent ) -> RetrieverEvent | None: "Entry point for RAG, triggered by a StartEvent with `query`." query = ev.get("query") index = ev.get("index") if not query: return None print(f"Query the database with: {query}") # store the query in the global context await ctx.set("query", query) # get the index from the global context if index is None: print("Index is empty, load some documents before querying!") return None retriever = index.as_retriever(similarity_top_k=2) nodes = await retriever.aretrieve(query) print(f"Retrieved {len(nodes)} nodes.") return RetrieverEvent(nodes=nodes) @step async def rerank(self, ctx: Context, ev: RetrieverEvent) -> RerankEvent: # Rerank the nodes ranker = LLMRerank( choice_batch_size=5, top_n=3, llm=OpenAI(model="gpt-4o-mini") ) print(await ctx.get("query", default=None), flush=True) new_nodes = ranker.postprocess_nodes( ev.nodes, query_str=await ctx.get("query", default=None) ) print(f"Reranked nodes to {len(new_nodes)}") return RerankEvent(nodes=new_nodes) @step async def synthesize(self, ctx: Context, ev: RerankEvent) -> StopEvent: """Return a streaming response using reranked nodes.""" llm = OpenAI(model="gpt-4o-mini") summarizer = CompactAndRefine(llm=llm, streaming=True, verbose=True) query = await ctx.get("query", default=None) response = await summarizer.asynthesize(query, nodes=ev.nodes) return StopEvent(result=response)
就这样!让我们稍微探索一下我们编写的工作流。
- 我们有两个入口点 (接受
StartEvent
的步骤) - 步骤本身决定何时可以运行
- 工作流上下文用于存储用户查询
- 节点传递过来,最后返回流式响应
运行工作流!¶
In [ ]
已复制!
w = RAGWorkflow()
# Ingest the documents
index = await w.run(dirname="data")
w = RAGWorkflow() # Ingest the documents index = await w.run(dirname="data")
In [ ]
已复制!
# Run a query
result = await w.run(query="How was Llama2 trained?", index=index)
async for chunk in result.async_response_gen():
print(chunk, end="", flush=True)
# Run a query result = await w.run(query="How was Llama2 trained?", index=index) async for chunk in result.async_response_gen(): print(chunk, end="", flush=True)
Query the database with: How was Llama2 trained? Retrieved 2 nodes. How was Llama2 trained? Reranked nodes to 2 Llama 2 was trained through a multi-step process that began with pretraining using publicly available online sources. This was followed by the creation of an initial version of Llama 2-Chat through supervised fine-tuning. The model was then iteratively refined using Reinforcement Learning with Human Feedback (RLHF) methodologies, which included rejection sampling and Proximal Policy Optimization (PPO). During pretraining, the model utilized an optimized auto-regressive transformer architecture, incorporating robust data cleaning, updated data mixes, and training on a significantly larger dataset of 2 trillion tokens. The training process also involved increased context length and the use of grouped-query attention (GQA) to enhance inference scalability. The training employed the AdamW optimizer with specific hyperparameters, including a cosine learning rate schedule and gradient clipping. The models were pretrained on Meta’s Research SuperCluster and internal production clusters, utilizing NVIDIA A100 GPUs.