Signal bus

Event-driven architecture is based on objects exchanging a non-directed messages - events. Here we assume that events (signals) that are not stored anywhere, everyone can receive and send them, like a radio-transfer.

Here, the intermediary that manages messages routing is called the Signal Bus and implements the publish / subscribe pattern. A signal is a message with a strict fixed structure. The Bus contains many channels that are different for each type of signal. We can send a signal from many sources and listen it with many receivers.

Implementations:

Using SignalBus separately (sending only)

from microagent import load_signals
from microagent.tools.redis import RedisSignalBus

signals = load_signals('file://signals.json')

bus = RedisSignalBus('redis://localhost/7')
await bus.user_created.send('user_agent', user_id=1)

Using with MicroAgent

from microagent import MicroAgent, load_signals
from microagent.tools.redis import RedisSignalBus

signals = load_signals('file://signals.json')

class UserAgent(MicroAgent):
    @receiver(signals.user_created)
    async def example(self, user_id, **kwargs):
        await self.bus.user_created.send('some_signal', user_id=1)

bus = RedisSignalBus('redis://localhost/7')
user_agent = UserAgent(bus=bus)
await user_agent.start()
microagent.load_signals(source: str) NamedTuple[source]

Load Signal-entities from file or by web.

from microagent import load_signals

signals_from_file = load_signals('file://signals.json')
signals_from_web = load_signals('http://example.com/signals.json')

Signals declarations (signals.json).

{
    "signals": [
        {"name": "started", "providing_args": []},
        {"name": "user_created", "providing_args": ["user_id"]},
        {"name": "typed_signal", "providing_args": {
            "uuid": "string",
            "code": ["number", "null"],
            "flag": "boolean",
            "ids": "array"
        }}
    ]
}
microagent.receiver(*signals: Signal, timeout: int = 60) Callable[[Callable[[...], Awaitable[None | int | str]]], Callable[[...], Awaitable[None | int | str]]][source]

Binding for signals receiving.

Handler can receive many signals, and many handlers can receiver same signal.

Parameters:
  • signals – List of receiving signals

  • timeout – Calling timeout in seconds

@receiver(signal_1, signal_2)
async def handler_1(self, **kwargs):
    log.info('Called handler 1 %s', kwargs)

@receiver(signal_1)
async def handle_2(self, **kwargs):
    log.info('Called handler 2 %s', kwargs)

@receiver(signal_2, timeout=30)
async def handle_3(self, **kwargs):
    log.info('Called handler 3 %s', kwargs)
class microagent.bus.AbstractSignalBus(dsn: str, uid: str = <factory>, prefix: str = 'PUBSUB', log: ~logging.Logger = <Logger microagent.bus (WARNING)>, receivers: dict[str, list[~microagent.signal.Receiver]] = <factory>, _responses: dict[str, ~microagent.utils.IterQueue] = <factory>)[source]

Signal bus is an abstract interface with two basic methods - send and bind.

send-method allows to publish some signal in the channel for subscribers.

bind-method allows to subscribe to the channel(s) for receive the signal(s).

call-method allows to use RPC based on send and bind.

All registered Signals are available in the bus object as attributes with the names specified in the declaration.

Signal(name='user_created', providing_args=['user_id'])

bus = RedisSignalBus('redis://localhost/7')
await bus.user_created.send('user_agent', user_id=1)
dsn

Bus has only one required parameter - dsn-string (data source name), which provide details for establishing a connection with the mediator-service.

prefix

Channel prefix, string, one for project. It is allowing use same redis for different projects.

log

Provided or defaul logger

uid

UUID, id of bus instance (for debugging)

receivers

Dict of all binded receivers

abstract async send(channel: str, message: str) None[source]

Send raw message to channel. Available optional type checking for input data.

Parameters:
  • channel – string, channel name

  • message – string, serialized object

abstract async bind(signal: str) None[source]

Subscribe to channel.

Parameters:

signal – string, signal name for subscribe

async bind_receiver(receiver: Receiver) None[source]

Bind bounded to agent receiver to current bus.

call(channel: str, message: str, timeout: int) AsyncIterator[IterQueue][source]

RPC over pub/sub. Pair of signals - sending and responsing. Response-signal is an internal construction enabled by default. When we call call we send a ordinary declared by user signal with a unique id and awaiting a response with same id. The response can contain a string value or an integer that is returned by the signal receiver.

Signal-attached method call will catch only first value. To process multiple responses, you can use async context waiter, which will return an async generator of response data. You can break it or return value when it needed. waiter-method has the timeout argument set to 60 by default.

Answer: `{"<Class>.<method>": <value>}`

Available optional type checking for input data.

class CommentAgent(MicroAgent):
    @receiver(signals.rpc_comments_count)
    async def example_rpc_handler(self, user_id, **kwargs):
        return 1

response = await bus.rpc_comments_count.call('user_agent', user_id=1)
value = response['CommentAgent.example_rpc_handler']

async with bus.rpc_comments_count.waiter('user_agent', user_id=1) as queue:
    async for x in queue:
        logging.info('Get response %s', x)
        break
receiver(channel: str, message: str) None[source]

Handler of raw incoming messages. Available optional type checking for input data.

class microagent.bus.Signal(name: str, providing_args: list[str], type_map: dict[str, tuple[type, ...]] | None = None)[source]

Dataclass (declaration) for a signal entity with a unique name. Each instance registered at creation. Usually, you don’t need to work directly with the Signal-class.

name

String, signal name, project-wide unique, [a-z_]+

providing_args

All available and required parameters of message, can be simple list of argument names, or dictionary with declared types for each argument. If types declared, will be enabled soft type checking (warning log) for input data in runtime. Type checking works in bus.send, bus.call and on receiving signals. Supported only json-types: string, number, boolean, array, object, null.

Declaration with config-file (signals.json).

{
    "signals": [
        {"name": "started", "providing_args": []},
        {"name": "user_created", "providing_args": ["user_id"]},
        {"name": "typed_signal", "providing_args": {
            "uuid": "string",
            "code": ["number", "null"],
            "flag": "boolean",
            "ids": "array"
        }}
    ]
}

Manual declaration (not recommended)

some_signal = Signal(
    name='some_signal',
    providing_args=['some_arg']
)
classmethod get(name: str) Signal[source]

Get the signal instance by name

classmethod get_all() dict[str, Signal][source]

All registered signals

make_channel_name(channel_prefix: str, sender: str = '*') str[source]

Construct a channel name by the signal description

Parameters:
  • channel_prefix – prefix, often project name

  • sender – name of signal sender

serialize(data: dict) str[source]

Data serializing method

Parameters:

data – dict of transfered data

deserialize(data: str) dict[source]

Data deserializing method

Parameters:

data – serialized transfered data

Internals stuff for signal bus binding

class microagent.bus.BoundSignal(bus: microagent.bus.AbstractSignalBus, signal: microagent.signal.Signal)[source]
waiter(sender: str, timeout: int = 60, **kwargs: Any) AbstractAsyncContextManager[source]
async with bus.iter(sender=’name’, a=1, timeout=10) as queue:
async for x in queue:

logging.info(‘Get response %s’, x) break

class microagent.bus.Receiver(agent: 'MicroAgent', handler: collections.abc.Callable[..., collections.abc.Awaitable[None | int | str]], signal: microagent.signal.Signal, timeout: float)[source]

Exceptions

class microagent.signal.SignalException[source]

Base signal exception

class microagent.signal.SignalNotFound[source]
class microagent.signal.SerializingError[source]