跳到内容

Python SDK#

客户端#

LlamaDeploy Python 客户端。

客户端提供了对 asyncio 和非 asyncio API 的访问。要访问同步 API,只需使用 client.sync 的方法。

使用示例

from llama_deploy.client import Client

# Use the same client instance
c = Client()

async def an_async_function():
    status = await client.apiserver.status()

def normal_function():
    status = client.sync.apiserver.status()

源代码位于 llama_deploy/client/client.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class Client(_BaseClient):
    """The LlamaDeploy Python client.

    The client is gives access to both the asyncio and non-asyncio APIs. To access the sync
    API just use methods of `client.sync`.

    Example usage:
    ```py
    from llama_deploy.client import Client

    # Use the same client instance
    c = Client()

    async def an_async_function():
        status = await client.apiserver.status()

    def normal_function():
        status = client.sync.apiserver.status()
    ```
    """

    @property
    def sync(self) -> "_SyncClient":
        """Returns the sync version of the client API."""
        try:
            asyncio.get_running_loop()
        except RuntimeError:
            return _SyncClient(**self.model_dump())

        msg = "You cannot use the sync client within an async event loop - just await the async methods directly."
        raise RuntimeError(msg)

    @property
    def apiserver(self) -> ApiServer:
        """Returns the ApiServer model."""
        return ApiServer(client=self, id="apiserver")

    @property
    def core(self) -> Core:
        """Returns the Core model."""
        return Core(client=self, id="core")

sync property #

sync: _SyncClient

返回客户端 API 的同步版本。

apiserver property #

apiserver: ApiServer

返回 ApiServer 模型。

core property #

core: Core

返回 Core 模型。

API Server 功能#

SessionCollection #

基类: Collection

表示给定 Deployment 的 Session 集合的模型。

参数

名称 类型 描述 默认值
deployment_id str
必需
源代码位于 llama_deploy/client/models/apiserver.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
class SessionCollection(Collection):
    """A model representing a collection of session for a given deployment."""

    deployment_id: str

    async def delete(self, session_id: str) -> None:
        """Deletes the session with the provided `session_id`.

        Args:
            session_id: The id of the session that will be removed

        Raises:
            HTTPException: If the session couldn't be found with the id provided.
        """
        delete_url = f"{self.client.api_server_url}/deployments/{self.deployment_id}/sessions/delete"

        await self.client.request(
            "POST",
            delete_url,
            params={"session_id": session_id},
            verify=not self.client.disable_ssl,
            timeout=self.client.timeout,
        )

    async def create(self) -> SessionDefinition:
        """"""
        create_url = f"{self.client.api_server_url}/deployments/{self.deployment_id}/sessions/create"

        r = await self.client.request(
            "POST",
            create_url,
            verify=not self.client.disable_ssl,
            timeout=self.client.timeout,
        )

        return SessionDefinition(**r.json())

    async def list(self) -> list[SessionDefinition]:
        """Returns a collection of all the sessions in the given deployment."""
        sessions_url = (
            f"{self.client.api_server_url}/deployments/{self.deployment_id}/sessions"
        )
        r = await self.client.request(
            "GET",
            sessions_url,
            verify=not self.client.disable_ssl,
            timeout=self.client.timeout,
        )

        return r.json()

delete async #

delete(session_id: str) -> None

删除具有提供的 session_id 的会话。

参数

名称 类型 描述 默认值
session_id str

将要删除的会话 ID

必需

引发

类型 描述
HTTPException

如果找不到具有提供的 ID 的会话。

源代码位于 llama_deploy/client/models/apiserver.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
async def delete(self, session_id: str) -> None:
    """Deletes the session with the provided `session_id`.

    Args:
        session_id: The id of the session that will be removed

    Raises:
        HTTPException: If the session couldn't be found with the id provided.
    """
    delete_url = f"{self.client.api_server_url}/deployments/{self.deployment_id}/sessions/delete"

    await self.client.request(
        "POST",
        delete_url,
        params={"session_id": session_id},
        verify=not self.client.disable_ssl,
        timeout=self.client.timeout,
    )

create async #

create() -> SessionDefinition
源代码位于 llama_deploy/client/models/apiserver.py
44
45
46
47
48
49
50
51
52
53
54
55
async def create(self) -> SessionDefinition:
    """"""
    create_url = f"{self.client.api_server_url}/deployments/{self.deployment_id}/sessions/create"

    r = await self.client.request(
        "POST",
        create_url,
        verify=not self.client.disable_ssl,
        timeout=self.client.timeout,
    )

    return SessionDefinition(**r.json())

list async #

返回给定部署中所有会话的集合。

源代码位于 llama_deploy/client/models/apiserver.py
57
58
59
60
61
62
63
64
65
66
67
68
69
async def list(self) -> list[SessionDefinition]:
    """Returns a collection of all the sessions in the given deployment."""
    sessions_url = (
        f"{self.client.api_server_url}/deployments/{self.deployment_id}/sessions"
    )
    r = await self.client.request(
        "GET",
        sessions_url,
        verify=not self.client.disable_ssl,
        timeout=self.client.timeout,
    )

    return r.json()

Task #

基类: Model

表示给定部署中给定会话所属任务的模型。

参数

