Queue broker

The data-driven architecture is based on unidirectional message flows between agents. Here we assume that messages are exchanged through an intermediary, not directly.

Here, an intermediary called Queue Broker implements the producer / consumer pattern. The broker performs the functions of guaranteed and consistent transmission of messages from the product to the consumer, many to one (or according to the broker’s own logic). The message has a free structure, fully defined in the user area.

Implementations:

Using QueueBroker separately (sending only)

from microagent import load_queues
from microagent.tools.redis import RedisBroker

queues = load_queues('file://queues.json')

broker = RedisBroker('redis://localhost/7')
await broker.user_created.send({'user_id': 1})

Using with MicroAgent

from microagent import MicroAgent, load_queues
from microagent.tools.redis import RedisSignalBus

queues = load_queues('file://queues.json')

class EmailAgent(MicroAgent):
    @consumer(queues.mailer)
    async def example_read_queue(self, **kwargs):
        await self.broker.email_sended.send({'user_id': 1})

broker = RedisBroker('redis://localhost/7')
email_agent = EmailAgent(broker=broker)
await email_agent.start()
microagent.load_queues(source: str) NamedTuple[source]

Load Queue-entities from file or by web.

from microagent import load_queues

signals_from_file = load_signals('file://queues.json')
signals_from_web = load_signals('http://example.com/queues.json')

Queues declarations (queues.json).

{
    "queues": [
        {"name": "mailer"},
        {"name": "pusher"},
    ]
}
microagent.consumer(queue: Queue, timeout: int = 60, dto_class: type | None = None, dto_name: str | None = None, **options: Any) Callable[[Callable[[...], Awaitable[None]]], Callable[[...], Awaitable[None]]][source]

Binding for consuming messages from queue.

Only one handler can be bound to one queue.

Parameters:
  • queue – Queue - source of data

  • timeout – Calling timeout in seconds

  • dto_class – DTO-class, wrapper for consuming data

  • dto_name – DTO name in consumer method kwargs

@consumer(queue_1)
async def handler_1(self, **kwargs):
    log.info('Called handler 1 %s', kwargs)

@consumer(queue_2, timeout=30)
async def handle_2(self, **kwargs):
    log.info('Called handler 2 %s', kwargs)

@consumer(queue_3, dto_class=MyDTO)
async def handle_3(self, dto: MyDTO, **kwargs):
    log.info('Called handler 3 %s', dto)  # dto = MyDTO(**kwargs)

@consumer(queue_4, timeout=30, dto_class=MyDTO, dto_name='obj')
async def handle_4(self, obj: MyDTO, **kwargs):
    log.info('Called handler 4 %s', obj)  # obj = MyDTO(**kwargs)
class microagent.broker.AbstractQueueBroker(dsn: str, uid: str = <factory>, log: ~logging.Logger = <Logger microagent.broker (WARNING)>, _bindings: dict[str, ~microagent.queue.Consumer] = <factory>)[source]

Broker is an abstract interface with two basic methods - send and bind.

send-method allows to write a message to the queue.

bind-method allows to connect to queue for reading.

All registered Queue are available in the broker object as attributes with the names specified in the declaration.

Queue(name='user_created')

broker = RedisBroker('redis://localhost/7')
await broker.user_created.send({'user_id': 1})
dsn

Broker has only one required parameter - dsn-string (data source name), which provide details for establishing connection to mediator-service.

log

Provided or defaul logger

uid

UUID, id of broker instance (for debugging)

_bindings

Dict of all bound consumers

abstract async send(name: str, message: str, **kwargs: Any) None[source]

Write a raw message to queue.

Parameters:
  • name – string, queue name

  • message – string, serialized object

  • **kwargs – specific parameters for each broker implementation

abstract async bind(name: str) None[source]

Start reading queue.

Parameters:

name – string, queue name

async bind_consumer(consumer: Consumer) None[source]

Bind bounded to agent consumer to current broker.

abstract async queue_length(name: str, **options: Any) int[source]

Get the current queue length.

Parameters:
  • name – string, queue name

  • **options – specific parameters for each broker implementation

class microagent.queue.Queue(name: str)[source]

Dataclass (declaration) for a queue entity with a unique name. Each instance registered at creation. Usually, you don’t need to work directly with the Queue-class.

name

String, queue name, project-wide unique, [a-z_]+

Declaration with config-file (queues.json)

{
    "queues": [
        {"name": "mailer"},
        {"name": "pusher"},
    ]
}

Manual declaration (not recommended)

some_queue = Queue(
    name='some_queue'
)
classmethod get(name: str) Queue[source]

Get the queue instance by name

classmethod get_all() dict[str, Queue][source]

All registered queues

serialize(data: dict) str[source]

Data serializing method

Parameters:

data – dict of transfered data

deserialize(data: str | bytes) dict[source]

Data deserializing method

Parameters:

data – serialized transfered data

Internals stuff for queues broker binding

class microagent.broker.BoundQueue(broker: 'AbstractQueueBroker', queue: microagent.queue.Queue)[source]
class microagent.broker.Consumer(agent: 'MicroAgent', handler: collections.abc.Callable[..., collections.abc.Awaitable[None]], queue: microagent.queue.Queue, timeout: float, options: dict, dto_class: type | None = None, dto_name: str | None = None)[source]

Exceptions

class microagent.queue.QueueException[source]

Base queue exception

class microagent.queue.QueueNotFound[source]
class microagent.queue.SerializingError[source]