本指南将向您展示如何在 Google Drive 文件上构建“实时” RAG 管线。
该管线将对 Google Drive 文件建立索引,并将其导入到 Redis 向量存储中。之后,每次重新运行摄取管线时,管线都会传播增量更新,以便只有更改的文档在向量存储中得到更新。这意味着我们无需重新索引所有文档!
我们使用以下数据源 - 您需要复制这些文件并上传到您自己的 Google Drive 目录中!
注意:您还需要设置一个服务帐号和 credentials.json 文件。有关 Google Drive 加载器的更多详细信息,请参阅我们的 LlamaHub 页面:https://llamahub.ai/l/readers/llama-index-readers-google?from=readers
设置¶
我们安装所需的软件包并启动 Redis Docker 镜像。
In [ ]
%pip install llama-index-storage-docstore-redis
%pip install llama-index-vector-stores-redis
%pip install llama-index-embeddings-huggingface
%pip install llama-index-readers-google
# if creating a new container
!docker run -d --name redis-stack -p 6379:6379 -p 8001:8001 redis/redis-stack:latest
# # if starting an existing container
# !docker start -a redis-stack
d32273cc1267d3221afa780db0edcd6ce5eee08ae88886f645410b9a220d4916
import os
os.environ["OPENAI_API_KEY"] = "sk-..."
这里我们定义摄取管线。给定一组文档,我们将运行句子分割/嵌入转换,然后将它们加载到 Redis 文档存储/向量存储中。
向量存储用于索引数据和存储嵌入,文档存储用于追踪重复项。
from llama_index.embeddings.huggingface import HuggingFaceEmbedding from llama_index.core.ingestion import ( DocstoreStrategy, IngestionPipeline, IngestionCache, ) from llama_index.storage.kvstore.redis import RedisKVStore as RedisCache from llama_index.storage.docstore.redis import RedisDocumentStore from llama_index.core.node_parser import SentenceSplitter from llama_index.vector_stores.redis import RedisVectorStore from redisvl.schema import IndexSchema
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.core.ingestion import (
DocstoreStrategy,
IngestionPipeline,
IngestionCache,
)
from llama_index.storage.kvstore.redis import RedisKVStore as RedisCache
from llama_index.storage.docstore.redis import RedisDocumentStore
from llama_index.core.node_parser import SentenceSplitter
from llama_index.vector_stores.redis import RedisVectorStore
from redisvl.schema import IndexSchema
embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5")
model.safetensors: 0%| | 0.00/133M [00:00<?, ?B/s]
tokenizer_config.json: 0%| | 0.00/366 [00:00<?, ?B/s]
vocab.txt: 0%| | 0.00/232k [00:00<?, ?B/s]
tokenizer.json: 0%| | 0.00/711k [00:00<?, ?B/s]
special_tokens_map.json: 0%| | 0.00/125 [00:00<?, ?B/s]
1_Pooling/config.json: 0%| | 0.00/190 [00:00<?, ?B/s]
custom_schema = IndexSchema.from_dict(
{
"index": {"name": "gdrive", "prefix": "doc"},
# customize fields that are indexed
"fields": [
# required fields for llamaindex
{"type": "tag", "name": "id"},
{"type": "tag", "name": "doc_id"},
{"type": "text", "name": "text"},
# custom vector field for bge-small-en-v1.5 embeddings
{
"type": "vector",
"name": "vector",
"attrs": {
"dims": 384,
"algorithm": "hnsw",
"distance_metric": "cosine",
},
},
],
}
)
vector_store = RedisVectorStore(
schema=custom_schema,
redis_url="redis://localhost:6379",
)
# Optional: clear vector store if exists
if vector_store.index_exists():
vector_store.delete_index()
# Set up the ingestion cache layer
cache = IngestionCache(
cache=RedisCache.from_host_and_port("localhost", 6379),
collection="redis_cache",
)
pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(),
embed_model,
],
docstore=RedisDocumentStore.from_host_and_port(
"localhost", 6379, namespace="document_store"
),
vector_store=vector_store,
cache=cache,
docstore_strategy=DocstoreStrategy.UPSERTS,
)
我们定义索引来封装底层的向量存储。
from llama_index.core import VectorStoreIndex index = VectorStoreIndex.from_vector_store( pipeline.vector_store, embed_model=embed_model )
from llama_index.core import VectorStoreIndex
index = VectorStoreIndex.from_vector_store(
pipeline.vector_store, embed_model=embed_model
)
这里我们从 LlamaHub 上的 Google Drive 加载器加载数据。
加载的文档是来自我们文档中使用案例的标题部分。
from llama_index.readers.google import GoogleDriveReader
from llama_index.readers.google import GoogleDriveReader
loader = GoogleDriveReader()
def load_data(folder_id: str):
docs = loader.load_data(folder_id=folder_id)
for doc in docs:
doc.id_ = doc.metadata["file_name"]
return docs
docs = load_data(folder_id="1RFhr3-KmOZCR5rtp4dlOMNl3LKe1kOA5")
# print(docs)
nodes = pipeline.run(documents=docs)
print(f"Ingested {len(nodes)} Nodes")
对初始数据进行提问¶
query_engine = index.as_query_engine()
query_engine = index.as_query_engine()
response = query_engine.query("What are the sub-types of question answering?")
print(str(response))
The sub-types of question answering mentioned in the context are semantic search and summarization.
让我们尝试修改已摄取的数据!
我们修改了“问答”文档,增加了一段“结构化分析”文本。请参考我们的更新后的文档。
现在让我们重新运行摄取管线。
docs = load_data(folder_id="1RFhr3-KmOZCR5rtp4dlOMNl3LKe1kOA5") nodes = pipeline.run(documents=docs) print(f"已摄取 {len(nodes)} 个节点")
docs = load_data(folder_id="1RFhr3-KmOZCR5rtp4dlOMNl3LKe1kOA5")
nodes = pipeline.run(documents=docs)
print(f"Ingested {len(nodes)} Nodes")
对新数据进行提问¶
返回顶部
query_engine = index.as_query_engine()
response = query_engine.query("What are the sub-types of question answering?")
print(str(response))
The sub-types of question answering mentioned in the context are semantic search, summarization, and structured analytics.