名称 类型 描述 默认值
deployment_id str
必需
session_id str
必需
源代码位于 llama_deploy/client/models/apiserver.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
class Task(Model):
    """A model representing a task belonging to a given session in the given deployment."""

    deployment_id: str
    session_id: str

    async def results(self) -> TaskResult:
        """Returns the result of a given task."""
        results_url = f"{self.client.api_server_url}/deployments/{self.deployment_id}/tasks/{self.id}/results"

        r = await self.client.request(
            "GET",
            results_url,
            verify=not self.client.disable_ssl,
            params={"session_id": self.session_id},
            timeout=self.client.timeout,
        )
        return TaskResult.model_validate(r.json())

    async def send_event(self, ev: Event, service_name: str) -> EventDefinition:
        """Sends a human response event."""
        url = f"{self.client.api_server_url}/deployments/{self.deployment_id}/tasks/{self.id}/events"

        serializer = JsonSerializer()
        event_def = EventDefinition(
            event_obj_str=serializer.serialize(ev), agent_id=service_name
        )

        r = await self.client.request(
            "POST",
            url,
            verify=not self.client.disable_ssl,
            params={"session_id": self.session_id},
            json=event_def.model_dump(),
            timeout=self.client.timeout,
        )
        return EventDefinition.model_validate(r.json())

    async def events(self) -> AsyncGenerator[dict[str, Any], None]:  # pragma: no cover
        """Returns a generator object to consume the events streamed from a service."""
        events_url = f"{self.client.api_server_url}/deployments/{self.deployment_id}/tasks/{self.id}/events"

        while True:
            try:
                async with httpx.AsyncClient(
                    verify=not self.client.disable_ssl
                ) as client:
                    async with client.stream(
                        "GET", events_url, params={"session_id": self.session_id}
                    ) as response:
                        response.raise_for_status()
                        async for line in response.aiter_lines():
                            json_line = json.loads(line)
                            yield json_line
                        break  # Exit the function if successful
            except httpx.HTTPStatusError as e:
                if e.response.status_code != 404:
                    raise  # Re-raise if it's not a 404 error
                await asyncio.sleep(self.client.poll_interval)

results async #

results() -> TaskResult

返回给定任务的结果。

源代码位于 llama_deploy/client/models/apiserver.py
78
79
80
81
82
83
84
85
86
87
88
89
async def results(self) -> TaskResult:
    """Returns the result of a given task."""
    results_url = f"{self.client.api_server_url}/deployments/{self.deployment_id}/tasks/{self.id}/results"

    r = await self.client.request(
        "GET",
        results_url,
        verify=not self.client.disable_ssl,
        params={"session_id": self.session_id},
        timeout=self.client.timeout,
    )
    return TaskResult.model_validate(r.json())

send_event async #

send_event(ev: Event, service_name: str) -> EventDefinition

发送人工响应事件。

源代码位于 llama_deploy/client/models/apiserver.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
async def send_event(self, ev: Event, service_name: str) -> EventDefinition:
    """Sends a human response event."""
    url = f"{self.client.api_server_url}/deployments/{self.deployment_id}/tasks/{self.id}/events"

    serializer = JsonSerializer()
    event_def = EventDefinition(
        event_obj_str=serializer.serialize(ev), agent_id=service_name
    )

    r = await self.client.request(
        "POST",
        url,
        verify=not self.client.disable_ssl,
        params={"session_id": self.session_id},
        json=event_def.model_dump(),
        timeout=self.client.timeout,
    )
    return EventDefinition.model_validate(r.json())

events async #

events() -> AsyncGenerator[dict[str, Any], None]

返回一个生成器对象,用于消费从服务流式传输的事件。

源代码位于 llama_deploy/client/models/apiserver.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
async def events(self) -> AsyncGenerator[dict[str, Any], None]:  # pragma: no cover
    """Returns a generator object to consume the events streamed from a service."""
    events_url = f"{self.client.api_server_url}/deployments/{self.deployment_id}/tasks/{self.id}/events"

    while True:
        try:
            async with httpx.AsyncClient(
                verify=not self.client.disable_ssl
            ) as client:
                async with client.stream(
                    "GET", events_url, params={"session_id": self.session_id}
                ) as response:
                    response.raise_for_status()
                    async for line in response.aiter_lines():
                        json_line = json.loads(line)
                        yield json_line
                    break  # Exit the function if successful
        except httpx.HTTPStatusError as e:
            if e.response.status_code != 404:
                raise  # Re-raise if it's not a 404 error
            await asyncio.sleep(self.client.poll_interval)

TaskCollection #

基类: Collection

表示给定部署的任务集合的模型。

参数

