Rewrite notifier microservice #23

This commit is contained in:
Ashish D'Souza 2025-04-18 20:24:08 -05:00
parent 44b61402b1
commit e5c1560177
20 changed files with 296 additions and 17 deletions

View File

@ -75,9 +75,9 @@ services:
networks:
- wyze-bridge
- frigate
notify:
container_name: frigate-notify
image: frigate-notify:latest
notifier:
container_name: frigate-notifier
image: frigate-notifier:latest
restart: unless-stopped
depends_on:
- frigate

View File

@ -69,25 +69,25 @@
dest: /data/frigate-config/config.yaml
mode: preserve
- name: Create temporary Docker build directory for frigate-notify
- name: Create temporary Docker build directory for frigate-notifier
ansible.builtin.tempfile:
state: directory
register: docker_build_dir
- name: Copy Docker build directory
ansible.builtin.copy:
src: notify/
src: notifier/
dest: '{{docker_build_dir.path}}'
mode: preserve
- name: Build frigate-notify Docker image
- name: Build frigate-notifier Docker image
community.docker.docker_image:
build:
path: '{{docker_build_dir.path}}'
name: frigate-notify
name: frigate-notifier
tag: latest
source: build
force_source: true
state: present
- name: Remove temporary Docker build directory for frigate-notify
- name: Remove temporary Docker build directory for frigate-notifier
ansible.builtin.file:
path: '{{docker_build_dir.path}}'
state: absent

View File

@ -1,8 +1,7 @@
FROM python:3.11
FROM python:3.12
WORKDIR /code
COPY docker_entrypoint.sh /
ENTRYPOINT ["/docker_entrypoint.sh"]
ENTRYPOINT ["python", "-m", "notifier"]
COPY requirements.txt .
RUN pip3 install -r requirements.txt

View File

@ -1,2 +1,3 @@
requests==2.30.0
paho-mqtt==1.6.1
PyYAML==6.0.1

View File

@ -0,0 +1,23 @@
version: 1
disable_existing_loggers: false
formatters:
default:
format: '[{asctime}.{msecs:03.0f}][{levelname}] {name}:{lineno} - {message}'
style: '{'
datefmt: '%Y-%m-%d %H:%M:%S'
handlers:
stdout:
class: logging.StreamHandler
level: INFO
formatter: default
stream: ext://sys.stdout
stderr:
class: logging.StreamHandler
level: ERROR
formatter: default
stream: ext://sys.stderr
root:
level: DEBUG
handlers:
- stdout
- stderr

View File

View File

@ -0,0 +1,34 @@
import datetime as dt
import os
import logging
from typing import Any
from notifier.frigate.config import FrigateConfig, EventType
from notifier.frigate.mqtt import MQTTSubscriber
from notifier.notify.ntfy import NtfyNotification, NtfyNotificationSender
from notifier.utils.logging import configure_logging
configure_logging()
logger = logging.getLogger(__name__)
def on_event(payload: dict[str, Any]) -> None:
event_type = EventType(payload["type"])
camera = payload["after"]["camera"]
object_label = payload["after"]["label"]
new_zones = set(payload["after"]["entered_zones"]) - set(payload["before"]["entered_zones"])
for new_zone in new_zones:
if frigate_config.is_alert(event_type, camera, new_zone, object_label):
event_id = payload["after"]["id"]
score = payload["after"]["top_score"]
notification = NtfyNotification(event_id, camera, object_label, score)
notification_sender.send(notification)
if __name__ == "__main__":
frigate_config = FrigateConfig()
notification_sender = NtfyNotificationSender(quiet_period=dt.timedelta(seconds=3 * 60))
mqtt_subscriber = MQTTSubscriber(on_event, username=os.getenv("MQTT_USERNAME"), password=os.getenv("MQTT_PASSWORD"))
mqtt_subscriber.subscribe()

View File

@ -0,0 +1,53 @@
import json
import logging
from enum import StrEnum
from functools import cache
import requests
logger = logging.getLogger(__name__)
class EventType(StrEnum):
NEW = "new"
UPDATE = "update"
END = "end"
class FrigateConfig:
def __init__(self, base_url: str = "http://frigate:5000") -> None:
self._base_url = base_url
self._camera_zones = {}
def fetch(self) -> None:
response = requests.get(self._base_url)
response.raise_for_status()
config = json.loads(response.content)
self._camera_zones = {
camera_name: {
required_zone: {
object_label
for object_label in camera_config["zones"][required_zone]["objects"]
}
for required_zone in camera_config["review"]["alerts"]["required_zones"]
}
for camera_name, camera_config in config["cameras"].items()
}
logger.info("Fetched Frigate camera zone configuration")
@cache
def is_alert(self, event_type: EventType, camera: str, new_zone: str, object_label: str) -> bool:
if zones := self._camera_zones[camera]:
if (object_labels := zones.get(new_zone)) is None:
# New zone is not a required zone
return False
else:
if object_labels:
# Alert if event has required object label
return object_label in object_labels
else:
# No required object labels
return True
else:
# No required zones, new event is an alert
return event_type == EventType.NEW

View File

