Agent examples

signals.json

{
    "version": 2,
    "signals": [
        {"name": "started", "providing_args": []},
        {"name": "user_created", "providing_args": ["user_id"]},
        {"name": "user_comment", "providing_args": ["user_id"]},
        {"name": "rpc_comments_count", "providing_args": ["user_id"]},
        {"name": "comment_created", "providing_args": {
            "comment_id": ["number", "null"],
            "user_id": "number"
        }}
    ],
    "queues": [
        {"name": "mailer"},
        {"name": "android", "exchange": "push3", "topics": ["*.android"]},
        {"name": "ios", "exchange": "push3", "topics": ["*.ios"]},
        {"name": "android_a", "exchange": "push"},
        {"name": "ios_a", "exchange": "push"}
    ]
}

user_agent.py

# mypy: ignore-errors
import os

from microagent import MicroAgent, cron, load_stuff, on, periodic, receiver

cur_dir = os.path.dirname(os.path.realpath(__file__))
signals, queues = load_stuff('file://' + os.path.join(cur_dir, 'signals.json'))


class UserAgent(MicroAgent):
    @on('pre_start')
    def setup(self):
        self.log.info('Run ...\n %s', self.info())

    @cron('* * * * *', timeout=5)
    async def example_cron_send_message(self):
        self.log.info('Run cron task')
        await self.broker.mailer.send({'text': 'Report text', 'email': 'admin@lwr.pw'})

    @periodic(period=15, timeout=10, start_after=3)
    async def example_periodic_task_send_signal(self):
        self.log.info('Run periodic task')
        await self.bus.user_comment.send('user_agent', user_id=1, text='informer text')

    @receiver(signals.user_created)
    async def example_signal_receiver_send_message(self, signal, sender, **kwargs):
        self.log.info('Catch signal %s from %s with %s', signal, sender, kwargs)
        await self.broker.mailer.send({'text': 'Welcome text', 'email': 'user@lwr.pw'})

    @receiver(signals.user_comment)
    async def example_rpc_call(self, **kwargs):
        self.log.info('Catch signal %s', kwargs)
        value = await self.bus.rpc_comments_count.call('user_agent', user_id=1)
        self.log.info('Get value = %s', value)

comment_agent.py

# mypy: ignore-errors
import asyncio
import os
from collections import defaultdict

from microagent import MicroAgent, load_stuff, on, receiver

cur_dir = os.path.dirname(os.path.realpath(__file__))
signals, queues = load_stuff('file://' + os.path.join(cur_dir, 'signals.json'))


class CommentAgent(MicroAgent):
    @on('pre_start')
    async def setup(self):
        self.log.info('Run ...\n %s', self.info())
        self.comments_cache = defaultdict(lambda: 0)
        self.counter = 0

    @receiver(signals.rpc_comments_count)
    async def example_rpc_handler(self, user_id, **kwargs):
        self.log.info('Catch signal %s', kwargs)
        await asyncio.sleep(1)
        return self.comments_cache[user_id]

    @receiver(signals.user_comment)
    async def example_signal_receiver_send_message(self, user_id, **kwargs):
        self.log.info('Catch signal %s', kwargs)
        self.comments_cache[user_id] += 1
        await self.broker.mailer.send({'text': 'Comment', 'email': 'user@lwr.pw'})
        await self.broker.mailer.length()

email_agent.py

# mypy: ignore-errors
import os

from microagent import MicroAgent, consumer, load_stuff, on

cur_dir = os.path.dirname(os.path.realpath(__file__))
signals, queues = load_stuff('file://' + os.path.join(cur_dir, 'signals.json'))


class EmailAgent(MicroAgent):
    @on('pre_start')
    async def setup(self):
        self.log.info('Run ...\n %s', self.info())

    @consumer(queues.mailer)
    async def example_read_queue(self, **kwargs):
        self.log.info('Catch emailer %s', kwargs)

push_agent.py

# mypy: ignore-errors
import asyncio
import logging
import os
import sys

from microagent import MicroAgent, consumer, load_stuff, on, periodic
from microagent.tools import amqp, redis

logging.basicConfig(format=(
    '%(levelname)-8s [pid#%(process)d] %(asctime)s %(name)s '
    '%(filename)s:%(lineno)d %(message)s'
), stream=sys.stdout, level=logging.INFO)

cur_dir = os.path.dirname(os.path.realpath(__file__))
signals, queues = load_stuff('file://' + os.path.join(cur_dir, 'signals.json'))


class UserAgent(MicroAgent):
    @on('pre_start')
    def setup(self):
        self.log.info('Run ...\n %s', self.info())

    @periodic(period=15, timeout=10, start_after=3)
    async def example_periodic_task_send_push(self):
        self.log.info('Run periodic task')
        await self.broker.push3.send({'text': 'informer text'}, topic='msg.ios')
        await self.broker.push.send({'text': 'informer 1'})

    @consumer(queues.android)
    async def example_read_queue_android(self, **kwargs):
        self.log.info('Catch android %s', kwargs)

    @consumer(queues.ios)
    async def example_read_queue_ios(self, **kwargs):
        self.log.info('Catch ios %s', kwargs)

    @consumer(queues.android_a)
    async def example_read_queue_android_a(self, **kwargs):
        self.log.info('Catch android_a %s', kwargs)

    @consumer(queues.ios_a)
    async def example_read_queue_ios_a(self, **kwargs):
        self.log.info('Catch ios_a %s', kwargs)


async def main():
    bus = redis.RedisSignalBus('redis://localhost/7')
    broker = amqp.AMQPBroker('amqp://guest:guest@localhost:5672/')
    await broker.push3.send({'q':1}, topic='msg.android')

    agent = UserAgent(bus=bus, broker=broker)
    await agent.start()

    while True:
        await asyncio.sleep(1)


asyncio.run(main())