名称 类型 描述 默认值
deployment_id str
必需
源代码位于 llama_deploy/client/models/apiserver.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
class TaskCollection(Collection):
    """A model representing a collection of tasks for a given deployment."""

    deployment_id: str

    async def run(self, task: TaskDefinition) -> Any:
        """Runs a task and returns the results once it's done.

        Args:
            task: The definition of the task we want to run.
        """
        run_url = (
            f"{self.client.api_server_url}/deployments/{self.deployment_id}/tasks/run"
        )
        if task.session_id:
            run_url += f"?session_id={task.session_id}"

        r = await self.client.request(
            "POST",
            run_url,
            verify=not self.client.disable_ssl,
            json=task.model_dump(),
            timeout=self.client.timeout,
        )

        return r.json()

    async def create(self, task: TaskDefinition) -> Task:
        """Runs a task returns it immediately, without waiting for the results."""
        create_url = f"{self.client.api_server_url}/deployments/{self.deployment_id}/tasks/create"

        r = await self.client.request(
            "POST",
            create_url,
            verify=not self.client.disable_ssl,
            json=task.model_dump(),
            timeout=self.client.timeout,
        )
        response_fields = r.json()

        model_class = self._prepare(Task)
        return model_class(
            client=self.client,
            deployment_id=self.deployment_id,
            id=response_fields["task_id"],
            session_id=response_fields["session_id"],
        )

    async def list(self) -> list[Task]:
        """Returns the list of tasks from this collection."""
        tasks_url = (
            f"{self.client.api_server_url}/deployments/{self.deployment_id}/tasks"
        )
        r = await self.client.request(
            "GET",
            tasks_url,
            verify=not self.client.disable_ssl,
            timeout=self.client.timeout,
        )
        task_model_class = self._prepare(Task)
        items = {
            "id": task_model_class(
                client=self.client,
                id=task_def.task_id,
                session_id=task_def.session_id,
                deployment_id=self.deployment_id,
            )
            for task_def in r.json()
        }
        model_class = self._prepare(TaskCollection)
        return model_class(
            client=self.client, deployment_id=self.deployment_id, items=items
        )

run async #

run(task: TaskDefinition) -> Any

运行一个任务并在完成后返回结果。

参数

名称 类型 描述 默认值
task TaskDefinition

我们想要运行的任务定义。

必需
源代码位于 llama_deploy/client/models/apiserver.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
async def run(self, task: TaskDefinition) -> Any:
    """Runs a task and returns the results once it's done.

    Args:
        task: The definition of the task we want to run.
    """
    run_url = (
        f"{self.client.api_server_url}/deployments/{self.deployment_id}/tasks/run"
    )
    if task.session_id:
        run_url += f"?session_id={task.session_id}"

    r = await self.client.request(
        "POST",
        run_url,
        verify=not self.client.disable_ssl,
        json=task.model_dump(),
        timeout=self.client.timeout,
    )

    return r.json()

create async #

create(task: TaskDefinition) -> Task

运行一个任务并立即返回,不等待结果。

源代码位于 llama_deploy/client/models/apiserver.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
async def create(self, task: TaskDefinition) -> Task:
    """Runs a task returns it immediately, without waiting for the results."""
    create_url = f"{self.client.api_server_url}/deployments/{self.deployment_id}/tasks/create"

    r = await self.client.request(
        "POST",
        create_url,
        verify=not self.client.disable_ssl,
        json=task.model_dump(),
        timeout=self.client.timeout,
    )
    response_fields = r.json()

    model_class = self._prepare(Task)
    return model_class(
        client=self.client,
        deployment_id=self.deployment_id,
        id=response_fields["task_id"],
        session_id=response_fields["session_id"],
    )

list async #

list() -> list[Task]

返回此集合中的任务列表。

源代码位于 llama_deploy/client/models/apiserver.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
async def list(self) -> list[Task]:
    """Returns the list of tasks from this collection."""
    tasks_url = (
        f"{self.client.api_server_url}/deployments/{self.deployment_id}/tasks"
    )
    r = await self.client.request(
        "GET",
        tasks_url,
        verify=not self.client.disable_ssl,
        timeout=self.client.timeout,
    )
    task_model_class = self._prepare(Task)
    items = {
        "id": task_model_class(
            client=self.client,
            id=task_def.task_id,
            session_id=task_def.session_id,
            deployment_id=self.deployment_id,
        )
        for task_def in r.json()
    }
    model_class = self._prepare(TaskCollection)
    return model_class(
        client=self.client, deployment_id=self.deployment_id, items=items
    )

Deployment #

基类: Model

表示部署的模型。

源代码位于 llama_deploy/client/models/apiserver.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
class Deployment(Model):
    """A model representing a deployment."""

    @property
    def tasks(self) -> TaskCollection:
        """Returns a collection of tasks from all the sessions in the given deployment."""

        model_class = self._prepare(TaskCollection)
        return model_class(client=self.client, deployment_id=self.id, items={})

    @property
    def sessions(self) -> SessionCollection:
        """Returns a collection of all the sessions in the given deployment."""

        coll_model_class = self._prepare(SessionCollection)
        return coll_model_class(client=self.client, deployment_id=self.id, items={})

tasks property #

返回给定部署中所有会话的任务集合。

sessions property #

返回给定部署中所有会话的集合。

DeploymentCollection #

基类: Collection

表示当前活动部署集合的模型。

