跳到内容

Deployment #

一个部署由运行中的服务和核心组件实例组成。

每个部署都是独立的,运行控制平面和消息队列的专用实例,以及配置对象中定义的任何服务。

源代码位于 llama_deploy/apiserver/deployment.py

client 属性 #
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
class Deployment:
    """A Deployment consists of running services and core component instances.

    Every Deployment is self contained, running a dedicated instance of the control plane
    and the message queue along with any service defined in the configuration object.
    """

    def __init__(self, *, config: DeploymentConfig, root_path: Path) -> None:
        """Creates a Deployment instance.

        Args:
            config: The configuration object defining this deployment
            root_path: The path on the filesystem used to store deployment data
        """
        self._name = config.name
        self._path = root_path / config.name
        self._queue_client = self._load_message_queue_client(config.message_queue)
        self._control_plane_config = config.control_plane
        self._control_plane = ControlPlaneServer(
            self._queue_client,
            SimpleOrchestrator(**SimpleOrchestratorConfig().model_dump()),
            config=config.control_plane,
        )
        self._client = Client(control_plane_url=config.control_plane.url)
        self._default_service: str | None = None
        self._running = False
        self._service_tasks: list[asyncio.Task] = []
        self._service_startup_complete = asyncio.Event()
        # Ready to load services
        self._workflow_services: dict[str, WorkflowService] = self._load_services(
            config
        )

    @property
    def default_service(self) -> str | None:
        return self._default_service

    @property
    def client(self) -> Client:
        """Returns an async client to interact with this deployment."""
        return self._client

    @property
    def name(self) -> str:
        """Returns the name of this deployment."""
        return self._name

    @property
    def path(self) -> Path:
        """Returns the absolute path to the root of this deployment."""
        return self._path.resolve()

    @property
    def service_names(self) -> list[str]:
        """Returns the list of service names in this deployment."""
        return list(self._workflow_services.keys())

    async def start(self) -> None:
        """The task that will be launched in this deployment asyncio loop.

        This task is responsible for launching asyncio tasks for the core components and the services.
        All the tasks are gathered before returning.
        """
        self._running = True

        # Control Plane
        tasks = await self._start_control_plane()

        # Start the services. It makes no sense for a deployment to have no services but
        # the configuration allows it, so let's be defensive here.
        if self._workflow_services:
            tasks.append(asyncio.create_task(self._run_services()))

        # Run allthethings
        await asyncio.gather(*tasks)
        self._running = False

    async def reload(self, config: DeploymentConfig) -> None:
        """Reload this deployment by restarting its services.

        The reload process consists in cancelling the services tasks
        and rely on the fact that _run_services() will restart them
        with the new configuration. This function won't return until
        _run_services will trigger the _service_startup_complete signal.
        """
        self._workflow_services = self._load_services(config)
        self._default_service = config.default_service

        for t in self._service_tasks:
            # t is awaited in _run_services(), we don't need to await here
            t.cancel()

        # Hold until _run_services() has restarted all the tasks
        await self._service_startup_complete.wait()

    async def _start_control_plane(self) -> list[asyncio.Task]:
        tasks = []
        cp_consumer_fn = await self._control_plane.register_to_message_queue()
        tasks.append(asyncio.create_task(self._control_plane.launch_server()))
        tasks.append(asyncio.create_task(cp_consumer_fn()))
        # Wait for the Control Plane to boot
        try:
            async for attempt in AsyncRetrying(
                wait=wait_exponential(min=1, max=10),
            ):
                with attempt:
                    async with httpx.AsyncClient() as client:
                        response = await client.get(self._control_plane_config.url)
                        response.raise_for_status()
        except RetryError:
            msg = f"Unable to reach Control Plane at {self._control_plane_config.url}"
            raise DeploymentError(msg)

        return tasks

    async def _run_services(self) -> None:
        """Start an asyncio task for each service and gather them.

        For the time self._running holds true, the tasks will be restarted
        if they are all cancelled. This is to support the reload process
        (see reload() for more details).
        """
        while self._running:
            self._service_tasks = []
            # If this is a reload, self._workflow_services contains the updated configurations
            for wfs in self._workflow_services.values():
                service_task = asyncio.create_task(wfs.launch_server())
                self._service_tasks.append(service_task)
                consumer_fn = await wfs.register_to_message_queue()
                await wfs.register_to_control_plane(self._control_plane_config.url)
                consumer_task = asyncio.create_task(consumer_fn())
                # Make sure the service is up and running before proceeding
                url = wfs.config.url
                try:
                    async for attempt in AsyncRetrying(
                        wait=wait_exponential(min=1, max=10),
                    ):
                        with attempt:
                            async with httpx.AsyncClient() as client:
                                response = await client.get(url)
                                response.raise_for_status()
                except RetryError:
                    msg = f"Unable to reach WorkflowService at {url}"
                    raise DeploymentError(msg)

                self._service_tasks.append(consumer_task)

            # If this is a reload, unblock the reload() function signalling that tasks are up and running
            self._service_startup_complete.set()
            await asyncio.gather(*self._service_tasks)

    def _load_services(self, config: DeploymentConfig) -> dict[str, WorkflowService]:
        """Creates WorkflowService instances according to the configuration object."""
        workflow_services = {}
        for service_id, service_config in config.services.items():
            source = service_config.source
            if source is None:
                # this is a default service, skip for now
                # TODO: check the service name is valid and supported
                # TODO: possibly start the default service if not running already
                continue

            # FIXME: Momentarily assuming everything is a workflow
            if service_config.path is None:
                msg = "path field in service definition must be set"
                raise ValueError(msg)

            if service_config.port is None:
                # This won't happen if we arrive here from Manager.deploy(), the manager will assign a port
                msg = "port field in service definition must be set"
                raise ValueError(msg)

            if service_config.host is None:
                # This won't happen if we arrive here from Manager.deploy(), the manager will assign a host
                msg = "host field in service definition must be set"
                raise ValueError(msg)

            # Sync the service source
            destination = (self._path / service_id).resolve()

            if destination.exists():
                # FIXME: this could be managed at the source manager level, so that
                # each implementation can decide what to do with existing data. For
                # example, the git source manager might decide to perform a git pull
                # instead of a brand new git clone. Leaving these optimnizations for
                # later, for the time being having an empty data folder works smoothly
                # for any source manager currently supported.
                rmtree(str(destination))

            source_manager = SOURCE_MANAGERS[source.type](config)
            source_manager.sync(source.name, str(destination))

            # Install dependencies
            self._install_dependencies(service_config)

            # Set environment variables
            self._set_environment_variables(service_config, destination)

            # Search for a workflow instance in the service path
            pythonpath = (destination / service_config.path).parent.resolve()
            sys.path.append(str(pythonpath))
            module_name, workflow_name = Path(service_config.path).name.split(":")
            module = importlib.import_module(module_name)

            workflow = getattr(module, workflow_name)
            workflow_config = WorkflowServiceConfig(
                host=service_config.host,
                port=service_config.port,
                internal_host="0.0.0.0",
                internal_port=service_config.port,
                service_name=service_id,
            )
            workflow_services[service_id] = WorkflowService(
                workflow=workflow,
                message_queue=self._queue_client,
                config=workflow_config,
            )

        if config.default_service in workflow_services:
            self._default_service = config.default_service
        else:
            msg = f"There is no service with id '{config.default_service}' in this deployment, cannot set default."
            logger.warning(msg)

        return workflow_services

    @staticmethod
    def _set_environment_variables(
        service_config: Service, root: Path | None = None
    ) -> None:
        """Sets environment variables for the service."""
        env_vars: dict[str, str | None] = {}

        if service_config.env:
            env_vars.update(**service_config.env)

        if service_config.env_files:
            for env_file in service_config.env_files:
                # use dotenv to parse env_file
                env_file_path = root / env_file if root else Path(env_file)
                env_vars.update(**dotenv_values(env_file_path))

        for k, v in env_vars.items():
            if v:
                os.environ[k] = v

    @staticmethod
    def _install_dependencies(service_config: Service) -> None:
        """Runs `pip install` on the items listed under `python-dependencies` in the service configuration."""
        if not service_config.python_dependencies:
            return

        try:
            subprocess.check_call(
                [
                    sys.executable,
                    "-m",
                    "pip",
                    "install",
                    *service_config.python_dependencies,
                ]
            )
        except subprocess.CalledProcessError as e:
            msg = f"Unable to install service dependencies using command '{e.cmd}': {e.stderr}"
            raise DeploymentError(msg) from None

    def _load_message_queue_client(
        self, cfg: MessageQueueConfig | None
    ) -> AbstractMessageQueue:
        # Use the SimpleMessageQueue as the default
        if cfg is None:
            # we use model_validate instead of __init__ to avoid static checkers complaining over field aliases
            cfg = SimpleMessageQueueConfig()

        if cfg.type == "aws":
            return AWSMessageQueue(cfg)
        elif cfg.type == "kafka":
            return KafkaMessageQueue(cfg)
        elif cfg.type == "rabbitmq":
            return RabbitMQMessageQueue(cfg)
        elif cfg.type == "redis":
            return RedisMessageQueue(cfg)
        elif cfg.type == "simple":
            return SimpleMessageQueue(cfg)
        elif cfg.type == "solace":
            return SolaceMessageQueue(cfg)  # pragma: no cover
        else:
            msg = f"Unsupported message queue: {cfg.type}"
            raise ValueError(msg)

