安装与设置¶
# Install some libraries we'll use for our examples. These
# are not required to use Airtrain with LlamaIndex, and are just
# there to help us illustrate use.
%pip install llama-index-embeddings-openai==0.2.4
%pip install llama-index-readers-web==0.2.2
%pip install llama-index-readers-github==0.2.0
# Install Airtrain SDK with LlamaIndex integration
%pip install airtrain-py[llama-index]
# Running async code in a notebook requires using nest_asyncio, and we will
# use some async examples. So we will set up nest_asyncio here. Outside
# an async context or outside a notebook, this step is not required.
import nest_asyncio
nest_asyncio.apply()
API 密钥设置¶
设置运行后续示例所需的 API 密钥。GitHub API 令牌和 OpenAI API 密钥仅在示例“与 Readers/Embeddings/Splitters 的用法”中需要。获取 GitHub 访问令牌的说明在此处,而 OpenAI API 密钥可在此处获取。
获取您的 Airtrain API 密钥
- 通过访问此处创建 Airtrain 账户
- 在左下角查看“Settings”,然后转到“Billing”注册专业账户或开始试用
- 从“Billing”中的“Airtrain API Key”选项卡复制您的 API 密钥
请注意,Airtrain 试用版每次只允许创建一个数据集。由于本笔记本会创建多个数据集,您可能需要在 Airtrain UI 中随时删除数据集,以便为新的数据集腾出空间。
import os
os.environ["GITHUB_TOKEN"] = "<your GitHub token>"
os.environ["OPENAI_API_KEY"] = "<your OpenAi API key>"
os.environ["AIRTRAIN_API_KEY"] = "<your Airtrain API key>"
示例 1:与 Readers/Embeddings/Splitters 的用法¶
LlamaIndex 的核心抽象包括 Documents 和 Nodes。Airtrain 与 LlamaIndex 的集成允许您使用这些抽象的任何可迭代集合创建 Airtrain 数据集,通过 upload_from_llama_nodes
函数实现。
为了说明其灵活性,我们将做以下两件事
- 直接创建文档数据集。在本例中,是来自 Sematic 文档的完整页面。
- 使用 OpenAI 嵌入和
SemanticSplitterNodeParser
将这些文档分割成节点,并从这些节点创建数据集。
import os
import airtrain as at
from llama_index.core.node_parser import SemanticSplitterNodeParser
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.readers.github import GithubRepositoryReader, GithubClient
下一步是设置我们的读取器。在本例中,我们使用 GitHub 读取器,但这仅用于说明目的。无论文档最初来自哪个读取器,Airtrain 都可以摄取它们。
github_token = os.environ.get("GITHUB_TOKEN")
github_client = GithubClient(github_token=github_token, verbose=True)
reader = GithubRepositoryReader(
github_client=github_client,
owner="sematic-ai",
repo="sematic",
use_parser=False,
verbose=False,
filter_directories=(
["docs"],
GithubRepositoryReader.FilterType.INCLUDE,
),
filter_file_extensions=(
[
".md",
],
GithubRepositoryReader.FilterType.INCLUDE,
),
)
read_kwargs = dict(branch="main")
使用读取器读取文档
documents = reader.load_data(**read_kwargs)
直接从文档创建数据集¶
您可以直接从这些文档创建 Airtrain 数据集,无需进行任何进一步处理。在这种情况下,Airtrain 将在生成更多见解之前自动为您嵌入文档。数据集中的每一行都将代表一个完整的 markdown 文档。Airtrain 将自动提供诸如文档语义聚类之类的见解,让您可以通过查看涵盖相似主题的文档来浏览文档,或发现您可能想要移除的文档子集。
虽然不需要进行基础文档检索之外的额外处理,但这是允许的。您可以在上传到 Airtrain 之前,随意使用元数据丰富文档、过滤文档或操作文档。
result = at.upload_from_llama_nodes(
documents,
name="Sematic Docs Dataset: Whole Documents",
)
print(f"Uploaded {result.size} rows to '{result.name}'. View at: {result.url}")
Uploaded 42 rows to 'Sematic Docs Dataset: Whole Documents'. View at: https://app.airtrain.ai/dataset/7fd09dca-81b9-42b8-acc9-01ce08302b16
分割和嵌入后创建数据集¶
如果您希望查看一个侧重于文档内节点而非整个文档的数据集,您也可以这样做。Airtrain 将自动创建诸如嵌入向量的 2D PCA 投影之类的见解,以便您可以在视觉上探索您的 RAG 节点将被检索到的嵌入空间。您还可以点击单独的行,查看在完整的 N 维嵌入空间中与它最近的行,以便进一步深入研究。自动聚类和其他见解也将被生成,以丰富和辅助您的探索。
在这里,我们将使用 OpenAI 嵌入和 SemanticSplitterNodeParser
分割器,但您可以使用任何其他 LlamaIndex 工具来处理您的节点,然后再上传到 Airtrain。您甚至可以完全跳过自己嵌入节点,在这种情况下,Airtrain 将为您嵌入节点。
embed_model = OpenAIEmbedding()
splitter = SemanticSplitterNodeParser(
buffer_size=1, breakpoint_percentile_threshold=95, embed_model=embed_model
)
nodes = splitter.get_nodes_from_documents(documents)
⚠️ 注意 ⚠️:如果您正在使用 Airtrain 试用版,并且已经创建了完整文档数据集,则需要在上传新数据集之前将其删除。
result = at.upload_from_llama_nodes(
nodes,
name="Sematic Docs, split + embedded",
)
print(f"Uploaded {result.size} rows to {result.name}. View at: {result.url}")
Uploaded 137 rows to Sematic Docs, split + embedded. View at: https://app.airtrain.ai/dataset/ebec9bcc-6ed8-4165-a0de-29bef740c70b
示例 2:使用 Workflow API¶
由于文档和节点是 Airtrain 集成所依赖的核心抽象,并且这些抽象在 LlamaIndex 的 workflows API 中是共享的,因此您也可以将 Airtrain 用作更广泛工作流的一部分。在这里,我们将通过抓取一些 Hacker News 评论线程来演示用法,但同样,您不限于 Web 抓取工作流;任何能生成文档或节点的工作流都可以。
import asyncio
from llama_index.core.schema import Node
from llama_index.core.workflow import (
Context,
Event,
StartEvent,
StopEvent,
Workflow,
step,
)
from llama_index.readers.web import AsyncWebPageReader
from airtrain import DatasetMetadata, upload_from_llama_nodes
指定我们将要抓取的评论线程。本例中选取的线程是 2024 年 9 月 30 日在首页或首页附近的。如果您希望从 Hacker News 以外的页面摄取数据,请注意某些网站的内容是在客户端渲染的,在这种情况下,您可能需要使用像 WholeSiteReader
这样的读取器,它会使用无头 Chrome 驱动程序渲染页面后再返回文档。为了简单起见,这里我们使用一个带有服务器端渲染 HTML 的页面。
URLS = [
"https://news.ycombinator.com/item?id=41694044",
"https://news.ycombinator.com/item?id=41696046",
"https://news.ycombinator.com/item?id=41693087",
"https://news.ycombinator.com/item?id=41695756",
"https://news.ycombinator.com/item?id=41666269",
"https://news.ycombinator.com/item?id=41697137",
"https://news.ycombinator.com/item?id=41695840",
"https://news.ycombinator.com/item?id=41694712",
"https://news.ycombinator.com/item?id=41690302",
"https://news.ycombinator.com/item?id=41695076",
"https://news.ycombinator.com/item?id=41669747",
"https://news.ycombinator.com/item?id=41694504",
"https://news.ycombinator.com/item?id=41697032",
"https://news.ycombinator.com/item?id=41694025",
"https://news.ycombinator.com/item?id=41652935",
"https://news.ycombinator.com/item?id=41693979",
"https://news.ycombinator.com/item?id=41696236",
"https://news.ycombinator.com/item?id=41696434",
"https://news.ycombinator.com/item?id=41688469",
"https://news.ycombinator.com/item?id=41646782",
"https://news.ycombinator.com/item?id=41689332",
"https://news.ycombinator.com/item?id=41688018",
"https://news.ycombinator.com/item?id=41668896",
"https://news.ycombinator.com/item?id=41690087",
"https://news.ycombinator.com/item?id=41679497",
"https://news.ycombinator.com/item?id=41687739",
"https://news.ycombinator.com/item?id=41686722",
"https://news.ycombinator.com/item?id=41689138",
"https://news.ycombinator.com/item?id=41691530",
]
接下来我们将定义一个基本事件,因为事件是 LlamaIndex 工作流中步骤之间传递数据的标准方式。
class CompletedDocumentRetrievalEvent(Event):
name: str
documents: list[Node]
之后我们将定义工作流本身。在本例中,它将有一个步骤用于从 Web 摄取文档,一个步骤用于将它们摄取到 Airtrain,还有一个步骤用于结束工作流。
class IngestToAirtrainWorkflow(Workflow):
@step
async def ingest_documents(
self, ctx: Context, ev: StartEvent
) -> CompletedDocumentRetrievalEvent | None:
if not ev.get("urls"):
return None
reader = AsyncWebPageReader(html_to_text=True)
documents = await reader.aload_data(urls=ev.get("urls"))
return CompletedDocumentRetrievalEvent(
name=ev.get("name"), documents=documents
)
@step
async def ingest_documents_to_airtrain(
self, ctx: Context, ev: CompletedDocumentRetrievalEvent
) -> StopEvent | None:
dataset_meta = upload_from_llama_nodes(ev.documents, name=ev.name)
return StopEvent(result=dataset_meta)
由于 workflow API 将异步代码视为一等公民,我们将定义一个异步主函数来驱动工作流。
async def main() -> None:
workflow = IngestToAirtrainWorkflow()
result = await workflow.run(
name="My HN Discussions Dataset",
urls=URLS,
)
print(
f"Uploaded {result.size} rows to {result.name}. View at: {result.url}"
)
最后,我们将使用 asyncio 事件循环执行异步主函数。
⚠️ 注意 ⚠️:如果您正在使用 Airtrain 试用版,并且已经运行了上面的示例,则需要在上传新数据集之前删除生成的数据集。
asyncio.run(main()) # actually run the main & the workflow
error fetching page from https://news.ycombinator.com/item?id=41693087 error fetching page from https://news.ycombinator.com/item?id=41666269 error fetching page from https://news.ycombinator.com/item?id=41697137 error fetching page from https://news.ycombinator.com/item?id=41697032 error fetching page from https://news.ycombinator.com/item?id=41652935 error fetching page from https://news.ycombinator.com/item?id=41696434 error fetching page from https://news.ycombinator.com/item?id=41688469 error fetching page from https://news.ycombinator.com/item?id=41646782 error fetching page from https://news.ycombinator.com/item?id=41668896
Uploaded 20 rows to My HN Discussions Dataset. View at: https://app.airtrain.ai/dataset/bd330f0a-6ff1-4e51-9fe2-9900a1a42308