Source code for microagent

__version__ = '1.7.2'

import importlib
import json
import urllib.request

from collections import abc
from typing import Any, NamedTuple

from .abc import ConsumerFunc, HookFunc, PeriodicFunc, ReceiverFunc
from .agent import MicroAgent
from .hooks import Hook, HookArgs
from .launcher import ServerInterrupt
from .queue import Consumer, ConsumerArgs, Queue
from .signal import Receiver, ReceiverArgs, Signal
from .timer import CRONArgs, CRONTask, PeriodicArgs, PeriodicTask, cron_parser
from .utils import make_bound_key


__all__ = ['Signal', 'Queue', 'MicroAgent', 'ServerInterrupt', 'receiver', 'consumer',
           'periodic', 'cron', 'on', 'load_stuff', 'load_signals', 'load_queues']

MIN_TIMEOUT_SEC = .001
JSON_TYPES = {
    'string': str,
    'number': int,
    'object': dict,
    'array': list,
    'boolean': bool,
    'null': type(None)
}


def get_types(value: str | list[str]) -> tuple[type, ...]:
    if isinstance(value, str):
        value = [value]
    return tuple(JSON_TYPES[x] for x in value)


def load_stuff(source: str) -> tuple[Any, Any]:
    '''
        Init signals from json-file loaded from disk or http request
    '''

    data: dict[str, abc.Iterable[dict[str, Any]]] = {}

    if source.startswith('file://'):
        with open(source.replace('file://', ''), encoding='utf8') as f:
            data.update(json.loads(f.read().replace('\n', '')))
    else:
        with urllib.request.urlopen(source) as response:
            data.update(json.loads(response.read()))

    for _data in data.get('signals', []):
        providing_args, type_map = _data['providing_args'], None

        if isinstance(providing_args, dict):
            type_map = {name: get_types(_type) for name, _type in providing_args.items()}
            providing_args = list(providing_args)

        Signal(name=_data['name'], providing_args=providing_args, type_map=type_map)

    for _data in data.get('queues', []):
        Queue(name=_data['name'])

    if data.get('jsonlib'):
        jsonlib = importlib.import_module(data['jsonlib'])  # type: ignore
        Signal.set_jsonlib(jsonlib)
        Queue.set_jsonlib(jsonlib)

    # mypy: https://github.com/python/mypy/issues/848
    SignalList = NamedTuple('signals', [(name, Signal) for name in Signal.get_all().keys()])  # type: ignore
    QueueList = NamedTuple('queues', [(name, Queue) for name in Queue.get_all().keys()])  # type: ignore

    return (
        SignalList(*Signal.get_all().values()),
        QueueList(*Queue.get_all().values())
    )


