Source code for microagent.agent

'''
MicroAgent is a **container** for **signal receivers**, **message consumers**
and **periodic tasks**. The MicroAgent links the abstract declarations of receivers
and consumers with the provided bus and broker. The MicroAgent initiates
periodic tasks and starts the servers.

The microagent can be launched using the supplied launcher.
Or it can be used as a stand-alone entity running in python-shell or a custom script.
It may be useful for testing, exploring, debugging and launching in specific purposes.


Agent declaration.

.. code-block:: python

    from microagent import MicroAgent, receiver, consumer, periodic, cron, on

    class Agent(MicroAgent):

        @on('pre_start')
        async def setup(self):
            pass

        @periodic(period=5)
        async def periodic_handler(self):
            pass

        @receiver(signals.send_mail)
        async def send_mail_handler(self, **kwargs):
            pass

        @consumer(queues.mailer)
        async def mail_handler(self, **kwargs):
            pass


Agent initiation.

.. code-block:: python

    import logging
    from microagent.tools.redis import RedisSignalBus, RedisBroker

    # Initialize bus, broker and logger
    bus = RedisSignalBus('redis://localhost/7')
    broker = RedisBroker('redis://localhost/7')
    log = logging.getLogger('my_log')
    settings = {'secret': 'my_secret'}

    # Initialize MicroAgent, all arguments optional
    agent = Agent(bus=bus, broker=broker, log=logger, settings=settings)


Manual launching.

.. code-block:: python

    await user_agent.start()  # freezed here if sub-servers running

    while True:
        await asyncio.sleep(60)


Using MicroAgent resources.

.. code-block:: python

    class Agent(MicroAgent):

        async def setup(self):
            self.log.info('Setup called!')  # write log
            await self.bus.my_sugnal.send(sender='agent', param=1)  # use bus
            await self.broker.my_queue.send({'text': 'Hello world!'})  # use broker
            secret = self.settings['secret']  # user settings
            print(self.info())  # serializable dict of agent structure
'''

import asyncio
import logging

from collections.abc import Callable, Iterable
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from typing import TypeVar

from .broker import AbstractQueueBroker
from .bus import AbstractSignalBus
from .hooks import Hook, Hooks
from .queue import Consumer
from .signal import Receiver
from .timer import CRONTask, PeriodicTask


HandlerTypes = TypeVar('HandlerTypes', Receiver, Consumer, PeriodicTask, CRONTask, Hook)


class MissConfig(Exception):
    pass


