跳到内容

deploy#

deploy_core async #

deploy_core(control_plane_config: ControlPlaneConfig | None = None, message_queue_config: BaseSettings | None = None, orchestrator_config: SimpleOrchestratorConfig | None = None, disable_message_queue: bool = False, disable_control_plane: bool = False) -> None

部署 llama_deploy 系统的核心组件。

此函数设置并启动消息队列、控制平面和编排器。它处理这些核心组件的初始化和连接。

参数

名称 类型 描述 默认值
control_plane_config 可选[ControlPlaneConfig]

控制平面的配置。

message_queue_config 可选[BaseSettings]

消息队列的配置。默认为本地 SimpleMessageQueue。

orchestrator_config 可选[SimpleOrchestratorConfig]

编排器的配置。如果未提供,将使用默认的 SimpleOrchestratorConfig。

disable_message_queue bool

是否禁用消息队列的部署。默认为 False。

False
disable_control_plane bool

是否禁用控制平面的部署。默认为 False。

False

引发

类型 描述
ValueError

如果在配置中指定了未知消息队列类型。

Exception

如果任何已启动的任务遇到错误。

源代码位于 llama_deploy/deploy/deploy.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
async def deploy_core(
    control_plane_config: ControlPlaneConfig | None = None,
    message_queue_config: BaseSettings | None = None,
    orchestrator_config: SimpleOrchestratorConfig | None = None,
    disable_message_queue: bool = False,
    disable_control_plane: bool = False,
) -> None:
    """
    Deploy the core components of the llama_deploy system.

    This function sets up and launches the message queue, control plane, and orchestrator.
    It handles the initialization and connection of these core components.

    Args:
        control_plane_config (Optional[ControlPlaneConfig]): Configuration for the control plane.
        message_queue_config (Optional[BaseSettings]): Configuration for the message queue. Defaults to a local SimpleMessageQueue.
        orchestrator_config (Optional[SimpleOrchestratorConfig]): Configuration for the orchestrator.
            If not provided, a default SimpleOrchestratorConfig will be used.
        disable_message_queue (bool): Whether to disable deploying the message queue. Defaults to False.
        disable_control_plane (bool): Whether to disable deploying the control plane. Defaults to False.

    Raises:
        ValueError: If an unknown message queue type is specified in the config.
        Exception: If any of the launched tasks encounter an error.
    """
    control_plane_config = control_plane_config or ControlPlaneConfig()
    message_queue_config = message_queue_config or SimpleMessageQueueConfig()
    orchestrator_config = orchestrator_config or SimpleOrchestratorConfig()

    tasks = []

    message_queue_client = _get_message_queue_client(message_queue_config)
    # If needed, start the SimpleMessageQueueServer
    if (
        isinstance(message_queue_config, SimpleMessageQueueConfig)
        and not disable_message_queue
    ):
        queue = SimpleMessageQueueServer(message_queue_config)
        tasks.append(asyncio.create_task(queue.launch_server()))
        # let message queue boot up
        await asyncio.sleep(2)

    if not disable_control_plane:
        control_plane = ControlPlaneServer(
            message_queue_client,
            SimpleOrchestrator(**orchestrator_config.model_dump()),
            config=control_plane_config,
        )
        tasks.append(asyncio.create_task(control_plane.launch_server()))
        # let service spin up
        await asyncio.sleep(2)
        # register the control plane as a consumer
        control_plane_consumer_fn = await control_plane.register_to_message_queue()
        tasks.append(asyncio.create_task(control_plane_consumer_fn()))

    # let things run
    try:
        await asyncio.gather(*tasks)
    except (Exception, asyncio.CancelledError):
        await message_queue_client.cleanup()
        for task in tasks:
            if not task.done():
                task.cancel()

        await asyncio.gather(*tasks, return_exceptions=True)

deploy_workflow async #

deploy_workflow(workflow: Workflow, workflow_config: WorkflowServiceConfig, control_plane_config: ControlPlaneConfig | None = None) -> None

在 llama_deploy 系统内将工作流部署为服务。

此函数将工作流设置为服务,将其连接到消息队列,并将其注册到控制平面。

参数

名称 类型 描述 默认值
workflow 工作流

要部署为服务的工作流。

必需
workflow_config WorkflowServiceConfig

工作流服务的配置。

必需
control_plane_config 可选[ControlPlaneConfig]

控制平面的配置。

引发

类型 描述
HTTPError

如果与控制平面通信时发生错误。

ValueError

如果遇到无效的消息队列配置。

Exception

如果任何已启动的任务遇到错误。

源代码位于 llama_deploy/deploy/deploy.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
async def deploy_workflow(
    workflow: Workflow,
    workflow_config: WorkflowServiceConfig,
    control_plane_config: ControlPlaneConfig | None = None,
) -> None:
    """
    Deploy a workflow as a service within the llama_deploy system.

    This function sets up a workflow as a service, connects it to the message queue,
    and registers it with the control plane.

    Args:
        workflow (Workflow): The workflow to be deployed as a service.
        workflow_config (WorkflowServiceConfig): Configuration for the workflow service.
        control_plane_config (Optional[ControlPlaneConfig]): Configuration for the control plane.

    Raises:
        httpx.HTTPError: If there's an error communicating with the control plane.
        ValueError: If an invalid message queue config is encountered.
        Exception: If any of the launched tasks encounter an error.
    """
    control_plane_config = control_plane_config or ControlPlaneConfig()
    control_plane_url = control_plane_config.url

    async with httpx.AsyncClient() as client:
        response = await client.get(f"{control_plane_url}/queue_config")
        queue_config_dict = response.json()

    message_queue_config = _get_message_queue_config(queue_config_dict)
    message_queue_client = _get_message_queue_client(message_queue_config)

    # override the service manager, while maintaining dict of existing services
    workflow._service_manager = NetworkServiceManager(
        workflow._service_manager._services
    )

    service = WorkflowService(
        workflow=workflow,
        message_queue=message_queue_client,
        config=workflow_config,
    )

    service_task = asyncio.create_task(service.launch_server())

    # let service spin up
    await asyncio.sleep(1)

    # register to control plane
    await service.register_to_control_plane(control_plane_url)

    # register to message queue
    consumer_fn = await service.register_to_message_queue()

    # create consumer task
    consumer_task = asyncio.create_task(consumer_fn())

    # let things sync up
    await asyncio.sleep(1)

    try:
        # Propagate the exception if any of the tasks exited with an error
        await asyncio.gather(service_task, consumer_task, return_exceptions=True)
    except asyncio.CancelledError:
        consumer_task.cancel()
        service_task.cancel()

        await asyncio.gather(service_task, consumer_task)