返回一个异步客户端,用于与此部署交互。

client: Client

name 属性 #

返回此部署的名称。

name: str

path 属性 #

返回此部署根目录的绝对路径。

path: Path

service_names 属性 #

返回此部署中的服务名称列表。

service_names: list[str]

start 异步 #

将在此部署的asyncio循环中启动的任务。

start() -> None

此任务负责为核心组件和服务启动asyncio任务。所有任务在返回前都会被收集。

reload 异步 #

client 属性 #
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
async def start(self) -> None:
    """The task that will be launched in this deployment asyncio loop.

    This task is responsible for launching asyncio tasks for the core components and the services.
    All the tasks are gathered before returning.
    """
    self._running = True

    # Control Plane
    tasks = await self._start_control_plane()

    # Start the services. It makes no sense for a deployment to have no services but
    # the configuration allows it, so let's be defensive here.
    if self._workflow_services:
        tasks.append(asyncio.create_task(self._run_services()))

    # Run allthethings
    await asyncio.gather(*tasks)
    self._running = False

通过重启其服务来重新加载此部署。

reload(config: DeploymentConfig) -> None

重新加载过程包括取消服务任务,并依赖于_run_services()将使用新配置重新启动它们。此函数直到_run_services触发_service_startup_complete信号才会返回。

