Rewrite notifier microservice #23
This commit is contained in:
parent
44b61402b1
commit
a52d0c59cb
|
@ -75,9 +75,9 @@ services:
|
||||||
networks:
|
networks:
|
||||||
- wyze-bridge
|
- wyze-bridge
|
||||||
- frigate
|
- frigate
|
||||||
notify:
|
notifier:
|
||||||
container_name: frigate-notify
|
container_name: frigate-notifier
|
||||||
image: frigate-notify:latest
|
image: frigate-notifier:latest
|
||||||
restart: unless-stopped
|
restart: unless-stopped
|
||||||
depends_on:
|
depends_on:
|
||||||
- frigate
|
- frigate
|
||||||
|
|
10
install.yaml
10
install.yaml
|
@ -69,25 +69,25 @@
|
||||||
dest: /data/frigate-config/config.yaml
|
dest: /data/frigate-config/config.yaml
|
||||||
mode: preserve
|
mode: preserve
|
||||||
|
|
||||||
- name: Create temporary Docker build directory for frigate-notify
|
- name: Create temporary Docker build directory for frigate-notifier
|
||||||
ansible.builtin.tempfile:
|
ansible.builtin.tempfile:
|
||||||
state: directory
|
state: directory
|
||||||
register: docker_build_dir
|
register: docker_build_dir
|
||||||
- name: Copy Docker build directory
|
- name: Copy Docker build directory
|
||||||
ansible.builtin.copy:
|
ansible.builtin.copy:
|
||||||
src: notify/
|
src: notifier/
|
||||||
dest: '{{docker_build_dir.path}}'
|
dest: '{{docker_build_dir.path}}'
|
||||||
mode: preserve
|
mode: preserve
|
||||||
- name: Build frigate-notify Docker image
|
- name: Build frigate-notifier Docker image
|
||||||
community.docker.docker_image:
|
community.docker.docker_image:
|
||||||
build:
|
build:
|
||||||
path: '{{docker_build_dir.path}}'
|
path: '{{docker_build_dir.path}}'
|
||||||
name: frigate-notify
|
name: frigate-notifier
|
||||||
tag: latest
|
tag: latest
|
||||||
source: build
|
source: build
|
||||||
force_source: true
|
force_source: true
|
||||||
state: present
|
state: present
|
||||||
- name: Remove temporary Docker build directory for frigate-notify
|
- name: Remove temporary Docker build directory for frigate-notifier
|
||||||
ansible.builtin.file:
|
ansible.builtin.file:
|
||||||
path: '{{docker_build_dir.path}}'
|
path: '{{docker_build_dir.path}}'
|
||||||
state: absent
|
state: absent
|
||||||
|
|
|
@ -1,8 +1,8 @@
|
||||||
FROM python:3.11
|
FROM python:3.12
|
||||||
WORKDIR /code
|
WORKDIR /code
|
||||||
|
|
||||||
COPY docker_entrypoint.sh /
|
COPY docker_entrypoint.sh /
|
||||||
ENTRYPOINT ["/docker_entrypoint.sh"]
|
ENTRYPOINT ["python", "-m", "notifier"]
|
||||||
|
|
||||||
COPY requirements.txt .
|
COPY requirements.txt .
|
||||||
RUN pip3 install -r requirements.txt
|
RUN pip3 install -r requirements.txt
|
|
@ -0,0 +1,23 @@
|
||||||
|
version: 1
|
||||||
|
disable_existing_loggers: false
|
||||||
|
formatters:
|
||||||
|
default:
|
||||||
|
format: '[{asctime}.{msecs:03.0f}][{levelname}] {name}:{lineno} - {message}'
|
||||||
|
style: '{'
|
||||||
|
datefmt: '%Y-%m-%d %H:%M:%S'
|
||||||
|
handlers:
|
||||||
|
stdout:
|
||||||
|
class: logging.StreamHandler
|
||||||
|
level: INFO
|
||||||
|
formatter: default
|
||||||
|
stream: ext://sys.stdout
|
||||||
|
stderr:
|
||||||
|
class: logging.StreamHandler
|
||||||
|
level: ERROR
|
||||||
|
formatter: default
|
||||||
|
stream: ext://sys.stderr
|
||||||
|
root:
|
||||||
|
level: DEBUG
|
||||||
|
handlers:
|
||||||
|
- stdout
|
||||||
|
- stderr
|
|
@ -0,0 +1,29 @@
|
||||||
|
import datetime as dt
|
||||||
|
import os
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from notifier.frigate.config import FrigateConfig, EventType
|
||||||
|
from notifier.frigate.mqtt import MQTTSubscriber
|
||||||
|
from notifier.notify.ntfy import NtfyNotification, NtfyNotificationSender
|
||||||
|
|
||||||
|
|
||||||
|
def on_event(payload: dict[str, Any]) -> None:
|
||||||
|
event_type = EventType(payload["type"])
|
||||||
|
camera = payload["after"]["camera"]
|
||||||
|
object_label = payload["after"]["label"]
|
||||||
|
new_zones = set(payload["after"]["entered_zones"]) - set(payload["before"]["entered_zones"])
|
||||||
|
for new_zone in new_zones:
|
||||||
|
if frigate_config.is_alert(event_type, camera, new_zone, object_label):
|
||||||
|
event_id = payload["after"]["id"]
|
||||||
|
score = payload["after"]["top_score"]
|
||||||
|
|
||||||
|
notification = NtfyNotification(event_id, camera, object_label, score)
|
||||||
|
notification_sender.send(notification)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
frigate_config = FrigateConfig()
|
||||||
|
notification_sender = NtfyNotificationSender(quiet_period=dt.timedelta(seconds=3 * 60))
|
||||||
|
mqtt_subscriber = MQTTSubscriber(on_event, username=os.getenv("MQTT_USERNAME"), password=os.getenv("MQTT_PASSWORD"))
|
||||||
|
mqtt_subscriber.subscribe()
|
||||||
|
|
|
@ -0,0 +1,53 @@
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
from enum import StrEnum
|
||||||
|
from functools import cache
|
||||||
|
|
||||||
|
import requests
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class EventType(StrEnum):
|
||||||
|
NEW = "new"
|
||||||
|
UPDATE = "update"
|
||||||
|
END = "end"
|
||||||
|
|
||||||
|
|
||||||
|
class FrigateConfig:
|
||||||
|
def __init__(self, base_url: str = "http://frigate:5000") -> None:
|
||||||
|
self._base_url = base_url
|
||||||
|
self._camera_zones = {}
|
||||||
|
|
||||||
|
def fetch(self) -> None:
|
||||||
|
response = requests.get(self._base_url)
|
||||||
|
response.raise_for_status()
|
||||||
|
config = json.loads(response.content)
|
||||||
|
self._camera_zones = {
|
||||||
|
camera_name: {
|
||||||
|
required_zone: {
|
||||||
|
object_label
|
||||||
|
for object_label in camera_config["zones"][required_zone]["objects"]
|
||||||
|
}
|
||||||
|
for required_zone in camera_config["review"]["alerts"]["required_zones"]
|
||||||
|
}
|
||||||
|
for camera_name, camera_config in config["cameras"].items()
|
||||||
|
}
|
||||||
|
logger.info("Fetched Frigate camera zone configuration")
|
||||||
|
|
||||||
|
@cache
|
||||||
|
def is_alert(self, event_type: EventType, camera: str, new_zone: str, object_label: str) -> bool:
|
||||||
|
if zones := self._camera_zones[camera]:
|
||||||
|
if (object_labels := zones.get(new_zone)) is None:
|
||||||
|
# New zone is not a required zone
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
if object_labels:
|
||||||
|
# Alert if event has required object label
|
||||||
|
return object_label in object_labels
|
||||||
|
else:
|
||||||
|
# No required object labels
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
# No required zones, new event is an alert
|
||||||
|
return event_type == EventType.NEW
|
|
@ -0,0 +1,30 @@
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
from typing import Any, Callable
|
||||||
|
|
||||||
|
from paho.mqtt.client import Client as MQTTClient, MQTTMessage
|
||||||
|
from paho.mqtt.reasoncodes import ReasonCodes
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class MQTTSubscriber:
|
||||||
|
def __init__(self, callback: Callable[[dict[str, Any]], None], host: str = "mqtt", port: int = 1883, username: str | None = None, password: str | None = None) -> None:
|
||||||
|
self._callback = callback
|
||||||
|
self._host = host
|
||||||
|
self._port = port
|
||||||
|
self._client = MQTTClient()
|
||||||
|
if username and password:
|
||||||
|
self._client.username_pw_set(username, password=password)
|
||||||
|
|
||||||
|
def subscribe(self) -> None:
|
||||||
|
self._client.connect(host=self._host, port=self._port)
|
||||||
|
self._client.loop_forever()
|
||||||
|
|
||||||
|
def _on_connect(self, client: MQTTClient, userdata: Any, flags: dict[str, Any], rc: ReasonCodes) -> None:
|
||||||
|
logger.info("Connected with return code {rc}")
|
||||||
|
client.subscribe("frigate/events")
|
||||||
|
|
||||||
|
def _on_message(self, client: MQTTClient, userdata: Any, message: MQTTMessage) -> None:
|
||||||
|
payload = json.loads(message.payload.decode())
|
||||||
|
self._callback(payload)
|
|
@ -0,0 +1,48 @@
|
||||||
|
import datetime as dt
|
||||||
|
import time
|
||||||
|
from abc import ABC, abstractmethod
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from enum import StrEnum
|
||||||
|
from functools import cache, cached_property
|
||||||
|
|
||||||
|
|
||||||
|
class MobileOS(StrEnum):
|
||||||
|
IOS = "ios"
|
||||||
|
ANDROID = "android"
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Notification:
|
||||||
|
event_id: str
|
||||||
|
camera: str
|
||||||
|
object_label: str
|
||||||
|
score: float
|
||||||
|
|
||||||
|
@cached_property
|
||||||
|
def camera_display_name(self) -> str:
|
||||||
|
return " ".join(word.capitalize() for word in self.camera.split("_"))
|
||||||
|
|
||||||
|
@cached_property
|
||||||
|
def camera_url(self) -> str:
|
||||||
|
return f"https://frigate.homelab.net/cameras/{self.camera}"
|
||||||
|
|
||||||
|
@cache
|
||||||
|
def get_image_url(self, mobile_os: MobileOS = MobileOS.ANDROID) -> str:
|
||||||
|
return f"https://frigate.homelab.net/api/events/{self.event_id}/thumbnail.jpg?format={mobile_os}"
|
||||||
|
|
||||||
|
|
||||||
|
class NotificationSender[N: Notification](ABC):
|
||||||
|
def __init__(self, quiet_period: dt.timedelta) -> None:
|
||||||
|
self._quiet_period = quiet_period
|
||||||
|
self._camera_to_last_notification_time = {}
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def _send(self, notification: N) -> None:
|
||||||
|
...
|
||||||
|
|
||||||
|
def send(self, notification: N) -> None:
|
||||||
|
now = time.time()
|
||||||
|
last_notification_time = self._camera_to_last_notification_time.setdefault(notification.camera, 0)
|
||||||
|
if now - last_notification_time >= self._quiet_period.total_seconds():
|
||||||
|
# Quiet period has passed since the last notification for this camera
|
||||||
|
self._send(notification)
|
|
@ -0,0 +1,79 @@
|
||||||
|
import datetime as dt
|
||||||
|
import json
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from enum import IntEnum
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
import requests
|
||||||
|
|
||||||
|
from notifier.notify.notification import Notification, NotificationSender, MobileOS
|
||||||
|
|
||||||
|
|
||||||
|
class NtfyPriority(IntEnum):
|
||||||
|
MIN = 1
|
||||||
|
LOW = 2
|
||||||
|
DEFAULT = 3
|
||||||
|
HIGH = 4
|
||||||
|
MAX = 5
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class NtfyNotification(Notification):
|
||||||
|
priority: int = NtfyPriority.DEFAULT
|
||||||
|
|
||||||
|
def generate_payload(self, mobile_os: MobileOS, dad: bool = False) -> dict[str, Any]:
|
||||||
|
match mobile_os, dad:
|
||||||
|
case _, True:
|
||||||
|
topic = "frigate_notifications_dad"
|
||||||
|
case MobileOS.ANDROID, False:
|
||||||
|
topic = "frigate_notifications"
|
||||||
|
case MobileOS.IOS, False:
|
||||||
|
topic = "frigate_notifications_ios"
|
||||||
|
|
||||||
|
return {
|
||||||
|
"topic": topic,
|
||||||
|
"title": "Frigate",
|
||||||
|
"message": f"{self.object_label.capitalize()} at {self.camera_display_name} ({self.score:.0%})",
|
||||||
|
"priority": NtfyPriority.DEFAULT,
|
||||||
|
"click": self.camera_url,
|
||||||
|
"icon": "https://cdn.jsdelivr.net/gh/walkxcode/dashboard-icons/png/frigate.png",
|
||||||
|
"attach": self.get_image_url(mobile_os),
|
||||||
|
"actions": [
|
||||||
|
{
|
||||||
|
"action": "http",
|
||||||
|
"label": "DBL (30m)" if dad else "Disable (30m)",
|
||||||
|
"url": f"https://frigate.homelab.net/webcontrol/camera/{self.camera}/detection",
|
||||||
|
"method": "POST",
|
||||||
|
"headers": {
|
||||||
|
"Content-Type": "application/json"
|
||||||
|
},
|
||||||
|
"body": json.dumps({
|
||||||
|
"detection": False,
|
||||||
|
"duration": 30
|
||||||
|
}),
|
||||||
|
"clear": not dad,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class NtfyNotificationSender(NotificationSender[NtfyNotification]):
|
||||||
|
def __init__(self, quiet_period: dt.timedelta, base_url: str = "https://ntfy.homelab.net") -> None:
|
||||||
|
super().__init__(quiet_period)
|
||||||
|
self._base_url = base_url
|
||||||
|
|
||||||
|
def _send(self, notification: NtfyNotification) -> None:
|
||||||
|
# Send to Android topic
|
||||||
|
payload = notification.generate_payload(MobileOS.ANDROID)
|
||||||
|
response = requests.post(self._base_url, json=payload)
|
||||||
|
response.raise_for_status()
|
||||||
|
|
||||||
|
# Send to iOS topic
|
||||||
|
payload = notification.generate_payload(MobileOS.IOS)
|
||||||
|
response = requests.post(self._base_url, json=payload)
|
||||||
|
response.raise_for_status()
|
||||||
|
|
||||||
|
# Send to Dad topic
|
||||||
|
payload = notification.generate_payload(MobileOS.ANDROID, dad=True)
|
||||||
|
response = requests.post(self._base_url, json=payload)
|
||||||
|
response.raise_for_status()
|
|
@ -0,0 +1,12 @@
|
||||||
|
import logging
|
||||||
|
import logging.config
|
||||||
|
|
||||||
|
import yaml
|
||||||
|
|
||||||
|
|
||||||
|
def configure_logging() -> None:
|
||||||
|
with open("configs/logging.yaml", "r") as logging_yaml:
|
||||||
|
logging_config = yaml.safe_load(logging_yaml)
|
||||||
|
|
||||||
|
logging.config.dictConfig(logging_config)
|
||||||
|
|
Loading…
Reference in New Issue