跳到内容

手动编排#

LlamaDeploy 提供了不同的抽象层,以实现最大程度的灵活性。例如,如果您不需要API 服务器,可以降低一层,自行编排核心组件。LlamaDeploy 提供了一种简单的方法,可以使用配置对象和辅助函数来自行管理所需的服务。

使用 Python 封装器进行手动编排#

LlamaDeploy 提供了一组实用函数,它们封装了较低级别的 Python API,以简化在编排不同核心组件时常见的某些操作,下面我们看看如何使用它们。

运行核心系统#

注意

手动编排 LlamaDeploy 实例时,通常希望从各自的 Python 脚本(或 Docker 镜像等)部署核心组件和工作流服务。

要手动编排实例,首先要做的是运行核心服务:消息队列、控制平面和编排器。为此,您可以使用 deploy_core 函数

from llama_deploy import (
    deploy_core,
    ControlPlaneConfig,
    SimpleMessageQueueConfig,
)


async def main():
    # This will run forever until you interrupt the process, like by pressing CTRL+C
    await deploy_core(
        control_plane_config=ControlPlaneConfig(),
        message_queue_config=SimpleMessageQueueConfig(),
    )


if __name__ == "__main__":
    import asyncio

    asyncio.run(main())

这将为您的 LlamaDeploy 实例设置基本基础设施。您可以自定义配置以调整 TCP 端口号和基本设置,并在当前支持的消息队列后端中选择您想要的,例如 Redis、Kafka 或 RabbiMQ。

部署工作流#

要将工作流作为 LlamaDeploy 服务运行,您需要另一个 Python 进程。您可以通过像这样调用 deploy_workflow 函数,轻松让 LlamaDeploy 为您的工作流提供服务

from llama_deploy import (
    deploy_workflow,
    WorkflowServiceConfig,
    ControlPlaneConfig,
    SimpleMessageQueueConfig,
)
from llama_index.core.workflow import (
    Context,
    Event,
    Workflow,
    StartEvent,
    StopEvent,
    step,
)


class ProgressEvent(Event):
    progress: str


# create a dummy workflow
class MyWorkflow(Workflow):
    @step()
    async def run_step(self, ctx: Context, ev: StartEvent) -> StopEvent:
        # Your workflow logic here
        arg1 = str(ev.get("arg1", ""))
        result = arg1 + " result"

        # stream events as steps run
        ctx.write_event_to_stream(
            ProgressEvent(progress="I am doing something!")
        )

        return StopEvent(result=result)


async def main():
    # This will run forever until you interrupt the process, like by pressing CTRL+C
    await deploy_workflow(
        workflow=MyWorkflow(),
        workflow_config=WorkflowServiceConfig(
            host="127.0.0.1", port=8002, service_name="my_workflow"
        ),
        control_plane_config=ControlPlaneConfig(),
    )


if __name__ == "__main__":
    import asyncio

    asyncio.run(main())

假设之前的 Python 代码片段仍在运行,这将把您的工作流作为服务运行,并将其注册到现有的控制平面和消息队列。

与您的部署交互#

在所有构建块都运行起来后,您可以使用 Python SDK 中的 Client 与您的工作流服务进行交互。从另一个 Python 代码片段中

from llama_deploy import Client, ControlPlaneConfig

# point the client to the running control plane from the previous steps
client = Client(control_plane_url="http://localhost:8001")


async def run_task():
    session = await c1.core.sessions.create()
    result = await session.run("my_workflow", arg="Hello World!")
    print(result.result)
    # prints 'Hello World! result'

如果您也想查看事件流,可以这样做

async def run_task_and_stream():
    # create a session
    session = await c1.core.sessions.create()

    # kick off task run
    task_id = session.run_nowait("my_workflow", arg="Hello Streaming!")

    # stream events
    async for event in session.get_task_result_stream(task_id):
        print(event)

    # get final result
    final_result = await session.get_task_result(task_id)

    print(final_result.result)
    # prints 'Hello Streaming! result'

部署嵌套工作流#

每个 Workflow 都能够注入并运行嵌套工作流。例如

from llama_index.core.workflow import Workflow, StartEvent, StopEvent, step


class InnerWorkflow(Workflow):
    @step()
    async def run_step(self, ev: StartEvent) -> StopEvent:
        arg1 = ev.get("arg1")
        if not arg1:
            raise ValueError("arg1 is required.")

        return StopEvent(result=str(arg1) + "_result")


class OuterWorkflow(Workflow):
    @step()
    async def run_step(
        self, ev: StartEvent, inner: InnerWorkflow
    ) -> StopEvent:
        arg1 = ev.get("arg1")
        if not arg1:
            raise ValueError("arg1 is required.")

        arg1 = await inner.run(arg1=arg1)

        return StopEvent(result=str(arg1) + "_result")


inner = InnerWorkflow()
outer = OuterWorkflow()
outer.add_workflows(inner=InnerWorkflow())

LlamaDeploy 使您能够轻松地将上述每个工作流作为服务启动,并且无需对代码进行任何更改即可运行一切!

只需部署每个工作流

注意

这段代码从同一个脚本启动了两个工作流,但它们也可以轻松地成为独立的脚本、机器或 Docker 容器!

