Redis

Signal Bus and Queue Broker based on redis.

class microagent.tools.redis.RedisSignalBus(dsn: str, uid: str = <factory>, prefix: str = 'PUBSUB', log: ~logging.Logger = <Logger microagent.bus (WARNING)>, receivers: dict[str, list[~microagent.signal.Receiver]] = <factory>, _responses: dict[str, ~microagent.utils.IterQueue] = <factory>, _pubsub_lock: ~asyncio.locks.Lock = <factory>)[source]

Bus is based on redis publish and subscribe features. Channel name is forming by rule `{prefix}:{signal_name}:{sender_name}#{message_id}`

Example:

from microagent.tools.redis import RedisSignalBus

bus = RedisSignalBus('redis://localhost/7', prefix='MYAPP', log=custom_logger)

await bus.user_created.send('user_agent', user_id=1)
async send(channel: str, message: str) None[source]

Send raw message to channel. Available optional type checking for input data.

Parameters:
  • channel – string, channel name

  • message – string, serialized object

async bind(channel: str) None[source]

Subscribe to channel.

Parameters:

signal – string, signal name for subscribe

class microagent.tools.redis.RedisBroker(dsn: str, uid: str = <factory>, log: ~logging.Logger = <Logger microagent.broker (WARNING)>, _bindings: dict = <factory>, WAIT_TIME: int = 15, BIND_TIME: float = 1, ROLLBACK_ATTEMPTS: int = 3, _rollbacks: dict = <factory>)[source]

Broker is based on Redis lists and RPUSH and BLPOP commands. Queue name using as a key. If hanling faild, message will be returned to queue 3 times (by default) and then droped.

Example:

from microagent.tools.redis import RedisBroker

broker = RedisBroker('redis://localhost/7', log=custom_logger)

await broker.user_created.send({'user_id': 1})
ROLLBACK_ATTEMPTS

Number attempts for handling of message before it will be droped (by default: 3)

WAIT_TIME

BLPOP option (by default: 15)

async send(name: str, message: str, **kwargs: Any) None[source]

Write a raw message to queue.

Parameters:
  • name – string, queue name

  • message – string, serialized object

  • **kwargs – specific parameters for each broker implementation

async queue_length(name: str, **options: Any) int[source]

Get the current queue length.

Parameters:
  • name – string, queue name

  • **options – specific parameters for each broker implementation

async bind(name: str) None[source]

Start reading queue.

Parameters:

name – string, queue name