Redis
Signal Bus and Queue Broker based on redis.
- class microagent.tools.redis.RedisSignalBus(dsn: str, uid: str = <factory>, prefix: str = 'PUBSUB', log: ~logging.Logger = <Logger microagent.bus (WARNING)>, receivers: dict[str, list[~microagent.signal.Receiver]] = <factory>, _responses: dict[str, ~microagent.utils.IterQueue] = <factory>, _pubsub_lock: ~asyncio.locks.Lock = <factory>)[source]
Bus is based on redis publish and subscribe features. Channel name is forming by rule
`{prefix}:{signal_name}:{sender_name}#{message_id}`
Example:
from microagent.tools.redis import RedisSignalBus bus = RedisSignalBus('redis://localhost/7', prefix='MYAPP', log=custom_logger) await bus.user_created.send('user_agent', user_id=1)
- class microagent.tools.redis.RedisBroker(dsn: str, uid: str = <factory>, log: ~logging.Logger = <Logger microagent.broker (WARNING)>, _bindings: dict = <factory>, WAIT_TIME: int = 15, BIND_TIME: float = 1, ROLLBACK_ATTEMPTS: int = 3, _rollbacks: dict = <factory>)[source]
Broker is based on Redis lists and RPUSH and BLPOP commands. Queue name using as a key. If hanling faild, message will be returned to queue 3 times (by default) and then droped.
Example:
from microagent.tools.redis import RedisBroker broker = RedisBroker('redis://localhost/7', log=custom_logger) await broker.user_created.send({'user_id': 1})
- ROLLBACK_ATTEMPTS
Number attempts for handling of message before it will be droped (by default: 3)
- WAIT_TIME
BLPOP option (by default: 15)
- async send(name: str, message: str, **kwargs: Any) None [source]
Write a raw message to queue.
- Parameters:
name – string, queue name
message – string, serialized object
**kwargs – specific parameters for each broker implementation