Manager #

client 属性 #
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
async def reload(self, config: DeploymentConfig) -> None:
    """Reload this deployment by restarting its services.

    The reload process consists in cancelling the services tasks
    and rely on the fact that _run_services() will restart them
    with the new configuration. This function won't return until
    _run_services will trigger the _service_startup_complete signal.
    """
    self._workflow_services = self._load_services(config)
    self._default_service = config.default_service

    for t in self._service_tasks:
        # t is awaited in _run_services(), we don't need to await here
        t.cancel()

    # Hold until _run_services() has restarted all the tasks
    await self._service_startup_complete.wait()

Manager编排部署及其运行时。

使用示例

deployment_names 属性 #
config = Config.from_yaml(data_path / "git_service.yaml")
manager = Manager(tmp_path)
t = threading.Thread(target=asyncio.run, args=(manager.serve(),))
t.start()
manager.deploy(config)
t.join()
client 属性 #
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
class Manager:
    """The Manager orchestrates deployments and their runtime.

    Usage example:
        ```python
        config = Config.from_yaml(data_path / "git_service.yaml")
        manager = Manager(tmp_path)
        t = threading.Thread(target=asyncio.run, args=(manager.serve(),))
        t.start()
        manager.deploy(config)
        t.join()
        ```
    """

    def __init__(
        self, deployments_path: Path = Path(".deployments"), max_deployments: int = 10
    ) -> None:
        """Creates a Manager instance.

        Args:
            deployments_path: The filesystem path where deployments will create their root path.
            max_deployments: The maximum number of deployments supported by this manager.
        """
        self._deployments: dict[str, Any] = {}
        self._deployments_path = deployments_path
        self._max_deployments = max_deployments
        self._pool = ThreadPool(processes=max_deployments)
        self._last_control_plane_port = 8002
        self._simple_message_queue_server: asyncio.Task | None = None

    @property
    def deployment_names(self) -> list[str]:
        """Return a list of names for the active deployments."""
        return list(self._deployments.keys())

    def get_deployment(self, deployment_name: str) -> Deployment | None:
        return self._deployments.get(deployment_name)

    async def serve(self) -> None:
        """The server loop, it keeps the manager running."""
        event = asyncio.Event()
        try:
            # Waits indefinitely since `event` will never be set
            await event.wait()
        except asyncio.CancelledError:
            if self._simple_message_queue_server is not None:
                self._simple_message_queue_server.cancel()
                await self._simple_message_queue_server

    async def deploy(self, config: DeploymentConfig, reload: bool = False) -> None:
        """Creates a Deployment instance and starts the relative runtime.

        Args:
            config: The deployment configuration.
            reload: Reload an existing deployment instead of raising an error.

        Raises:
            ValueError: If a deployment with the same name already exists or the maximum number of deployment exceeded.
            DeploymentError: If it wasn't possible to create a deployment.
        """
        if not reload:
            # Raise an error if deployment already exists
            if config.name in self._deployments:
                msg = f"Deployment already exists: {config.name}"
                raise ValueError(msg)

            # Raise an error if we can't create any new deployment
            if len(self._deployments) == self._max_deployments:
                msg = "Reached the maximum number of deployments, cannot schedule more"
                raise ValueError(msg)

            # Set the control plane TCP port in the config where not specified
            self._assign_control_plane_address(config)

            # Get the message queue configuration
            msg_queue = config.message_queue or SimpleMessageQueueConfig()

            # Spawn SimpleMessageQueue server if needed
            if (
                isinstance(msg_queue, SimpleMessageQueueConfig)
                and self._simple_message_queue_server is None
            ):
                self._simple_message_queue_server = asyncio.create_task(
                    SimpleMessageQueueServer(msg_queue).launch_server()
                )

                # the other components need the queue to run in order to start, give the queue some time to start
                try:
                    async for attempt in AsyncRetrying(
                        wait=wait_exponential(min=1, max=10),
                    ):
                        with attempt:
                            async with httpx.AsyncClient() as client:
                                response = await client.get(msg_queue.base_url)
                                response.raise_for_status()
                except RetryError:
                    msg = f"Unable to reach SimpleMessageQueueServer at {msg_queue.base_url}"
                    raise DeploymentError(msg)

            deployment = Deployment(config=config, root_path=self._deployments_path)
            self._deployments[config.name] = deployment
            self._pool.apply_async(func=asyncio.run, args=(deployment.start(),))
        else:
            if config.name not in self._deployments:
                msg = f"Cannot find deployment to reload: {config.name}"
                raise ValueError(msg)

            deployment = self._deployments[config.name]
            await deployment.reload(config)

    def _assign_control_plane_address(self, config: DeploymentConfig) -> None:
        for service in config.services.values():
            if not service.port:
                service.port = self._last_control_plane_port
                self._last_control_plane_port += 1
            if not service.host:
                service.host = "localhost"

