跳到主要内容

apache_kafka#

Apache Kafka 消息队列。

KafkaMessageQueueConfig #

基类: BaseSettings

Kafka 消息队列配置。

参数

名称 类型 描述 默认值
type Literal[str]
'kafka'
url str
'localhost:9092'
host str | None
port int | None
源代码位于 llama_deploy/message_queues/apache_kafka.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class KafkaMessageQueueConfig(BaseSettings):
    """Kafka message queue configuration."""

    model_config = SettingsConfigDict(env_prefix="KAFKA_")

    type: Literal["kafka"] = Field(default="kafka", exclude=True)
    url: str = DEFAULT_URL
    host: str | None = None
    port: int | None = None

    @model_validator(mode="after")
    def update_url(self) -> "KafkaMessageQueueConfig":
        if self.host and self.port:
            self.url = f"{self.host}:{self.port}"
        return self

KafkaMessageQueue #

基类: AbstractMessageQueue

使用 aiokafka 集成 Apache Kafka。

此类实现了使用 Apache Kafka 的传统消息代理。 - 主题使用 N 个分区创建 - 消费者注册到单个组中,以实现竞争性消费者模式,其中只有一个订阅了主题的消费者获得消息 - 使用默认的轮询分配

属性

名称 类型 描述
url str

连接到 Kafka 服务器的代理 URL 字符串

示例

from llama_deploy.message_queues.apache_kafka import KafkaMessageQueue

