Pathway 检索器¶
Pathway 是一个开源数据处理框架。它允许您轻松开发处理实时数据源和变化数据的 数据转换管道和机器学习应用。
本笔记本演示了如何使用使用 LlamaIndex
的实时数据索引管道。您可以使用提供的 PathwayRetriever
从您的 LLM 应用查询此管道的结果。然而,在底层,Pathway 会在每次数据变化时更新索引,为您提供始终最新的答案。
在本笔记本中,我们将使用一个公共演示文档处理管道,它
- 监控多个云数据源以检测数据变化。
- 为数据构建向量索引。
要拥有自己的文档处理管道,请查看托管服务或按照本笔记本构建您自己的。
我们将使用 llama_index.retrievers.pathway.PathwayRetriever
检索器连接到索引,它实现了 retrieve
接口。
本文档描述的基本管道可以轻松构建存储在云位置的文件简单索引。然而,Pathway 提供了构建实时数据管道和应用所需的一切,包括类似 SQL 的操作,例如不同数据源之间的分组-归约和连接,基于时间的数据分组和窗口化,以及广泛的连接器。
有关 Pathway 数据摄取管道和向量存储的更多详细信息,请访问向量存储管道。
前提条件¶
要使用 PathwayRetrievier
,您必须安装 llama-index-retrievers-pathway
包。
!pip install llama-index-retrievers-pathway
为 llama-index 创建检索器¶
要实例化和配置 PathwayRetriever
,您需要提供文档索引管道的 url
或 host
和 port
。在下面的代码中,我们使用一个公开可用的演示管道,您可以在 https://demo-document-indexing.pathway.stream
访问其 REST API。此演示从Google Drive 和Sharepoint 摄取文档,并维护一个用于检索文档的索引。
from llama_index.retrievers.pathway import PathwayRetriever
retriever = PathwayRetriever(
url="https://demo-document-indexing.pathway.stream"
)
retriever.retrieve(str_or_query_bundle="what is pathway")
在查询引擎中使用¶
from llama_index.core.query_engine import RetrieverQueryEngine
query_engine = RetrieverQueryEngine.from_args(
retriever,
)
response = query_engine.query("Tell me about Pathway")
print(str(response))
构建您自己的数据处理管道¶
前提条件¶
安装 pathway
包。然后下载示例数据。
%pip install pathway
%pip install llama-index-embeddings-openai
!mkdir -p 'data/'
!wget 'https://gist.githubusercontent.com/janchorowski/dd22a293f3d99d1b726eedc7d46d2fc0/raw/pathway_readme.md' -O 'data/pathway_readme.md'
定义 Pathway 跟踪的数据源¶
Pathway 可以同时监听许多来源,例如本地文件、S3 文件夹、云存储以及任何用于数据变化的数据流。
有关更多信息,请参阅pathway-io。
import pathway as pw
data_sources = []
data_sources.append(
pw.io.fs.read(
"./data",
format="binary",
mode="streaming",
with_metadata=True,
) # This creates a `pathway` connector that tracks
# all the files in the ./data directory
)
# This creates a connector that tracks files in Google drive.
# please follow the instructions at https://pathway.com/developers/tutorials/connectors/gdrive-connector/ to get credentials
# data_sources.append(
# pw.io.gdrive.read(object_id="17H4YpBOAKQzEJ93xmC2z170l0bP2npMy", service_user_credentials_file="credentials.json", with_metadata=True))
创建文档索引管道¶
让我们创建文档索引管道。transformations
应该是一个 TransformComponent
列表,以一个 Embedding
转换结束。
在此示例中,让我们首先使用 TokenTextSplitter
分割文本,然后使用 OpenAIEmbedding
进行嵌入。
from pathway.xpacks.llm.vector_store import VectorStoreServer
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.node_parser import TokenTextSplitter
embed_model = OpenAIEmbedding(embed_batch_size=10)
transformations_example = [
TokenTextSplitter(
chunk_size=150,
chunk_overlap=10,
separator=" ",
),
embed_model,
]
processing_pipeline = VectorStoreServer.from_llamaindex_components(
*data_sources,
transformations=transformations_example,
)
# Define the Host and port that Pathway will be on
PATHWAY_HOST = "127.0.0.1"
PATHWAY_PORT = 8754
# `threaded` runs pathway in detached mode, we have to set it to False when running from terminal or container
# for more information on `with_cache` check out https://pathway.com/developers/api-docs/persistence-api
processing_pipeline.run_server(
host=PATHWAY_HOST, port=PATHWAY_PORT, with_cache=False, threaded=True
)
将检索器连接到自定义管道¶
from llama_index.retrievers.pathway import PathwayRetriever
retriever = PathwayRetriever(host=PATHWAY_HOST, port=PATHWAY_PORT)
retriever.retrieve(str_or_query_bundle="what is pathway")