@ -0,0 +1,30 @@
import json
import logging
from typing import Any, Callable
from paho.mqtt.client import Client as MQTTClient, MQTTMessage
from paho.mqtt.reasoncodes import ReasonCodes
logger = logging.getLogger(__name__)
class MQTTSubscriber:
def __init__(self, callback: Callable[[dict[str, Any]], None], host: str = "mqtt", port: int = 1883, username: str | None = None, password: str | None = None) -> None:
self._callback = callback
self._host = host
self._port = port
self._client = MQTTClient()
if username and password:
self._client.username_pw_set(username, password=password)
def subscribe(self) -> None:
self._client.connect(host=self._host, port=self._port)
self._client.loop_forever()
def _on_connect(self, client: MQTTClient, userdata: Any, flags: dict[str, Any], rc: ReasonCodes) -> None:
logger.info("Connected with return code {rc}")
client.subscribe("frigate/events")
def _on_message(self, client: MQTTClient, userdata: Any, message: MQTTMessage) -> None:
payload = json.loads(message.payload.decode())
self._callback(payload)

View File

View File

View File

@ -0,0 +1,48 @@
import datetime as dt
import time
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import StrEnum
from functools import cache, cached_property
class MobileOS(StrEnum):
IOS = "ios"
ANDROID = "android"
@dataclass
class Notification:
event_id: str
camera: str
object_label: str
score: float
@cached_property
def camera_display_name(self) -> str:
return " ".join(word.capitalize() for word in self.camera.split("_"))
@cached_property
def camera_url(self) -> str:
return f"https://frigate.homelab.net/cameras/{self.camera}"
@cache
def get_image_url(self, mobile_os: MobileOS = MobileOS.ANDROID) -> str:
return f"https://frigate.homelab.net/api/events/{self.event_id}/thumbnail.jpg?format={mobile_os}"
class NotificationSender[N: Notification](ABC):
def __init__(self, quiet_period: dt.timedelta) -> None:
self._quiet_period = quiet_period
self._camera_to_last_notification_time = {}
@abstractmethod
def _send(self, notification: N) -> None:
...
def send(self, notification: N) -> None:
now = time.time()
last_notification_time = self._camera_to_last_notification_time.setdefault(notification.camera, 0)
if now - last_notification_time >= self._quiet_period.total_seconds():
# Quiet period has passed since the last notification for this camera
self._send(notification)

View File

@ -0,0 +1,79 @@
import datetime as dt
import json
from dataclasses import dataclass
from enum import IntEnum
from typing import Any
import requests
from notifier.notify.notification import Notification, NotificationSender, MobileOS
class NtfyPriority(IntEnum):
MIN = 1
LOW = 2
DEFAULT = 3
HIGH = 4
MAX = 5
@dataclass
class NtfyNotification(Notification):
priority: int = NtfyPriority.DEFAULT
def generate_payload(self, mobile_os: MobileOS, dad: bool = False) -> dict[str, Any]:
match mobile_os, dad:
case _, True:
topic = "frigate_notifications_dad"
case MobileOS.ANDROID, False:
topic = "frigate_notifications"
case MobileOS.IOS, False:
topic = "frigate_notifications_ios"
return {
"topic": topic,
"title": "Frigate",
"message": f"{self.object_label.capitalize()} at {self.camera_display_name} ({self.score:.0%})",
"priority": NtfyPriority.DEFAULT,
"click": self.camera_url,
"icon": "https://cdn.jsdelivr.net/gh/walkxcode/dashboard-icons/png/frigate.png",
"attach": self.get_image_url(mobile_os),
"actions": [
{
"action": "http",
"label": "DBL (30m)" if dad else "Disable (30m)",
"url": f"https://frigate.homelab.net/webcontrol/camera/{self.camera}/detection",
"method": "POST",
"headers": {
"Content-Type": "application/json"
},
"body": json.dumps({
"detection": False,
"duration": 30
}),
"clear": not dad,
},
],
}
class NtfyNotificationSender(NotificationSender[NtfyNotification]):
def __init__(self, quiet_period: dt.timedelta, base_url: str = "https://ntfy.homelab.net") -> None:
super().__init__(quiet_period)
self._base_url = base_url
def _send(self, notification: NtfyNotification) -> None:
# Send to Android topic
payload = notification.generate_payload(MobileOS.ANDROID)
response = requests.post(self._base_url, json=payload)
response.raise_for_status()
# Send to iOS topic
payload = notification.generate_payload(MobileOS.IOS)
response = requests.post(self._base_url, json=payload)
response.raise_for_status()
# Send to Dad topic
payload = notification.generate_payload(MobileOS.ANDROID, dad=True)
response = requests.post(self._base_url, json=payload)
response.raise_for_status()

View File

View File

@ -0,0 +1,12 @@
import logging
import logging.config
import yaml
def configure_logging() -> None:
with open("configs/logging.yaml", "r") as logging_yaml:
logging_config = yaml.safe_load(logging_yaml)
logging.config.dictConfig(logging_config)

View File

@ -1,6 +0,0 @@
#!/bin/bash
set -e
update-ca-certificates
python3 frigate_event_notifier.py "$@"

View File

@ -56,3 +56,9 @@
ansible.builtin.file:
path: '{{docker_compose_dir.path}}'
state: absent
- name: Update CA certificates for frigate-notifier
community.docker.docker_container_exec:
container: frigate-notifier
command: update-ca-certificates
user: root