[docs] @dataclass class MicroAgent: ''' MicroAgent is a **container** for **signal receivers**, **message consumers** and **periodic tasks**. The magic of declarations binding to an agent-object is implemented in *__new__*. Attaching the bus, broker and logger is implemented in *__init__*. Subscribing and initiating periodic tasks and servers is implemented in *start* method. To create specialized MicroAgent classes, you can override *__init__*, which is safe for the constructor logic. But it is usually sufficient to use *@on('pre_start')* decorator for sync or async methods for initializing resources and etc. :param bus: signal bus, object of subclass :class:`AbstractSignalBus`, required for receive or send the signals :param broker: queue broker, object of subclass :class:`AbstractQueueBroker`, required for consume or send the messages :param log: prepared :class:`logging.Logger`, or use default logger if not provided :param settings: dict of user settings storing in object .. attribute:: log Prepared python logger:: self.log.info('Hellow world') .. attribute:: bus Signal bus, provided on initializing:: await self.bus.send_mail.send('agent', user_id=1) .. attribute:: broker Queue broker, provided on initializing:: await self.broker.mailer.send({'text': 'Hello world'}) .. attribute:: settings Dict, user settings, provided on initializing, or empty. ''' bus: AbstractSignalBus | None = None broker: AbstractQueueBroker | None = None log: logging.Logger = field(default_factory=lambda: logging.getLogger('microagent')) settings: dict = field(default_factory=dict) periodic_tasks: Iterable[PeriodicTask] = field(init=False) cron_tasks: Iterable[CRONTask] = field(init=False) receivers: Iterable[Receiver] = field(init=False) consumers: Iterable[Consumer] = field(init=False) hook: Hooks = field(init=False) def __post_init__(self) -> None: self.periodic_tasks = self._bound_handler(PeriodicTask) self.cron_tasks = self._bound_handler(CRONTask) self.receivers = self._bound_handler(Receiver) self.consumers = self._bound_handler(Consumer) self.hook = Hooks(self._bound_handler(Hook)) if self.receivers and not isinstance(self.bus, AbstractSignalBus): raise MissConfig(f'Bus must be AbstractSignalBus instance {self.bus}') if self.consumers and not isinstance(self.broker, AbstractQueueBroker): raise MissConfig(f'Broker must be AbstractQueueBroker instance {self.broker}') self.log.debug('%s initialized', self) def __repr__(self) -> str: return f'<MicroAgent {self.__class__.__name__}>'
[docs] async def start(self, enable_periodic_tasks: bool = True, enable_receiving_signals: bool = True, enable_consuming_messages: bool = True, enable_servers_running: bool = True ) -> None: ''' Starting MicroAgent to receive signals, consume messages and initiate periodic running. :param enable_periodic_tasks: default enabled :param enable_consuming_messages: default enabled :param enable_receiving_signals: default enabled :param enable_servers_running: default enabled ''' await self.hook.pre_start() if enable_receiving_signals: await self.bind_receivers(self.receivers) if enable_consuming_messages: await self.bind_consumers(self.consumers) if enable_periodic_tasks: self.run_periodic_tasks(self.periodic_tasks, self.cron_tasks) await self.hook.post_start() if enable_servers_running: await self.run_servers(self.hook.servers)
async def stop(self) -> None: await self.hook.pre_stop() @staticmethod async def run_servers(servers: Iterable[Callable]) -> None: await asyncio.gather(*[server() for server in servers]) def run_periodic_tasks(self, periodic_tasks: Iterable[PeriodicTask], cron_tasks: Iterable[CRONTask]) -> None: for task in [*periodic_tasks, *cron_tasks]: start_after: float = getattr(task, 'start_after', None) or 0.0 if start_after > 100: # noqa PLR2004 start_at = datetime.now(tz=timezone.utc) + timedelta(seconds=start_after) self.log.debug('Set %s at %s', task, f'{start_at:%H:%M:%S}') else: self.log.debug('Set %s after %d sec', task, start_after) task.start(start_after)
[docs] async def bind_receivers(self, receivers: Iterable[Receiver]) -> None: ''' Bind signal receivers to bus subscribers ''' if self.bus: for receiver in receivers: await self.bus.bind_receiver(receiver)
[docs] async def bind_consumers(self, consumers: Iterable[Consumer]) -> None: ''' Bind message consumers to queues ''' if self.broker: for consumer in consumers: await self.broker.bind_consumer(consumer)
[docs] def info(self) -> dict: ''' Information about MicroAgent in json-serializable dict ''' return { 'name': self.__class__.__name__, 'bus': str(self.bus) if self.bus else None, 'broker': str(self.broker) if self.broker else None, 'periodic': [ { 'name': task.handler.__name__, 'period': task.period, } for task in self.periodic_tasks ], 'cron': [ { 'name': task.handler.__name__, 'cron': str(task.cron), } for task in self.cron_tasks ], 'receivers': [ { 'name': receiver.handler.__name__, 'signal': receiver.signal.name, } for receiver in self.receivers ], 'consumers': [ { 'name': consumer.handler.__name__, 'queue': consumer.queue.name, } for consumer in self.consumers ], }
def _bound_handler(self, handler_type: type[HandlerTypes]) -> list[HandlerTypes]: classes, result = [], [] for _cls in self.__class__.__mro__: if _cls.__module__ != 'builtins' and _cls.__name__ != 'MicroAgent': classes.append( (_cls.__module__, *_cls.__qualname__.split('.')) # type: ignore[has-type] ) for lookup_key, args in handler_type._register.items(): if tuple(lookup_key[:-1]) in classes: result.append(handler_type( agent=self, handler=getattr(self, lookup_key[-1].split(':')[0]), **args )) return result