返回活动部署的名称列表。

deployment_names: list[str]

serve 异步 #

服务器循环,它使Manager保持运行。

serve() -> None

deploy 异步 #

client 属性 #
383
384
385
386
387
388
389
390
391
392
async def serve(self) -> None:
    """The server loop, it keeps the manager running."""
    event = asyncio.Event()
    try:
        # Waits indefinitely since `event` will never be set
        await event.wait()
    except asyncio.CancelledError:
        if self._simple_message_queue_server is not None:
            self._simple_message_queue_server.cancel()
            await self._simple_message_queue_server

创建一个Deployment实例并启动相应的运行时。

deploy(config: DeploymentConfig, reload: bool = False) -> None

参数

名称

类型 描述 默认值 config
部署配置。 DeploymentConfig

必需

bool
reload 重新加载现有部署,而不是引发错误。

False

引发

ValueError

描述 默认值
如果已存在同名部署或超出最大部署数量。

DeploymentError

如果无法创建部署。

DeploymentConfig #

client 属性 #
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
async def deploy(self, config: DeploymentConfig, reload: bool = False) -> None:
    """Creates a Deployment instance and starts the relative runtime.

    Args:
        config: The deployment configuration.
        reload: Reload an existing deployment instead of raising an error.

    Raises:
        ValueError: If a deployment with the same name already exists or the maximum number of deployment exceeded.
        DeploymentError: If it wasn't possible to create a deployment.
    """
    if not reload:
        # Raise an error if deployment already exists
        if config.name in self._deployments:
            msg = f"Deployment already exists: {config.name}"
            raise ValueError(msg)

        # Raise an error if we can't create any new deployment
        if len(self._deployments) == self._max_deployments:
            msg = "Reached the maximum number of deployments, cannot schedule more"
            raise ValueError(msg)

        # Set the control plane TCP port in the config where not specified
        self._assign_control_plane_address(config)

        # Get the message queue configuration
        msg_queue = config.message_queue or SimpleMessageQueueConfig()

        # Spawn SimpleMessageQueue server if needed
        if (
            isinstance(msg_queue, SimpleMessageQueueConfig)
            and self._simple_message_queue_server is None
        ):
            self._simple_message_queue_server = asyncio.create_task(
                SimpleMessageQueueServer(msg_queue).launch_server()
            )

            # the other components need the queue to run in order to start, give the queue some time to start
            try:
                async for attempt in AsyncRetrying(
                    wait=wait_exponential(min=1, max=10),
                ):
                    with attempt:
                        async with httpx.AsyncClient() as client:
                            response = await client.get(msg_queue.base_url)
                            response.raise_for_status()
            except RetryError:
                msg = f"Unable to reach SimpleMessageQueueServer at {msg_queue.base_url}"
                raise DeploymentError(msg)

        deployment = Deployment(config=config, root_path=self._deployments_path)
        self._deployments[config.name] = deployment
        self._pool.apply_async(func=asyncio.run, args=(deployment.start(),))
    else:
        if config.name not in self._deployments:
            msg = f"Cannot find deployment to reload: {config.name}"
            raise ValueError(msg)

        deployment = self._deployments[config.name]
        await deployment.reload(config)

