Nile 向量存储(多租户 PostgreSQL)¶
本Notebook展示了如何使用基于Postgres的向量存储NileVectorStore
来存储和查询向量嵌入,用于多租户RAG应用。
什么是Nile?¶
Nile是一个Postgres数据库,它为每个租户提供了所有的数据库操作能力,包括自动伸缩、分支和备份,同时实现完全的客户隔离。
多租户RAG应用越来越受欢迎,因为它们在使用大语言模型的同时提供了安全性和隐私保护。
然而,管理底层的Postgres数据库并不简单。每个租户一个数据库(DB-per-tenant)成本高且管理复杂,而共享数据库(shared-DB)存在安全和隐私问题,并且限制了RAG应用的可扩展性和性能。Nile重新设计了Postgres,以提供两全其美的优势——既有DB-per-tenant的隔离性,又有shared-DB的成本效益、效率和开发者体验。
在共享数据库中存储数百万向量可能会很慢,并且需要大量资源进行索引和查询。但是,如果您在Nile的虚拟租户数据库中存储1000个租户,每个租户拥有1000个向量,这将变得非常易于管理。特别是您可以将较大的租户放在自己的计算资源上,而较小的租户可以有效地共享计算资源并按需自动伸缩。
Nile入门指南¶
首先注册 Nile。注册完成后,您将被提示创建您的第一个数据库。请继续创建。您将被重定向到新数据库的“查询编辑器”页面。
在 वहां,点击“主页”(左侧菜单顶部的图标),点击“生成凭据”,并复制生成的连接字符串。稍后您会需要它。
更多资源¶
%pip install llama-index-vector-stores-nile
%pip install /Users/gwen/workspaces/llama_index/llama-index-integrations/vector_stores/llama-index-vector-stores-nile/dist/llama_index_vector_stores_nile-0.1.1.tar.gz
!pip install llama-index
import logging
from llama_index.core import SimpleDirectoryReader, StorageContext
from llama_index.core import VectorStoreIndex
from llama_index.core.vector_stores import (
MetadataFilter,
MetadataFilters,
FilterOperator,
)
from llama_index.vector_stores.nile import NileVectorStore, IndexType
设置与Nile数据库的连接¶
假设您已遵循上一节“Nile入门指南”中的说明,您现在应该拥有与Nile数据库的连接字符串。
您可以将其设置为名为NILEDB_SERVICE_URL
的环境变量,或直接在Python中设置。
%env NILEDB_SERVICE_URL=postgresql://username:password@us-west-2.db.thenile.dev:5432/niledb
现在,我们将创建一个NileVectorStore
。请注意,除了URL和维度等常规参数外,我们还设置了tenant_aware=True
。
:fire: NileVectorStore
同时支持租户感知型向量存储(隔离每个租户的文档)和常规存储(通常用于所有租户都可以访问的共享数据)。下面,我们将演示租户感知型向量存储。
# Get the service url by reading local .env file with NILE_SERVICE_URL variable
import os
NILEDB_SERVICE_URL = os.environ["NILEDB_SERVICE_URL"]
# OR set it explicitly
# NILE_SERVICE_URL = "postgresql://nile:[email protected]:5432/nile"
vector_store = NileVectorStore(
service_url=NILEDB_SERVICE_URL,
table_name="documents",
tenant_aware=True,
num_dimensions=1536,
)
设置OpenAI¶
您可以将其设置在.env文件中,或直接在Python中设置。
%env OPENAI_API_KEY=sk-...
# Uncomment and set it explicitly if you prefer not to use .env
# os.environ["OPENAI_API_KEY"] = "sk-..."
多租户相似度搜索¶
为了演示使用LlamaIndex和Nile进行多租户相似度搜索,我们将下载两份文档——每份文档包含不同公司的销售电话录音稿。Nexiv提供IT服务,而ModaMart从事零售业。我们将为每份文档添加租户标识符,并将其加载到租户感知型向量存储中。然后,我们将针对每个租户查询存储。您将看到同一个问题如何产生两种不同的响应,因为针对每个租户检索到的是不同的文档。
下载数据¶
!mkdir -p data
!wget "https://raw.githubusercontent.com/niledatabase/niledatabase/main/examples/ai/sales_insight/data/transcripts/nexiv-solutions__0_transcript.txt" -O "data/nexiv-solutions__0_transcript.txt"
!wget "https://raw.githubusercontent.com/niledatabase/niledatabase/main/examples/ai/sales_insight/data/transcripts/modamart__0_transcript.txt" -O "data/modamart__0_transcript.txt"
加载文档¶
我们将使用LlamaIndex的SimpleDirectoryReader
来加载文档。因为我们想在加载后使用租户元数据更新文档,所以我们将为每个租户使用单独的阅读器。
reader = SimpleDirectoryReader(
input_files=["data/nexiv-solutions__0_transcript.txt"]
)
documents_nexiv = reader.load_data()
reader = SimpleDirectoryReader(input_files=["data/modamart__0_transcript.txt"])
documents_modamart = reader.load_data()
使用租户元数据丰富文档¶
我们将创建两个Nile租户,并将每个租户的租户ID添加到文档元数据中。我们还添加了一些额外的元数据,例如自定义文档ID和类别。这些元数据可以在检索过程中用于过滤文档。当然,在您自己的应用程序中,您也可以加载现有租户的文档并添加您认为有用的任何元数据信息。
tenant_id_nexiv = str(vector_store.create_tenant("nexiv-solutions"))
tenant_id_modamart = str(vector_store.create_tenant("modamart"))
# Add the tenant id to the metadata
for i, doc in enumerate(documents_nexiv, start=1):
doc.metadata["tenant_id"] = tenant_id_nexiv
doc.metadata[
"category"
] = "IT" # We will use this to apply additional filters in a later example
doc.id_ = f"nexiv_doc_id_{i}" # We are also setting a custom id, this is optional but can be useful
for i, doc in enumerate(documents_modamart, start=1):
doc.metadata["tenant_id"] = tenant_id_modamart
doc.metadata["category"] = "Retail"
doc.id_ = f"modamart_doc_id_{i}"
使用NileVectorStore
创建VectorStore索引¶
我们将所有文档加载到同一个VectorStoreIndex
中。由于我们在设置时创建了租户感知型NileVectorStore
,Nile将正确使用元数据中的tenant_id
字段来隔离它们。
将不带tenant_id
的文档加载到租户感知型存储将抛出ValueException
。
storage_context = StorageContext.from_defaults(vector_store=vector_store)
index = VectorStoreIndex.from_documents(
documents_nexiv + documents_modamart,
storage_context=storage_context,
show_progress=True,
)
/Users/gwen/.pyenv/versions/3.10.15/lib/python3.10/site-packages/tqdm/auto.py:21: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html from .autonotebook import tqdm as notebook_tqdm Parsing nodes: 100%|██████████| 2/2 [00:00<00:00, 1129.32it/s] Generating embeddings: 100%|██████████| 2/2 [00:00<00:00, 4.58it/s]
查询每个租户的索引¶
您可以在下面看到我们如何为每个查询指定租户,从而获得与该租户相关且仅针对该租户的答案。
nexiv_query_engine = index.as_query_engine(
similarity_top_k=3,
vector_store_kwargs={
"tenant_id": str(tenant_id_nexiv),
},
)
print(nexiv_query_engine.query("What were the customer pain points?"))
The customer pain points were related to managing customer data using multiple platforms, leading to data discrepancies, time-consuming reconciliation efforts, and decreased productivity.
modamart_query_engine = index.as_query_engine(
similarity_top_k=3,
vector_store_kwargs={
"tenant_id": str(tenant_id_modamart),
},
)
print(modamart_query_engine.query("What were the customer pain points?"))
The customer's pain points were concerns about the quality and value of the winter jackets, skepticism towards reviews, worries about sizing and fit when ordering clothes online, and the desire for a warm but lightweight jacket.
查询现有嵌入¶
在上面的示例中,我们通过加载和嵌入新文档创建了索引。但是如果我们已经生成了嵌入并将其存储在Nile中怎么办?在这种情况下,您仍然像上面一样初始化NileVectorStore
,但不再使用VectorStoreIndex.from_documents(...)
,而是使用以下代码:
index = VectorStoreIndex.from_vector_store(vector_store=vector_store)
query_engine = index.as_query_engine(
vector_store_kwargs={
"tenant_id": str(tenant_id_modamart),
},
)
response = query_engine.query("What action items do we need to follow up on?")
print(response)
The action items to follow up on include sending the customer detailed testimonials about the lightweight and warm qualities of the jackets, providing the customer with a sizing guide, and emailing the customer a 10% discount on their first purchase.
使用ANN索引进行近似最近邻搜索¶
Nile支持pgvector支持的所有索引——IVFFlat和HNSW。IVFFlat更快,使用更少资源且易于调优。HNSW创建和使用时需要更多资源,调优更具挑战性,但具有很好的准确性/速度权衡。让我们看看如何使用索引,即使一个只有两份文档的示例实际上并不需要它们。
IVFFlat索引¶
IVFFlat索引的工作原理是将向量空间分成称为“列表”的区域,首先找到最近的列表,然后在这些列表中搜索最近邻。您在索引创建期间指定列表的数量(nlists
),然后在查询时,您可以指定在搜索中使用多少个最近的列表(ivfflat_probes
)。
try:
vector_store.create_index(index_type=IndexType.PGVECTOR_IVFFLAT, nlists=10)
except Exception as e:
# This will throw an error if the index already exists, which may be expected
print(e)
nexiv_query_engine = index.as_query_engine(
similarity_top_k=3,
vector_store_kwargs={
"tenant_id": str(tenant_id_nexiv),
"ivfflat_probes": 10,
},
)
print(
nexiv_query_engine.query("What action items do we need to follow up on?")
)
vector_store.drop_index()
Index documents_embedding_idx already exists
HNSW索引¶
HNSW索引的工作原理是将向量空间分成一个多层图,其中每一层包含不同粒度级别点之间的连接。在搜索过程中,它从粗粒度层导航到更细粒度层,识别数据中的最近邻。在索引创建期间,您指定一层中的最大连接数(m
)和构建图时考虑的候选向量数(ef_construction
)。在查询时,您可以指定将搜索的候选列表的大小(hnsw_ef
)。
try:
vector_store.create_index(
index_type=IndexType.PGVECTOR_HNSW, m=16, ef_construction=64
)
except Exception as e:
# This will throw an error if the index already exists, which may be expected
print(e)
nexiv_query_engine = index.as_query_engine(
similarity_top_k=3,
vector_store_kwargs={
"tenant_id": str(tenant_id_nexiv),
"hnsw_ef": 10,
},
)
print(nexiv_query_engine.query("Did we mention any pricing?"))
vector_store.drop_index()
filters = MetadataFilters(
filters=[
MetadataFilter(
key="category", operator=FilterOperator.EQ, value="Retail"
),
]
)
nexiv_query_engine_filtered = index.as_query_engine(
similarity_top_k=3,
filters=filters,
vector_store_kwargs={"tenant_id": str(tenant_id_nexiv)},
)
print(
"test query on nexiv with filter on category = Retail (should return empty): ",
nexiv_query_engine_filtered.query("What were the customer pain points?"),
)
test query on nexiv with filter on category = Retail (should return empty): Empty Response
删除文档¶
删除文档非常重要。特别是如果您的某些租户位于需要遵循GDPR法规的地区。
ref_doc_id = "nexiv_doc_id_1"
vector_store.delete(ref_doc_id, tenant_id=tenant_id_nexiv)
# Query the data again
print(
"test query on nexiv after deletion (should return empty): ",
nexiv_query_engine.query("What were the customer pain points?"),
)
test query on nexiv after deletion (should return empty): Empty Response