Source code for microagent.bus

'''
Event-driven architecture is based on objects exchanging a non-directed messages - events.
Here we assume that events (signals) that are not stored anywhere, everyone can
receive and send them, like a radio-transfer.

Here, the intermediary that manages messages routing is called the Signal Bus and
implements the publish / subscribe pattern. A signal is a message with a strict fixed structure.
The Bus contains many channels that are different for each type of signal.
We can send a signal from many sources and listen it with many receivers.

Implementations:

* :ref:`redis <tools-redis>`


Using SignalBus separately (sending only)

.. code-block:: python

    from microagent import load_signals
    from microagent.tools.redis import RedisSignalBus

    signals = load_signals('file://signals.json')

    bus = RedisSignalBus('redis://localhost/7')
    await bus.user_created.send('user_agent', user_id=1)


Using with MicroAgent

.. code-block:: python

    from microagent import MicroAgent, load_signals
    from microagent.tools.redis import RedisSignalBus

    signals = load_signals('file://signals.json')

    class UserAgent(MicroAgent):
        @receiver(signals.user_created)
        async def example(self, user_id, **kwargs):
            await self.bus.user_created.send('some_signal', user_id=1)

    bus = RedisSignalBus('redis://localhost/7')
    user_agent = UserAgent(bus=bus)
    await user_agent.start()
'''
import asyncio
import contextlib
import logging
import time
import uuid

from abc import abstractmethod
from collections import abc, defaultdict
from dataclasses import dataclass, field
from typing import Any

from .abc import BusProtocol, SignalProtocol
from .signal import Receiver, SerializingError, Signal
from .utils import IterQueue, raise_timeout


