Source code for microagent.tools.kafka

'''
:ref:`Queue Broker <broker>` based on :kafka:`kafka <>`.
'''
import asyncio
import time

from dataclasses import dataclass, field
from typing import Any
from urllib import parse

import aiokafka

from ..broker import AbstractQueueBroker, Consumer


[docs] @dataclass class KafkaBroker(AbstractQueueBroker): ''' Experimental broker based on the Apache Kafka distributed stream processing system. :param dsn: string, data source name for connection kafka://localhost:9092 :param log: logging.Logger (optional) Sending messages. .. code-block:: python from microagent.tools.kafka import KafkaBroker broker = KafkaBroker('kafka://localhost:9092') await broker.user_created.send({'user_id': 1}) Consuming messages. .. code-block:: python class EmailAgent(MicroAgent): @consumer(queues.mailer) async def example_read_queue(self, kafka, **data): # kafka: AIOKafkaConsumer process(data) ''' addr: str = field(init=False) producer: aiokafka.AIOKafkaProducer = field(init=False) def __post_init__(self) -> None: self.addr = parse.urlparse(self.dsn).netloc self.producer = aiokafka.AIOKafkaProducer(bootstrap_servers=self.addr)
[docs] async def send(self, name: str, message: str, **kwargs: Any) -> None: await self.producer.start() try: await self.producer.send_and_wait(name, bytes(message, 'utf8'), **kwargs) finally: await self.producer.stop() _loop = asyncio.get_running_loop() self.producer = aiokafka.AIOKafkaProducer(loop=_loop, bootstrap_servers=self.addr)
[docs] async def bind(self, name: str) -> None: loop = asyncio.get_running_loop() kafka_consumer = aiokafka.AIOKafkaConsumer(name, loop=loop, bootstrap_servers=self.addr) asyncio.create_task(self._kafka_wrapper(kafka_consumer, name))
async def _kafka_wrapper(self, kafka_consumer: aiokafka.AIOKafkaConsumer, name: str) -> None: consumer = self._bindings[name] await kafka_consumer.start() try: async for msg in kafka_consumer: data = self.prepared_data(consumer, msg.value) data['kafka'] = msg asyncio.create_task(self._handle(consumer, data)) finally: await kafka_consumer.stop() async def _handle(self, consumer: Consumer, data: dict) -> None: self.log.debug('Calling %s by %s with %s', consumer, consumer.queue.name, data) timer = time.monotonic() try: await asyncio.wait_for(consumer.handler(**data), consumer.timeout) except TypeError: self.log.exception('Call %s failed', consumer) except asyncio.TimeoutError: self.log.error('TimeoutError: %s %.2f', consumer, time.monotonic() - timer)
[docs] async def queue_length(self, name: str, **options: Any) -> int: # noqa return 0