跳到内容

message_consumers#

BaseMessageQueueConsumer #

基类: BaseModel, ABC

消息队列的消费者。

处理来自消息队列的特定消息类型的消息。

参数

名称 类型 描述 默认值
id_ str
'14686d04-83db-4704-b6e0-5aed9df561d1'
message_type str

要消费的消息类型。

'default'
channel Any

接收消息的通道(如果有)。

工作流运行检查点
consuming_callable Callable[..., Coroutine[Any, Any, None]]
<function default_start_consuming_callable at 0x7e52a2d24cc0>
源代码位于 llama_deploy/message_consumers/base.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class BaseMessageQueueConsumer(BaseModel, ABC):
    """Consumer of a MessageQueue.

    Process messages from a MessageQueue for a specific message type.
    """

    model_config = ConfigDict(arbitrary_types_allowed=True)
    id_: str = Field(default_factory=generate_id)
    message_type: str = Field(
        default="default", description="Type of the message to consume."
    )
    channel: Any = Field(
        default=None, description="The channel if any for which to receive messages."
    )
    consuming_callable: StartConsumingCallable = Field(
        default=default_start_consuming_callable
    )

    @abstractmethod
    async def _process_message(self, message: QueueMessage, **kwargs: Any) -> Any:
        """Subclasses should implement logic here."""

    async def process_message(self, message: QueueMessage, **kwargs: Any) -> Any:
        """Logic for processing message."""
        if message.type != self.message_type:
            msg = f"Consumer cannot process messages of type '{message.type}'."
            raise ValueError(msg)
        return await self._process_message(message, **kwargs)

    async def start_consuming(
        self,
    ) -> None:
        """Begin consuming messages."""
        await self.consuming_callable()

process_message 异步 #

process_message(message: QueueMessage, **kwargs: Any) -> Any

消息处理逻辑。

源代码位于 llama_deploy/message_consumers/base.py
43
44
45
46
47
48
async def process_message(self, message: QueueMessage, **kwargs: Any) -> Any:
    """Logic for processing message."""
    if message.type != self.message_type:
        msg = f"Consumer cannot process messages of type '{message.type}'."
        raise ValueError(msg)
    return await self._process_message(message, **kwargs)

start_consuming 异步 #

start_consuming() -> None

开始消费消息。

源代码位于 llama_deploy/message_consumers/base.py
50
51
52
53
54
async def start_consuming(
    self,
) -> None:
    """Begin consuming messages."""
    await self.consuming_callable()

CallableMessageConsumer #

基类: BaseMessageQueueConsumer

可调用处理器的消息消费者。

对于给定的消息,它将以该消息作为输入调用处理器。

参数

名称 类型 描述 默认值
handler Callable
必需
源代码位于 llama_deploy/message_consumers/callable.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
class CallableMessageConsumer(BaseMessageQueueConsumer):
    """Message consumer for a callable handler.

    For a given message, it will call the handler with the message as input.
    """

    handler: Callable

    async def _process_message(self, message: QueueMessage, **kwargs: Any) -> None:
        if asyncio.iscoroutinefunction(self.handler):
            await self.handler(message, **kwargs)
        else:
            self.handler(message, **kwargs)