这个 notebook 演示了如何使用 LlamaIndex 工作流程实现 LongRAG。
In [ ]
import nest_asyncio
nest_asyncio.apply()
%pip install -U llama-index
import os
os.environ["OPENAI_API_KEY"] = "sk-proj-..."
!wget https://github.com/user-attachments/files/16474262/data.zip -O data.zip
!unzip -o data.zip
!rm data.zip
asyncio.run()
来启动一个异步事件循环。辅助函数¶
async def main():
<async code>
if __name__ == "__main__":
import asyncio
asyncio.run(main())
这些辅助函数将帮助我们将文档分割成更小的块,并根据它们的关系对节点进行分组。
from typing import List, Dict, Optional, Set, FrozenSet from llama_index.core.schema import BaseNode, TextNode from llama_index.core.node_parser import SentenceSplitter # constants DEFAULT_CHUNK_SIZE = 4096 # optionally splits documents into CHUNK_SIZE, then regroups them to demonstrate grouping algorithm DEFAULT_MAX_GROUP_SIZE = 20 # maximum number of documents in a group DEFAULT_SMALL_CHUNK_SIZE = 512 # small chunk size for generating embeddings DEFAULT_TOP_K = 8 # top k for retrieving def split_doc(chunk_size: int, documents: List[BaseNode]) -> List[TextNode]: """Splits documents into smaller pieces. Args: chunk_size (int): Chunk size documents (List[BaseNode]): Documents Returns: List[TextNode]: Smaller chunks """ # split docs into tokens text_parser = SentenceSplitter(chunk_size=chunk_size) return text_parser.get_nodes_from_documents(documents) def group_docs( nodes: List[str], adj: Dict[str, List[str]], max_group_size: Optional[int] = DEFAULT_MAX_GROUP_SIZE, ) -> Set[FrozenSet[str]]: """Groups documents. Args: nodes (List[str]): documents IDs adj (Dict[str, List[str]]): related documents for each document; id -> list of doc strings max_group_size (Optional[int], optional): max group size, None if no max group size. Defaults to DEFAULT_MAX_GROUP_SIZE. """ docs = sorted(nodes, key=lambda node: len(adj[node])) groups = set() # set of set of IDs for d in docs: related_groups = set() for r in adj[d]: for g in groups: if r in g: related_groups = related_groups.union(frozenset([g])) gnew = {d} related_groupsl = sorted(related_groups, key=lambda el: len(el)) for g in related_groupsl: if max_group_size is None or len(gnew) + len(g) <= max_group_size: gnew = gnew.union(g) if g in groups: groups.remove(g) groups.add(frozenset(gnew)) return groups def get_grouped_docs( nodes: List[TextNode], max_group_size: Optional[int] = DEFAULT_MAX_GROUP_SIZE, ) -> List[TextNode]: """Gets list of documents that are grouped. Args: nodes (t.List[TextNode]): Input list max_group_size (Optional[int], optional): max group size, None if no max group size. Defaults to DEFAULT_MAX_GROUP_SIZE. Returns: t.List[TextNode]: Output list """ # node IDs nodes_str = [node.id_ for node in nodes] # maps node ID -> related node IDs based on that node's relationships adj: Dict[str, List[str]] = { node.id_: [val.node_id for val in node.relationships.values()] for node in nodes } # node ID -> node nodes_dict = {node.id_: node for node in nodes} res = group_docs(nodes_str, adj, max_group_size) ret_nodes = [] for g in res: cur_node = TextNode() for node_id in g: cur_node.text += nodes_dict[node_id].text + "\n\n" cur_node.metadata.update(nodes_dict[node_id].metadata) ret_nodes.append(cur_node) return ret_nodes
from typing import List, Dict, Optional, Set, FrozenSet
from llama_index.core.schema import BaseNode, TextNode
from llama_index.core.node_parser import SentenceSplitter
# constants
DEFAULT_CHUNK_SIZE = 4096 # optionally splits documents into CHUNK_SIZE, then regroups them to demonstrate grouping algorithm
DEFAULT_MAX_GROUP_SIZE = 20 # maximum number of documents in a group
DEFAULT_SMALL_CHUNK_SIZE = 512 # small chunk size for generating embeddings
DEFAULT_TOP_K = 8 # top k for retrieving
def split_doc(chunk_size: int, documents: List[BaseNode]) -> List[TextNode]:
"""Splits documents into smaller pieces.
Args:
chunk_size (int): Chunk size
documents (List[BaseNode]): Documents
Returns:
List[TextNode]: Smaller chunks
"""
# split docs into tokens
text_parser = SentenceSplitter(chunk_size=chunk_size)
return text_parser.get_nodes_from_documents(documents)
def group_docs(
nodes: List[str],
adj: Dict[str, List[str]],
max_group_size: Optional[int] = DEFAULT_MAX_GROUP_SIZE,
) -> Set[FrozenSet[str]]:
"""Groups documents.
Args:
nodes (List[str]): documents IDs
adj (Dict[str, List[str]]): related documents for each document; id -> list of doc strings
max_group_size (Optional[int], optional): max group size, None if no max group size. Defaults to DEFAULT_MAX_GROUP_SIZE.
"""
docs = sorted(nodes, key=lambda node: len(adj[node]))
groups = set() # set of set of IDs
for d in docs:
related_groups = set()
for r in adj[d]:
for g in groups:
if r in g:
related_groups = related_groups.union(frozenset([g]))
gnew = {d}
related_groupsl = sorted(related_groups, key=lambda el: len(el))
for g in related_groupsl:
if max_group_size is None or len(gnew) + len(g) <= max_group_size:
gnew = gnew.union(g)
if g in groups:
groups.remove(g)
groups.add(frozenset(gnew))
return groups
def get_grouped_docs(
nodes: List[TextNode],
max_group_size: Optional[int] = DEFAULT_MAX_GROUP_SIZE,
) -> List[TextNode]:
"""Gets list of documents that are grouped.
Args:
nodes (t.List[TextNode]): Input list
max_group_size (Optional[int], optional): max group size, None if no max group size. Defaults to DEFAULT_MAX_GROUP_SIZE.
Returns:
t.List[TextNode]: Output list
"""
# node IDs
nodes_str = [node.id_ for node in nodes]
# maps node ID -> related node IDs based on that node's relationships
adj: Dict[str, List[str]] = {
node.id_: [val.node_id for val in node.relationships.values()]
for node in nodes
}
# node ID -> node
nodes_dict = {node.id_: node for node in nodes}
res = group_docs(nodes_str, adj, max_group_size)
ret_nodes = []
for g in res:
cur_node = TextNode()
for node_id in g:
cur_node.text += nodes_dict[node_id].text + "\n\n"
cur_node.metadata.update(nodes_dict[node_id].metadata)
ret_nodes.append(cur_node)
return ret_nodes
LongRAG 需要一个自定义检索器,如下所示
from llama_index.core.retrievers import BaseRetriever from llama_index.core.vector_stores.simple import BasePydanticVectorStore from llama_index.core.schema import QueryBundle, NodeWithScore from llama_index.core.vector_stores.types import VectorStoreQuery from llama_index.core.settings import Settings class LongRAGRetriever(BaseRetriever): """Long RAG Retriever.""" def __init__( self, grouped_nodes: List[TextNode], small_toks: List[TextNode], vector_store: BasePydanticVectorStore, similarity_top_k: int = DEFAULT_TOP_K, ) -> None: """Constructor. Args: grouped_nodes (List[TextNode]): Long retrieval units, nodes with docs grouped together based on relationships small_toks (List[TextNode]): Smaller tokens embed_model (BaseEmbedding, optional): Embed model. Defaults to None. similarity_top_k (int, optional): Similarity top k. Defaults to 8. """ self._grouped_nodes = grouped_nodes self._grouped_nodes_dict = {node.id_: node for node in grouped_nodes} self._small_toks = small_toks self._small_toks_dict = {node.id_: node for node in self._small_toks} self._similarity_top_k = similarity_top_k self._vec_store = vector_store self._embed_model = Settings.embed_model def _retrieve(self, query_bundle: QueryBundle) -> List[NodeWithScore]: """Retrieves. Args: query_bundle (QueryBundle): query bundle Returns: List[NodeWithScore]: nodes with scores """ # make query query_embedding = self._embed_model.get_query_embedding( query_bundle.query_str ) vector_store_query = VectorStoreQuery( query_embedding=query_embedding, similarity_top_k=500 ) # query for answer query_res = self._vec_store.query(vector_store_query) # determine top parents of most similar children (these are long retrieval units) top_parents_set: Set[str] = set() top_parents: List[NodeWithScore] = [] for id_, similarity in zip(query_res.ids, query_res.similarities): cur_node = self._small_toks_dict[id_] parent_id = cur_node.ref_doc_id if parent_id not in top_parents_set: top_parents_set.add(parent_id) parent_node = self._grouped_nodes_dict[parent_id] node_with_score = NodeWithScore( node=parent_node, score=similarity ) top_parents.append(node_with_score) if len(top_parents_set) >= self._similarity_top_k: break assert len(top_parents) == min( self._similarity_top_k, len(self._grouped_nodes) ) return top_parents
from llama_index.core.retrievers import BaseRetriever
from llama_index.core.vector_stores.simple import BasePydanticVectorStore
from llama_index.core.schema import QueryBundle, NodeWithScore
from llama_index.core.vector_stores.types import VectorStoreQuery
from llama_index.core.settings import Settings
class LongRAGRetriever(BaseRetriever):
"""Long RAG Retriever."""
def __init__(
self,
grouped_nodes: List[TextNode],
small_toks: List[TextNode],
vector_store: BasePydanticVectorStore,
similarity_top_k: int = DEFAULT_TOP_K,
) -> None:
"""Constructor.
Args:
grouped_nodes (List[TextNode]): Long retrieval units, nodes with docs grouped together based on relationships
small_toks (List[TextNode]): Smaller tokens
embed_model (BaseEmbedding, optional): Embed model. Defaults to None.
similarity_top_k (int, optional): Similarity top k. Defaults to 8.
"""
self._grouped_nodes = grouped_nodes
self._grouped_nodes_dict = {node.id_: node for node in grouped_nodes}
self._small_toks = small_toks
self._small_toks_dict = {node.id_: node for node in self._small_toks}
self._similarity_top_k = similarity_top_k
self._vec_store = vector_store
self._embed_model = Settings.embed_model
def _retrieve(self, query_bundle: QueryBundle) -> List[NodeWithScore]:
"""Retrieves.
Args:
query_bundle (QueryBundle): query bundle
Returns:
List[NodeWithScore]: nodes with scores
"""
# make query
query_embedding = self._embed_model.get_query_embedding(
query_bundle.query_str
)
vector_store_query = VectorStoreQuery(
query_embedding=query_embedding, similarity_top_k=500
)
# query for answer
query_res = self._vec_store.query(vector_store_query)
# determine top parents of most similar children (these are long retrieval units)
top_parents_set: Set[str] = set()
top_parents: List[NodeWithScore] = []
for id_, similarity in zip(query_res.ids, query_res.similarities):
cur_node = self._small_toks_dict[id_]
parent_id = cur_node.ref_doc_id
if parent_id not in top_parents_set:
top_parents_set.add(parent_id)
parent_node = self._grouped_nodes_dict[parent_id]
node_with_score = NodeWithScore(
node=parent_node, score=similarity
)
top_parents.append(node_with_score)
if len(top_parents_set) >= self._similarity_top_k:
break
assert len(top_parents) == min(
self._similarity_top_k, len(self._grouped_nodes)
)
return top_parents
LongRAG 包含以下步骤
摄取数据 — 根据文档关系对其进行分组并放入长检索单元中,将长检索单元分割成更小的 token 以生成嵌入,并对小型节点进行索引。
- 构建检索器和查询引擎。
- 给定字符串对数据进行查询。
- 我们定义了一个事件,将长检索单元和小型检索单元传递给检索器和查询引擎。
from typing import Iterable from llama_index.core import VectorStoreIndex from llama_index.core.llms import LLM from llama_index.core.workflow import Event class LoadNodeEvent(Event): """Event for loading nodes.""" small_nodes: Iterable[TextNode] grouped_nodes: list[TextNode] index: VectorStoreIndex similarity_top_k: int llm: LLM
from typing import Iterable
from llama_index.core import VectorStoreIndex
from llama_index.core.llms import LLM
from llama_index.core.workflow import Event
class LoadNodeEvent(Event):
"""Event for loading nodes."""
small_nodes: Iterable[TextNode]
grouped_nodes: list[TextNode]
index: VectorStoreIndex
similarity_top_k: int
llm: LLM
from llama_index.core.workflow import ( Workflow, step, StartEvent, StopEvent, Context, ) from llama_index.core import SimpleDirectoryReader from llama_index.core.query_engine import RetrieverQueryEngine class LongRAGWorkflow(Workflow): """Long RAG Workflow.""" @step async def ingest(self, ev: StartEvent) -> LoadNodeEvent | None: """Ingestion step. Args: ctx (Context): Context ev (StartEvent): start event Returns: StopEvent | None: stop event with result """ data_dir: str = ev.get("data_dir") llm: LLM = ev.get("llm") chunk_size: int | None = ev.get("chunk_size") similarity_top_k: int = ev.get("similarity_top_k") small_chunk_size: int = ev.get("small_chunk_size") index: VectorStoreIndex | None = ev.get("index") index_kwargs: dict[str, t.Any] | None = ev.get("index_kwargs") if any( i is None for i in [data_dir, llm, similarity_top_k, small_chunk_size] ): return None if not index: docs = SimpleDirectoryReader(data_dir).load_data() if chunk_size is not None: nodes = split_doc( chunk_size, docs ) # split documents into chunks of chunk_size grouped_nodes = get_grouped_docs( nodes ) # get list of nodes after grouping (groups are combined into one node), these are long retrieval units else: grouped_nodes = docs # split large retrieval units into smaller nodes small_nodes = split_doc(small_chunk_size, grouped_nodes) index_kwargs = index_kwargs or {} index = VectorStoreIndex(small_nodes, **index_kwargs) else: # get smaller nodes from index and form large retrieval units from these nodes small_nodes = index.docstore.docs.values() grouped_nodes = get_grouped_docs(small_nodes, None) return LoadNodeEvent( small_nodes=small_nodes, grouped_nodes=grouped_nodes, index=index, similarity_top_k=similarity_top_k, llm=llm, ) @step async def make_query_engine( self, ctx: Context, ev: LoadNodeEvent ) -> StopEvent: """Query engine construction step. Args: ctx (Context): context ev (LoadNodeEvent): event Returns: StopEvent: stop event """ # make retriever and query engine retriever = LongRAGRetriever( grouped_nodes=ev.grouped_nodes, small_toks=ev.small_nodes, similarity_top_k=ev.similarity_top_k, vector_store=ev.index.vector_store, ) query_eng = RetrieverQueryEngine.from_args(retriever, ev.llm) return StopEvent( result={ "retriever": retriever, "query_engine": query_eng, "index": ev.index, } ) @step async def query(self, ctx: Context, ev: StartEvent) -> StopEvent | None: """Query step. Args: ctx (Context): context ev (StartEvent): start event Returns: StopEvent | None: stop event with result """ query_str: str | None = ev.get("query_str") query_eng = ev.get("query_eng") if query_str is None: return None result = query_eng.query(query_str) return StopEvent(result=result)
from llama_index.core.workflow import (
Workflow,
step,
StartEvent,
StopEvent,
Context,
)
from llama_index.core import SimpleDirectoryReader
from llama_index.core.query_engine import RetrieverQueryEngine
class LongRAGWorkflow(Workflow):
"""Long RAG Workflow."""
@step
async def ingest(self, ev: StartEvent) -> LoadNodeEvent | None:
"""Ingestion step.
Args:
ctx (Context): Context
ev (StartEvent): start event
Returns:
StopEvent | None: stop event with result
"""
data_dir: str = ev.get("data_dir")
llm: LLM = ev.get("llm")
chunk_size: int | None = ev.get("chunk_size")
similarity_top_k: int = ev.get("similarity_top_k")
small_chunk_size: int = ev.get("small_chunk_size")
index: VectorStoreIndex | None = ev.get("index")
index_kwargs: dict[str, t.Any] | None = ev.get("index_kwargs")
if any(
i is None
for i in [data_dir, llm, similarity_top_k, small_chunk_size]
):
return None
if not index:
docs = SimpleDirectoryReader(data_dir).load_data()
if chunk_size is not None:
nodes = split_doc(
chunk_size, docs
) # split documents into chunks of chunk_size
grouped_nodes = get_grouped_docs(
nodes
) # get list of nodes after grouping (groups are combined into one node), these are long retrieval units
else:
grouped_nodes = docs
# split large retrieval units into smaller nodes
small_nodes = split_doc(small_chunk_size, grouped_nodes)
index_kwargs = index_kwargs or {}
index = VectorStoreIndex(small_nodes, **index_kwargs)
else:
# get smaller nodes from index and form large retrieval units from these nodes
small_nodes = index.docstore.docs.values()
grouped_nodes = get_grouped_docs(small_nodes, None)
return LoadNodeEvent(
small_nodes=small_nodes,
grouped_nodes=grouped_nodes,
index=index,
similarity_top_k=similarity_top_k,
llm=llm,
)
@step
async def make_query_engine(
self, ctx: Context, ev: LoadNodeEvent
) -> StopEvent:
"""Query engine construction step.
Args:
ctx (Context): context
ev (LoadNodeEvent): event
Returns:
StopEvent: stop event
"""
# make retriever and query engine
retriever = LongRAGRetriever(
grouped_nodes=ev.grouped_nodes,
small_toks=ev.small_nodes,
similarity_top_k=ev.similarity_top_k,
vector_store=ev.index.vector_store,
)
query_eng = RetrieverQueryEngine.from_args(retriever, ev.llm)
return StopEvent(
result={
"retriever": retriever,
"query_engine": query_eng,
"index": ev.index,
}
)
@step
async def query(self, ctx: Context, ev: StartEvent) -> StopEvent | None:
"""Query step.
Args:
ctx (Context): context
ev (StartEvent): start event
Returns:
StopEvent | None: stop event with result
"""
query_str: str | None = ev.get("query_str")
query_eng = ev.get("query_eng")
if query_str is None:
return None
result = query_eng.query(query_str)
return StopEvent(result=result)
有两种入口点:一个用于摄取和索引,另一个用于查询。
- 在摄取时,它首先读取文档,将其分割成更小的节点,并对它们进行索引。之后,它发送一个
LoadNodeEvent
,触发make_query_engine
的执行,从节点构建检索器和查询引擎。它返回检索器、查询引擎和索引的结果。 - 在查询时,它从
StartEvent
获取查询,将其输入到上下文中的查询引擎,并返回查询结果。 - 上下文用于存储查询引擎。
- 运行工作流程¶
from llama_index.llms.openai import OpenAI wf = LongRAGWorkflow(timeout=60) llm = OpenAI("gpt-4o") data_dir = "data" # initialize the workflow result = await wf.run( data_dir=data_dir, llm=llm, chunk_size=DEFAULT_CHUNK_SIZE, similarity_top_k=DEFAULT_TOP_K, small_chunk_size=DEFAULT_SMALL_CHUNK_SIZE, )
from llama_index.llms.openai import OpenAI
wf = LongRAGWorkflow(timeout=60)
llm = OpenAI("gpt-4o")
data_dir = "data"
# initialize the workflow
result = await wf.run(
data_dir=data_dir,
llm=llm,
chunk_size=DEFAULT_CHUNK_SIZE,
similarity_top_k=DEFAULT_TOP_K,
small_chunk_size=DEFAULT_SMALL_CHUNK_SIZE,
)
from IPython.display import display, Markdown
# run a query
res = await wf.run(
query_str="How can Pittsburgh become a startup hub, and what are the two types of moderates?",
query_eng=result["query_engine"],
)
display(Markdown(str(res)))
温和派有两种类型:意向性温和派和偶然性温和派。意向性温和派故意选择位于左右两翼极端之间的立场,而偶然性温和派则在每个问题上独立形成自己的观点,这导致观点范围广泛,平均下来表现为温和立场。
返回顶部