摄取管道#
IngestionPipeline
使用了应用于输入数据的转换
概念。这些转换
应用于您的输入数据,生成的节点要么返回,要么插入到向量数据库(如果提供)。每个节点+转换对都会被缓存,因此后续使用相同节点+转换组合的运行(如果缓存已持久化)可以使用缓存结果,从而节省时间。
要查看 IngestionPipeline
的交互式使用示例,请查看 RAG CLI。
使用模式#
最简单的用法是实例化一个 IngestionPipeline
,如下所示
from llama_index.core import Document
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.extractors import TitleExtractor
from llama_index.core.ingestion import IngestionPipeline, IngestionCache
# create the pipeline with transformations
pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(chunk_size=25, chunk_overlap=0),
TitleExtractor(),
OpenAIEmbedding(),
]
)
# run the pipeline
nodes = pipeline.run(documents=[Document.example()])
请注意,在实际场景中,您将从 SimpleDirectoryReader
或 Llama Hub 的其他阅读器获取文档。
连接向量数据库#
运行摄取管道时,您还可以选择自动将生成的节点插入远程向量存储。
然后,您可以稍后从该向量存储构建索引。
from llama_index.core import Document
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.extractors import TitleExtractor
from llama_index.core.ingestion import IngestionPipeline
from llama_index.vector_stores.qdrant import QdrantVectorStore
import qdrant_client
client = qdrant_client.QdrantClient(location=":memory:")
vector_store = QdrantVectorStore(client=client, collection_name="test_store")
pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(chunk_size=25, chunk_overlap=0),
TitleExtractor(),
OpenAIEmbedding(),
],
vector_store=vector_store,
)
# Ingest directly into a vector db
pipeline.run(documents=[Document.example()])
# Create your index
from llama_index.core import VectorStoreIndex
index = VectorStoreIndex.from_vector_store(vector_store)
在管道中计算嵌入#
请注意,在上面的示例中,嵌入是作为管道的一部分计算的。如果您将管道连接到向量存储,嵌入必须是管道的一个阶段,否则您后续实例化索引将失败。
如果您未连接到向量存储,则可以从管道中省略嵌入,例如,仅生成节点列表。
缓存#
在 IngestionPipeline
中,每个节点 + 转换组合都会被哈希并缓存。这可以节省后续使用相同数据的运行时间。
以下部分介绍了一些关于缓存的基本用法。
本地缓存管理#
构建管道后,您可能希望存储和加载缓存。
# save
pipeline.persist("./pipeline_storage")
# load and restore state
new_pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(chunk_size=25, chunk_overlap=0),
TitleExtractor(),
],
)
new_pipeline.load("./pipeline_storage")
# will run instantly due to the cache
nodes = pipeline.run(documents=[Document.example()])
如果缓存过大,您可以清除它
# delete all context of the cache
cache.clear()
远程缓存管理#
我们支持多种远程存储后端作为缓存
RedisCache
MongoDBCache
FirestoreCache
这里是使用 RedisCache
的示例
from llama_index.core import Document
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.extractors import TitleExtractor
from llama_index.core.ingestion import IngestionPipeline, IngestionCache
from llama_index.storage.kvstore.redis import RedisKVStore as RedisCache
ingest_cache = IngestionCache(
cache=RedisCache.from_host_and_port(host="127.0.0.1", port=6379),
collection="my_test_cache",
)
pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(chunk_size=25, chunk_overlap=0),
TitleExtractor(),
OpenAIEmbedding(),
],
cache=ingest_cache,
)
# Ingest directly into a vector db
nodes = pipeline.run(documents=[Document.example()])
这里不需要持久化步骤,因为所有内容都会在指定的远程集合中即时缓存。
异步支持#
IngestionPipeline
也支持异步操作
nodes = await pipeline.arun(documents=documents)
文档管理#
将 docstore
附加到摄取管道将启用文档管理功能。
使用 document.doc_id
或 node.ref_doc_id
作为基础点,摄取管道将主动查找重复文档。
工作原理如下
- 存储
doc_id
->document_hash
的映射 - 如果附加了向量存储
- 如果检测到重复的
doc_id
且哈希已更改,文档将重新处理并 Upsert - 如果检测到重复的
doc_id
且哈希未更改,则跳过该节点 - 如果未附加向量存储
- 检查每个节点的所有现有哈希
- 如果发现重复,则跳过该节点
- 否则,处理该节点
注意:如果我们不附加向量存储,则只能检查并移除重复输入。
from llama_index.core.ingestion import IngestionPipeline
from llama_index.core.storage.docstore import SimpleDocumentStore
pipeline = IngestionPipeline(
transformations=[...], docstore=SimpleDocumentStore()
)
完整的演练可以在我们的 演示 notebook 中找到。
还可以查看另一篇使用 Redis 作为整个摄取堆栈的指南。
并行处理#
IngestionPipeline
的 run
方法可以使用并行进程执行。它通过使用 multiprocessing.Pool
将节点批次分发到处理器来实现。
要执行并行处理,请将 num_workers
设置为您希望使用的进程数
from llama_index.core.ingestion import IngestionPipeline
pipeline = IngestionPipeline(
transformations=[...],
)
pipeline.run(documents=[...], num_workers=4)