broadcast/ntfy_router/src/router.py

67 lines
2.9 KiB
Python

import os
import re
import asyncio
import functools
from typing import Callable
import yaml
from ntfy_subscriber import NtfySubscriber
from rabbitmq_publisher import RabbitMQPublisher
class NtfyRouter:
def __init__(self, config_filename: str):
self._formatters = None
self._ntfy_subscriber = None
self._rabbitmq_publisher = None
with open(config_filename) as config_file:
config = yaml.safe_load(config_file)
self._formatters = self._get_formatters(config)
def _get_formatters(self, config: dict) -> dict:
def parse_arg(compiled_regex: re.Pattern, text: str) -> str:
return compiled_regex.search(text).group().lower()
def format_field(parsers: dict[str, Callable], **args) -> str:
return '. '.join(parser(args[arg_name]) for arg_name, parser in parsers.items())
return {
broadcast_type: {
topic: {
field_name: functools.partial(
format_field,
{
arg: functools.partial(
parse_arg,
re.compile(regex_pattern)
) for arg, regex_pattern in config[broadcast_type][topic][field_name].items()
}
) for field_name in config[broadcast_type][topic]
} for topic in config[broadcast_type]
} for broadcast_type in config
}
def route_message(self, topic: str, title: str, message: str):
if topic in self._formatters['notification']:
notification_args = {notification_arg: self._formatters['notification'][topic][notification_arg](topic=topic, title=title, message=message) for notification_arg in self._formatters['notification'][topic]}
self._rabbitmq_publisher.send_notification(user='ashish', priority='high', **notification_args)
elif topic in self._call_topics:
call_args = {call_arg: self._formatters['call'][topic][call_arg](topic=topic, title=title, message=message) for call_arg in self._formatters['call'][topic]}
self._rabbitmq_publisher.send_call(user='ashish', **call_args)
async def start(self, rabbitmq_host: str, rabbitmq_port: int):
self._ntfy_subscriber = NtfySubscriber([topic for message_type in self._formatters for topic in self._formatters[message_type]])
self._rabbitmq_publisher = RabbitMQPublisher(rabbitmq_host, rabbitmq_port)
async for topic, title, message in self._ntfy_subscriber.subscribe():
self.route_message(topic, title, message)
if __name__ == '__main__':
ntfy_router = NtfyRouter(os.path.join(os.path.abspath(os.path.dirname(__file__)), 'config.yaml'))
rabbitmq_host = os.environ.get('RABBITMQ_HOST', 'localhost')
rabbitmq_port = int(os.environ.get('RABBITMQ_PORT', '5672'))
asyncio.run(ntfy_router.start(rabbitmq_host, rabbitmq_port))