Queue broker
The data-driven architecture is based on unidirectional message flows between agents. Here we assume that messages are exchanged through an intermediary, not directly.
Here, an intermediary called Queue Broker implements the producer / consumer pattern. The broker performs the functions of guaranteed and consistent transmission of messages from the product to the consumer, many to one (or according to the broker’s own logic). The message has a free structure, fully defined in the user area.
Implementations:
Using QueueBroker separately (sending only)
from microagent import load_queues
from microagent.tools.redis import RedisBroker
queues = load_queues('file://queues.json')
broker = RedisBroker('redis://localhost/7')
await broker.user_created.send({'user_id': 1})
Using with MicroAgent
from microagent import MicroAgent, load_queues
from microagent.tools.redis import RedisSignalBus
queues = load_queues('file://queues.json')
class EmailAgent(MicroAgent):
@consumer(queues.mailer)
async def example_read_queue(self, **kwargs):
await self.broker.email_sended.send({'user_id': 1})
broker = RedisBroker('redis://localhost/7')
email_agent = EmailAgent(broker=broker)
await email_agent.start()
- microagent.load_queues(source: str) NamedTuple [source]
Load Queue-entities from file or by web.
from microagent import load_queues signals_from_file = load_signals('file://queues.json') signals_from_web = load_signals('http://example.com/queues.json')
Queues declarations (queues.json).
{ "queues": [ {"name": "mailer"}, {"name": "pusher"}, ] }
- microagent.consumer(queue: Queue, timeout: int = 60, dto_class: type | None = None, dto_name: str | None = None, **options: Any) Callable[[Callable[[...], Awaitable[None]]], Callable[[...], Awaitable[None]]] [source]
Binding for consuming messages from queue.
Only one handler can be bound to one queue.
- Parameters:
queue – Queue - source of data
timeout – Calling timeout in seconds
dto_class – DTO-class, wrapper for consuming data
dto_name – DTO name in consumer method kwargs
@consumer(queue_1) async def handler_1(self, **kwargs): log.info('Called handler 1 %s', kwargs) @consumer(queue_2, timeout=30) async def handle_2(self, **kwargs): log.info('Called handler 2 %s', kwargs) @consumer(queue_3, dto_class=MyDTO) async def handle_3(self, dto: MyDTO, **kwargs): log.info('Called handler 3 %s', dto) # dto = MyDTO(**kwargs) @consumer(queue_4, timeout=30, dto_class=MyDTO, dto_name='obj') async def handle_4(self, obj: MyDTO, **kwargs): log.info('Called handler 4 %s', obj) # obj = MyDTO(**kwargs)
- class microagent.broker.AbstractQueueBroker(dsn: str, uid: str = <factory>, log: ~logging.Logger = <Logger microagent.broker (WARNING)>, _bindings: dict[str, ~microagent.queue.Consumer] = <factory>)[source]
Broker is an abstract interface with two basic methods - send and bind.
send-method allows to write a message to the queue.
bind-method allows to connect to queue for reading.
All registered Queue are available in the broker object as attributes with the names specified in the declaration.
Queue(name='user_created') broker = RedisBroker('redis://localhost/7') await broker.user_created.send({'user_id': 1})
- dsn
Broker has only one required parameter - dsn-string (data source name), which provide details for establishing connection to mediator-service.
- log
Provided or defaul logger
- uid
UUID, id of broker instance (for debugging)
- _bindings
Dict of all bound consumers
- abstract async send(name: str, message: str, **kwargs: Any) None [source]
Write a raw message to queue.
- Parameters:
name – string, queue name
message – string, serialized object
**kwargs – specific parameters for each broker implementation
- abstract async bind(name: str) None [source]
Start reading queue.
- Parameters:
name – string, queue name
- class microagent.queue.Queue(name: str)[source]
Dataclass (declaration) for a queue entity with a unique name. Each instance registered at creation. Usually, you don’t need to work directly with the Queue-class.
- name
String, queue name, project-wide unique, [a-z_]+
Declaration with config-file (queues.json)
{ "queues": [ {"name": "mailer"}, {"name": "pusher"}, ] }
Manual declaration (not recommended)
some_queue = Queue( name='some_queue' )
Internals stuff for queues broker binding
- class microagent.broker.BoundQueue(broker: 'AbstractQueueBroker', queue: microagent.queue.Queue)[source]
- class microagent.broker.Consumer(agent: 'MicroAgent', handler: collections.abc.Callable[..., collections.abc.Awaitable[None]], queue: microagent.queue.Queue, timeout: float, options: dict, dto_class: type | None = None, dto_name: str | None = None)[source]
Exceptions