import json
from dataclasses import dataclass
from types import ModuleType
from typing import TYPE_CHECKING, ClassVar, TypedDict
from .abc import BoundKey, ConsumerFunc
if TYPE_CHECKING:
from .agent import MicroAgent
[docs]
class QueueException(Exception):
''' Base queue exception '''
pass
[docs]
class QueueNotFound(QueueException):
pass
[docs]
class SerializingError(QueueException):
pass
[docs]
@dataclass(frozen=True)
class Queue:
'''
Dataclass (declaration) for a queue entity with a unique name.
Each instance registered at creation.
Usually, you don't need to work directly with the Queue-class.
.. attribute:: name
String, queue name, project-wide unique, `[a-z_]+`
Declaration with config-file (queues.json)
.. code-block:: json
{
"queues": [
{"name": "mailer"},
{"name": "pusher"},
]
}
Manual declaration (not recommended)
.. code-block:: python
some_queue = Queue(
name='some_queue'
)
'''
name: str
_queues: ClassVar[dict[str, 'Queue']] = {}
_jsonlib: ClassVar[ModuleType] = json
def __post_init__(self) -> None:
self._queues[self.name] = self
def __repr__(self) -> str:
return f'<Queue {self.name}>'
def __eq__(self, other: object) -> bool:
if not isinstance(other, Queue):
return NotImplemented
return self.name == other.name
def __hash__(self) -> int:
return id(self)
@classmethod
def set_jsonlib(cls, jsonlib: ModuleType) -> None:
cls._jsonlib = jsonlib
[docs]
@classmethod
def get(cls, name: str) -> 'Queue':
''' Get the queue instance by name '''
try:
return cls._queues[name]
except KeyError as exc:
raise QueueNotFound(f'No such queue {name}') from exc
[docs]
@classmethod
def get_all(cls) -> dict[str, 'Queue']:
''' All registered queues '''
return cls._queues
[docs]
def serialize(self, data: dict) -> str:
'''
Data serializing method
:param data: dict of transfered data
'''
try:
return self._jsonlib.dumps(data)
except (ValueError, TypeError, OverflowError) as exc:
raise SerializingError(exc) from exc
[docs]
def deserialize(self, data: str | bytes) -> dict:
'''
Data deserializing method
:param data: serialized transfered data
'''
try:
return self._jsonlib.loads(data)
except (ValueError, TypeError, OverflowError) as exc:
raise SerializingError(exc) from exc
class ConsumerArgs(TypedDict):
queue: Queue
timeout: float
dto_class: type | None
dto_name: str | None
options: dict
[docs]
@dataclass(frozen=True)
class Consumer:
agent: 'MicroAgent'
handler: ConsumerFunc
queue: Queue
timeout: float
options: dict
dto_class: type | None = None
dto_name: str | None = None
_register: ClassVar[dict[BoundKey, ConsumerArgs]] = {}
def __repr__(self) -> str:
name = getattr(self.handler, '__name__', 'unknown')
return f'<Consumer {name} of {self.agent} for {self.queue}>'