MicroAgent
MicroAgent is a container for signal receivers, message consumers and periodic tasks. The MicroAgent links the abstract declarations of receivers and consumers with the provided bus and broker. The MicroAgent initiates periodic tasks and starts the servers.
The microagent can be launched using the supplied launcher. Or it can be used as a stand-alone entity running in python-shell or a custom script. It may be useful for testing, exploring, debugging and launching in specific purposes.
Agent declaration.
from microagent import MicroAgent, receiver, consumer, periodic, cron, on
class Agent(MicroAgent):
@on('pre_start')
async def setup(self):
pass
@periodic(period=5)
async def periodic_handler(self):
pass
@receiver(signals.send_mail)
async def send_mail_handler(self, **kwargs):
pass
@consumer(queues.mailer)
async def mail_handler(self, **kwargs):
pass
Agent initiation.
import logging
from microagent.tools.redis import RedisSignalBus, RedisBroker
# Initialize bus, broker and logger
bus = RedisSignalBus('redis://localhost/7')
broker = RedisBroker('redis://localhost/7')
log = logging.getLogger('my_log')
settings = {'secret': 'my_secret'}
# Initialize MicroAgent, all arguments optional
agent = Agent(bus=bus, broker=broker, log=logger, settings=settings)
Manual launching.
await user_agent.start() # freezed here if sub-servers running
while True:
await asyncio.sleep(60)
Using MicroAgent resources.
class Agent(MicroAgent):
async def setup(self):
self.log.info('Setup called!') # write log
await self.bus.my_sugnal.send(sender='agent', param=1) # use bus
await self.broker.my_queue.send({'text': 'Hello world!'}) # use broker
secret = self.settings['secret'] # user settings
print(self.info()) # serializable dict of agent structure
Decorators details.
- microagent.consumer(queue: Queue, timeout: int = 60, dto_class: type | None = None, dto_name: str | None = None, **options: Any) Callable[[Callable[[...], Awaitable[None]]], Callable[[...], Awaitable[None]]] [source]
Binding for consuming messages from queue.
Only one handler can be bound to one queue.
- Parameters:
queue – Queue - source of data
timeout – Calling timeout in seconds
dto_class – DTO-class, wrapper for consuming data
dto_name – DTO name in consumer method kwargs
@consumer(queue_1) async def handler_1(self, **kwargs): log.info('Called handler 1 %s', kwargs) @consumer(queue_2, timeout=30) async def handle_2(self, **kwargs): log.info('Called handler 2 %s', kwargs) @consumer(queue_3, dto_class=MyDTO) async def handle_3(self, dto: MyDTO, **kwargs): log.info('Called handler 3 %s', dto) # dto = MyDTO(**kwargs) @consumer(queue_4, timeout=30, dto_class=MyDTO, dto_name='obj') async def handle_4(self, obj: MyDTO, **kwargs): log.info('Called handler 4 %s', obj) # obj = MyDTO(**kwargs)
- microagent.cron(spec: str, timeout: int | float = 1) Callable[[Callable[[Any], Awaitable[None]]], Callable[[Any], Awaitable[None]]] [source]
Run decorated function by schedule (cron)
- Parameters:
spec – Specified running scheduling in cron format
timeout – Function timeout in seconds
@periodic('0 */4 * * *') async def handler_1(self): log.info('Called handler 1') @periodic('*/15 * * * *', timeout=10) async def handler_2(self): log.info('Called handler 2')
- microagent.on(label: str) Callable[[Callable[[Any], Awaitable[None]]], Callable[[Any], Awaitable[None]]] [source]
Hooks for internal events (pre_start, post_start, pre_stop) or running forever servers (server).
Server-function will be call as run-forever asyncio task.
- Parameters:
label – Hook type label string (pre_start, post_start, pre_stop, server)
@on('pre_start') async def handler_1(self): log.info('Called handler 1') @on('post_start') async def handler_2(self): log.info('Called handler 2') @on('pre_stop') async def handler_3(self): log.info('Called handler 3') @on('server') async def run_server(self): await Server().start() # run forever raise ServerInterrupt('Exit') # graceful exit
- microagent.periodic(period: int | float, timeout: int | float = 1, start_after: int | float = 0) Callable[[Callable[[Any], Awaitable[None]]], Callable[[Any], Awaitable[None]]] [source]
Run decorated handler periodically.
- Parameters:
period – Period of running functions in seconds
timeout – Function timeout in seconds
start_after – Delay for running loop in seconds
@periodic(period=5) async def handler_1(self): log.info('Called handler 1') @periodic(5, timeout=4) async def handler_2(self): log.info('Called handler 2') @periodic(period=5, start_after=10) async def handler_3(self): log.info('Called handler 3')
- microagent.receiver(*signals: Signal, timeout: int = 60) Callable[[Callable[[...], Awaitable[None | int | str]]], Callable[[...], Awaitable[None | int | str]]] [source]
Binding for signals receiving.
Handler can receive many signals, and many handlers can receiver same signal.
- Parameters:
signals – List of receiving signals
timeout – Calling timeout in seconds
@receiver(signal_1, signal_2) async def handler_1(self, **kwargs): log.info('Called handler 1 %s', kwargs) @receiver(signal_1) async def handle_2(self, **kwargs): log.info('Called handler 2 %s', kwargs) @receiver(signal_2, timeout=30) async def handle_3(self, **kwargs): log.info('Called handler 3 %s', kwargs)
- class microagent.MicroAgent(bus: ~microagent.bus.AbstractSignalBus | None = None, broker: ~microagent.broker.AbstractQueueBroker | None = None, log: ~logging.Logger = <factory>, settings: dict = <factory>)[source]
MicroAgent is a container for signal receivers, message consumers and periodic tasks.
The magic of declarations binding to an agent-object is implemented in __new__. Attaching the bus, broker and logger is implemented in __init__. Subscribing and initiating periodic tasks and servers is implemented in start method.
To create specialized MicroAgent classes, you can override __init__, which is safe for the constructor logic. But it is usually sufficient to use @on(‘pre_start’) decorator for sync or async methods for initializing resources and etc.
- Parameters:
bus – signal bus, object of subclass
AbstractSignalBus
, required for receive or send the signalsbroker – queue broker, object of subclass
AbstractQueueBroker
, required for consume or send the messageslog – prepared
logging.Logger
, or use default logger if not providedsettings – dict of user settings storing in object
- log
Prepared python logger:
self.log.info('Hellow world')
- bus
Signal bus, provided on initializing:
await self.bus.send_mail.send('agent', user_id=1)
- broker
Queue broker, provided on initializing:
await self.broker.mailer.send({'text': 'Hello world'})
- settings
Dict, user settings, provided on initializing, or empty.
- async start(enable_periodic_tasks: bool = True, enable_receiving_signals: bool = True, enable_consuming_messages: bool = True, enable_servers_running: bool = True) None [source]
Starting MicroAgent to receive signals, consume messages and initiate periodic running.
- Parameters:
enable_periodic_tasks – default enabled
enable_consuming_messages – default enabled
enable_receiving_signals – default enabled
enable_servers_running – default enabled