跳到内容

orchestrators#

BaseOrchestrator #

基础: ABC

编排器的基类。

编排器的总体思路是管理服务之间的消息流。

给定一些状态和任务,确定要发布的下一批消息。然后,一旦消息处理完毕,用结果更新状态。

源代码位于 llama_deploy/orchestrators/base.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class BaseOrchestrator(ABC):
    """Base class for an orchestrator.

    The general idea for an orchestrator is to manage the flow of messages between services.

    Given some state, and task, figure out the next messages to publish. Then, once
    the messages are processed, update the state with the results.
    """

    @abstractmethod
    async def get_next_messages(
        self, task_def: TaskDefinition, state: Dict[str, Any]
    ) -> Tuple[List[QueueMessage], Dict[str, Any]]:
        """Get the next message to process. Returns the message and the new state."""
        ...

    @abstractmethod
    async def add_result_to_state(
        self, result: TaskResult, state: Dict[str, Any]
    ) -> Dict[str, Any]:
        """Add the result of processing a message to the state. Returns the new state."""
        ...

get_next_messages 抽象方法 异步 #

get_next_messages(task_def: TaskDefinition, state: Dict[str, Any]) -> Tuple[List[QueueMessage], Dict[str, Any]]

获取要处理的下一条消息。返回消息和新状态。

源代码位于 llama_deploy/orchestrators/base.py
17
18
19
20
21
22
@abstractmethod
async def get_next_messages(
    self, task_def: TaskDefinition, state: Dict[str, Any]
) -> Tuple[List[QueueMessage], Dict[str, Any]]:
    """Get the next message to process. Returns the message and the new state."""
    ...

add_result_to_state 抽象方法 异步 #

add_result_to_state(result: TaskResult, state: Dict[str, Any]) -> Dict[str, Any]

将消息处理结果添加到状态中。返回新状态。

源代码位于 llama_deploy/orchestrators/base.py
24
25
26
27
28
29
@abstractmethod
async def add_result_to_state(
    self, result: TaskResult, state: Dict[str, Any]
) -> Dict[str, Any]:
    """Add the result of processing a message to the state. Returns the new state."""
    ...

SimpleOrchestrator #

基础: BaseOrchestrator

一个简单的编排器,用于处理服务和用户之间的编排。

目前,最终消息被发布到 `human` 消息队列以进行最终处理。

源代码位于 llama_deploy/orchestrators/simple.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
class SimpleOrchestrator(BaseOrchestrator):
    """A simple orchestrator that handles orchestration between a service and a user.

    Currently, the final message is published to the `human` message queue for final processing.
    """

    def __init__(self, max_retries: int = 3, final_message_type: str = "human") -> None:
        self.max_retries = max_retries
        self.final_message_type = final_message_type

    async def get_next_messages(
        self, task_def: TaskDefinition, state: Dict[str, Any]
    ) -> Tuple[List[QueueMessage], Dict[str, Any]]:
        """Get the next message to process. Returns the message and the new state.

        Assumes the agent_id (i.e. the service name) is the destination for the next message.

        Runs the required service, then sends the result to the final message type.
        """

        destination_messages = []

        if task_def.agent_id is None:
            raise ValueError(
                "Task definition must have an agent_id specified as a service name"
            )

        if task_def.task_id not in state:
            state[task_def.task_id] = {}

        result_key = get_result_key(task_def.task_id)
        if state.get(result_key, None) is not None:
            result = state[result_key]
            if not isinstance(result, TaskResult):
                if isinstance(result, str):
                    result = TaskResult(**json.loads(result))
                elif isinstance(result, dict):
                    result = TaskResult(**result)
                else:
                    raise ValueError(f"Result must be a TaskResult, not {type(result)}")

            assert isinstance(result, TaskResult), "Result must be a TaskResult"

            if self.final_message_type is not None:
                destination = self.final_message_type

                destination_messages = [
                    QueueMessage(
                        type=destination,
                        action=ActionTypes.COMPLETED_TASK,
                        data=result.model_dump(),
                    )
                ]
        else:
            destination = task_def.agent_id
            destination_messages = [
                QueueMessage(
                    type=destination,
                    action=ActionTypes.NEW_TASK,
                    data=task_def.model_dump(),
                )
            ]

        return destination_messages, state

    async def add_result_to_state(
        self, result: TaskResult, state: Dict[str, Any]
    ) -> Dict[str, Any]:
        """Add the result of processing a message to the state. Returns the new state."""

        # TODO: detect failures + retries
        cur_retries = state.get("retries", -1) + 1
        state["retries"] = cur_retries

        # add result to state
        state[get_result_key(result.task_id)] = result

        return state

get_next_messages 异步 #

get_next_messages(task_def: TaskDefinition, state: Dict[str, Any]) -> Tuple[List[QueueMessage], Dict[str, Any]]

获取要处理的下一条消息。返回消息和新状态。

假设 agent_id(即服务名称)是下一条消息的目标。

运行所需的服务,然后将结果发送到最终消息类型。

源代码位于 llama_deploy/orchestrators/simple.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
async def get_next_messages(
    self, task_def: TaskDefinition, state: Dict[str, Any]
) -> Tuple[List[QueueMessage], Dict[str, Any]]:
    """Get the next message to process. Returns the message and the new state.

    Assumes the agent_id (i.e. the service name) is the destination for the next message.

    Runs the required service, then sends the result to the final message type.
    """

    destination_messages = []

    if task_def.agent_id is None:
        raise ValueError(
            "Task definition must have an agent_id specified as a service name"
        )

    if task_def.task_id not in state:
        state[task_def.task_id] = {}

    result_key = get_result_key(task_def.task_id)
    if state.get(result_key, None) is not None:
        result = state[result_key]
        if not isinstance(result, TaskResult):
            if isinstance(result, str):
                result = TaskResult(**json.loads(result))
            elif isinstance(result, dict):
                result = TaskResult(**result)
            else:
                raise ValueError(f"Result must be a TaskResult, not {type(result)}")

        assert isinstance(result, TaskResult), "Result must be a TaskResult"

        if self.final_message_type is not None:
            destination = self.final_message_type

            destination_messages = [
                QueueMessage(
                    type=destination,
                    action=ActionTypes.COMPLETED_TASK,
                    data=result.model_dump(),
                )
            ]
    else:
        destination = task_def.agent_id
        destination_messages = [
            QueueMessage(
                type=destination,
                action=ActionTypes.NEW_TASK,
                data=task_def.model_dump(),
            )
        ]

    return destination_messages, state

add_result_to_state 异步 #

add_result_to_state(result: TaskResult, state: Dict[str, Any]) -> Dict[str, Any]

将消息处理结果添加到状态中。返回新状态。

源代码位于 llama_deploy/orchestrators/simple.py
84
85
86
87
88
89
90
91
92
93
94
95
96
async def add_result_to_state(
    self, result: TaskResult, state: Dict[str, Any]
) -> Dict[str, Any]:
    """Add the result of processing a message to the state. Returns the new state."""

    # TODO: detect failures + retries
    cur_retries = state.get("retries", -1) + 1
    state["retries"] = cur_retries

    # add result to state
    state[get_result_key(result.task_id)] = result

    return state

SimpleOrchestratorConfig #

基础: BaseSettings

参数

名称 类型 描述 默认值
max_retries int
3
final_message_type str | None
源代码位于 llama_deploy/orchestrators/simple.py
12
13
14
15
16
class SimpleOrchestratorConfig(BaseSettings):
    model_config = SettingsConfigDict(env_prefix="SIMPLE_ORCHESTRATOR_")

    max_retries: int = 3
    final_message_type: Optional[str] = None