import asyncio
from llama_deploy import (
    WorkflowServiceConfig,
    ControlPlaneConfig,
    deploy_workflow,
)


async def main():
    inner_task = asyncio.create_task(
        deploy_workflow(
            inner,
            WorkflowServiceConfig(
                host="127.0.0.1", port=8003, service_name="inner"
            ),
            ControlPlaneConfig(),
        )
    )

    outer_task = asyncio.create_task(
        deploy_workflow(
            outer,
            WorkflowServiceConfig(
                host="127.0.0.1", port=8002, service_name="outer"
            ),
            ControlPlaneConfig(),
        )
    )

    # This will run forever until you interrupt the process, like by pressing CTRL+C
    await asyncio.gather(inner_task, outer_task)


if __name__ == "__main__":
    import asyncio

    asyncio.run(main())

然后像以前一样使用它

from llama_deploy import Client

# points to deployed control plane
client = Client(control_plane_url="http://localhost:8001")


async def run_task():
    session = await c1.core.sessions.create()
    result = await session.run("outer", arg="Hello World!")
    print(result.result)
    # prints 'Hello World! result result'

使用较低级别的 Python API 进行手动编排#

要对 LlamaDeploy 设置过程进行更多控制,您可以使用较低级别的 API。在本节中,我们将了解当您使用上一节中看到的 deploy_coredeploy_workflow 等封装器时,幕后发生了什么。

deploy_core#

deploy_core 函数设置消息队列、控制平面和编排器。它执行以下操作

async def deploy_core(
    control_plane_config: ControlPlaneConfig,
    message_queue_config: BaseSettings,
    orchestrator_config: Optional[SimpleOrchestratorConfig] = None,
) -> None:
    orchestrator_config = orchestrator_config or SimpleOrchestratorConfig()

    message_queue_client = _get_message_queue_client(message_queue_config)

    control_plane = ControlPlaneServer(
        message_queue_client,
        SimpleOrchestrator(**orchestrator_config.model_dump()),
        **control_plane_config.model_dump(),
    )

    message_queue_task = None
    if isinstance(message_queue_config, SimpleMessageQueueConfig):
        message_queue_task = _deploy_local_message_queue(message_queue_config)

    control_plane_task = asyncio.create_task(control_plane.launch_server())

    # let services spin up
    await asyncio.sleep(1)

    # register the control plane as a consumer
    control_plane_consumer_fn = await control_plane.register_to_message_queue()

    consumer_task = asyncio.create_task(control_plane_consumer_fn())

    # let things sync up
    await asyncio.sleep(1)

    # let things run
    if message_queue_task:
        all_tasks = [control_plane_task, consumer_task, message_queue_task]
    else:
        all_tasks = [control_plane_task, consumer_task]

    shutdown_handler = _get_shutdown_handler(all_tasks)
    loop = asyncio.get_event_loop()
    while loop.is_running():
        await asyncio.sleep(0.1)
        signal.signal(signal.SIGINT, shutdown_handler)

        for task in all_tasks:
            if task.done() and task.exception():  # type: ignore
                raise task.exception()  # type: ignore

此函数

  1. 设置消息队列客户端
  2. 创建控制平面服务器
  3. 启动消息队列(如果使用 SimpleMessageQueue)
  4. 启动控制平面服务器
  5. 将控制平面注册为消费者
  6. 设置关闭处理程序并保持事件循环运行

deploy_workflow#

deploy_workflow 函数将工作流部署为服务。它执行以下操作

async def deploy_workflow(
    workflow: Workflow,
    workflow_config: WorkflowServiceConfig,
    control_plane_config: ControlPlaneConfig,
) -> None:
    control_plane_url = control_plane_config.url

    async with httpx.AsyncClient() as client:
        response = await client.get(f"{control_plane_url}/queue_config")
        queue_config_dict = response.json()

    message_queue_config = _get_message_queue_config(queue_config_dict)
    message_queue_client = _get_message_queue_client(message_queue_config)

    service = WorkflowService(
        workflow=workflow,
        message_queue=message_queue_client,
        **workflow_config.model_dump(),
    )

    service_task = asyncio.create_task(service.launch_server())

    # let service spin up
    await asyncio.sleep(1)

    # register to message queue
    consumer_fn = await service.register_to_message_queue()

    # register to control plane
    control_plane_url = (
        f"http://{control_plane_config.host}:{control_plane_config.port}"
    )
    await service.register_to_control_plane(control_plane_url)

    # create consumer task
    consumer_task = asyncio.create_task(consumer_fn())

    # let things sync up
    await asyncio.sleep(1)

    all_tasks = [consumer_task, service_task]

    shutdown_handler = _get_shutdown_handler(all_tasks)
    loop = asyncio.get_event_loop()
    while loop.is_running():
        await asyncio.sleep(0.1)
        signal.signal(signal.SIGINT, shutdown_handler)

        for task in all_tasks:
            if task.done() and task.exception():  # type: ignore
                raise task.exception()  # type: ignore

此函数

  1. 设置消息队列客户端
  2. 使用提供的工作流创建一个 WorkflowService
  3. 启动服务服务器
  4. 将服务注册到消息队列
  5. 将服务注册到控制平面
  6. 为服务设置一个消费者任务
  7. 设置关闭处理程序并保持事件循环运行