diff --git a/ntfy_router/Dockerfile b/ntfy_router/Dockerfile new file mode 100644 index 0000000..f4dcab5 --- /dev/null +++ b/ntfy_router/Dockerfile @@ -0,0 +1,12 @@ +FROM python:3.11 + +LABEL author="Ashish D'Souza" + +WORKDIR /code + +ENTRYPOINT ["python3.11", "router.py"] + +COPY requirements.txt . +RUN pip install -r requirements.txt + +COPY src . diff --git a/ntfy_router/requirements.txt b/ntfy_router/requirements.txt new file mode 100644 index 0000000..a183264 --- /dev/null +++ b/ntfy_router/requirements.txt @@ -0,0 +1,3 @@ +aiohttp==3.8.6 +pika==1.3.2 +PyYAML==6.0.1 diff --git a/ntfy_router/src/config.yaml b/ntfy_router/src/config.yaml new file mode 100644 index 0000000..383afff --- /dev/null +++ b/ntfy_router/src/config.yaml @@ -0,0 +1,26 @@ +notification: + test2: + system: + title: '(?<=\[).+(?=\])' + message: + title: '^.+(?=\s\[)' + message: '[^\n]+$' + jenkins: + system: + title: '.*' + message: + message: '.*' + uptime_kuma: + system: + title: '(?<=\[).+(?=\])' + message: + title: '^.+(?=\s\[)' + message: '[^\n]+$' +call: + call_ashish: + caller: + title: '.*' + user: + topic: '(?<=_)\w+$' + message: + message: '.*' diff --git a/ntfy_router/src/ntfy_subscriber.py b/ntfy_router/src/ntfy_subscriber.py new file mode 100644 index 0000000..014b0c3 --- /dev/null +++ b/ntfy_router/src/ntfy_subscriber.py @@ -0,0 +1,21 @@ +import json +import asyncio +from typing import AsyncGenerator + +import aiohttp + + +class NtfySubscriber: + def __init__(self, topics: list[str]): + self._topics = topics + + def _generate_ntfy_url(self): + return 'https://ntfy.homelab.net/' + ','.join(self._topics) + '/json' + + async def subscribe(self) -> AsyncGenerator[tuple[str, str, str], None]: + async with aiohttp.ClientSession(raise_for_status=True, timeout=aiohttp.ClientTimeout()) as session: + async with session.get(self._generate_ntfy_url(), verify_ssl=False) as response: + async for line in response.content: + msg = json.loads(line.decode()) + if msg['event'] == 'message': + yield msg['topic'], msg['title'], msg['message'] diff --git a/ntfy_router/src/rabbitmq_publisher.py b/ntfy_router/src/rabbitmq_publisher.py new file mode 100644 index 0000000..3c0cc12 --- /dev/null +++ b/ntfy_router/src/rabbitmq_publisher.py @@ -0,0 +1,50 @@ +import json +import asyncio + +import pika +from pika.adapters.asyncio_connection import AsyncioConnection + + +class RabbitMQPublisher: + def __init__(self, rabbitmq_host: str, rabbitmq_port: int): + self._connection = AsyncioConnection( + pika.ConnectionParameters(host=rabbitmq_host, port=rabbitmq_port), + on_open_callback=self._on_connection_open, + on_open_error_callback=self._on_connection_open_error, + on_close_callback=self._on_connection_closed, + ) + self._channel = None + + def _on_connection_open(self, connection: AsyncioConnection) -> None: + self._connection.channel(on_open_callback=self._on_channel_open) + + def _on_connection_open_error(self, connection: AsyncioConnection, error: Exception) -> None: + print(f'Failed to open connection') + self._connection.ioloop.stop() + raise error + + def _on_connection_closed(self, connection: AsyncioConnection, reason: Exception) -> None: + print(f'Connection closed') + raise reason + self._connection.ioloop.stop() + + def _on_channel_open(self, channel: pika.channel.Channel) -> None: + self._channel = channel + + def _publish(self, topic: str, data: str) -> None: + self._channel.basic_publish(exchange='broadcast', routing_key=topic, body=data) + + def send_notification(self, system: str, user: str, priority: str, message: str) -> None: + self._publish(f'notification.{system}.{user}.{priority}', message) + + def send_call() -> None: + self._publish(f'call.{caller}.{user}', json.dumps({ + 'ringtone': { + 'filename': 'Magic.mp3', + 'trim': { + 'start_time': '00:01:34.64', + 'end_time': '00:01:41.53' + } + }, + 'message': message + })) diff --git a/ntfy_router/src/router.py b/ntfy_router/src/router.py new file mode 100644 index 0000000..6fb8c1a --- /dev/null +++ b/ntfy_router/src/router.py @@ -0,0 +1,66 @@ +import os +import re +import asyncio +import functools +from typing import Callable + +import yaml + +from ntfy_subscriber import NtfySubscriber +from rabbitmq_publisher import RabbitMQPublisher + + +class NtfyRouter: + def __init__(self, config_filename: str): + self._formatters = None + self._ntfy_subscriber = None + self._rabbitmq_publisher = None + + with open(config_filename) as config_file: + config = yaml.safe_load(config_file) + self._formatters = self._get_formatters(config) + + def _get_formatters(self, config: dict) -> dict: + def parse_arg(compiled_regex: re.Pattern, text: str) -> str: + return compiled_regex.search(text).group().lower() + + def format_field(parsers: dict[str, Callable], **args) -> str: + return '. '.join(parser(args[arg_name]) for arg_name, parser in parsers.items()) + + return { + broadcast_type: { + topic: { + field_name: functools.partial( + format_field, + { + arg: functools.partial( + parse_arg, + re.compile(regex_pattern) + ) for arg, regex_pattern in config[broadcast_type][topic][field_name].items() + } + ) for field_name in config[broadcast_type][topic] + } for topic in config[broadcast_type] + } for broadcast_type in config + } + + def route_message(self, topic: str, title: str, message: str): + if topic in self._formatters['notification']: + notification_args = {notification_arg: self._formatters['notification'][topic][output](topic=topic, title=title, message=message) for notification_arg in self._formatters['notification'][topic]} + self._send_notification(user='ashish', priority='high', **notification_args) + elif topic in self._call_topics: + call_args = {call_arg: self._formatters['call'][topic][output](topic=topic, title=title, message=message) for call_arg in self._formatters['call'][topic]} + self._send_call(user='ashish', **call_args) + + async def start(self, rabbitmq_host: str, rabbitmq_port: int): + self._ntfy_subscriber = NtfySubscriber([topic for message_type in self._formatters for topic in self._formatters[message_type]]) + self._rabbitmq_publisher = RabbitMQPublisher(rabbitmq_host, rabbitmq_port) + + async for topic, title, message in self._ntfy_subscriber.subscribe(): + self.route_message(topic, title, message) + + +if __name__ == '__main__': + ntfy_router = NtfyRouter(os.path.join(os.path.abspath(os.path.dirname(__file__)), 'config.yaml')) + rabbitmq_host = os.environ.get('RABBITMQ_HOST', 'localhost') + rabbitmq_port = int(os.environ.get('RABBITMQ_PORT', '5672')) + asyncio.run(ntfy_router.start(rabbitmq_host, rabbitmq_port))