Rewrite notifier microservice #23
This commit is contained in:
parent
44b61402b1
commit
749f6fcf4f
|
@ -75,9 +75,9 @@ services:
|
|||
networks:
|
||||
- wyze-bridge
|
||||
- frigate
|
||||
notify:
|
||||
container_name: frigate-notify
|
||||
image: frigate-notify:latest
|
||||
notifier:
|
||||
container_name: frigate-notifier
|
||||
image: frigate-notifier:latest
|
||||
restart: unless-stopped
|
||||
depends_on:
|
||||
- frigate
|
||||
|
|
10
install.yaml
10
install.yaml
|
@ -69,25 +69,25 @@
|
|||
dest: /data/frigate-config/config.yaml
|
||||
mode: preserve
|
||||
|
||||
- name: Create temporary Docker build directory for frigate-notify
|
||||
- name: Create temporary Docker build directory for frigate-notifier
|
||||
ansible.builtin.tempfile:
|
||||
state: directory
|
||||
register: docker_build_dir
|
||||
- name: Copy Docker build directory
|
||||
ansible.builtin.copy:
|
||||
src: notify/
|
||||
src: notifier/
|
||||
dest: '{{docker_build_dir.path}}'
|
||||
mode: preserve
|
||||
- name: Build frigate-notify Docker image
|
||||
- name: Build frigate-notifier Docker image
|
||||
community.docker.docker_image:
|
||||
build:
|
||||
path: '{{docker_build_dir.path}}'
|
||||
name: frigate-notify
|
||||
name: frigate-notifier
|
||||
tag: latest
|
||||
source: build
|
||||
force_source: true
|
||||
state: present
|
||||
- name: Remove temporary Docker build directory for frigate-notify
|
||||
- name: Remove temporary Docker build directory for frigate-notifier
|
||||
ansible.builtin.file:
|
||||
path: '{{docker_build_dir.path}}'
|
||||
state: absent
|
||||
|
|
|
@ -1,8 +1,7 @@
|
|||
FROM python:3.11
|
||||
FROM python:3.12
|
||||
WORKDIR /code
|
||||
|
||||
COPY docker_entrypoint.sh /
|
||||
ENTRYPOINT ["/docker_entrypoint.sh"]
|
||||
ENTRYPOINT ["python", "-m", "notifier"]
|
||||
|
||||
COPY requirements.txt .
|
||||
RUN pip3 install -r requirements.txt
|
|
@ -1,2 +1,3 @@
|
|||
requests==2.30.0
|
||||
paho-mqtt==1.6.1
|
||||
PyYAML==6.0.1
|
|
@ -0,0 +1,23 @@
|
|||
version: 1
|
||||
disable_existing_loggers: false
|
||||
formatters:
|
||||
default:
|
||||
format: '[{asctime}.{msecs:03.0f}][{levelname}] {name}:{lineno} - {message}'
|
||||
style: '{'
|
||||
datefmt: '%Y-%m-%d %H:%M:%S'
|
||||
handlers:
|
||||
stdout:
|
||||
class: logging.StreamHandler
|
||||
level: INFO
|
||||
formatter: default
|
||||
stream: ext://sys.stdout
|
||||
stderr:
|
||||
class: logging.StreamHandler
|
||||
level: ERROR
|
||||
formatter: default
|
||||
stream: ext://sys.stderr
|
||||
root:
|
||||
level: DEBUG
|
||||
handlers:
|
||||
- stdout
|
||||
- stderr
|
|
@ -0,0 +1,34 @@
|
|||
import datetime as dt
|
||||
import os
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
from notifier.frigate.config import FrigateConfig, EventType
|
||||
from notifier.frigate.mqtt import MQTTSubscriber
|
||||
from notifier.notify.ntfy import NtfyNotification, NtfyNotificationSender
|
||||
from notifier.utils.logging import configure_logging
|
||||
|
||||
configure_logging()
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def on_event(payload: dict[str, Any]) -> None:
|
||||
event_type = EventType(payload["type"])
|
||||
camera = payload["after"]["camera"]
|
||||
object_label = payload["after"]["label"]
|
||||
new_zones = set(payload["after"]["entered_zones"]) - set(payload["before"]["entered_zones"])
|
||||
for new_zone in new_zones:
|
||||
if frigate_config.is_alert(event_type, camera, new_zone, object_label):
|
||||
event_id = payload["after"]["id"]
|
||||
score = payload["after"]["top_score"]
|
||||
|
||||
notification = NtfyNotification(event_id, camera, object_label, score)
|
||||
notification_sender.send(notification)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
frigate_config = FrigateConfig()
|
||||
notification_sender = NtfyNotificationSender(quiet_period=dt.timedelta(seconds=3 * 60))
|
||||
mqtt_subscriber = MQTTSubscriber(on_event, username=os.getenv("MQTT_USERNAME"), password=os.getenv("MQTT_PASSWORD"))
|
||||
mqtt_subscriber.subscribe()
|
||||
|
|
@ -0,0 +1,53 @@
|
|||
import json
|
||||
import logging
|
||||
from enum import StrEnum
|
||||
from functools import cache
|
||||
|
||||
import requests
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class EventType(StrEnum):
|
||||
NEW = "new"
|
||||
UPDATE = "update"
|
||||
END = "end"
|
||||
|
||||
|
||||
class FrigateConfig:
|
||||
def __init__(self, base_url: str = "http://frigate:5000") -> None:
|
||||
self._base_url = base_url
|
||||
self._camera_zones = {}
|
||||
|
||||
def fetch(self) -> None:
|
||||
response = requests.get(self._base_url)
|
||||
response.raise_for_status()
|
||||
config = json.loads(response.content)
|
||||
self._camera_zones = {
|
||||
camera_name: {
|
||||
required_zone: {
|
||||
object_label
|
||||
for object_label in camera_config["zones"][required_zone]["objects"]
|
||||
}
|
||||
for required_zone in camera_config["review"]["alerts"]["required_zones"]
|
||||
}
|
||||
for camera_name, camera_config in config["cameras"].items()
|
||||
}
|
||||
logger.info("Fetched Frigate camera zone configuration")
|
||||
|
||||
@cache
|
||||
def is_alert(self, event_type: EventType, camera: str, new_zone: str, object_label: str) -> bool:
|
||||
if zones := self._camera_zones[camera]:
|
||||
if (object_labels := zones.get(new_zone)) is None:
|
||||
# New zone is not a required zone
|
||||
return False
|
||||
else:
|
||||
if object_labels:
|
||||
# Alert if event has required object label
|
||||
return object_label in object_labels
|
||||
else:
|
||||
# No required object labels
|
||||
return True
|
||||
else:
|
||||
# No required zones, new event is an alert
|
||||
return event_type == EventType.NEW
|
|
@ -0,0 +1,30 @@
|
|||
import json
|
||||
import logging
|
||||
from typing import Any, Callable
|
||||
|
||||
from paho.mqtt.client import Client as MQTTClient, MQTTMessage
|
||||
from paho.mqtt.reasoncodes import ReasonCodes
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class MQTTSubscriber:
|
||||
def __init__(self, callback: Callable[[dict[str, Any]], None], host: str = "mqtt", port: int = 1883, username: str | None = None, password: str | None = None) -> None:
|
||||
self._callback = callback
|
||||
self._host = host
|
||||
self._port = port
|
||||
self._client = MQTTClient()
|
||||
if username and password:
|
||||
self._client.username_pw_set(username, password=password)
|
||||
|
||||
def subscribe(self) -> None:
|
||||
self._client.connect(host=self._host, port=self._port)
|
||||
self._client.loop_forever()
|
||||
|
||||
def _on_connect(self, client: MQTTClient, userdata: Any, flags: dict[str, Any], rc: ReasonCodes) -> None:
|
||||
logger.info("Connected with return code {rc}")
|
||||
client.subscribe("frigate/events")
|
||||
|
||||
def _on_message(self, client: MQTTClient, userdata: Any, message: MQTTMessage) -> None:
|
||||
payload = json.loads(message.payload.decode())
|
||||
self._callback(payload)
|
|
@ -0,0 +1,48 @@
|
|||
import datetime as dt
|
||||
import time
|
||||
from abc import ABC, abstractmethod
|
||||
from dataclasses import dataclass
|
||||
from enum import StrEnum
|
||||
from functools import cache, cached_property
|
||||
|
||||
|
||||
class MobileOS(StrEnum):
|
||||
IOS = "ios"
|
||||
ANDROID = "android"
|
||||
|
||||
|
||||
@dataclass
|
||||
class Notification:
|
||||
event_id: str
|
||||
camera: str
|
||||
object_label: str
|
||||
score: float
|
||||
|
||||
@cached_property
|
||||
def camera_display_name(self) -> str:
|
||||
return " ".join(word.capitalize() for word in self.camera.split("_"))
|
||||
|
||||
@cached_property
|
||||
def camera_url(self) -> str:
|
||||
return f"https://frigate.homelab.net/cameras/{self.camera}"
|
||||
|
||||
@cache
|
||||
def get_image_url(self, mobile_os: MobileOS = MobileOS.ANDROID) -> str:
|
||||
return f"https://frigate.homelab.net/api/events/{self.event_id}/thumbnail.jpg?format={mobile_os}"
|
||||
|
||||
|
||||
class NotificationSender[N: Notification](ABC):
|
||||
def __init__(self, quiet_period: dt.timedelta) -> None:
|
||||
self._quiet_period = quiet_period
|
||||
self._camera_to_last_notification_time = {}
|
||||
|
||||
@abstractmethod
|
||||
def _send(self, notification: N) -> None:
|
||||
...
|
||||
|
||||
def send(self, notification: N) -> None:
|
||||
now = time.time()
|
||||
last_notification_time = self._camera_to_last_notification_time.setdefault(notification.camera, 0)
|
||||
if now - last_notification_time >= self._quiet_period.total_seconds():
|
||||
# Quiet period has passed since the last notification for this camera
|
||||
self._send(notification)
|
|
@ -0,0 +1,79 @@
|
|||
import datetime as dt
|
||||
import json
|
||||
from dataclasses import dataclass
|
||||
from enum import IntEnum
|
||||
from typing import Any
|
||||
|
||||
import requests
|
||||
|
||||
from notifier.notify.notification import Notification, NotificationSender, MobileOS
|
||||
|
||||
|
||||
class NtfyPriority(IntEnum):
|
||||
MIN = 1
|
||||
LOW = 2
|
||||
DEFAULT = 3
|
||||
HIGH = 4
|
||||
MAX = 5
|
||||
|
||||
|
||||
@dataclass
|
||||
class NtfyNotification(Notification):
|
||||
priority: int = NtfyPriority.DEFAULT
|
||||
|
||||
def generate_payload(self, mobile_os: MobileOS, dad: bool = False) -> dict[str, Any]:
|
||||
match mobile_os, dad:
|
||||
case _, True:
|
||||
topic = "frigate_notifications_dad"
|
||||
case MobileOS.ANDROID, False:
|
||||
topic = "frigate_notifications"
|
||||
case MobileOS.IOS, False:
|
||||
topic = "frigate_notifications_ios"
|
||||
|
||||
return {
|
||||
"topic": topic,
|
||||
"title": "Frigate",
|
||||
"message": f"{self.object_label.capitalize()} at {self.camera_display_name} ({self.score:.0%})",
|
||||
"priority": NtfyPriority.DEFAULT,
|
||||
"click": self.camera_url,
|
||||
"icon": "https://cdn.jsdelivr.net/gh/walkxcode/dashboard-icons/png/frigate.png",
|
||||
"attach": self.get_image_url(mobile_os),
|
||||
"actions": [
|
||||
{
|
||||
"action": "http",
|
||||
"label": "DBL (30m)" if dad else "Disable (30m)",
|
||||
"url": f"https://frigate.homelab.net/webcontrol/camera/{self.camera}/detection",
|
||||
"method": "POST",
|
||||
"headers": {
|
||||
"Content-Type": "application/json"
|
||||
},
|
||||
"body": json.dumps({
|
||||
"detection": False,
|
||||
"duration": 30
|
||||
}),
|
||||
"clear": not dad,
|
||||
},
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
class NtfyNotificationSender(NotificationSender[NtfyNotification]):
|
||||
def __init__(self, quiet_period: dt.timedelta, base_url: str = "https://ntfy.homelab.net") -> None:
|
||||
super().__init__(quiet_period)
|
||||
self._base_url = base_url
|
||||
|
||||
def _send(self, notification: NtfyNotification) -> None:
|
||||
# Send to Android topic
|
||||
payload = notification.generate_payload(MobileOS.ANDROID)
|
||||
response = requests.post(self._base_url, json=payload)
|
||||
response.raise_for_status()
|
||||
|
||||
# Send to iOS topic
|
||||
payload = notification.generate_payload(MobileOS.IOS)
|
||||
response = requests.post(self._base_url, json=payload)
|
||||
response.raise_for_status()
|
||||
|
||||
# Send to Dad topic
|
||||
payload = notification.generate_payload(MobileOS.ANDROID, dad=True)
|
||||
response = requests.post(self._base_url, json=payload)
|
||||
response.raise_for_status()
|
|
@ -0,0 +1,12 @@
|
|||
import logging
|
||||
import logging.config
|
||||
|
||||
import yaml
|
||||
|
||||
|
||||
def configure_logging() -> None:
|
||||
with open("configs/logging.yaml", "r") as logging_yaml:
|
||||
logging_config = yaml.safe_load(logging_yaml)
|
||||
|
||||
logging.config.dictConfig(logging_config)
|
||||
|
|
@ -1,6 +0,0 @@
|
|||
#!/bin/bash
|
||||
set -e
|
||||
|
||||
update-ca-certificates
|
||||
|
||||
python3 frigate_event_notifier.py "$@"
|
Loading…
Reference in New Issue