源代码位于 llama_deploy/client/models/apiserver.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
class DeploymentCollection(Collection):
    """A model representing a collection of deployments currently active."""

    async def create(self, config: TextIO, reload: bool = False) -> Deployment:
        """Creates a new deployment from a deployment file.

        If `reload` is true, an existing deployment will be reloaded, otherwise
        an error will be raised.

        Example:
            ```
            with open("deployment.yml") as f:
                await client.apiserver.deployments.create(f)
            ```
        """
        create_url = f"{self.client.api_server_url}/deployments/create"

        files = {"config_file": config.read()}
        r = await self.client.request(
            "POST",
            create_url,
            files=files,
            params={"reload": reload},
            verify=not self.client.disable_ssl,
            timeout=self.client.timeout,
        )

        model_class = self._prepare(Deployment)
        return model_class(client=self.client, id=r.json().get("name"))

    async def get(self, id: str) -> Deployment:
        """Gets a deployment by id."""
        get_url = f"{self.client.api_server_url}/deployments/{id}"
        # Current version of apiserver doesn't returns anything useful in this endpoint, let's just ignore it
        await self.client.request(
            "GET",
            get_url,
            verify=not self.client.disable_ssl,
            timeout=self.client.timeout,
        )
        model_class = self._prepare(Deployment)
        return model_class(client=self.client, id=id)

    async def list(self) -> list[Deployment]:
        deployments_url = f"{self.client.api_server_url}/deployments/"
        r = await self.client.request("GET", deployments_url)
        model_class = self._prepare(Deployment)
        deployments = [model_class(client=self.client, id=name) for name in r.json()]
        return deployments

create async #

create(config: TextIO, reload: bool = False) -> Deployment

从部署文件创建一个新的部署。

如果 reload 为 true,则将重新加载现有部署,否则将引发错误。

示例
with open("deployment.yml") as f:
    await client.apiserver.deployments.create(f)
源代码位于 llama_deploy/client/models/apiserver.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
async def create(self, config: TextIO, reload: bool = False) -> Deployment:
    """Creates a new deployment from a deployment file.

    If `reload` is true, an existing deployment will be reloaded, otherwise
    an error will be raised.

    Example:
        ```
        with open("deployment.yml") as f:
            await client.apiserver.deployments.create(f)
        ```
    """
    create_url = f"{self.client.api_server_url}/deployments/create"

    files = {"config_file": config.read()}
    r = await self.client.request(
        "POST",
        create_url,
        files=files,
        params={"reload": reload},
        verify=not self.client.disable_ssl,
        timeout=self.client.timeout,
    )

    model_class = self._prepare(Deployment)
    return model_class(client=self.client, id=r.json().get("name"))

get async #

get(id: str) -> Deployment

按 ID 获取部署。

源代码位于 llama_deploy/client/models/apiserver.py
256
257
258
259
260
261
262
263
264
265
266
267
async def get(self, id: str) -> Deployment:
    """Gets a deployment by id."""
    get_url = f"{self.client.api_server_url}/deployments/{id}"
    # Current version of apiserver doesn't returns anything useful in this endpoint, let's just ignore it
    await self.client.request(
        "GET",
        get_url,
        verify=not self.client.disable_ssl,
        timeout=self.client.timeout,
    )
    model_class = self._prepare(Deployment)
    return model_class(client=self.client, id=id)

ApiServer #

基类: Model

表示 API 服务器实例的模型。

源代码位于 llama_deploy/client/models/apiserver.py
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
class ApiServer(Model):
    """A model representing the API Server instance."""

    async def status(self) -> Status:
        """Returns the status of the API Server."""
        status_url = f"{self.client.api_server_url}/status/"

        try:
            r = await self.client.request(
                "GET",
                status_url,
                verify=not self.client.disable_ssl,
                timeout=self.client.timeout,
            )
        except httpx.ConnectError:
            return Status(
                status=StatusEnum.DOWN,
                status_message="API Server is down",
            )

        if r.status_code >= 400:
            body = r.json()
            return Status(status=StatusEnum.UNHEALTHY, status_message=r.text)

        description = "LlamaDeploy is up and running."
        body = r.json()
        deployments = body.get("deployments") or []
        if deployments:
            description += "\nActive deployments:"
            for d in deployments:
                description += f"\n- {d}"
        else:
            description += "\nCurrently there are no active deployments"

        return Status(
            status=StatusEnum.HEALTHY,
            status_message=description,
            deployments=deployments,
        )

    @property
    def deployments(self) -> DeploymentCollection:
        """Returns a collection of deployments currently active in the API Server."""
        model_class = self._prepare(DeploymentCollection)
        return model_class(client=self.client, items={})

deployments property #

deployments: DeploymentCollection

返回 API 服务器中当前活动部署的集合。

status async #

status() -> Status

返回 API 服务器的状态。

源代码位于 llama_deploy/client/models/apiserver.py
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
async def status(self) -> Status:
    """Returns the status of the API Server."""
    status_url = f"{self.client.api_server_url}/status/"

    try:
        r = await self.client.request(
            "GET",
            status_url,
            verify=not self.client.disable_ssl,
            timeout=self.client.timeout,
        )
    except httpx.ConnectError:
        return Status(
            status=StatusEnum.DOWN,
            status_message="API Server is down",
        )

    if r.status_code >= 400:
        body = r.json()
        return Status(status=StatusEnum.UNHEALTHY, status_message=r.text)

    description = "LlamaDeploy is up and running."
    body = r.json()
    deployments = body.get("deployments") or []
    if deployments:
        description += "\nActive deployments:"
        for d in deployments:
            description += f"\n- {d}"
    else:
        description += "\nCurrently there are no active deployments"

    return Status(
        status=StatusEnum.HEALTHY,
        status_message=description,
        deployments=deployments,
    )