[docs] @dataclass(slots=True) class AbstractSignalBus(BusProtocol): ''' Signal bus is an abstract interface with two basic methods - send and bind. `send`-method allows to publish some signal in the channel for subscribers. `bind`-method allows to subscribe to the channel(s) for receive the signal(s). `call`-method allows to use RPC based on `send` and `bind`. All registered Signals are available in the bus object as attributes with the names specified in the declaration. .. code-block:: python Signal(name='user_created', providing_args=['user_id']) bus = RedisSignalBus('redis://localhost/7') await bus.user_created.send('user_agent', user_id=1) .. attribute:: dsn Bus has only one required parameter - dsn-string (data source name), which provide details for establishing a connection with the mediator-service. .. attribute:: prefix Channel prefix, string, one for project. It is allowing use same redis for different projects. .. attribute:: log Provided or defaul logger .. attribute:: uid UUID, id of bus instance (for debugging) .. attribute:: receivers Dict of all binded receivers ''' dsn: str uid: str = field(default_factory=lambda: uuid.uuid4().hex) prefix: str = 'PUBSUB' log: logging.Logger = logging.getLogger('microagent.bus') receivers: dict[str, list[Receiver]] = field(default_factory=lambda: defaultdict(list)) _responses: dict[str, IterQueue] = field(default_factory=dict) def __post_init__(self) -> None: response_signal = Signal(name='response', providing_args=[]) asyncio.create_task(self.bind(response_signal.make_channel_name(self.prefix))) self.log.debug('%s initialized', self) def __repr__(self) -> str: return f'<Bus {self.__class__.__name__} {self.dsn} {self.prefix}#{self.uid}>' def __getattr__(self, name: str) -> 'BoundSignal': signal = Signal.get(name) return BoundSignal(self, signal)
[docs] @abstractmethod async def send(self, channel: str, message: str) -> None: ''' Send raw message to channel. Available optional type checking for input data. :param channel: string, channel name :param message: string, serialized object ''' ...
[docs] @abstractmethod async def bind(self, signal: str) -> None: ''' Subscribe to channel. :param signal: string, signal name for subscribe ''' ...
[docs] async def bind_receiver(self, receiver: Receiver) -> None: ''' Bind bounded to agent receiver to current bus. ''' self.log.info('Bind %s to %s: %s', receiver.signal, self, receiver) if receiver.signal.name not in self.receivers: await self.bind(receiver.signal.make_channel_name(self.prefix)) self.receivers[receiver.signal.name].append(receiver)
[docs] @contextlib.asynccontextmanager async def call(self, channel: str, message: str, timeout: int) -> abc.AsyncIterator[IterQueue]: ''' RPC over pub/sub. Pair of signals - sending and responsing. Response-signal is an internal construction enabled by default. When we call `call` we send a ordinary declared by user signal with a unique id and awaiting a response with same id. The response can contain a string value or an integer that is returned by the signal receiver. Signal-attached method `call` will catch only first value. To process multiple responses, you can use async context `waiter`, which will return an async generator of response data. You can break it or return value when it needed. `waiter`-method has the `timeout` argument set to 60 by default. Answer: ```{"<Class>.<method>": <value>}``` Available optional type checking for input data. .. code-block:: python class CommentAgent(MicroAgent): @receiver(signals.rpc_comments_count) async def example_rpc_handler(self, user_id, **kwargs): return 1 response = await bus.rpc_comments_count.call('user_agent', user_id=1) value = response['CommentAgent.example_rpc_handler'] async with bus.rpc_comments_count.waiter('user_agent', user_id=1) as queue: async for x in queue: logging.info('Get response %s', x) break ''' queue: IterQueue = IterQueue() request_id = uuid.uuid4().hex self._responses[request_id] = queue await self.send(f'{channel}#{request_id}', message) try: raise_timeout(timeout) yield queue finally: self._responses.pop(request_id, None)
[docs] def receiver(self, channel: str, message: str) -> None: ''' Handler of raw incoming messages. Available optional type checking for input data. ''' signal_id: str | None = None if '#' in channel: channel, signal_id = channel.split('#') _, name, sender = channel.split(':') # prefix:name:sender signal = Signal.get(name) try: data = signal.deserialize(message) # type: dict except SerializingError: self.log.error('Invalid pubsub message: %s', message) return None if name == 'response' and signal_id: return self.handle_response(signal_id, data) if signal.type_map: check_types(signal, data, self.log) diff_args = set(signal.providing_args) ^ set(data.keys()) if diff_args: self.log.warning('Pubsub mismatch arguments %s %s', channel, diff_args) asyncio.create_task(self.handle_signal(signal, sender, signal_id, data)) return None
def handle_response(self, signal_id: str, message: dict[str, int | str | None]) -> None: if queue := self._responses.get(signal_id): queue.put_nowait(message) async def handle_signal(self, signal: Signal, sender: str, signal_id: str | None, message: dict) -> None: receivers: list[Receiver] = self.receivers.get(signal.name, []) responses: list[int | str | None] = await asyncio.gather(*[ self.broadcast(receiver, signal, sender, message) for receiver in receivers ]) if signal_id: await self.send( f'{self.prefix}:response:{self.uid}#{signal_id}', Signal._jsonlib.dumps({ rec.key: res for rec, res in zip(receivers, responses, strict=True) }) ) async def broadcast(self, receiver: Receiver, signal: Signal, sender: str, message: dict) -> int | str | None: self.log.debug('Calling %s by %s:%s with %s', receiver.handler, signal.name, sender, str(message).encode('utf-8')) timer = time.monotonic() # type: float try: response = await asyncio.wait_for( receiver.handler(signal=signal, sender=sender, **message), receiver.timeout ) if isinstance(response, int | str): return response except TypeError: self.log.exception('Call %s failed', signal.name) except asyncio.TimeoutError: self.log.error( 'TimeoutError: %s %.2f', receiver.handler, time.monotonic() - timer) return None
[docs] @dataclass(slots=True, frozen=True) class BoundSignal(SignalProtocol): bus: AbstractSignalBus signal: Signal async def send(self, sender: str, **kwargs: Any) -> None: if self.signal.type_map: check_types(self.signal, kwargs, self.bus.log) await self.bus.send( self.signal.make_channel_name(self.bus.prefix, sender), self.signal.serialize(kwargs)) async def call(self, sender: str, timeout: int = 60, **kwargs: Any ) -> dict[str, int | str | None]: if self.signal.type_map: check_types(self.signal, kwargs, self.bus.log) gen = self.bus.call( self.signal.make_channel_name(self.bus.prefix, sender), self.signal.serialize(kwargs), timeout=timeout ) async with gen as queue: async for value in queue: return value return {}
[docs] def waiter(self, sender: str, timeout: int = 60, **kwargs: Any ) -> contextlib.AbstractAsyncContextManager: ''' async with bus.iter(sender='name', a=1, timeout=10) as queue: async for x in queue: logging.info('Get response %s', x) break ''' return self.bus.call( self.signal.make_channel_name(self.bus.prefix, sender), self.signal.serialize(kwargs), timeout=timeout )
def check_types(signal: Signal, data: dict, log: logging.Logger) -> None: if signal.type_map: for key, value in data.items(): if key not in signal.type_map: log.warning('Receiver get unknown arg "%s" %s', key, value) elif not isinstance(value, signal.type_map[key]): log.warning('Receiver get wrong type for "%s" %s', key, value)