Agent examples
signals.json
{
"version": 2,
"signals": [
{"name": "started", "providing_args": []},
{"name": "user_created", "providing_args": ["user_id"]},
{"name": "user_comment", "providing_args": ["user_id"]},
{"name": "rpc_comments_count", "providing_args": ["user_id"]},
{"name": "comment_created", "providing_args": {
"comment_id": ["number", "null"],
"user_id": "number"
}}
],
"queues": [
{"name": "mailer"},
{"name": "android", "exchange": "push3", "topics": ["*.android"]},
{"name": "ios", "exchange": "push3", "topics": ["*.ios"]},
{"name": "android_a", "exchange": "push"},
{"name": "ios_a", "exchange": "push"}
]
}
user_agent.py
# mypy: ignore-errors
import os
from microagent import MicroAgent, cron, load_stuff, on, periodic, receiver
cur_dir = os.path.dirname(os.path.realpath(__file__))
signals, queues = load_stuff('file://' + os.path.join(cur_dir, 'signals.json'))
class UserAgent(MicroAgent):
@on('pre_start')
def setup(self):
self.log.info('Run ...\n %s', self.info())
@cron('* * * * *', timeout=5)
async def example_cron_send_message(self):
self.log.info('Run cron task')
await self.broker.mailer.send({'text': 'Report text', 'email': 'admin@lwr.pw'})
@periodic(period=15, timeout=10, start_after=3)
async def example_periodic_task_send_signal(self):
self.log.info('Run periodic task')
await self.bus.user_comment.send('user_agent', user_id=1, text='informer text')
@receiver(signals.user_created)
async def example_signal_receiver_send_message(self, signal, sender, **kwargs):
self.log.info('Catch signal %s from %s with %s', signal, sender, kwargs)
await self.broker.mailer.send({'text': 'Welcome text', 'email': 'user@lwr.pw'})
@receiver(signals.user_comment)
async def example_rpc_call(self, **kwargs):
self.log.info('Catch signal %s', kwargs)
value = await self.bus.rpc_comments_count.call('user_agent', user_id=1)
self.log.info('Get value = %s', value)
comment_agent.py
# mypy: ignore-errors
import asyncio
import os
from collections import defaultdict
from microagent import MicroAgent, load_stuff, on, receiver
cur_dir = os.path.dirname(os.path.realpath(__file__))
signals, queues = load_stuff('file://' + os.path.join(cur_dir, 'signals.json'))
class CommentAgent(MicroAgent):
@on('pre_start')
async def setup(self):
self.log.info('Run ...\n %s', self.info())
self.comments_cache = defaultdict(lambda: 0)
self.counter = 0
@receiver(signals.rpc_comments_count)
async def example_rpc_handler(self, user_id, **kwargs):
self.log.info('Catch signal %s', kwargs)
await asyncio.sleep(1)
return self.comments_cache[user_id]
@receiver(signals.user_comment)
async def example_signal_receiver_send_message(self, user_id, **kwargs):
self.log.info('Catch signal %s', kwargs)
self.comments_cache[user_id] += 1
await self.broker.mailer.send({'text': 'Comment', 'email': 'user@lwr.pw'})
await self.broker.mailer.length()
email_agent.py
# mypy: ignore-errors
import os
from microagent import MicroAgent, consumer, load_stuff, on
cur_dir = os.path.dirname(os.path.realpath(__file__))
signals, queues = load_stuff('file://' + os.path.join(cur_dir, 'signals.json'))
class EmailAgent(MicroAgent):
@on('pre_start')
async def setup(self):
self.log.info('Run ...\n %s', self.info())
@consumer(queues.mailer)
async def example_read_queue(self, **kwargs):
self.log.info('Catch emailer %s', kwargs)
push_agent.py
# mypy: ignore-errors
import asyncio
import logging
import os
import sys
from microagent import MicroAgent, consumer, load_stuff, on, periodic
from microagent.tools import amqp, redis
logging.basicConfig(format=(
'%(levelname)-8s [pid#%(process)d] %(asctime)s %(name)s '
'%(filename)s:%(lineno)d %(message)s'
), stream=sys.stdout, level=logging.INFO)
cur_dir = os.path.dirname(os.path.realpath(__file__))
signals, queues = load_stuff('file://' + os.path.join(cur_dir, 'signals.json'))
class UserAgent(MicroAgent):
@on('pre_start')
def setup(self):
self.log.info('Run ...\n %s', self.info())
@periodic(period=15, timeout=10, start_after=3)
async def example_periodic_task_send_push(self):
self.log.info('Run periodic task')
await self.broker.push3.send({'text': 'informer text'}, topic='msg.ios')
await self.broker.push.send({'text': 'informer 1'})
@consumer(queues.android)
async def example_read_queue_android(self, **kwargs):
self.log.info('Catch android %s', kwargs)
@consumer(queues.ios)
async def example_read_queue_ios(self, **kwargs):
self.log.info('Catch ios %s', kwargs)
@consumer(queues.android_a)
async def example_read_queue_android_a(self, **kwargs):
self.log.info('Catch android_a %s', kwargs)
@consumer(queues.ios_a)
async def example_read_queue_ios_a(self, **kwargs):
self.log.info('Catch ios_a %s', kwargs)
async def main():
bus = redis.RedisSignalBus('redis://localhost/7')
broker = amqp.AMQPBroker('amqp://guest:guest@localhost:5672/')
await broker.push3.send({'q':1}, topic='msg.android')
agent = UserAgent(bus=bus, broker=broker)
await agent.start()
while True:
await asyncio.sleep(1)
asyncio.run(main())