控制平面功能#

Session #

基类: Model

源代码位于 llama_deploy/client/models/core.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
class Session(Model):
    async def run(self, service_name: str, **run_kwargs: Any) -> str:
        """Implements the workflow-based run API for a session."""
        task_input = json.dumps(run_kwargs)
        task_def = TaskDefinition(input=task_input, agent_id=service_name)
        task_id = await self._do_create_task(task_def)

        # wait for task to complete, up to timeout seconds
        async def _get_result() -> str:
            while True:
                task_result = await self._do_get_task_result(task_id)

                if isinstance(task_result, TaskResult):
                    return task_result.result or ""
                await asyncio.sleep(self.client.poll_interval)

        return await asyncio.wait_for(_get_result(), timeout=self.client.timeout)

    async def run_nowait(self, service_name: str, **run_kwargs: Any) -> str:
        """Implements the workflow-based run API for a session, but does not wait for the task to complete."""

        task_input = json.dumps(run_kwargs)
        task_def = TaskDefinition(input=task_input, agent_id=service_name)
        task_id = await self._do_create_task(task_def)

        return task_id

    async def create_task(self, task_def: TaskDefinition) -> str:
        """Create a new task in this session.

        Args:
            task_def (Union[str, TaskDefinition]): The task definition or input string.

        Returns:
            str: The ID of the created task.
        """
        return await self._do_create_task(task_def)

    async def _do_create_task(self, task_def: TaskDefinition) -> str:
        """Async-only version of create_task, to be used internally from other methods."""
        task_def.session_id = self.id
        url = f"{self.client.control_plane_url}/sessions/{self.id}/tasks"
        response = await self.client.request("POST", url, json=task_def.model_dump())
        return response.json()

    async def get_task_result(self, task_id: str) -> TaskResult | None:
        """Get the result of a task in this session if it has one.

        Args:
            task_id (str): The ID of the task to get the result for.

        Returns:
            Optional[TaskResult]: The result of the task if it has one, otherwise None.
        """
        return await self._do_get_task_result(task_id)

    async def _do_get_task_result(self, task_id: str) -> TaskResult | None:
        """Async-only version of get_task_result, to be used internally from other methods."""
        url = (
            f"{self.client.control_plane_url}/sessions/{self.id}/tasks/{task_id}/result"
        )
        response = await self.client.request("GET", url)
        data = response.json()
        return TaskResult(**data) if data else None

    async def get_tasks(self) -> list[TaskDefinition]:
        """Get all tasks in this session.

        Returns:
            list[TaskDefinition]: A list of task definitions in the session.
        """
        url = f"{self.client.control_plane_url}/sessions/{self.id}/tasks"
        response = await self.client.request("GET", url)
        return [TaskDefinition(**task) for task in response.json()]

    async def send_event(self, service_name: str, task_id: str, ev: Event) -> None:
        """Send event to a Workflow service.

        Args:
            event (Event): The event to be submitted to the workflow.

        Returns:
            None
        """
        serializer = JsonSerializer()
        event_def = EventDefinition(
            event_obj_str=serializer.serialize(ev), agent_id=service_name
        )

        url = f"{self.client.control_plane_url}/sessions/{self.id}/tasks/{task_id}/send_event"
        await self.client.request("POST", url, json=event_def.model_dump())

    async def send_event_def(self, task_id: str, ev_def: EventDefinition) -> None:
        """Send event to a Workflow service.

        Args:
            event (Event): The event to be submitted to the workflow.

        Returns:
            None
        """
        url = f"{self.client.control_plane_url}/sessions/{self.id}/tasks/{task_id}/send_event"
        await self.client.request("POST", url, json=ev_def.model_dump())

    async def get_task_result_stream(
        self, task_id: str
    ) -> AsyncGenerator[dict[str, Any], None]:
        """Get the result of a task in this session if it has one.

        Args:
            task_id (str): The ID of the task to get the result for.

        Returns:
            AsyncGenerator[str, None, None]: A generator that yields the result of the task.
        """
        url = f"{self.client.control_plane_url}/sessions/{self.id}/tasks/{task_id}/result_stream"
        start_time = time.time()
        while True:
            try:
                async with httpx.AsyncClient(timeout=self.client.timeout) as client:
                    async with client.stream("GET", url) as response:
                        response.raise_for_status()
                        async for line in response.aiter_lines():
                            json_line = json.loads(line)
                            yield json_line
                        break  # Exit the function if successful
            except httpx.HTTPStatusError as e:
                if e.response.status_code != 404:
                    raise  # Re-raise if it's not a 404 error
                if (
                    self.client.timeout is None  # means no timeout, always poll
                    or time.time() - start_time < self.client.timeout
                ):
                    await asyncio.sleep(self.client.poll_interval)
                else:
                    raise TimeoutError(
                        f"Task result not available after waiting for {self.client.timeout} seconds"
                    )

run async #

run(service_name: str, **run_kwargs: Any) -> str

实现会话基于工作流的 run API。