[docs] def load_signals(source: str) -> NamedTuple: ''' Load Signal-entities from file or by web. .. code-block:: python from microagent import load_signals signals_from_file = load_signals('file://signals.json') signals_from_web = load_signals('http://example.com/signals.json') Signals declarations (signals.json). .. code-block:: json { "signals": [ {"name": "started", "providing_args": []}, {"name": "user_created", "providing_args": ["user_id"]}, {"name": "typed_signal", "providing_args": { "uuid": "string", "code": ["number", "null"], "flag": "boolean", "ids": "array" }} ] } ''' return load_stuff(source)[0]
[docs] def load_queues(source: str) -> NamedTuple: ''' Load Queue-entities from file or by web. .. code-block:: python from microagent import load_queues signals_from_file = load_signals('file://queues.json') signals_from_web = load_signals('http://example.com/queues.json') Queues declarations (queues.json). .. code-block:: json { "queues": [ {"name": "mailer"}, {"name": "pusher"}, ] } ''' return load_stuff(source)[1]
[docs] def periodic(period: int | float, timeout: int | float = 1, start_after: int | float = 0 ) -> abc.Callable[[PeriodicFunc], PeriodicFunc]: ''' Run decorated handler periodically. :param period: Period of running functions in seconds :param timeout: Function timeout in seconds :param start_after: Delay for running loop in seconds .. code-block:: python @periodic(period=5) async def handler_1(self): log.info('Called handler 1') @periodic(5, timeout=4) async def handler_2(self): log.info('Called handler 2') @periodic(period=5, start_after=10) async def handler_3(self): log.info('Called handler 3') ''' assert period > MIN_TIMEOUT_SEC, 'period must be more than 0.001 s' assert timeout > MIN_TIMEOUT_SEC, 'timeout must be more than 0.001 s' assert start_after >= 0, 'start_after must be a positive' def _decorator(func: PeriodicFunc) -> PeriodicFunc: PeriodicTask._register[make_bound_key(func)] = PeriodicArgs( period=float(period), timeout=float(timeout), start_after=float(start_after) ) return func return _decorator
[docs] def cron(spec: str, timeout: int | float = 1) -> abc.Callable[[PeriodicFunc], PeriodicFunc]: ''' Run decorated function by schedule (cron) :param spec: Specified running scheduling in cron format :param timeout: Function timeout in seconds .. code-block:: python @periodic('0 */4 * * *') async def handler_1(self): log.info('Called handler 1') @periodic('*/15 * * * *', timeout=10) async def handler_2(self): log.info('Called handler 2') ''' assert timeout > MIN_TIMEOUT_SEC, 'timeout must be more than 0.001 s' def _decorator(func: PeriodicFunc) -> PeriodicFunc: CRONTask._register[make_bound_key(func)] = CRONArgs( cron=cron_parser(spec), timeout=float(timeout) ) return func return _decorator
[docs] def receiver(*signals: Signal, timeout: int = 60) -> abc.Callable[[ReceiverFunc], ReceiverFunc]: ''' Binding for signals receiving. Handler can receive **many** signals, and **many** handlers can receiver same signal. :param signals: List of receiving signals :param timeout: Calling timeout in seconds .. code-block:: python @receiver(signal_1, signal_2) async def handler_1(self, **kwargs): log.info('Called handler 1 %s', kwargs) @receiver(signal_1) async def handle_2(self, **kwargs): log.info('Called handler 2 %s', kwargs) @receiver(signal_2, timeout=30) async def handle_3(self, **kwargs): log.info('Called handler 3 %s', kwargs) ''' def _decorator(func: ReceiverFunc) -> ReceiverFunc: base_key = func.__module__, *func.__qualname__.split('.') for _signal in signals: key = (*base_key[:-1], f'{base_key[-1]}:{_signal.name}') Receiver._register[key] = ReceiverArgs(signal=_signal, timeout=timeout) return func return _decorator
[docs] def consumer(queue: Queue, timeout: int = 60, dto_class: type | None = None, dto_name: str | None = None, **options: Any) -> abc.Callable[[ConsumerFunc], ConsumerFunc]: ''' Binding for consuming messages from queue. Only **one** handler can be bound to **one** queue. :param queue: Queue - source of data :param timeout: Calling timeout in seconds :param dto_class: DTO-class, wrapper for consuming data :param dto_name: DTO name in consumer method kwargs .. code-block:: python @consumer(queue_1) async def handler_1(self, **kwargs): log.info('Called handler 1 %s', kwargs) @consumer(queue_2, timeout=30) async def handle_2(self, **kwargs): log.info('Called handler 2 %s', kwargs) @consumer(queue_3, dto_class=MyDTO) async def handle_3(self, dto: MyDTO, **kwargs): log.info('Called handler 3 %s', dto) # dto = MyDTO(**kwargs) @consumer(queue_4, timeout=30, dto_class=MyDTO, dto_name='obj') async def handle_4(self, obj: MyDTO, **kwargs): log.info('Called handler 4 %s', obj) # obj = MyDTO(**kwargs) ''' def _decorator(func: ConsumerFunc) -> ConsumerFunc: Consumer._register[make_bound_key(func)] = ConsumerArgs( queue=queue, timeout=timeout, dto_class=dto_class, dto_name=dto_name, options=options ) return func return _decorator
[docs] def on(label: str) -> abc.Callable[[HookFunc], HookFunc]: ''' Hooks for internal events *(pre_start, post_start, pre_stop)* or running forever servers *(server)*. Server-function will be call as run-forever asyncio task. :param label: Hook type label string *(pre_start, post_start, pre_stop, server)* .. code-block:: python @on('pre_start') async def handler_1(self): log.info('Called handler 1') @on('post_start') async def handler_2(self): log.info('Called handler 2') @on('pre_stop') async def handler_3(self): log.info('Called handler 3') @on('server') async def run_server(self): await Server().start() # run forever raise ServerInterrupt('Exit') # graceful exit ''' assert label in {'pre_start', 'post_start', 'pre_stop', 'server'}, 'Bad label' def _decorator(func: HookFunc) -> HookFunc: Hook._register[make_bound_key(func)] = HookArgs(label=label) return func return _decorator