MicroAgent

MicroAgent is a container for signal receivers, message consumers and periodic tasks. The MicroAgent links the abstract declarations of receivers and consumers with the provided bus and broker. The MicroAgent initiates periodic tasks and starts the servers.

The microagent can be launched using the supplied launcher. Or it can be used as a stand-alone entity running in python-shell or a custom script. It may be useful for testing, exploring, debugging and launching in specific purposes.

Agent declaration.

from microagent import MicroAgent, receiver, consumer, periodic, cron, on

class Agent(MicroAgent):

    @on('pre_start')
    async def setup(self):
        pass

    @periodic(period=5)
    async def periodic_handler(self):
        pass

    @receiver(signals.send_mail)
    async def send_mail_handler(self, **kwargs):
        pass

    @consumer(queues.mailer)
    async def mail_handler(self, **kwargs):
        pass

Agent initiation.

import logging
from microagent.tools.redis import RedisSignalBus, RedisBroker

# Initialize bus, broker and logger
bus = RedisSignalBus('redis://localhost/7')
broker = RedisBroker('redis://localhost/7')
log = logging.getLogger('my_log')
settings = {'secret': 'my_secret'}

# Initialize MicroAgent, all arguments optional
agent = Agent(bus=bus, broker=broker, log=logger, settings=settings)

Manual launching.

await user_agent.start()  # freezed here if sub-servers running

while True:
    await asyncio.sleep(60)

Using MicroAgent resources.

class Agent(MicroAgent):

    async def setup(self):
        self.log.info('Setup called!')  # write log
        await self.bus.my_sugnal.send(sender='agent', param=1)  # use bus
        await self.broker.my_queue.send({'text': 'Hello world!'})  # use broker
        secret = self.settings['secret']  # user settings
        print(self.info())  # serializable dict of agent structure

Decorators details.

microagent.consumer(queue: Queue, timeout: int = 60, dto_class: type | None = None, dto_name: str | None = None, **options: Any) Callable[[Callable[[...], Awaitable[None]]], Callable[[...], Awaitable[None]]][source]

Binding for consuming messages from queue.

Only one handler can be bound to one queue.

Parameters:
  • queue – Queue - source of data

  • timeout – Calling timeout in seconds

  • dto_class – DTO-class, wrapper for consuming data

  • dto_name – DTO name in consumer method kwargs

@consumer(queue_1)
async def handler_1(self, **kwargs):
    log.info('Called handler 1 %s', kwargs)

@consumer(queue_2, timeout=30)
async def handle_2(self, **kwargs):
    log.info('Called handler 2 %s', kwargs)

@consumer(queue_3, dto_class=MyDTO)
async def handle_3(self, dto: MyDTO, **kwargs):
    log.info('Called handler 3 %s', dto)  # dto = MyDTO(**kwargs)

@consumer(queue_4, timeout=30, dto_class=MyDTO, dto_name='obj')
async def handle_4(self, obj: MyDTO, **kwargs):
    log.info('Called handler 4 %s', obj)  # obj = MyDTO(**kwargs)
microagent.cron(spec: str, timeout: int | float = 1) Callable[[Callable[[Any], Awaitable[None]]], Callable[[Any], Awaitable[None]]][source]

Run decorated function by schedule (cron)

Parameters:
  • spec – Specified running scheduling in cron format

  • timeout – Function timeout in seconds

@periodic('0 */4 * * *')
async def handler_1(self):
    log.info('Called handler 1')

@periodic('*/15 * * * *', timeout=10)
async def handler_2(self):
    log.info('Called handler 2')
microagent.on(label: str) Callable[[Callable[[Any], Awaitable[None]]], Callable[[Any], Awaitable[None]]][source]

Hooks for internal events (pre_start, post_start, pre_stop) or running forever servers (server).

Server-function will be call as run-forever asyncio task.

Parameters:

label – Hook type label string (pre_start, post_start, pre_stop, server)

@on('pre_start')
async def handler_1(self):
    log.info('Called handler 1')

@on('post_start')
async def handler_2(self):
    log.info('Called handler 2')

@on('pre_stop')
async def handler_3(self):
    log.info('Called handler 3')

@on('server')
async def run_server(self):
    await Server().start()  # run forever
    raise ServerInterrupt('Exit')  # graceful exit
microagent.periodic(period: int | float, timeout: int | float = 1, start_after: int | float = 0) Callable[[Callable[[Any], Awaitable[None]]], Callable[[Any], Awaitable[None]]][source]

Run decorated handler periodically.

Parameters:
  • period – Period of running functions in seconds

  • timeout – Function timeout in seconds

  • start_after – Delay for running loop in seconds

@periodic(period=5)
async def handler_1(self):
    log.info('Called handler 1')

@periodic(5, timeout=4)
async def handler_2(self):
    log.info('Called handler 2')

@periodic(period=5, start_after=10)
async def handler_3(self):
    log.info('Called handler 3')
microagent.receiver(*signals: Signal, timeout: int = 60) Callable[[Callable[[...], Awaitable[None | int | str]]], Callable[[...], Awaitable[None | int | str]]][source]

Binding for signals receiving.

Handler can receive many signals, and many handlers can receiver same signal.

Parameters:
  • signals – List of receiving signals

  • timeout – Calling timeout in seconds

@receiver(signal_1, signal_2)
async def handler_1(self, **kwargs):
    log.info('Called handler 1 %s', kwargs)

@receiver(signal_1)
async def handle_2(self, **kwargs):
    log.info('Called handler 2 %s', kwargs)

@receiver(signal_2, timeout=30)
async def handle_3(self, **kwargs):
    log.info('Called handler 3 %s', kwargs)
class microagent.MicroAgent(bus: ~microagent.bus.AbstractSignalBus | None = None, broker: ~microagent.broker.AbstractQueueBroker | None = None, log: ~logging.Logger = <factory>, settings: dict = <factory>)[source]

MicroAgent is a container for signal receivers, message consumers and periodic tasks.

The magic of declarations binding to an agent-object is implemented in __new__. Attaching the bus, broker and logger is implemented in __init__. Subscribing and initiating periodic tasks and servers is implemented in start method.

To create specialized MicroAgent classes, you can override __init__, which is safe for the constructor logic. But it is usually sufficient to use @on(‘pre_start’) decorator for sync or async methods for initializing resources and etc.

Parameters:
  • bus – signal bus, object of subclass AbstractSignalBus, required for receive or send the signals

  • broker – queue broker, object of subclass AbstractQueueBroker, required for consume or send the messages

  • log – prepared logging.Logger, or use default logger if not provided

  • settings – dict of user settings storing in object

log

Prepared python logger:

self.log.info('Hellow world')
bus

Signal bus, provided on initializing:

await self.bus.send_mail.send('agent', user_id=1)
broker

Queue broker, provided on initializing:

await self.broker.mailer.send({'text': 'Hello world'})
settings

Dict, user settings, provided on initializing, or empty.

async start(enable_periodic_tasks: bool = True, enable_receiving_signals: bool = True, enable_consuming_messages: bool = True, enable_servers_running: bool = True) None[source]

Starting MicroAgent to receive signals, consume messages and initiate periodic running.

Parameters:
  • enable_periodic_tasks – default enabled

  • enable_consuming_messages – default enabled

  • enable_receiving_signals – default enabled

  • enable_servers_running – default enabled

async bind_receivers(receivers: Iterable[Receiver]) None[source]

Bind signal receivers to bus subscribers

async bind_consumers(consumers: Iterable[Consumer]) None[source]

Bind message consumers to queues

info() dict[source]

Information about MicroAgent in json-serializable dict