message_queue = KafkaMessageQueue()  # uses the default url
源代码位于 llama_deploy/message_queues/apache_kafka.py
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
class KafkaMessageQueue(AbstractMessageQueue):
    """Apache Kafka integration with aiokafka.

    This class implements a traditional message broker using Apache Kafka.
        - Topics are created with N partitions
        - Consumers are registered to a single group to implement a competing
        consumer scheme where only one consumer subscribed to a topic gets the
        message
            - Default round-robin assignment is used

    Attributes:
        url (str): The broker url string to connect to the Kafka server

    Examples:
        ```python
        from llama_deploy.message_queues.apache_kafka import KafkaMessageQueue

        message_queue = KafkaMessageQueue()  # uses the default url
        ```
    """

    def __init__(self, config: KafkaMessageQueueConfig | None = None) -> None:
        self._config = config or KafkaMessageQueueConfig()
        self._kafka_consumers: dict[str, Any] = {}
        self._registered_topics: set[str] = set()

    @classmethod
    def from_url_params(
        cls,
        host: str,
        port: int | None = None,
    ) -> "KafkaMessageQueue":
        """Convenience constructor from url params.

        Args:
            host (str): host for rabbitmq server
            port (Optional[int], optional): port for rabbitmq server. Defaults to None.

        Returns:
            KafkaMessageQueue: An Apache Kafka MessageQueue integration.
        """
        url = f"{host}:{port}" if port else f"{host}"
        return cls(KafkaMessageQueueConfig(url=url))

    def _create_new_topic(
        self,
        topic_name: str,
        num_partitions: int | None = None,
        replication_factor: int | None = None,
        **kwargs: Dict[str, Any],
    ) -> None:
        """Create a new topic.

        Use kafka-python-ng instead of aio-kafka as latter has issues with
        resolving api_version with broker.

        TODO: convert to aiokafka once this it is resolved there.
        """
        try:
            from kafka.admin import KafkaAdminClient, NewTopic
            from kafka.errors import TopicAlreadyExistsError
        except ImportError:
            raise ImportError(
                "kafka-python-ng is not installed. "
                "Please install it using `pip install kafka-python-ng`."
            )

        admin_client = KafkaAdminClient(bootstrap_servers=self._config.url)
        try:
            topic = NewTopic(
                name=topic_name,
                num_partitions=num_partitions or DEFAULT_TOPIC_PARTITIONS,
                replication_factor=replication_factor
                or DEFAULT_TOPIC_REPLICATION_FACTOR,
                **kwargs,
            )
            admin_client.create_topics(new_topics=[topic])
            self._registered_topics.add(topic_name)
            logger.info(f"New topic {topic_name} created.")
        except TopicAlreadyExistsError:
            logger.info(f"Topic {topic_name} already exists.")
            pass

    async def _publish(self, message: QueueMessage, topic: str) -> Any:
        """Publish message to the queue."""
        try:
            from aiokafka import AIOKafkaProducer
        except ImportError:
            raise ImportError(
                "aiokafka is not installed. "
                "Please install it using `pip install aiokafka`."
            )

        producer = AIOKafkaProducer(bootstrap_servers=self._config.url)
        await producer.start()
        try:
            message_body = json.dumps(message.model_dump()).encode("utf-8")
            await producer.send_and_wait(topic, message_body)
            logger.info(f"published message {message.id_}")
        finally:
            await producer.stop()

    async def cleanup(self, *args: Any, **kwargs: Dict[str, Any]) -> None:
        """Cleanup for local runs.

        Use kafka-python-ng instead of aio-kafka as latter has issues with
        resolving api_version with broker when using admin client.

        TODO: convert to aiokafka once this it is resolved there.
        """
        try:
            from kafka.admin import KafkaAdminClient
        except ImportError:
            raise ImportError(
                "aiokafka is not installed. "
                "Please install it using `pip install aiokafka`."
            )

        admin_client = KafkaAdminClient(bootstrap_servers=self._config.url)
        active_topics = admin_client.list_topics()
        topics_to_delete = [el for el in self._registered_topics if el in active_topics]
        admin_client.delete_consumer_groups(DEFAULT_GROUP_ID)
        if topics_to_delete:
            admin_client.delete_topics(topics_to_delete)

    async def deregister_consumer(self, consumer: BaseMessageQueueConsumer) -> Any:
        """Deregister a consumer."""
        if consumer.id_ in self._kafka_consumers:
            await self._kafka_consumers[consumer.id_].stop()

    async def register_consumer(
        self, consumer: BaseMessageQueueConsumer, topic: str
    ) -> StartConsumingCallable:
        """Register a new consumer."""
        try:
            from aiokafka import AIOKafkaConsumer
        except ImportError:
            raise ImportError(
                "aiokafka is not installed. "
                "Please install it using `pip install aiokafka`."
            )

        if consumer.id_ in self._kafka_consumers:
            msg = f"Consumer {consumer.id_} already registered for topic {topic}"
            raise ValueError(msg)

        self._create_new_topic(topic)
        kafka_consumer = AIOKafkaConsumer(
            topic,
            bootstrap_servers=self._config.url,
            group_id=DEFAULT_GROUP_ID,
            auto_offset_reset="earliest",
        )
        self._kafka_consumers[consumer.id_] = kafka_consumer

        await kafka_consumer.start()

        logger.info(
            f"Registered consumer {consumer.id_} for message type '{consumer.message_type}' on topic '{topic}'",
        )

        async def start_consuming_callable() -> None:
            """StartConsumingCallable."""
            try:
                async for msg in kafka_consumer:
                    if msg.value is None:
                        raise RuntimeError("msg.value is None")
                    decoded_message = json.loads(msg.value.decode("utf-8"))
                    queue_message = QueueMessage.model_validate(decoded_message)
                    await consumer.process_message(queue_message)
            finally:
                stop_task = asyncio.create_task(kafka_consumer.stop())
                stop_task.add_done_callback(
                    lambda _: logger.info(
                        f"stopped kafka consumer {consumer.id_}: {consumer.message_type} on topic {topic}"
                    )
                )
                await asyncio.shield(stop_task)
                del self._kafka_consumers[consumer.id_]

        return start_consuming_callable

    def as_config(self) -> BaseModel:
        return self._config

from_url_params classmethod #

from_url_params(host: str, port: int | None = None) -> KafkaMessageQueue

从 URL 参数构建的便捷构造函数。

参数

名称 类型 描述 默认值
host str

rabbitmq 服务器的主机

必需
port Optional[int]

rabbitmq 服务器的端口。默认为 None。

返回

名称 类型 描述
KafkaMessageQueue KafkaMessageQueue

Apache Kafka 消息队列集成。

