Kafka

Queue Broker based on kafka.

class microagent.tools.kafka.KafkaBroker(dsn: str, uid: str = <factory>, log: ~logging.Logger = <Logger microagent.broker (WARNING)>, _bindings: dict[str, ~microagent.queue.Consumer] = <factory>)[source]

Experimental broker based on the Apache Kafka distributed stream processing system.

Parameters:
  • dsn – string, data source name for connection kafka://localhost:9092

  • log – logging.Logger (optional)

Sending messages.

from microagent.tools.kafka import KafkaBroker

broker = KafkaBroker('kafka://localhost:9092')

await broker.user_created.send({'user_id': 1})

Consuming messages.

class EmailAgent(MicroAgent):
    @consumer(queues.mailer)
    async def example_read_queue(self, kafka, **data):
        # kafka: AIOKafkaConsumer
        process(data)
async send(name: str, message: str, **kwargs: Any) None[source]

Write a raw message to queue.

Parameters:
  • name – string, queue name

  • message – string, serialized object

  • **kwargs – specific parameters for each broker implementation

async bind(name: str) None[source]

Start reading queue.

Parameters:

name – string, queue name

async queue_length(name: str, **options: Any) int[source]

Get the current queue length.

Parameters:
  • name – string, queue name

  • **options – specific parameters for each broker implementation