Pathway 读取器¶
Pathway 是一个开放的数据处理框架。它允许您轻松开发数据转换管道和机器学习应用程序,这些管道和应用程序可以处理实时数据源和不断变化的数据。
本笔记本演示了如何设置一个实时数据索引管道。您可以像使用普通读取器一样,从您的 LLM 应用程序中查询此管道的结果。然而,在底层,Pathway 会在每次数据更改时更新索引,从而始终为您提供最新答案。
在本笔记本中,我们将首先连接 llama_index.readers.pathway.PathwayReader
读取器到一个公共演示文档处理管道,该管道:
- 监控多个云数据源的数据变化。
- 为数据构建一个向量索引。
要拥有自己的文档处理管道,请查看托管服务或按照本笔记本的步骤自行构建。
本文档中描述的基本管道可以轻松地构建存储在云位置文件的一个简单索引。然而,Pathway 提供了构建实时数据管道和应用程序所需的一切,包括类 SQL 的表操作,例如 groupby-reductions 和不同数据源之间的连接,基于时间的数据分组和窗口处理,以及广泛的连接器。
有关 Pathway 数据摄取管道和向量存储的更多详细信息,请访问向量存储管道。
前提条件¶
安装 llama-index-readers-pathway
集成
%pip install llama-index-readers-pathway
配置日志记录
import logging
import sys
logging.basicConfig(stream=sys.stdout, level=logging.ERROR)
logging.getLogger().addHandler(logging.StreamHandler(stream=sys.stdout))
设置您的 OpenAI API 密钥。
import getpass
import os
# omit if embedder of choice is not OpenAI
if "OPENAI_API_KEY" not in os.environ:
os.environ["OPENAI_API_KEY"] = getpass.getpass("OpenAI API Key:")
创建读取器并连接到公共管道¶
要实例化和配置 PathwayReader
,您需要提供文档索引管道的 url
或 host
和 port
。在下面的代码中,我们使用一个公共可用的演示管道,其 REST API 可通过 https://demo-document-indexing.pathway.stream
访问。此演示管道从Google Drive和Sharepoint摄取文档,并维护一个用于检索文档的索引。
from llama_index.readers.pathway import PathwayReader
reader = PathwayReader(url="https://demo-document-indexing.pathway.stream")
# let us search with some text
reader.load_data(query_text="What is Pathway")
使用 llama-index 创建摘要索引¶
docs = reader.load_data(query_text="What is Pathway", k=2)
from llama_index.core import SummaryIndex
index = SummaryIndex.from_documents(docs)
query_engine = index.as_query_engine()
response = query_engine.query("What does Pathway do?")
print(response)
构建您自己的数据处理管道¶
前提条件¶
安装 pathway
包。然后下载示例数据。
%pip install pathway
%pip install llama-index-embeddings-openai
!mkdir -p 'data/'
!wget 'https://gist.githubusercontent.com/janchorowski/dd22a293f3d99d1b726eedc7d46d2fc0/raw/pathway_readme.md' -O 'data/pathway_readme.md'
定义 Pathway 跟踪的数据源¶
Pathway 可以同时监听许多来源,例如本地文件、S3 文件夹、云存储以及任何用于数据变化的数据流。
更多信息请参阅 pathway-io。
import pathway as pw
data_sources = []
data_sources.append(
pw.io.fs.read(
"./data",
format="binary",
mode="streaming",
with_metadata=True,
) # This creates a `pathway` connector that tracks
# all the files in the ./data directory
)
# This creates a connector that tracks files in Google drive.
# please follow the instructions at https://pathway.com/developers/tutorials/connectors/gdrive-connector/ to get credentials
# data_sources.append(
# pw.io.gdrive.read(object_id="17H4YpBOAKQzEJ93xmC2z170l0bP2npMy", service_user_credentials_file="credentials.json", with_metadata=True))
创建文档索引管道¶
让我们创建文档索引管道。transformations
应该是一个以 Embedding
转换为结尾的 TransformComponent
列表。
在这个示例中,我们首先使用 TokenTextSplitter
分割文本,然后使用 OpenAIEmbedding
进行嵌入。
from pathway.xpacks.llm.vector_store import VectorStoreServer
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.node_parser import TokenTextSplitter
embed_model = OpenAIEmbedding(embed_batch_size=10)
transformations_example = [
TokenTextSplitter(
chunk_size=150,
chunk_overlap=10,
separator=" ",
),
embed_model,
]
processing_pipeline = VectorStoreServer.from_llamaindex_components(
*data_sources,
transformations=transformations_example,
)
# Define the Host and port that Pathway will be on
PATHWAY_HOST = "127.0.0.1"
PATHWAY_PORT = 8754
# `threaded` runs pathway in detached mode, we have to set it to False when running from terminal or container
# for more information on `with_cache` check out https://pathway.com/developers/api-docs/persistence-api
processing_pipeline.run_server(
host=PATHWAY_HOST, port=PATHWAY_PORT, with_cache=False, threaded=True
)
连接读取器到自定义管道¶
from llama_index.readers.pathway import PathwayReader
reader = PathwayReader(host=PATHWAY_HOST, port=PATHWAY_PORT)
# let us search with some text
reader.load_data(query_text="What is Pathway")