源代码位于 llama_deploy/message_queues/apache_kafka.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
@classmethod
def from_url_params(
    cls,
    host: str,
    port: int | None = None,
) -> "KafkaMessageQueue":
    """Convenience constructor from url params.

    Args:
        host (str): host for rabbitmq server
        port (Optional[int], optional): port for rabbitmq server. Defaults to None.

    Returns:
        KafkaMessageQueue: An Apache Kafka MessageQueue integration.
    """
    url = f"{host}:{port}" if port else f"{host}"
    return cls(KafkaMessageQueueConfig(url=url))

cleanup async #

cleanup(*args: Any, **kwargs: Dict[str, Any]) -> None

清理本地运行。

使用 kafka-python-ng 而非 aio-kafka,因为后者在使用 admin client 时在解析 api_version 与 broker 时存在问题。

TODO: 一旦 aiokafka 中解决此问题,则转换为 aiokafka。

源代码位于 llama_deploy/message_queues/apache_kafka.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
async def cleanup(self, *args: Any, **kwargs: Dict[str, Any]) -> None:
    """Cleanup for local runs.

    Use kafka-python-ng instead of aio-kafka as latter has issues with
    resolving api_version with broker when using admin client.

    TODO: convert to aiokafka once this it is resolved there.
    """
    try:
        from kafka.admin import KafkaAdminClient
    except ImportError:
        raise ImportError(
            "aiokafka is not installed. "
            "Please install it using `pip install aiokafka`."
        )

    admin_client = KafkaAdminClient(bootstrap_servers=self._config.url)
    active_topics = admin_client.list_topics()
    topics_to_delete = [el for el in self._registered_topics if el in active_topics]
    admin_client.delete_consumer_groups(DEFAULT_GROUP_ID)
    if topics_to_delete:
        admin_client.delete_topics(topics_to_delete)

deregister_consumer async #

deregister_consumer(consumer: BaseMessageQueueConsumer) -> Any

注销消费者。

源代码位于 llama_deploy/message_queues/apache_kafka.py
172
173
174
175
async def deregister_consumer(self, consumer: BaseMessageQueueConsumer) -> Any:
    """Deregister a consumer."""
    if consumer.id_ in self._kafka_consumers:
        await self._kafka_consumers[consumer.id_].stop()

register_consumer async #

register_consumer(consumer: BaseMessageQueueConsumer, topic: str) -> StartConsumingCallable

注册新消费者。

源代码位于 llama_deploy/message_queues/apache_kafka.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
async def register_consumer(
    self, consumer: BaseMessageQueueConsumer, topic: str
) -> StartConsumingCallable:
    """Register a new consumer."""
    try:
        from aiokafka import AIOKafkaConsumer
    except ImportError:
        raise ImportError(
            "aiokafka is not installed. "
            "Please install it using `pip install aiokafka`."
        )

    if consumer.id_ in self._kafka_consumers:
        msg = f"Consumer {consumer.id_} already registered for topic {topic}"
        raise ValueError(msg)

    self._create_new_topic(topic)
    kafka_consumer = AIOKafkaConsumer(
        topic,
        bootstrap_servers=self._config.url,
        group_id=DEFAULT_GROUP_ID,
        auto_offset_reset="earliest",
    )
    self._kafka_consumers[consumer.id_] = kafka_consumer

    await kafka_consumer.start()

    logger.info(
        f"Registered consumer {consumer.id_} for message type '{consumer.message_type}' on topic '{topic}'",
    )

    async def start_consuming_callable() -> None:
        """StartConsumingCallable."""
        try:
            async for msg in kafka_consumer:
                if msg.value is None:
                    raise RuntimeError("msg.value is None")
                decoded_message = json.loads(msg.value.decode("utf-8"))
                queue_message = QueueMessage.model_validate(decoded_message)
                await consumer.process_message(queue_message)
        finally:
            stop_task = asyncio.create_task(kafka_consumer.stop())
            stop_task.add_done_callback(
                lambda _: logger.info(
                    f"stopped kafka consumer {consumer.id_}: {consumer.message_type} on topic {topic}"
                )
            )
            await asyncio.shield(stop_task)
            del self._kafka_consumers[consumer.id_]

    return start_consuming_callable