Added ntfy router #2

Squashed commit of the following:

commit 3ba0c9b4c6
Author: Ashish D'Souza <sudouser512@gmail.com>
Date:   Sun Nov 19 05:21:27 2023 -0600

    Another bugfix

commit 8a76bddbf1
Author: Ashish D'Souza <sudouser512@gmail.com>
Date:   Sun Nov 19 05:12:40 2023 -0600

    Added ntfy router to ansible scripts

commit 7b51846e1b
Author: Ashish D'Souza <sudouser512@gmail.com>
Date:   Sun Nov 19 05:12:14 2023 -0600

    Fixed inconsistent variable names bug

commit a96a14e261
Author: Ashish D'Souza <sudouser512@gmail.com>
Date:   Sun Nov 19 04:53:09 2023 -0600

    Added ntfy router to docker compose

commit d2f30faa5e
Author: Ashish D'Souza <sudouser512@gmail.com>
Date:   Sun Nov 19 04:52:07 2023 -0600

    Added ntfy routing
This commit is contained in:
Ashish D'Souza 2023-11-19 06:17:01 -06:00
parent 1640a7a590
commit a14ae8581a
9 changed files with 219 additions and 0 deletions

View File

@ -41,6 +41,22 @@ services:
target: /etc/localtime
read_only: true
network_mode: host
ntfy-router:
container_name: broadcast-ntfy-router
image: broadcast-ntfy-router:latest
restart: unless-stopped
depends_on:
rabbitmq:
condition: service_healthy
environment:
RABBITMQ_HOST: localhost
RABBITMQ_PORT: 10000
volumes:
- type: bind
source: /etc/localtime
target: /etc/localtime
read_only: true
network_mode: host
nginx:
container_name: broadcast-nginx
image: nginx:1.25.2

View File

@ -39,6 +39,30 @@
path: '{{docker_build_dir.path}}'
state: absent
- name: Create temporary Docker build directory for broadcast-ntfy-router
ansible.builtin.tempfile:
state: directory
register: docker_build_dir
- name: Copy Docker build directory
ansible.builtin.copy:
src: ntfy_router/
dest: '{{docker_build_dir.path}}'
mode: preserve
- name: Build broadcast-ntfy-router Docker image
ansible.builtin.docker_image:
build:
path: '{{docker_build_dir.path}}'
name: broadcast-ntfy-router
tag: latest
source: build
force_source: true
state: present
- name: Remove temporary Docker build directory
become: true
ansible.builtin.file:
path: '{{docker_build_dir.path}}'
state: absent
- name: Read homelab config
ansible.builtin.slurp:
src: '{{ansible_user_dir}}/.homelab.json'

12
ntfy_router/Dockerfile Normal file
View File

@ -0,0 +1,12 @@
FROM python:3.11
LABEL author="Ashish D'Souza"
WORKDIR /code
ENTRYPOINT ["python3.11", "router.py"]
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY src .

View File

@ -0,0 +1,3 @@
aiohttp==3.8.6
pika==1.3.2
PyYAML==6.0.1

View File

@ -0,0 +1,25 @@
notification:
jenkins:
system:
title: '.*'
message:
message: '.*'
uptime_kuma:
system:
title: '(?<=\[).+(?=\])'
message:
title: '^.+(?=\s\[)'
message: '[^\n]+$'
Dsatp58_alerts:
system:
title: '(?<=\s)[\w.]+'
message:
message: '[^\n]+(?=\n---------)'
call:
call_ashish:
caller:
title: '.*'
user:
topic: '(?<=_)\w+$'
message:
message: '.*'

View File

@ -0,0 +1,21 @@
import json
import asyncio
from typing import AsyncGenerator
import aiohttp
class NtfySubscriber:
def __init__(self, topics: list[str]):
self._topics = topics
def _generate_ntfy_url(self):
return 'https://ntfy.homelab.net/' + ','.join(self._topics) + '/json'
async def subscribe(self) -> AsyncGenerator[tuple[str, str, str], None]:
async with aiohttp.ClientSession(raise_for_status=True, timeout=aiohttp.ClientTimeout()) as session:
async with session.get(self._generate_ntfy_url(), verify_ssl=False) as response:
async for line in response.content:
msg = json.loads(line.decode())
if msg['event'] == 'message':
yield msg['topic'], msg['title'], msg['message']

View File