源代码位于 llama_deploy/client/models/core.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
async def run(self, service_name: str, **run_kwargs: Any) -> str:
    """Implements the workflow-based run API for a session."""
    task_input = json.dumps(run_kwargs)
    task_def = TaskDefinition(input=task_input, agent_id=service_name)
    task_id = await self._do_create_task(task_def)

    # wait for task to complete, up to timeout seconds
    async def _get_result() -> str:
        while True:
            task_result = await self._do_get_task_result(task_id)

            if isinstance(task_result, TaskResult):
                return task_result.result or ""
            await asyncio.sleep(self.client.poll_interval)

    return await asyncio.wait_for(_get_result(), timeout=self.client.timeout)

run_nowait async #

run_nowait(service_name: str, **run_kwargs: Any) -> str

实现会话基于工作流的 run API,但不等待任务完成。

源代码位于 llama_deploy/client/models/core.py
38
39
40
41
42
43
44
45
async def run_nowait(self, service_name: str, **run_kwargs: Any) -> str:
    """Implements the workflow-based run API for a session, but does not wait for the task to complete."""

    task_input = json.dumps(run_kwargs)
    task_def = TaskDefinition(input=task_input, agent_id=service_name)
    task_id = await self._do_create_task(task_def)

    return task_id

create_task async #

create_task(task_def: TaskDefinition) -> str

在此会话中创建一个新任务。

参数

名称 类型 描述 默认值
task_def Union[str, TaskDefinition]

任务定义或输入字符串。

必需

返回

名称 类型 描述
str str

创建的任务 ID。

源代码位于 llama_deploy/client/models/core.py
47
48
49
50
51
52
53
54
55
56
async def create_task(self, task_def: TaskDefinition) -> str:
    """Create a new task in this session.

    Args:
        task_def (Union[str, TaskDefinition]): The task definition or input string.

    Returns:
        str: The ID of the created task.
    """
    return await self._do_create_task(task_def)

get_task_result async #

get_task_result(task_id: str) -> TaskResult | None

如果任务存在结果,则在此会话中获取该任务的结果。

参数

名称 类型 描述 默认值
task_id str

要获取结果的任务 ID。

必需

返回

类型 描述
TaskResult | None

Optional[TaskResult]: 任务的结果(如果存在),否则为 None。

源代码位于 llama_deploy/client/models/core.py
65
66
67
68
69
70
71
72
73
74
async def get_task_result(self, task_id: str) -> TaskResult | None:
    """Get the result of a task in this session if it has one.

    Args:
        task_id (str): The ID of the task to get the result for.

    Returns:
        Optional[TaskResult]: The result of the task if it has one, otherwise None.
    """
    return await self._do_get_task_result(task_id)

get_tasks async #

get_tasks() -> list[TaskDefinition]

获取此会话中的所有任务。

返回

类型 描述
list[TaskDefinition]

list[TaskDefinition]: 会话中的任务定义列表。

源代码位于 llama_deploy/client/models/core.py
85
86
87
88
89
90
91
92
93
async def get_tasks(self) -> list[TaskDefinition]:
    """Get all tasks in this session.

    Returns:
        list[TaskDefinition]: A list of task definitions in the session.
    """
    url = f"{self.client.control_plane_url}/sessions/{self.id}/tasks"
    response = await self.client.request("GET", url)
    return [TaskDefinition(**task) for task in response.json()]

send_event async #

send_event(service_name: str, task_id: str, ev: Event) -> None

向工作流服务发送事件。

参数

名称 类型 描述 默认值
event Event

要提交给工作流的事件。

必需

返回

类型 描述
None

None

源代码位于 llama_deploy/client/models/core.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
async def send_event(self, service_name: str, task_id: str, ev: Event) -> None:
    """Send event to a Workflow service.

    Args:
        event (Event): The event to be submitted to the workflow.

    Returns:
        None
    """
    serializer = JsonSerializer()
    event_def = EventDefinition(
        event_obj_str=serializer.serialize(ev), agent_id=service_name
    )

    url = f"{self.client.control_plane_url}/sessions/{self.id}/tasks/{task_id}/send_event"
    await self.client.request("POST", url, json=event_def.model_dump())

send_event_def async #

send_event_def(task_id: str, ev_def: EventDefinition) -> None

向工作流服务发送事件。

参数

名称 类型 描述 默认值
event Event

要提交给工作流的事件。

必需

返回

类型 描述
None

None

源代码位于 llama_deploy/client/models/core.py
112
113
114
115
116
117
118
119
120
121
122
async def send_event_def(self, task_id: str, ev_def: EventDefinition) -> None:
    """Send event to a Workflow service.

    Args:
        event (Event): The event to be submitted to the workflow.

    Returns:
        None
    """
    url = f"{self.client.control_plane_url}/sessions/{self.id}/tasks/{task_id}/send_event"
    await self.client.request("POST", url, json=ev_def.model_dump())

get_task_result_stream async #

get_task_result_stream(task_id: str) -> AsyncGenerator[dict[str, Any], None]

如果任务存在结果,则在此会话中获取该任务的结果。

参数

名称 类型 描述 默认值
task_id str

要获取结果的任务 ID。

必需

返回

类型 描述
AsyncGenerator[dict[str, Any], None]

AsyncGenerator[str, None, None]: 生成任务结果的生成器。

