diff --git a/conf/docker-compose.yaml b/conf/docker-compose.yaml index 126fd40..3c4730d 100644 --- a/conf/docker-compose.yaml +++ b/conf/docker-compose.yaml @@ -41,6 +41,22 @@ services: target: /etc/localtime read_only: true network_mode: host + ntfy-router: + container_name: broadcast-ntfy-router + image: broadcast-ntfy-router:latest + restart: unless-stopped + depends_on: + rabbitmq: + condition: service_healthy + environment: + RABBITMQ_HOST: localhost + RABBITMQ_PORT: 10000 + volumes: + - type: bind + source: /etc/localtime + target: /etc/localtime + read_only: true + network_mode: host nginx: container_name: broadcast-nginx image: nginx:1.25.2 diff --git a/install.yaml b/install.yaml index fd78ea1..9799d8c 100644 --- a/install.yaml +++ b/install.yaml @@ -39,6 +39,30 @@ path: '{{docker_build_dir.path}}' state: absent + - name: Create temporary Docker build directory for broadcast-ntfy-router + ansible.builtin.tempfile: + state: directory + register: docker_build_dir + - name: Copy Docker build directory + ansible.builtin.copy: + src: ntfy_router/ + dest: '{{docker_build_dir.path}}' + mode: preserve + - name: Build broadcast-ntfy-router Docker image + ansible.builtin.docker_image: + build: + path: '{{docker_build_dir.path}}' + name: broadcast-ntfy-router + tag: latest + source: build + force_source: true + state: present + - name: Remove temporary Docker build directory + become: true + ansible.builtin.file: + path: '{{docker_build_dir.path}}' + state: absent + - name: Read homelab config ansible.builtin.slurp: src: '{{ansible_user_dir}}/.homelab.json' diff --git a/ntfy_router/Dockerfile b/ntfy_router/Dockerfile new file mode 100644 index 0000000..f4dcab5 --- /dev/null +++ b/ntfy_router/Dockerfile @@ -0,0 +1,12 @@ +FROM python:3.11 + +LABEL author="Ashish D'Souza" + +WORKDIR /code + +ENTRYPOINT ["python3.11", "router.py"] + +COPY requirements.txt . +RUN pip install -r requirements.txt + +COPY src . diff --git a/ntfy_router/requirements.txt b/ntfy_router/requirements.txt new file mode 100644 index 0000000..a183264 --- /dev/null +++ b/ntfy_router/requirements.txt @@ -0,0 +1,3 @@ +aiohttp==3.8.6 +pika==1.3.2 +PyYAML==6.0.1 diff --git a/ntfy_router/src/config.yaml b/ntfy_router/src/config.yaml new file mode 100644 index 0000000..a63e73e --- /dev/null +++ b/ntfy_router/src/config.yaml @@ -0,0 +1,25 @@ +notification: + jenkins: + system: + title: '.*' + message: + message: '.*' + uptime_kuma: + system: + title: '(?<=\[).+(?=\])' + message: + title: '^.+(?=\s\[)' + message: '[^\n]+$' + Dsatp58_alerts: + system: + title: '(?<=\s)[\w.]+' + message: + message: '[^\n]+(?=\n---------)' +call: + call_ashish: + caller: + title: '.*' + user: + topic: '(?<=_)\w+$' + message: + message: '.*' diff --git a/ntfy_router/src/ntfy_subscriber.py b/ntfy_router/src/ntfy_subscriber.py new file mode 100644 index 0000000..014b0c3 --- /dev/null +++ b/ntfy_router/src/ntfy_subscriber.py @@ -0,0 +1,21 @@ +import json +import asyncio +from typing import AsyncGenerator + +import aiohttp + + +class NtfySubscriber: + def __init__(self, topics: list[str]): + self._topics = topics + + def _generate_ntfy_url(self): + return 'https://ntfy.homelab.net/' + ','.join(self._topics) + '/json' + + async def subscribe(self) -> AsyncGenerator[tuple[str, str, str], None]: + async with aiohttp.ClientSession(raise_for_status=True, timeout=aiohttp.ClientTimeout()) as session: + async with session.get(self._generate_ntfy_url(), verify_ssl=False) as response: + async for line in response.content: + msg = json.loads(line.decode()) + if msg['event'] == 'message': + yield msg['topic'], msg['title'], msg['message'] diff --git a/ntfy_router/src/rabbitmq_publisher.py b/ntfy_router/src/rabbitmq_publisher.py new file mode 100644 index 0000000..caf4b1e --- /dev/null +++ b/ntfy_router/src/rabbitmq_publisher.py @@ -0,0 +1,50 @@ +import json +import asyncio + +import pika +from pika.adapters.asyncio_connection import AsyncioConnection + + +class RabbitMQPublisher: + def __init__(self, rabbitmq_host: str, rabbitmq_port: int): + self._connection = AsyncioConnection( + pika.ConnectionParameters(host=rabbitmq_host, port=rabbitmq_port), + on_open_callback=self._on_connection_open, + on_open_error_callback=self._on_connection_open_error, + on_close_callback=self._on_connection_closed, + ) + self._channel = None + + def _on_connection_open(self, connection: AsyncioConnection) -> None: + self._connection.channel(on_open_callback=self._on_channel_open) + + def _on_connection_open_error(self, connection: AsyncioConnection, error: Exception) -> None: + print(f'Failed to open connection') + self._connection.ioloop.stop() + raise error + + def _on_connection_closed(self, connection: AsyncioConnection, reason: Exception) -> None: + print(f'Connection closed') + raise reason + self._connection.ioloop.stop() + + def _on_channel_open(self, channel: pika.channel.Channel) -> None: + self._channel = channel + + def _publish(self, topic: str, data: str) -> None: + self._channel.basic_publish(exchange='broadcast', routing_key=topic, body=data) + + def send_notification(self, system: str, user: str, priority: str, message: str) -> None: + self._publish(f'notification.{system}.{user}.{priority}', message) + + def send_call(self, caller: str, user: str, message: str) -> None: + self._publish(f'call.{caller}.{user}', json.dumps({ + 'ringtone': { + 'filename': 'Magic.mp3', + 'trim': { + 'start_time': '00:01:34.64', + 'end_time': '00:01:41.53' + } + }, + 'message': message + })) diff --git a/ntfy_router/src/router.py b/ntfy_router/src/router.py new file mode 100644 index 0000000..133ea5c --- /dev/null +++ b/ntfy_router/src/router.py @@ -0,0 +1,66 @@ +import os +import re +import asyncio +import functools +from typing import Callable + +import yaml + +from ntfy_subscriber import NtfySubscriber +from rabbitmq_publisher import RabbitMQPublisher + + +class NtfyRouter: + def __init__(self, config_filename: str): + self._formatters = None + self._ntfy_subscriber = None + self._rabbitmq_publisher = None + + with open(config_filename) as config_file: + config = yaml.safe_load(config_file) + self._formatters = self._get_formatters(config) + + def _get_formatters(self, config: dict) -> dict: + def parse_arg(compiled_regex: re.Pattern, text: str) -> str: + return compiled_regex.search(text).group().lower() + + def format_field(parsers: dict[str, Callable], **args) -> str: + return '. '.join(parser(args[arg_name]) for arg_name, parser in parsers.items()) + + return { + broadcast_type: { + topic: { + field_name: functools.partial( + format_field, + { + arg: functools.partial( + parse_arg, + re.compile(regex_pattern) + ) for arg, regex_pattern in config[broadcast_type][topic][field_name].items() + } + ) for field_name in config[broadcast_type][topic] + } for topic in config[broadcast_type] + } for broadcast_type in config + } + + def route_message(self, topic: str, title: str, message: str): + if topic in self._formatters['notification']: + notification_args = {notification_arg: self._formatters['notification'][topic][notification_arg](topic=topic, title=title, message=message) for notification_arg in self._formatters['notification'][topic]} + self._rabbitmq_publisher.send_notification(user='ashish', priority='high', **notification_args) + elif topic in self._call_topics: + call_args = {call_arg: self._formatters['call'][topic][call_arg](topic=topic, title=title, message=message) for call_arg in self._formatters['call'][topic]} + self._rabbitmq_publisher.send_call(user='ashish', **call_args) + + async def start(self, rabbitmq_host: str, rabbitmq_port: int): + self._ntfy_subscriber = NtfySubscriber([topic for message_type in self._formatters for topic in self._formatters[message_type]]) + self._rabbitmq_publisher = RabbitMQPublisher(rabbitmq_host, rabbitmq_port) + + async for topic, title, message in self._ntfy_subscriber.subscribe(): + self.route_message(topic, title, message) + + +if __name__ == '__main__': + ntfy_router = NtfyRouter(os.path.join(os.path.abspath(os.path.dirname(__file__)), 'config.yaml')) + rabbitmq_host = os.environ.get('RABBITMQ_HOST', 'localhost') + rabbitmq_port = int(os.environ.get('RABBITMQ_PORT', '5672')) + asyncio.run(ntfy_router.start(rabbitmq_host, rabbitmq_port)) diff --git a/stop.yaml b/stop.yaml index c697474..5210a07 100644 --- a/stop.yaml +++ b/stop.yaml @@ -12,6 +12,8 @@ mode: preserve - name: Docker Compose down Broadcast + environment: + SERVER_IP: '{{ansible_default_ipv4.address}}' community.docker.docker_compose: project_name: broadcast project_src: '{{docker_compose_dir.path}}'