'''
The MicroAgent method can be run periodically after a certain period of time or
on a schedule (cron).
Periodic calls are implemented with *asyncio.call_later* chains.
Before each method call, the next call is initiated.
Each call is independent, and previous calls do not affect subsequent calls.
Exceptions are written to the logger in the associated Microagent.
.. code-block:: python
class Agent(MicroAgent):
@periodic(period=3, timeout=10, start_after=2) # in seconds
async def periodic_handler(self):
pass # code here
@cron('*/10 * * * *', timeout=10) # in seconds
async def cron_handler(self):
pass # code here
'''
import asyncio
import inspect
import re
import time
from collections.abc import Callable
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from typing import TYPE_CHECKING, ClassVar, NamedTuple, TypedDict
from .abc import BoundKey, PeriodicFunc
if TYPE_CHECKING:
from .agent import MicroAgent
RANGES = ((0, 59), (0, 23), (1, 31), (1, 12), (0, 7))
DAYS = (31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31)
MAX_DIFF = 2 * 365 * 24 * 60 * 60
class CRON(NamedTuple):
spec: str
minutes: list[int] # 0-59
hours: list[int] # 0-23
days: list[int] # 1-31
months: list[int] # 1-12
weekdays: list[int] # 0-7
def next(self) -> datetime: # noqa A003
return next_moment(self, datetime.now(tz=timezone.utc))
def __str__(self) -> str:
return f'[{self.spec}]'
class PeriodicState(TypedDict, total=False):
timer_handle: asyncio.TimerHandle
running_task: asyncio.Task
class PeriodicMixin:
agent: 'MicroAgent'
handler: Callable
timeout: float
_state: PeriodicState
async def call(self) -> None:
try:
response = self.handler()
if inspect.isawaitable(response):
try:
await asyncio.wait_for(response, self.timeout)
except asyncio.TimeoutError:
self.agent.log.exception(f'TimeoutError: {self}')
except Exception as exc:
self.agent.log.exception(f'Periodic Exception: {exc}')
def start(self, start_after: float) -> None:
self._state['timer_handle'] = asyncio.get_running_loop().call_later(
start_after, _periodic, self) # type: ignore[arg-type]
def cancel(self) -> None:
if (timer_handle := self._state.get('timer_handle')) is not None:
timer_handle.cancel()
if (running_task := self._state.get('running_task')) is not None and not running_task.done():
running_task.cancel()
class PeriodicArgs(TypedDict):
period: float
timeout: float
start_after: float
[docs]
@dataclass(frozen=True)
class PeriodicTask(PeriodicMixin):
agent: 'MicroAgent'
handler: PeriodicFunc
period: float
timeout: float
start_after: float
_state: PeriodicState = field(default_factory=PeriodicState)
_register: ClassVar[dict[BoundKey, PeriodicArgs]] = {}
def __repr__(self) -> str:
return f'<PeriodicTask {self.handler.__name__} of {self.agent} every {self.period} sec>'
class CRONArgs(TypedDict):
cron: CRON
timeout: float
[docs]
@dataclass(frozen=True)
class CRONTask(PeriodicMixin):
agent: 'MicroAgent'
handler: PeriodicFunc
cron: CRON
timeout: float
_state: PeriodicState = field(default_factory=PeriodicState)
_register: ClassVar[dict[BoundKey, CRONArgs]] = {}
def __repr__(self) -> str:
return f'<CRONTask {self.handler.__name__} of {self.agent} every {self.cron}>'
@property
def start_after(self) -> float:
'''
*start_after* property of **CRONTask** object is a next value of
generator behind facade. Be carefully with manual manipulation with it.
'''
return self.cron.next().timestamp() - time.time() # initial delay
@property
def period(self) -> float:
return self.cron.next().timestamp() - time.time() # next step delay
def _periodic(task: PeriodicTask | CRONTask) -> asyncio.Task:
task._state['timer_handle'] = asyncio.get_running_loop().call_later(task.period, _periodic, task)
running = asyncio.create_task(task.call())
task._state['running_task'] = running
return running
def cron_parser(spec: str) -> CRON:
'''
* * * * *
'''
values = []
norm_spec = (
re.sub( # * -> 0-23/1
r'^\*$',
r'%d-%d/1' % rng,
re.sub( # */2 -> 0-23/2
r'^\*(\/.+)$',
r'%d-%d\1' % rng,
re.sub( # 5-20 -> 5-20/1
r'^(\d+-\d+)$',
r'\1/1',
val
)
)
)
for rng, val in zip(RANGES, spec.split(), strict=True)
)
for i, val in enumerate(norm_spec):
match = re.search(r'^([^-]+)-([^-/]+)/(\d+)?$', val)
if match: # 0-23/5 -> [0, 5, 10, 15, 20]
_min, _max, _step = map(int, match.groups())
if i in {2, 3} and _min == 1:
values.append([x for x in range(_min - 1, _max + 1) if x and not x % _step])
else:
values.append([x for x in range(_min, _max + 1) if not x % _step])
else: # 4,7,12 -> [4, 7, 12]
values.append([int(x) for x in val.split(',')])
return CRON(
spec=spec,
minutes=values[0],
hours=values[1],
days=values[2],
months=values[3],
weekdays=values[4]
)
def next_moment(cron: CRON, now: datetime) -> datetime:
if abs((datetime.now(tz=timezone.utc) - now).total_seconds()) > MAX_DIFF:
raise ValueError
if now.second or now.microsecond:
# if moment passed several seconds ago, go to next minute
now += timedelta(minutes=1)
now = datetime(year=now.year, month=now.month, day=now.day,
hour=now.hour, minute=now.minute, tzinfo=timezone.utc)
if now.month not in cron.months:
if now.month == 12: # noqa PLR2004
now = datetime(year=now.year + 1, month=1, day=1, tzinfo=timezone.utc)
else:
now = datetime(year=now.year, month=now.month + 1, day=1, tzinfo=timezone.utc)
return next_moment(cron, now)
# calculating whether a task needs to be started on that day
days_all = set(cron.days) == set(range(1, 32)) # run every calendar day
wdays_all = set(cron.weekdays) >= set(range(7)) # run every week day
day_ok = now.day in cron.days # run today
wday_ok = now.weekday() in cron.weekdays # run today
if not days_all and not wdays_all:
ok = day_ok or wday_ok # POSIX: both restricted → OR
elif not days_all:
ok = day_ok
elif not wdays_all:
ok = wday_ok
else: # if run every day
ok = True
if not ok:
now += timedelta(days=1)
now = datetime(year=now.year, month=now.month, day=now.day, tzinfo=timezone.utc)
return next_moment(cron, now)
if now.hour not in cron.hours:
now += timedelta(hours=1)
now = datetime(year=now.year, month=now.month, day=now.day, hour=now.hour,
tzinfo=timezone.utc)
return next_moment(cron, now)
if now.minute not in cron.minutes:
now += timedelta(minutes=1)
return next_moment(cron, now)
return datetime(year=now.year, month=now.month, day=now.day,
hour=now.hour, minute=now.minute, tzinfo=timezone.utc)