@ -0,0 +1,50 @@
import json
import asyncio
import pika
from pika.adapters.asyncio_connection import AsyncioConnection
class RabbitMQPublisher:
def __init__(self, rabbitmq_host: str, rabbitmq_port: int):
self._connection = AsyncioConnection(
pika.ConnectionParameters(host=rabbitmq_host, port=rabbitmq_port),
on_open_callback=self._on_connection_open,
on_open_error_callback=self._on_connection_open_error,
on_close_callback=self._on_connection_closed,
)
self._channel = None
def _on_connection_open(self, connection: AsyncioConnection) -> None:
self._connection.channel(on_open_callback=self._on_channel_open)
def _on_connection_open_error(self, connection: AsyncioConnection, error: Exception) -> None:
print(f'Failed to open connection')
self._connection.ioloop.stop()
raise error
def _on_connection_closed(self, connection: AsyncioConnection, reason: Exception) -> None:
print(f'Connection closed')
raise reason
self._connection.ioloop.stop()
def _on_channel_open(self, channel: pika.channel.Channel) -> None:
self._channel = channel
def _publish(self, topic: str, data: str) -> None:
self._channel.basic_publish(exchange='broadcast', routing_key=topic, body=data)
def send_notification(self, system: str, user: str, priority: str, message: str) -> None:
self._publish(f'notification.{system}.{user}.{priority}', message)
def send_call(self, caller: str, user: str, message: str) -> None:
self._publish(f'call.{caller}.{user}', json.dumps({
'ringtone': {
'filename': 'Magic.mp3',
'trim': {
'start_time': '00:01:34.64',
'end_time': '00:01:41.53'
}
},
'message': message
}))

66
ntfy_router/src/router.py Normal file
View File

@ -0,0 +1,66 @@
import os
import re
import asyncio
import functools
from typing import Callable
import yaml
from ntfy_subscriber import NtfySubscriber
from rabbitmq_publisher import RabbitMQPublisher
class NtfyRouter:
def __init__(self, config_filename: str):
self._formatters = None
self._ntfy_subscriber = None
self._rabbitmq_publisher = None
with open(config_filename) as config_file:
config = yaml.safe_load(config_file)
self._formatters = self._get_formatters(config)
def _get_formatters(self, config: dict) -> dict:
def parse_arg(compiled_regex: re.Pattern, text: str) -> str:
return compiled_regex.search(text).group().lower()
def format_field(parsers: dict[str, Callable], **args) -> str:
return '. '.join(parser(args[arg_name]) for arg_name, parser in parsers.items())
return {
broadcast_type: {
topic: {
field_name: functools.partial(
format_field,
{
arg: functools.partial(
parse_arg,
re.compile(regex_pattern)
) for arg, regex_pattern in config[broadcast_type][topic][field_name].items()
}
) for field_name in config[broadcast_type][topic]
} for topic in config[broadcast_type]
} for broadcast_type in config
}
def route_message(self, topic: str, title: str, message: str):
if topic in self._formatters['notification']:
notification_args = {notification_arg: self._formatters['notification'][topic][notification_arg](topic=topic, title=title, message=message) for notification_arg in self._formatters['notification'][topic]}
self._rabbitmq_publisher.send_notification(user='ashish', priority='high', **notification_args)
elif topic in self._call_topics:
call_args = {call_arg: self._formatters['call'][topic][call_arg](topic=topic, title=title, message=message) for call_arg in self._formatters['call'][topic]}
self._rabbitmq_publisher.send_call(user='ashish', **call_args)
async def start(self, rabbitmq_host: str, rabbitmq_port: int):
self._ntfy_subscriber = NtfySubscriber([topic for message_type in self._formatters for topic in self._formatters[message_type]])
self._rabbitmq_publisher = RabbitMQPublisher(rabbitmq_host, rabbitmq_port)
async for topic, title, message in self._ntfy_subscriber.subscribe():
self.route_message(topic, title, message)
if __name__ == '__main__':
ntfy_router = NtfyRouter(os.path.join(os.path.abspath(os.path.dirname(__file__)), 'config.yaml'))
rabbitmq_host = os.environ.get('RABBITMQ_HOST', 'localhost')
rabbitmq_port = int(os.environ.get('RABBITMQ_PORT', '5672'))
asyncio.run(ntfy_router.start(rabbitmq_host, rabbitmq_port))

View File

@ -12,6 +12,8 @@
mode: preserve
- name: Docker Compose down Broadcast
environment:
SERVER_IP: '{{ansible_default_ipv4.address}}'
community.docker.docker_compose:
project_name: broadcast
project_src: '{{docker_compose_dir.path}}'