Redis 摄取管道¶
本指南展示了如何在摄取管道中将 Redis 用作向量存储、缓存和文档存储。
依赖项¶
安装并启动 redis,设置 OpenAI API 密钥
输入 [ ]
已复制!
%pip install llama-index-storage-docstore-redis
%pip install llama-index-vector-stores-redis
%pip install llama-index-embeddings-huggingface
%pip install llama-index-storage-docstore-redis %pip install llama-index-vector-stores-redis %pip install llama-index-embeddings-huggingface
输入 [ ]
已复制!
!docker run -d --name redis-stack -p 6379:6379 -p 8001:8001 redis/redis-stack:latest
!docker run -d --name redis-stack -p 6379:6379 -p 8001:8001 redis/redis-stack:latest
338c889086e8649aa80dfb79ebff4fffc98d72fc6d988ac158c6662e9e0cf04b
输入 [ ]
已复制!
import os
os.environ["OPENAI_API_KEY"] = "sk-..."
os.environ["TOKENIZERS_PARALLELISM"] = "false"
import os os.environ["OPENAI_API_KEY"] = "sk-..." os.environ["TOKENIZERS_PARALLELISM"] = "false"
创建种子数据¶
输入 [ ]
已复制!
# Make some test data
!rm -rf test_redis_data
!mkdir -p test_redis_data
!echo "This is a test file: one!" > test_redis_data/test1.txt
!echo "This is a test file: two!" > test_redis_data/test2.txt
# 创建一些测试数据 !rm -rf test_redis_data !mkdir -p test_redis_data !echo "This is a test file: one!" > test_redis_data/test1.txt !echo "This is a test file: two!" > test_redis_data/test2.txt
输入 [ ]
已复制!
from llama_index.core import SimpleDirectoryReader
# load documents with deterministic IDs
documents = SimpleDirectoryReader(
"./test_redis_data", filename_as_id=True
).load_data()
from llama_index.core import SimpleDirectoryReader # 使用确定性 ID 加载文档 documents = SimpleDirectoryReader( "./test_redis_data", filename_as_id=True ).load_data()
输入 [ ]
已复制!
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.core.ingestion import (
DocstoreStrategy,
IngestionPipeline,
IngestionCache,
)
from llama_index.storage.kvstore.redis import RedisKVStore as RedisCache
from llama_index.storage.docstore.redis import RedisDocumentStore
from llama_index.core.node_parser import SentenceSplitter
from llama_index.vector_stores.redis import RedisVectorStore
from redisvl.schema import IndexSchema
embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5")
custom_schema = IndexSchema.from_dict(
{
"index": {"name": "redis_vector_store", "prefix": "doc"},
# customize fields that are indexed
"fields": [
# required fields for llamaindex
{"type": "tag", "name": "id"},
{"type": "tag", "name": "doc_id"},
{"type": "text", "name": "text"},
# custom vector field for bge-small-en-v1.5 embeddings
{
"type": "vector",
"name": "vector",
"attrs": {
"dims": 384,
"algorithm": "hnsw",
"distance_metric": "cosine",
},
},
],
}
)
from llama_index.embeddings.huggingface import HuggingFaceEmbedding from llama_index.core.ingestion import ( DocstoreStrategy, IngestionPipeline, IngestionCache, ) from llama_index.storage.kvstore.redis import RedisKVStore as RedisCache from llama_index.storage.docstore.redis import RedisDocumentStore from llama_index.core.node_parser import SentenceSplitter from llama_index.vector_stores.redis import RedisVectorStore from redisvl.schema import IndexSchema embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5") custom_schema = IndexSchema.from_dict( { "index": {"name": "redis_vector_store", "prefix": "doc"}, # 自定义索引字段 "fields": [ # llamaindex 的必需字段 {"type": "tag", "name": "id"}, {"type": "tag", "name": "doc_id"}, {"type": "text", "name": "text"}, # bge-small-en-v1.5 嵌入的自定义向量字段 { "type": "vector", "name": "vector", "attrs": { "dims": 384, "algorithm": "hnsw", "distance_metric": "cosine", }, }, ], } )
输入 [ ]
已复制!
pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(),
embed_model,
],
docstore=RedisDocumentStore.from_host_and_port(
"localhost", 6379, namespace="document_store"
),
vector_store=RedisVectorStore(
schema=custom_schema,
redis_url="redis://localhost:6379",
),
cache=IngestionCache(
cache=RedisCache.from_host_and_port("localhost", 6379),
collection="redis_cache",
),
docstore_strategy=DocstoreStrategy.UPSERTS,
)
pipeline = IngestionPipeline( transformations=[ SentenceSplitter(), embed_model, ], docstore=RedisDocumentStore.from_host_and_port( "localhost", 6379, namespace="document_store" ), vector_store=RedisVectorStore( schema=custom_schema, redis_url="redis://localhost:6379", ), cache=IngestionCache( cache=RedisCache.from_host_and_port("localhost", 6379), collection="redis_cache", ), docstore_strategy=DocstoreStrategy.UPSERTS, )
输入 [ ]
已复制!
nodes = pipeline.run(documents=documents)
print(f"Ingested {len(nodes)} Nodes")
nodes = pipeline.run(documents=documents) print(f"已摄取 {len(nodes)} 个节点")
Ingested 2 Nodes
确认文档已摄取¶
我们可以使用向量存储创建向量索引,并快速询问看到了哪些文档。
输入 [ ]
已复制!
from llama_index.core import VectorStoreIndex
index = VectorStoreIndex.from_vector_store(
pipeline.vector_store, embed_model=embed_model
)
from llama_index.core import VectorStoreIndex index = VectorStoreIndex.from_vector_store( pipeline.vector_store, embed_model=embed_model )
输入 [ ]
已复制!
print(
index.as_query_engine(similarity_top_k=10).query(
"What documents do you see?"
)
)
print( index.as_query_engine(similarity_top_k=10).query( "你看到了哪些文档?" ) )
I see two documents.
添加数据并摄取¶
在这里,我们可以更新现有文件,并添加新文件!
输入 [ ]
已复制!
!echo "This is a test file: three!" > test_redis_data/test3.txt
!echo "This is a NEW test file: one!" > test_redis_data/test1.txt
!echo "This is a test file: three!" > test_redis_data/test3.txt !echo "This is a NEW test file: one!" > test_redis_data/test1.txt
输入 [ ]
已复制!
documents = SimpleDirectoryReader(
"./test_redis_data", filename_as_id=True
).load_data()
nodes = pipeline.run(documents=documents)
print(f"Ingested {len(nodes)} Nodes")
documents = SimpleDirectoryReader( "./test_redis_data", filename_as_id=True ).load_data() nodes = pipeline.run(documents=documents) print(f"已摄取 {len(nodes)} 个节点")
13:32:07 redisvl.index.index INFO Index already exists, not overwriting. Ingested 2 Nodes
输入 [ ]
已复制!
index = VectorStoreIndex.from_vector_store(
pipeline.vector_store, embed_model=embed_model
)
response = index.as_query_engine(similarity_top_k=10).query(
"What documents do you see?"
)
print(response)
for node in response.source_nodes:
print(node.get_text())
index = VectorStoreIndex.from_vector_store( pipeline.vector_store, embed_model=embed_model ) response = index.as_query_engine(similarity_top_k=10).query( "你看到了哪些文档?" ) print(response) for node in response.source_nodes: print(node.get_text())
You see three documents: test3.txt, test1.txt, and test2.txt. This is a test file: three! This is a NEW test file: one! This is a test file: two!
正如我们所见,数据被正确去重和 upsert 了!即使我们完整运行了两次管道,索引中也只有三个节点。