AMQP (RabbitMQ)

Queue Broker based on aioamqp.

class microagent.tools.amqp.AMQPBroker(dsn: str, uid: str = <factory>, log: ~logging.Logger = <Logger microagent.broker (WARNING)>, _bindings: dict[str, ~microagent.queue.Consumer] = <factory>, sending_channel: ~aiormq.abc.AbstractChannel | None = None)[source]

The broker is based on the basic_consume method of the AMQP and sends a acknowledgement automatically if the handler is completed without errors. The consumer takes an exclusive channel. Sending an reuse the channels.

Parameters:
  • dsn – string, data source name for connection amqp://guest@localhost:5672/

  • log – logging.Logger (optional)

from microagent.tools.amqp import AMQPBroker

broker = AMQPBroker('amqp://guest:guest@localhost:5672/')

await broker.user_created.send({'user_id': 1})

@consumer-decorator for this broker has an additional option - autoack, which enables / disables sending automatic acknowledgements.

class EmailAgent(MicroAgent):
    @consumer(queues.mailer, autoack=False)
    async def example_read_queue(self, amqp, **data):
        await amqp.channel.basic_client_ack(delivery_tag=amqp.delivery_tag)

Handler will takes one required positional argument - pamqp.DeliveredMessage. Consumer will be reconnect and subscribe to queue on disconnect. It make 3 attempts of reconnect after 1, 4, 9 seconds. if the queue does not exist, it will be declared with the default parameters when binding.

async bind(name: str) None[source]

Start reading queue.

Parameters:

name – string, queue name

async declare_queue(name: str, **options: Any) None[source]

Declare queue with queue_declare method.

Parameters:
  • name – string, queue name

  • **options – other queue_declare options

async get_channel() AbstractChannel[source]

Takes a channel from the pool or a new one, performs a lazy connection if required.

async static putout(amqp: DeliveredMessage) None[source]

Send acknowledgement to broker with basic_client_ack

Parameters:

amqp – pamqp.DeliveredMessage

async queue_length(name: str, **options: Any) int[source]

Get a queue length with queue_declare method.

Parameters:

name – string, queue name

async send(name: str, message: str, exchange: str = '', properties: dict | None = None, **kwargs: Any) None[source]

Raw message sending.

Parameters:
  • name – string, target queue name (routing_key)

  • message – string, serialized message

  • exchange – string, target exchange name

  • properties – dict, for Basic.Properties

  • **kwargs – dict, other basic_publish options