Source code for microagent.broker

'''
The data-driven architecture is based on unidirectional message flows between agents.
Here we assume that messages are exchanged through an intermediary, not directly.

Here, an intermediary called Queue Broker implements the producer / consumer pattern.
The broker performs the functions of guaranteed and consistent transmission of messages
from the product to the consumer, many to one (or according to the broker's own logic).
The message has a free structure, fully defined in the user area.

Implementations:

* :ref:`redis <tools-redis>`
* :ref:`aioamqp <tools-amqp>`
* :ref:`kafka <tools-kafka>`


Using QueueBroker separately (sending only)

.. code-block:: python

    from microagent import load_queues
    from microagent.tools.redis import RedisBroker

    queues = load_queues('file://queues.json')

    broker = RedisBroker('redis://localhost/7')
    await broker.user_created.send({'user_id': 1})


Using with MicroAgent

.. code-block:: python

    from microagent import MicroAgent, load_queues
    from microagent.tools.redis import RedisSignalBus

    queues = load_queues('file://queues.json')

    class EmailAgent(MicroAgent):
        @consumer(queues.mailer)
        async def example_read_queue(self, **kwargs):
            await self.broker.email_sended.send({'user_id': 1})

    broker = RedisBroker('redis://localhost/7')
    email_agent = EmailAgent(broker=broker)
    await email_agent.start()
'''
import logging
import uuid

from abc import abstractmethod
from dataclasses import dataclass, field
from typing import Any

from .abc import BrokerProtocol, QueueProtocol
from .queue import Consumer, Queue


[docs] @dataclass(slots=True) class AbstractQueueBroker(BrokerProtocol): ''' Broker is an abstract interface with two basic methods - send and bind. `send`-method allows to write a message to the queue. `bind`-method allows to connect to queue for reading. All registered Queue are available in the broker object as attributes with the names specified in the declaration. .. code-block:: python Queue(name='user_created') broker = RedisBroker('redis://localhost/7') await broker.user_created.send({'user_id': 1}) .. attribute:: dsn Broker has only one required parameter - dsn-string (data source name), which provide details for establishing connection to mediator-service. .. attribute:: log Provided or defaul logger .. attribute:: uid UUID, id of broker instance (for debugging) .. attribute:: _bindings Dict of all bound consumers ''' dsn: str uid: str = field(default_factory=lambda: uuid.uuid4().hex) log: logging.Logger = logging.getLogger('microagent.broker') _bindings: dict[str, Consumer] = field(default_factory=dict) def __getattr__(self, name: str) -> 'BoundQueue': return BoundQueue(self, Queue.get(name))
[docs] @abstractmethod async def send(self, name: str, message: str, **kwargs: Any) -> None: ''' Write a raw message to queue. :param name: string, queue name :param message: string, serialized object :param \*\*kwargs: specific parameters for each broker implementation ''' # noqa: W605 ...
[docs] @abstractmethod async def bind(self, name: str) -> None: ''' Start reading queue. :param name: string, queue name ''' ...
[docs] async def bind_consumer(self, consumer: Consumer) -> None: ''' Bind bounded to agent consumer to current broker. ''' if consumer.queue.name in self._bindings: self.log.warning('Handler to queue "%s" already binded. Ignoring', consumer) return self._bindings[consumer.queue.name] = consumer await self.bind(consumer.queue.name) self.log.debug('Bind %s to queue "%s"', consumer, consumer.queue.name)
@staticmethod def prepared_data(consumer: Consumer, raw_data: str | bytes) -> dict: data = consumer.queue.deserialize(raw_data) if consumer.dto_class: data[consumer.dto_name or 'dto'] = consumer.dto_class(**data) return data
[docs] @abstractmethod async def queue_length(self, name: str, **options: Any) -> int: ''' Get the current queue length. :param name: string, queue name :param \*\*options: specific parameters for each broker implementation ''' # noqa: W605 ...
[docs] @dataclass(slots=True, frozen=True) class BoundQueue(QueueProtocol): broker: 'AbstractQueueBroker' queue: Queue async def send(self, message: dict, **options: Any) -> None: await self.broker.send(self.queue.name, self.queue.serialize(message), **options) async def length(self) -> int: return await self.broker.queue_length(self.queue.name)