Python SDK#
LlamaDeploy 提供了一个 Python SDK,用于与已部署系统交互。该 SDK 通过统一的客户端接口支持同步和异步操作。建议在生产环境中使用异步 API。
入门#
创建客户端非常简单,如下所示
import llama_deploy
client = llama_deploy.Client()
客户端配置#
客户端可以通过构造函数参数或环境变量进行配置。要通过环境变量设置配置参数,其名称应为参数名称的大写版本,并加上前缀字符串 LLAMA_DEPLOY_
import os
import llama_deploy
# Set `disable_ssl` to False with an environment variable
os.environ["LLAMA_DEPLOY_DISABLE_SSL"] = "False"
async def check_status():
# Pass other config settings to the Client constructor
client = llama_deploy.Client(
api_server_url="http://localhost:4501", timeout=10
)
status = await client.apiserver.status()
print(status)
注意
有关所有可用配置参数的列表,请参阅专门的API 参考部分。
客户端组件#
客户端提供对两个主要组件的访问
每个组件都公开了用于管理已部署系统和与之交互的特定方法。
重要
要使用 apiserver
功能,API 服务器必须正在运行,并且其 URL(默认为 http://localhost:4501
)必须可被执行客户端代码的主机访问。
重要
要使用 core
功能,控制平面必须正在运行,并且其 URL(默认为 http://localhost:8000
)必须可被执行客户端代码的主机访问。
有关可用方法和详细 API 参考的完整列表,请参阅API 参考部分。
使用示例#
异步操作#
import llama_deploy
async def check_status():
client = llama_deploy.Client()
status = await client.apiserver.status()
print(status)
同步操作#
import llama_deploy
client = llama_deploy.Client()
status = client.sync.apiserver.status()
print(status)
重要
同步 API (client.sync
) 不能在异步事件循环中使用。在这种情况下,请直接使用异步方法。
更复杂的示例#
这是一个示例,说明如何使用推荐的客户端异步版本运行已部署的工作流并收集其流式事件。
import llama_deploy
async def stream_events(services):
client = llama_deploy.Client(timeout=10)
# Create a new session
session = await client.core.sessions.create()
# Assuming there's a workflow called `streaming_workflow`, run it in the background
task_id = await session.run_nowait(
"streaming_workflow", arg="Hello, world!"
)
# The workflow is supposed to stream events signalling its progress
async for event in session.get_task_result_stream(task_id):
if "progress" in event:
print(f'Workflow Progress: {event["progress"]}')
# When done, collect the workflow output
final_result = await session.get_task_result(task_id)
print(final_result)
# Clean up the session
await client.core.sessions.delete(session.id)
等效的同步版本如下
import llama_deploy
def stream_events(services):
client = llama_deploy.Client(timeout=10)
# Create a new session
session = client.sync.core.sessions.create()
# Assuming there's a workflow called `streaming_workflow`, run it
task_id = session.run_nowait("streaming_workflow", arg1="hello_world")
# The workflow is supposed to stream events signalling its progress.
# Since this is a synchronous call, by this time all the events were
# streamed and collected in a list.
for event in session.get_task_result_stream(task_id):
if "progress" in event:
print(f'Workflow Progress: {event["progress"]}')
# Collect the workflow output
final_result = session.get_task_result(task_id)
print(final_result)
# Clean up the session
client.sync.core.sessions.delete(session.id)