源代码位于 llama_deploy/client/models/core.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
async def get_task_result_stream(
    self, task_id: str
) -> AsyncGenerator[dict[str, Any], None]:
    """Get the result of a task in this session if it has one.

    Args:
        task_id (str): The ID of the task to get the result for.

    Returns:
        AsyncGenerator[str, None, None]: A generator that yields the result of the task.
    """
    url = f"{self.client.control_plane_url}/sessions/{self.id}/tasks/{task_id}/result_stream"
    start_time = time.time()
    while True:
        try:
            async with httpx.AsyncClient(timeout=self.client.timeout) as client:
                async with client.stream("GET", url) as response:
                    response.raise_for_status()
                    async for line in response.aiter_lines():
                        json_line = json.loads(line)
                        yield json_line
                    break  # Exit the function if successful
        except httpx.HTTPStatusError as e:
            if e.response.status_code != 404:
                raise  # Re-raise if it's not a 404 error
            if (
                self.client.timeout is None  # means no timeout, always poll
                or time.time() - start_time < self.client.timeout
            ):
                await asyncio.sleep(self.client.poll_interval)
            else:
                raise TimeoutError(
                    f"Task result not available after waiting for {self.client.timeout} seconds"
                )

SessionCollection #

基类: Collection

源代码位于 llama_deploy/client/models/core.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
class SessionCollection(Collection):
    async def list(self) -> list[Session]:  # type: ignore
        """Returns a list of all the sessions in the collection."""
        sessions_url = f"{self.client.control_plane_url}/sessions"
        response = await self.client.request("GET", sessions_url)
        sessions = []
        model_class = self._prepare(Session)
        for id, session_def in response.json().items():
            sessions.append(model_class(client=self.client, id=id))
        return sessions

    async def create(self) -> Session:
        """Creates a new session and returns a Session object.

        Returns:
            Session: A Session object representing the newly created session.
        """
        return await self._create()

    async def _create(self) -> Session:
        """Async-only version of create, to be used internally from other methods."""
        create_url = f"{self.client.control_plane_url}/sessions/create"
        response = await self.client.request("POST", create_url)
        session_id = response.json()
        model_class = self._prepare(Session)
        return model_class(client=self.client, id=session_id)

    async def get(self, id: str) -> Session:
        """Gets a session by ID.

        Args:
            session_id: The ID of the session to get.

        Returns:
            Session: A Session object representing the specified session.

        Raises:
            ValueError: If the session does not exist.
        """
        return await self._get(id)

    async def _get(self, id: str) -> Session:
        """Async-only version of get, to be used internally from other methods."""

        get_url = f"{self.client.control_plane_url}/sessions/{id}"
        await self.client.request("GET", get_url)
        model_class = self._prepare(Session)
        return model_class(client=self.client, id=id)

    async def get_or_create(self, id: str) -> Session:
        """Gets a session by ID, or creates a new one if it doesn't exist.

        Returns:
            Session: A Session object representing the specified session.
        """
        try:
            return await self._get(id)
        except httpx.HTTPStatusError as e:
            if e.response.status_code == 404:
                return await self._create()
            raise e

    async def delete(self, session_id: str) -> None:
        """Deletes a session by ID.

        Args:
            session_id: The ID of the session to delete.
        """
        delete_url = f"{self.client.control_plane_url}/sessions/{session_id}/delete"
        await self.client.request("POST", delete_url)

list async #

list() -> list[Session]

返回集合中所有会话的列表。

源代码位于 llama_deploy/client/models/core.py
161
162
163
164
165
166
167
168
169
async def list(self) -> list[Session]:  # type: ignore
    """Returns a list of all the sessions in the collection."""
    sessions_url = f"{self.client.control_plane_url}/sessions"
    response = await self.client.request("GET", sessions_url)
    sessions = []
    model_class = self._prepare(Session)
    for id, session_def in response.json().items():
        sessions.append(model_class(client=self.client, id=id))
    return sessions

create async #

create() -> Session

创建一个新会话并返回一个 Session 对象。

返回

名称 类型 描述
Session Session

表示新创建会话的 Session 对象。

源代码位于 llama_deploy/client/models/core.py
171
172
173
174
175
176
177
async def create(self) -> Session:
    """Creates a new session and returns a Session object.

    Returns:
        Session: A Session object representing the newly created session.
    """
    return await self._create()

get async #

get(id: str) -> Session

按 ID 获取会话。

参数

名称 类型 描述 默认值
session_id

要获取的会话 ID。

必需

返回

名称 类型 描述
Session Session

表示指定会话的 Session 对象。

引发

类型 描述
ValueError

如果会话不存在。

源代码位于 llama_deploy/client/models/core.py
187
188
189
190
191
192
193
194
195
196
197
198
199
async def get(self, id: str) -> Session:
    """Gets a session by ID.

    Args:
        session_id: The ID of the session to get.

    Returns:
        Session: A Session object representing the specified session.

    Raises:
        ValueError: If the session does not exist.
    """
    return await self._get(id)

get_or_create async #

get_or_create(id: str) -> Session

按 ID 获取会话,如果不存在则创建一个新会话。

返回

名称 类型 描述
Session Session

表示指定会话的 Session 对象。