基类: BaseModel

映射部署配置文件的模型定义。

str

名称

类型 描述 默认值 config
name ControlPlaneConfig
bool
control_plane message_queue
bool
Annotated[Union[AWSMessageQueueConfig, KafkaMessageQueueConfig, RabbitMQMessageQueueConfig, RedisMessageQueueConfig, SimpleMessageQueueConfig, SolaceMessageQueueConfig], FieldInfo] | None default_service
str | None dict[str, llama_deploy.apiserver.deployment_config_parser.Service]
services base_path
bool
Path PosixPath('.')
源代码位于 llama_deploy/apiserver/deployment_config_parser.py
from_yaml_bytes 类方法 #
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
class DeploymentConfig(BaseModel):
    """Model definition mapping a deployment config file."""

    name: str
    control_plane: ControlPlaneConfig = Field(alias="control-plane")
    message_queue: MessageQueueConfig | None = Field(None, alias="message-queue")
    default_service: str | None = Field(None, alias="default-service")
    services: dict[str, Service]
    base_path: Path = Path()

    @classmethod
    def from_yaml_bytes(cls, src: bytes) -> Self:
        """Read config data from bytes containing yaml code."""
        config = yaml.safe_load(src) or {}
        return cls(**config)

    @classmethod
    def from_yaml(cls, path: Path) -> Self:
        """Read config data from a yaml file."""
        with open(path, "r") as yaml_file:
            config = yaml.safe_load(yaml_file) or {}
        return cls(**config, base_path=path.parent)

从包含yaml代码的字节中读取配置数据。

from_yaml_bytes(src: bytes) -> Self

from_yaml 类方法 #

from_yaml_bytes 类方法 #
76
77
78
79
80
@classmethod
def from_yaml_bytes(cls, src: bytes) -> Self:
    """Read config data from bytes containing yaml code."""
    config = yaml.safe_load(src) or {}
    return cls(**config)

从yaml文件中读取配置数据。

from_yaml(path: Path) -> Self

SourceManager #

from_yaml_bytes 类方法 #
82
83
84
85
86
87
@classmethod
def from_yaml(cls, path: Path) -> Self:
    """Read config data from a yaml file."""
    with open(path, "r") as yaml_file:
        config = yaml.safe_load(yaml_file) or {}
    return cls(**config, base_path=path.parent)

基类: ABC

负责管理部署源的类应实现的协议。

源代码位于 llama_deploy/apiserver/source_managers/base.py

sync 抽象方法 #
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
class SourceManager(ABC):
    """Protocol to be implemented by classes responsible for managing Deployment sources."""

    def __init__(self, config: DeploymentConfig) -> None:
        self._config = config

    @abstractmethod
    def sync(
        self, source: str, destination: str | None = None
    ) -> None:  # pragma: no cover
        """Fetches resources from `source` so they can be used in a deployment.

        Optionally uses `destination` to store data when this makes sense for the
        specific source type.
        """

source 获取资源,以便它们可以在部署中使用。

sync(source: str, destination: str | None = None) -> None

对于特定源类型有意义时,可选地使用 destination 存储数据。

GitSourceManager #

sync 抽象方法 #
12
13
14
15
16
17
18
19
20
@abstractmethod
def sync(
    self, source: str, destination: str | None = None
) -> None:  # pragma: no cover
    """Fetches resources from `source` so they can be used in a deployment.

    Optionally uses `destination` to store data when this makes sense for the
    specific source type.
    """

