Signal bus
Event-driven architecture is based on objects exchanging a non-directed messages - events. Here we assume that events (signals) that are not stored anywhere, everyone can receive and send them, like a radio-transfer.
Here, the intermediary that manages messages routing is called the Signal Bus and implements the publish / subscribe pattern. A signal is a message with a strict fixed structure. The Bus contains many channels that are different for each type of signal. We can send a signal from many sources and listen it with many receivers.
Implementations:
Using SignalBus separately (sending only)
from microagent import load_signals
from microagent.tools.redis import RedisSignalBus
signals = load_signals('file://signals.json')
bus = RedisSignalBus('redis://localhost/7')
await bus.user_created.send('user_agent', user_id=1)
Using with MicroAgent
from microagent import MicroAgent, load_signals
from microagent.tools.redis import RedisSignalBus
signals = load_signals('file://signals.json')
class UserAgent(MicroAgent):
@receiver(signals.user_created)
async def example(self, user_id, **kwargs):
await self.bus.user_created.send('some_signal', user_id=1)
bus = RedisSignalBus('redis://localhost/7')
user_agent = UserAgent(bus=bus)
await user_agent.start()
- microagent.load_signals(source: str) NamedTuple [source]
Load Signal-entities from file or by web.
from microagent import load_signals signals_from_file = load_signals('file://signals.json') signals_from_web = load_signals('http://example.com/signals.json')
Signals declarations (signals.json).
{ "signals": [ {"name": "started", "providing_args": []}, {"name": "user_created", "providing_args": ["user_id"]}, {"name": "typed_signal", "providing_args": { "uuid": "string", "code": ["number", "null"], "flag": "boolean", "ids": "array" }} ] }
- microagent.receiver(*signals: Signal, timeout: int = 60) Callable[[Callable[[...], Awaitable[None | int | str]]], Callable[[...], Awaitable[None | int | str]]] [source]
Binding for signals receiving.
Handler can receive many signals, and many handlers can receiver same signal.
- Parameters:
signals – List of receiving signals
timeout – Calling timeout in seconds
@receiver(signal_1, signal_2) async def handler_1(self, **kwargs): log.info('Called handler 1 %s', kwargs) @receiver(signal_1) async def handle_2(self, **kwargs): log.info('Called handler 2 %s', kwargs) @receiver(signal_2, timeout=30) async def handle_3(self, **kwargs): log.info('Called handler 3 %s', kwargs)
- class microagent.bus.AbstractSignalBus(dsn: str, uid: str = <factory>, prefix: str = 'PUBSUB', log: ~logging.Logger = <Logger microagent.bus (WARNING)>, receivers: dict[str, list[~microagent.signal.Receiver]] = <factory>, _responses: dict[str, ~microagent.utils.IterQueue] = <factory>)[source]
Signal bus is an abstract interface with two basic methods - send and bind.
send-method allows to publish some signal in the channel for subscribers.
bind-method allows to subscribe to the channel(s) for receive the signal(s).
call-method allows to use RPC based on send and bind.
All registered Signals are available in the bus object as attributes with the names specified in the declaration.
Signal(name='user_created', providing_args=['user_id']) bus = RedisSignalBus('redis://localhost/7') await bus.user_created.send('user_agent', user_id=1)
- dsn
Bus has only one required parameter - dsn-string (data source name), which provide details for establishing a connection with the mediator-service.
- prefix
Channel prefix, string, one for project. It is allowing use same redis for different projects.
- log
Provided or defaul logger
- uid
UUID, id of bus instance (for debugging)
- receivers
Dict of all binded receivers
- abstract async send(channel: str, message: str) None [source]
Send raw message to channel. Available optional type checking for input data.
- Parameters:
channel – string, channel name
message – string, serialized object
- abstract async bind(signal: str) None [source]
Subscribe to channel.
- Parameters:
signal – string, signal name for subscribe
- async bind_receiver(receiver: Receiver) None [source]
Bind bounded to agent receiver to current bus.
- call(channel: str, message: str, timeout: int) AsyncIterator[IterQueue] [source]
RPC over pub/sub. Pair of signals - sending and responsing. Response-signal is an internal construction enabled by default. When we call call we send a ordinary declared by user signal with a unique id and awaiting a response with same id. The response can contain a string value or an integer that is returned by the signal receiver.
Signal-attached method call will catch only first value. To process multiple responses, you can use async context waiter, which will return an async generator of response data. You can break it or return value when it needed. waiter-method has the timeout argument set to 60 by default.
Answer:
`{"<Class>.<method>": <value>}`
Available optional type checking for input data.
class CommentAgent(MicroAgent): @receiver(signals.rpc_comments_count) async def example_rpc_handler(self, user_id, **kwargs): return 1 response = await bus.rpc_comments_count.call('user_agent', user_id=1) value = response['CommentAgent.example_rpc_handler'] async with bus.rpc_comments_count.waiter('user_agent', user_id=1) as queue: async for x in queue: logging.info('Get response %s', x) break
- class microagent.bus.Signal(name: str, providing_args: list[str], type_map: dict[str, tuple[type, ...]] | None = None)[source]
Dataclass (declaration) for a signal entity with a unique name. Each instance registered at creation. Usually, you don’t need to work directly with the Signal-class.
- name
String, signal name, project-wide unique, [a-z_]+
- providing_args
All available and required parameters of message, can be simple list of argument names, or dictionary with declared types for each argument. If types declared, will be enabled soft type checking (warning log) for input data in runtime. Type checking works in bus.send, bus.call and on receiving signals. Supported only json-types: string, number, boolean, array, object, null.
Declaration with config-file (signals.json).
{ "signals": [ {"name": "started", "providing_args": []}, {"name": "user_created", "providing_args": ["user_id"]}, {"name": "typed_signal", "providing_args": { "uuid": "string", "code": ["number", "null"], "flag": "boolean", "ids": "array" }} ] }
Manual declaration (not recommended)
some_signal = Signal( name='some_signal', providing_args=['some_arg'] )
- make_channel_name(channel_prefix: str, sender: str = '*') str [source]
Construct a channel name by the signal description
- Parameters:
channel_prefix – prefix, often project name
sender – name of signal sender
Internals stuff for signal bus binding
- class microagent.bus.BoundSignal(bus: microagent.bus.AbstractSignalBus, signal: microagent.signal.Signal)[source]
- class microagent.bus.Receiver(agent: 'MicroAgent', handler: collections.abc.Callable[..., collections.abc.Awaitable[None | int | str]], signal: microagent.signal.Signal, timeout: float)[source]
Exceptions