Agent examples
user_agent.py
# mypy: ignore-errors
import os
from microagent import MicroAgent, on, periodic, receiver, cron, load_stuff
cur_dir = os.path.dirname(os.path.realpath(__file__))
signals, queues = load_stuff('file://' + os.path.join(cur_dir, 'signals.json'))
class UserAgent(MicroAgent):
@on('pre_start')
def setup(self):
self.log.info('Run ...\n %s', self.info())
@cron('* * * * *', timeout=5)
async def example_cron_send_message(self):
self.log.info('Run cron task')
await self.broker.mailer.send({'text': 'Report text', 'email': 'admin@lwr.pw'})
@periodic(period=15, timeout=10, start_after=3)
async def example_periodic_task_send_signal(self):
self.log.info('Run periodic task')
await self.bus.user_comment.send('user_agent', user_id=1, text='informer text')
@receiver(signals.user_created)
async def example_signal_receiver_send_message(self, signal, sender, **kwargs):
self.log.info('Catch signal %s from %s with %s', signal, sender, kwargs)
await self.broker.mailer.send({'text': 'Welcome text', 'email': 'user@lwr.pw'})
@receiver(signals.user_comment)
async def example_rpc_call(self, **kwargs):
self.log.info('Catch signal %s', kwargs)
value = await self.bus.rpc_comments_count.call('user_agent', user_id=1)
self.log.info('Get value = %s', value)
comment_agent.py
# mypy: ignore-errors
import os
import asyncio
from collections import defaultdict
from microagent import MicroAgent, on, receiver, load_stuff
cur_dir = os.path.dirname(os.path.realpath(__file__))
signals, queues = load_stuff('file://' + os.path.join(cur_dir, 'signals.json'))
class CommentAgent(MicroAgent):
@on('pre_start')
async def setup(self):
self.log.info('Run ...\n %s', self.info())
self.comments_cache = defaultdict(lambda: 0)
self.counter = 0
@receiver(signals.rpc_comments_count)
async def example_rpc_handler(self, user_id, **kwargs):
self.log.info('Catch signal %s', kwargs)
await asyncio.sleep(1)
return self.comments_cache[user_id]
@receiver(signals.user_comment)
async def example_signal_receiver_send_message(self, user_id, **kwargs):
self.log.info('Catch signal %s', kwargs)
self.comments_cache[user_id] += 1
await self.broker.mailer.send({'text': 'Comment', 'email': 'user@lwr.pw'})
await self.broker.mailer.length()
email_agent.py
# mypy: ignore-errors
import os
from microagent import MicroAgent, on, consumer, load_stuff
cur_dir = os.path.dirname(os.path.realpath(__file__))
signals, queues = load_stuff('file://' + os.path.join(cur_dir, 'signals.json'))
class EmailAgent(MicroAgent):
@on('pre_start')
async def setup(self):
self.log.info('Run ...\n %s', self.info())
@consumer(queues.mailer)
async def example_read_queue(self, **kwargs):
self.log.info('Catch emailer %s', kwargs)
signals.json
{
"version": 2,
"signals": [
{"name": "started", "providing_args": []},
{"name": "user_created", "providing_args": ["user_id"]},
{"name": "user_comment", "providing_args": ["user_id"]},
{"name": "rpc_comments_count", "providing_args": ["user_id"]},
{"name": "comment_created", "providing_args": {
"comment_id": ["number", "null"],
"user_id": "number"
}}
],
"queues": [
{"name": "mailer"}
]
}