基类: SourceManager

专用于 git 类型源的SourceManager。

源代码位于 llama_deploy/apiserver/source_managers/git.py

sync #
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class GitSourceManager(SourceManager):
    """A SourceManager specialized for sources of type `git`."""

    def sync(self, source: str, destination: str | None = None) -> None:
        """Clones the repository at URL `source` into a local path `destination`.

        Args:
            source: The URL of the git repository. It can optionally contain a branch target using the name convention
                `git_repo_url@branch_name`. For example, "https://example.com/llama_deploy.git@branch_name".
            destination: The path in the local filesystem where to clone the git repository.
        """
        if not destination:
            raise ValueError("Destination cannot be empty")

        url, branch_name = self._parse_source(source)
        kwargs: dict[str, Any] = {"url": url, "to_path": destination}
        if branch_name:
            kwargs["multi_options"] = [f"-b {branch_name}", "--single-branch"]

        Repo.clone_from(**kwargs)

    @staticmethod
    def _parse_source(source: str) -> tuple[str, str | None]:
        branch_name = None
        toks = source.split("@")
        url = toks[0]
        if len(toks) > 1:
            branch_name = toks[1]

        return url, branch_name

将位于URL source 的仓库克隆到本地路径 destination

sync(source: str, destination: str | None = None) -> None

source

名称

类型 描述 默认值 config
Git仓库的URL。它可以选择包含一个分支目标,使用命名约定 git_repo_url@branch_name。例如,“https://example.com/llama_deploy.git@branch_name”。 ControlPlaneConfig

destination

bool
本地文件系统中克隆git仓库的路径。 dict[str, llama_deploy.apiserver.deployment_config_parser.Service]

LocalSourceManager #

sync #
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def sync(self, source: str, destination: str | None = None) -> None:
    """Clones the repository at URL `source` into a local path `destination`.

    Args:
        source: The URL of the git repository. It can optionally contain a branch target using the name convention
            `git_repo_url@branch_name`. For example, "https://example.com/llama_deploy.git@branch_name".
        destination: The path in the local filesystem where to clone the git repository.
    """
    if not destination:
        raise ValueError("Destination cannot be empty")

    url, branch_name = self._parse_source(source)
    kwargs: dict[str, Any] = {"url": url, "to_path": destination}
    if branch_name:
        kwargs["multi_options"] = [f"-b {branch_name}", "--single-branch"]

    Repo.clone_from(**kwargs)

专用于 local 类型源的SourceManager。

专用于 git 类型源的SourceManager。

源代码位于 llama_deploy/apiserver/source_managers/local.py

sync #
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class LocalSourceManager(SourceManager):
    """A SourceManager specialized for sources of type `local`."""

    def sync(self, source: str, destination: str | None = None) -> None:
        """Copies the folder with path `source` into a local path `destination`.

        Args:
            source: The filesystem path to the folder containing the source code.
            destination: The path in the local filesystem where to copy the source directory.
        """
        if not destination:
            raise ValueError("Destination cannot be empty")

        try:
            final_path = self._config.base_path / source
            shutil.copytree(final_path, destination, dirs_exist_ok=True)
        except Exception as e:
            msg = f"Unable to copy {source} into {destination}: {e}"
            raise ValueError(msg) from e

将路径为 source 的文件夹复制到本地路径 destination

sync(source: str, destination: str | None = None) -> None

包含源代码的文件夹的文件系统路径。

名称

类型 描述 默认值 config
Git仓库的URL。它可以选择包含一个分支目标,使用命名约定 git_repo_url@branch_name。例如,“https://example.com/llama_deploy.git@branch_name”。 ControlPlaneConfig

本地文件系统中复制源代码目录的路径。

bool
本地文件系统中克隆git仓库的路径。 dict[str, llama_deploy.apiserver.deployment_config_parser.Service]

返回顶部

sync #
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def sync(self, source: str, destination: str | None = None) -> None:
    """Copies the folder with path `source` into a local path `destination`.

    Args:
        source: The filesystem path to the folder containing the source code.
        destination: The path in the local filesystem where to copy the source directory.
    """
    if not destination:
        raise ValueError("Destination cannot be empty")

    try:
        final_path = self._config.base_path / source
        shutil.copytree(final_path, destination, dirs_exist_ok=True)
    except Exception as e:
        msg = f"Unable to copy {source} into {destination}: {e}"
        raise ValueError(msg) from e