Agent examples

user_agent.py

# mypy: ignore-errors
import os
from microagent import MicroAgent, on, periodic, receiver, cron, load_stuff

cur_dir = os.path.dirname(os.path.realpath(__file__))
signals, queues = load_stuff('file://' + os.path.join(cur_dir, 'signals.json'))


class UserAgent(MicroAgent):
    @on('pre_start')
    def setup(self):
        self.log.info('Run ...\n %s', self.info())

    @cron('* * * * *', timeout=5)
    async def example_cron_send_message(self):
        self.log.info('Run cron task')
        await self.broker.mailer.send({'text': 'Report text', 'email': 'admin@lwr.pw'})

    @periodic(period=15, timeout=10, start_after=3)
    async def example_periodic_task_send_signal(self):
        self.log.info('Run periodic task')
        await self.bus.user_comment.send('user_agent', user_id=1, text='informer text')

    @receiver(signals.user_created)
    async def example_signal_receiver_send_message(self, signal, sender, **kwargs):
        self.log.info('Catch signal %s from %s with %s', signal, sender, kwargs)
        await self.broker.mailer.send({'text': 'Welcome text', 'email': 'user@lwr.pw'})

    @receiver(signals.user_comment)
    async def example_rpc_call(self, **kwargs):
        self.log.info('Catch signal %s', kwargs)
        value = await self.bus.rpc_comments_count.call('user_agent', user_id=1)
        self.log.info('Get value = %s', value)

comment_agent.py

# mypy: ignore-errors
import os
import asyncio
from collections import defaultdict
from microagent import MicroAgent, on, receiver, load_stuff

cur_dir = os.path.dirname(os.path.realpath(__file__))
signals, queues = load_stuff('file://' + os.path.join(cur_dir, 'signals.json'))


class CommentAgent(MicroAgent):
    @on('pre_start')
    async def setup(self):
        self.log.info('Run ...\n %s', self.info())
        self.comments_cache = defaultdict(lambda: 0)
        self.counter = 0

    @receiver(signals.rpc_comments_count)
    async def example_rpc_handler(self, user_id, **kwargs):
        self.log.info('Catch signal %s', kwargs)
        await asyncio.sleep(1)
        return self.comments_cache[user_id]

    @receiver(signals.user_comment)
    async def example_signal_receiver_send_message(self, user_id, **kwargs):
        self.log.info('Catch signal %s', kwargs)
        self.comments_cache[user_id] += 1
        await self.broker.mailer.send({'text': 'Comment', 'email': 'user@lwr.pw'})
        await self.broker.mailer.length()

email_agent.py

# mypy: ignore-errors
import os
from microagent import MicroAgent, on, consumer, load_stuff

cur_dir = os.path.dirname(os.path.realpath(__file__))
signals, queues = load_stuff('file://' + os.path.join(cur_dir, 'signals.json'))


class EmailAgent(MicroAgent):
    @on('pre_start')
    async def setup(self):
        self.log.info('Run ...\n %s', self.info())

    @consumer(queues.mailer)
    async def example_read_queue(self, **kwargs):
        self.log.info('Catch emailer %s', kwargs)

signals.json

{
    "version": 2,
    "signals": [
        {"name": "started", "providing_args": []},
        {"name": "user_created", "providing_args": ["user_id"]},
        {"name": "user_comment", "providing_args": ["user_id"]},
        {"name": "rpc_comments_count", "providing_args": ["user_id"]},
        {"name": "comment_created", "providing_args": {
            "comment_id": ["number", "null"],
            "user_id": "number"
        }}
    ],
    "queues": [
        {"name": "mailer"}
    ]
}