源代码位于 llama_deploy/client/models/core.py
209
210
211
212
213
214
215
216
217
218
219
220
async def get_or_create(self, id: str) -> Session:
    """Gets a session by ID, or creates a new one if it doesn't exist.

    Returns:
        Session: A Session object representing the specified session.
    """
    try:
        return await self._get(id)
    except httpx.HTTPStatusError as e:
        if e.response.status_code == 404:
            return await self._create()
        raise e

delete async #

delete(session_id: str) -> None

按 ID 删除会话。

参数

名称 类型 描述 默认值
session_id str

要删除的会话 ID。

必需
源代码位于 llama_deploy/client/models/core.py
222
223
224
225
226
227
228
229
async def delete(self, session_id: str) -> None:
    """Deletes a session by ID.

    Args:
        session_id: The ID of the session to delete.
    """
    delete_url = f"{self.client.control_plane_url}/sessions/{session_id}/delete"
    await self.client.request("POST", delete_url)

Service #

基类: Model

源代码位于 llama_deploy/client/models/core.py
232
233
class Service(Model):
    pass

ServiceCollection #

基类: Collection

源代码位于 llama_deploy/client/models/core.py
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
class ServiceCollection(Collection):
    async def list(self) -> list[Service]:  # type: ignore
        """Returns a list containing all the services registered with the control plane.

        Returns:
            list[Service]: List of services registered with the control plane.
        """
        services_url = f"{self.client.control_plane_url}/services"
        response = await self.client.request("GET", services_url)
        services = []
        model_class = self._prepare(Service)

        for name, service in response.json().items():
            services.append(model_class(client=self.client, id=name))

        return services

    async def register(self, service: ServiceDefinition) -> Service:
        """Registers a service with the control plane.

        Args:
            service: Definition of the Service to register.
        """
        register_url = f"{self.client.control_plane_url}/services/register"
        await self.client.request("POST", register_url, json=service.model_dump())
        model_class = self._prepare(Service)
        s = model_class(id=service.service_name, client=self.client)
        self.items[service.service_name] = s
        return s

    async def deregister(self, service_name: str) -> None:
        """Deregisters a service from the control plane.

        Args:
            service_name: The name of the Service to deregister.
        """
        deregister_url = f"{self.client.control_plane_url}/services/deregister"
        await self.client.request(
            "POST",
            deregister_url,
            params={"service_name": service_name},
        )

list async #

list() -> list[Service]

返回包含所有注册到控制平面的服务的列表。

返回

类型 描述
list[Service]

list[Service]: 注册到控制平面的服务列表。

源代码位于 llama_deploy/client/models/core.py
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
async def list(self) -> list[Service]:  # type: ignore
    """Returns a list containing all the services registered with the control plane.

    Returns:
        list[Service]: List of services registered with the control plane.
    """
    services_url = f"{self.client.control_plane_url}/services"
    response = await self.client.request("GET", services_url)
    services = []
    model_class = self._prepare(Service)

    for name, service in response.json().items():
        services.append(model_class(client=self.client, id=name))

    return services

register async #

register(service: ServiceDefinition) -> Service

向控制平面注册服务。

参数

名称 类型 描述 默认值
service ServiceDefinition

要注册的服务定义。

必需
源代码位于 llama_deploy/client/models/core.py
253
254
255
256
257
258
259
260
261
262
263
264
async def register(self, service: ServiceDefinition) -> Service:
    """Registers a service with the control plane.

    Args:
        service: Definition of the Service to register.
    """
    register_url = f"{self.client.control_plane_url}/services/register"
    await self.client.request("POST", register_url, json=service.model_dump())
    model_class = self._prepare(Service)
    s = model_class(id=service.service_name, client=self.client)
    self.items[service.service_name] = s
    return s

deregister async #

deregister(service_name: str) -> None

从控制平面注销服务。

参数

名称 类型 描述 默认值
service_name str

要注销的服务名称。

必需
源代码位于 llama_deploy/client/models/core.py
266
267
268
269
270
271
272
273
274
275
276
277
async def deregister(self, service_name: str) -> None:
    """Deregisters a service from the control plane.

    Args:
        service_name: The name of the Service to deregister.
    """
    deregister_url = f"{self.client.control_plane_url}/services/deregister"
    await self.client.request(
        "POST",
        deregister_url,
        params={"service_name": service_name},
    )

Core #

基类: Model

源代码位于 llama_deploy/client/models/core.py
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
class Core(Model):
    @property
    def services(self) -> ServiceCollection:
        """Returns a collection containing all the services registered with the control plane.

        Returns:
            ServiceCollection: Collection of services registered with the control plane.
        """
        model_class = self._prepare(ServiceCollection)
        return model_class(client=self.client, items={})

    @property
    def sessions(self) -> SessionCollection:
        """Returns a collection to access all the sessions registered with the control plane.

        Returns:
            SessionCollection: Collection of sessions registered with the control plane.
        """
        model_class = self._prepare(SessionCollection)
        return model_class(client=self.client, items={})

services property #

返回包含所有注册到控制平面的服务的集合。

返回

名称 类型 描述
ServiceCollection ServiceCollection

注册到控制平面的服务集合。

sessions property #

返回一个集合,用于访问所有注册到控制平面的会话。

返回

名称 类型 描述
SessionCollection SessionCollection

注册到控制平面的会话集合。