手动编排#
LlamaDeploy 提供了不同的抽象层,以实现最大程度的灵活性。例如,如果您不需要API 服务器,可以降低一层,自行编排核心组件。LlamaDeploy 提供了一种简单的方法,可以使用配置对象和辅助函数来自行管理所需的服务。
使用 Python 封装器进行手动编排#
LlamaDeploy 提供了一组实用函数,它们封装了较低级别的 Python API,以简化在编排不同核心组件时常见的某些操作,下面我们看看如何使用它们。
运行核心系统#
注意
手动编排 LlamaDeploy 实例时,通常希望从各自的 Python 脚本(或 Docker 镜像等)部署核心组件和工作流服务。
要手动编排实例,首先要做的是运行核心服务:消息队列、控制平面和编排器。为此,您可以使用 deploy_core
函数
from llama_deploy import (
deploy_core,
ControlPlaneConfig,
SimpleMessageQueueConfig,
)
async def main():
# This will run forever until you interrupt the process, like by pressing CTRL+C
await deploy_core(
control_plane_config=ControlPlaneConfig(),
message_queue_config=SimpleMessageQueueConfig(),
)
if __name__ == "__main__":
import asyncio
asyncio.run(main())
这将为您的 LlamaDeploy 实例设置基本基础设施。您可以自定义配置以调整 TCP 端口号和基本设置,并在当前支持的消息队列后端中选择您想要的,例如 Redis、Kafka 或 RabbiMQ。
部署工作流#
要将工作流作为 LlamaDeploy 服务运行,您需要另一个 Python 进程。您可以通过像这样调用 deploy_workflow
函数,轻松让 LlamaDeploy 为您的工作流提供服务
from llama_deploy import (
deploy_workflow,
WorkflowServiceConfig,
ControlPlaneConfig,
SimpleMessageQueueConfig,
)
from llama_index.core.workflow import (
Context,
Event,
Workflow,
StartEvent,
StopEvent,
step,
)
class ProgressEvent(Event):
progress: str
# create a dummy workflow
class MyWorkflow(Workflow):
@step()
async def run_step(self, ctx: Context, ev: StartEvent) -> StopEvent:
# Your workflow logic here
arg1 = str(ev.get("arg1", ""))
result = arg1 + " result"
# stream events as steps run
ctx.write_event_to_stream(
ProgressEvent(progress="I am doing something!")
)
return StopEvent(result=result)
async def main():
# This will run forever until you interrupt the process, like by pressing CTRL+C
await deploy_workflow(
workflow=MyWorkflow(),
workflow_config=WorkflowServiceConfig(
host="127.0.0.1", port=8002, service_name="my_workflow"
),
control_plane_config=ControlPlaneConfig(),
)
if __name__ == "__main__":
import asyncio
asyncio.run(main())
假设之前的 Python 代码片段仍在运行,这将把您的工作流作为服务运行,并将其注册到现有的控制平面和消息队列。
与您的部署交互#
在所有构建块都运行起来后,您可以使用 Python SDK 中的 Client
与您的工作流服务进行交互。从另一个 Python 代码片段中
from llama_deploy import Client, ControlPlaneConfig
# point the client to the running control plane from the previous steps
client = Client(control_plane_url="http://localhost:8001")
async def run_task():
session = await c1.core.sessions.create()
result = await session.run("my_workflow", arg="Hello World!")
print(result.result)
# prints 'Hello World! result'
如果您也想查看事件流,可以这样做
async def run_task_and_stream():
# create a session
session = await c1.core.sessions.create()
# kick off task run
task_id = session.run_nowait("my_workflow", arg="Hello Streaming!")
# stream events
async for event in session.get_task_result_stream(task_id):
print(event)
# get final result
final_result = await session.get_task_result(task_id)
print(final_result.result)
# prints 'Hello Streaming! result'
部署嵌套工作流#
每个 Workflow
都能够注入并运行嵌套工作流。例如
from llama_index.core.workflow import Workflow, StartEvent, StopEvent, step
class InnerWorkflow(Workflow):
@step()
async def run_step(self, ev: StartEvent) -> StopEvent:
arg1 = ev.get("arg1")
if not arg1:
raise ValueError("arg1 is required.")
return StopEvent(result=str(arg1) + "_result")
class OuterWorkflow(Workflow):
@step()
async def run_step(
self, ev: StartEvent, inner: InnerWorkflow
) -> StopEvent:
arg1 = ev.get("arg1")
if not arg1:
raise ValueError("arg1 is required.")
arg1 = await inner.run(arg1=arg1)
return StopEvent(result=str(arg1) + "_result")
inner = InnerWorkflow()
outer = OuterWorkflow()
outer.add_workflows(inner=InnerWorkflow())
LlamaDeploy 使您能够轻松地将上述每个工作流作为服务启动,并且无需对代码进行任何更改即可运行一切!
只需部署每个工作流
注意
这段代码从同一个脚本启动了两个工作流,但它们也可以轻松地成为独立的脚本、机器或 Docker 容器!
import asyncio
from llama_deploy import (
WorkflowServiceConfig,
ControlPlaneConfig,
deploy_workflow,
)
async def main():
inner_task = asyncio.create_task(
deploy_workflow(
inner,
WorkflowServiceConfig(
host="127.0.0.1", port=8003, service_name="inner"
),
ControlPlaneConfig(),
)
)
outer_task = asyncio.create_task(
deploy_workflow(
outer,
WorkflowServiceConfig(
host="127.0.0.1", port=8002, service_name="outer"
),
ControlPlaneConfig(),
)
)
# This will run forever until you interrupt the process, like by pressing CTRL+C
await asyncio.gather(inner_task, outer_task)
if __name__ == "__main__":
import asyncio
asyncio.run(main())
然后像以前一样使用它
from llama_deploy import Client
# points to deployed control plane
client = Client(control_plane_url="http://localhost:8001")
async def run_task():
session = await c1.core.sessions.create()
result = await session.run("outer", arg="Hello World!")
print(result.result)
# prints 'Hello World! result result'
使用较低级别的 Python API 进行手动编排#
要对 LlamaDeploy 设置过程进行更多控制,您可以使用较低级别的 API。在本节中,我们将了解当您使用上一节中看到的 deploy_core
和 deploy_workflow
等封装器时,幕后发生了什么。
deploy_core#
deploy_core
函数设置消息队列、控制平面和编排器。它执行以下操作
async def deploy_core(
control_plane_config: ControlPlaneConfig,
message_queue_config: BaseSettings,
orchestrator_config: Optional[SimpleOrchestratorConfig] = None,
) -> None:
orchestrator_config = orchestrator_config or SimpleOrchestratorConfig()
message_queue_client = _get_message_queue_client(message_queue_config)
control_plane = ControlPlaneServer(
message_queue_client,
SimpleOrchestrator(**orchestrator_config.model_dump()),
**control_plane_config.model_dump(),
)
message_queue_task = None
if isinstance(message_queue_config, SimpleMessageQueueConfig):
message_queue_task = _deploy_local_message_queue(message_queue_config)
control_plane_task = asyncio.create_task(control_plane.launch_server())
# let services spin up
await asyncio.sleep(1)
# register the control plane as a consumer
control_plane_consumer_fn = await control_plane.register_to_message_queue()
consumer_task = asyncio.create_task(control_plane_consumer_fn())
# let things sync up
await asyncio.sleep(1)
# let things run
if message_queue_task:
all_tasks = [control_plane_task, consumer_task, message_queue_task]
else:
all_tasks = [control_plane_task, consumer_task]
shutdown_handler = _get_shutdown_handler(all_tasks)
loop = asyncio.get_event_loop()
while loop.is_running():
await asyncio.sleep(0.1)
signal.signal(signal.SIGINT, shutdown_handler)
for task in all_tasks:
if task.done() and task.exception(): # type: ignore
raise task.exception() # type: ignore
此函数
- 设置消息队列客户端
- 创建控制平面服务器
- 启动消息队列(如果使用 SimpleMessageQueue)
- 启动控制平面服务器
- 将控制平面注册为消费者
- 设置关闭处理程序并保持事件循环运行
deploy_workflow#
deploy_workflow
函数将工作流部署为服务。它执行以下操作
async def deploy_workflow(
workflow: Workflow,
workflow_config: WorkflowServiceConfig,
control_plane_config: ControlPlaneConfig,
) -> None:
control_plane_url = control_plane_config.url
async with httpx.AsyncClient() as client:
response = await client.get(f"{control_plane_url}/queue_config")
queue_config_dict = response.json()
message_queue_config = _get_message_queue_config(queue_config_dict)
message_queue_client = _get_message_queue_client(message_queue_config)
service = WorkflowService(
workflow=workflow,
message_queue=message_queue_client,
**workflow_config.model_dump(),
)
service_task = asyncio.create_task(service.launch_server())
# let service spin up
await asyncio.sleep(1)
# register to message queue
consumer_fn = await service.register_to_message_queue()
# register to control plane
control_plane_url = (
f"http://{control_plane_config.host}:{control_plane_config.port}"
)
await service.register_to_control_plane(control_plane_url)
# create consumer task
consumer_task = asyncio.create_task(consumer_fn())
# let things sync up
await asyncio.sleep(1)
all_tasks = [consumer_task, service_task]
shutdown_handler = _get_shutdown_handler(all_tasks)
loop = asyncio.get_event_loop()
while loop.is_running():
await asyncio.sleep(0.1)
signal.signal(signal.SIGINT, shutdown_handler)
for task in all_tasks:
if task.done() and task.exception(): # type: ignore
raise task.exception() # type: ignore
此函数
- 设置消息队列客户端
- 使用提供的工作流创建一个 WorkflowService
- 启动服务服务器
- 将服务注册到消息队列
- 将服务注册到控制平面
- 为服务设置一个消费者任务
- 设置关闭处理程序并保持事件循环运行