AMQP (RabbitMQ)
Queue Broker based on aioamqp.
- class microagent.tools.amqp.AMQPBroker(dsn: str, uid: str = <factory>, log: ~logging.Logger = <Logger microagent.broker (WARNING)>, _bindings: dict[str, ~microagent.queue.Consumer] = <factory>, sending_channel: ~aiormq.abc.AbstractChannel | None = None)[source]
The broker is based on the basic_consume method of the AMQP and sends a acknowledgement automatically if the handler is completed without errors. The consumer takes an exclusive channel. Sending an reuse the channels.
- Parameters:
dsn – string, data source name for connection amqp://guest@localhost:5672/
log – logging.Logger (optional)
from microagent.tools.amqp import AMQPBroker broker = AMQPBroker('amqp://guest:guest@localhost:5672/') await broker.user_created.send({'user_id': 1})
@consumer-decorator for this broker has an additional option - autoack, which enables / disables sending automatic acknowledgements.
class EmailAgent(MicroAgent): @consumer(queues.mailer, autoack=False) async def example_read_queue(self, amqp, **data): await amqp.channel.basic_client_ack(delivery_tag=amqp.delivery_tag)
Handler will takes one required positional argument - pamqp.DeliveredMessage. Consumer will be reconnect and subscribe to queue on disconnect. It make 3 attempts of reconnect after 1, 4, 9 seconds. if the queue does not exist, it will be declared with the default parameters when binding.
- async declare_queue(name: str, **options: Any) None [source]
Declare queue with queue_declare method.
- Parameters:
name – string, queue name
**options – other queue_declare options
- async get_channel() AbstractChannel [source]
Takes a channel from the pool or a new one, performs a lazy connection if required.
- async static putout(amqp: DeliveredMessage) None [source]
Send acknowledgement to broker with basic_client_ack
- Parameters:
amqp – pamqp.DeliveredMessage
- async queue_length(name: str, **options: Any) int [source]
Get a queue length with queue_declare method.
- Parameters:
name – string, queue name
- async send(name: str, message: str, exchange: str = '', properties: dict | None = None, **kwargs: Any) None [source]
Raw message sending.
- Parameters:
name – string, target queue name (routing_key)
message – string, serialized message
exchange – string, target exchange name
properties – dict, for Basic.Properties
**kwargs – dict, other basic_publish options