Kafka
Queue Broker based on kafka.
- class microagent.tools.kafka.KafkaBroker(dsn: str, uid: str = <factory>, log: ~logging.Logger = <Logger microagent.broker (WARNING)>, _bindings: dict[str, ~microagent.queue.Consumer] = <factory>)[source]
Experimental broker based on the Apache Kafka distributed stream processing system.
- Parameters:
dsn – string, data source name for connection kafka://localhost:9092
log – logging.Logger (optional)
Sending messages.
from microagent.tools.kafka import KafkaBroker broker = KafkaBroker('kafka://localhost:9092') await broker.user_created.send({'user_id': 1})
Consuming messages.
class EmailAgent(MicroAgent): @consumer(queues.mailer) async def example_read_queue(self, kafka, **data): # kafka: AIOKafkaConsumer process(data)