Fix and cleanup notifier
This commit is contained in:
parent
e6ccd8f1d7
commit
468a0b02f3
|
@ -16,18 +16,19 @@ def on_event(payload: dict[str, Any]) -> None:
|
|||
event_type = EventType(payload["type"])
|
||||
camera = payload["after"]["camera"]
|
||||
object_label = payload["after"]["label"]
|
||||
new_zones = set(payload["after"]["entered_zones"]) - set(payload["before"]["entered_zones"])
|
||||
new_zones = {None} | set(payload["after"]["entered_zones"]) - set(payload["before"]["entered_zones"])
|
||||
for new_zone in new_zones:
|
||||
if frigate_config.is_alert(event_type, camera, new_zone, object_label):
|
||||
if frigate_config.is_alert(event_type, camera, object_label, new_zone):
|
||||
event_id = payload["after"]["id"]
|
||||
score = payload["after"]["top_score"]
|
||||
|
||||
logger.debug(f"New alert for {object_label=} at {camera=} in {new_zone=}")
|
||||
notification = NtfyNotification(event_id, camera, object_label, score)
|
||||
notification_sender.send(notification)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
frigate_config = FrigateConfig()
|
||||
frigate_config.fetch()
|
||||
notification_sender = NtfyNotificationSender(quiet_period=dt.timedelta(seconds=3 * 60))
|
||||
mqtt_subscriber = MQTTSubscriber(on_event, username=os.getenv("MQTT_USERNAME"), password=os.getenv("MQTT_PASSWORD"))
|
||||
mqtt_subscriber.subscribe()
|
||||
|
|
|
@ -20,7 +20,7 @@ class FrigateConfig:
|
|||
self._camera_zones = {}
|
||||
|
||||
def fetch(self) -> None:
|
||||
response = requests.get(self._base_url)
|
||||
response = requests.get(f"{self._base_url}/api/config")
|
||||
response.raise_for_status()
|
||||
config = json.loads(response.content)
|
||||
self._camera_zones = {
|
||||
|
@ -36,13 +36,15 @@ class FrigateConfig:
|
|||
logger.info("Fetched Frigate camera zone configuration")
|
||||
|
||||
@cache
|
||||
def is_alert(self, event_type: EventType, camera: str, new_zone: str, object_label: str) -> bool:
|
||||
def is_alert(self, event_type: EventType, camera: str, object_label: str, new_zone: str | None = None) -> bool:
|
||||
if zones := self._camera_zones[camera]:
|
||||
if (object_labels := zones.get(new_zone)) is None:
|
||||
if not new_zone:
|
||||
# No zone change
|
||||
return False
|
||||
elif (object_labels := zones.get(new_zone)) is None:
|
||||
# New zone is not a required zone
|
||||
return False
|
||||
else:
|
||||
if object_labels:
|
||||
elif object_labels:
|
||||
# Alert if event has required object label
|
||||
return object_label in object_labels
|
||||
else:
|
||||
|
|
|
@ -16,13 +16,15 @@ class MQTTSubscriber:
|
|||
self._client = MQTTClient()
|
||||
if username and password:
|
||||
self._client.username_pw_set(username, password=password)
|
||||
self._client.on_connect = self._on_connect
|
||||
self._client.on_message = self._on_message
|
||||
|
||||
def subscribe(self) -> None:
|
||||
self._client.connect(host=self._host, port=self._port)
|
||||
self._client.loop_forever()
|
||||
|
||||
def _on_connect(self, client: MQTTClient, userdata: Any, flags: dict[str, Any], rc: ReasonCodes) -> None:
|
||||
logger.info("Connected with return code {rc}")
|
||||
logger.info(f"Connected with return code {rc}")
|
||||
client.subscribe("frigate/events")
|
||||
|
||||
def _on_message(self, client: MQTTClient, userdata: Any, message: MQTTMessage) -> None:
|
||||
|
|
|
@ -1,118 +0,0 @@
|
|||
import os
|
||||
import json
|
||||
import datetime as dt
|
||||
|
||||
import requests
|
||||
import paho.mqtt.client as mqtt
|
||||
|
||||
|
||||
class FrigateEventNotifier:
|
||||
def __init__(self, mqtt_username, mqtt_password, quiet_period=3 * 60):
|
||||
self._mqtt_client = mqtt.Client()
|
||||
if mqtt_username is not None and mqtt_password is not None:
|
||||
self._mqtt_client.username_pw_set(mqtt_username, password=mqtt_password)
|
||||
self._mqtt_client.on_connect = self._on_connect
|
||||
self._mqtt_client.on_message = self._on_message
|
||||
|
||||
frigate_api_response = requests.get("http://frigate:5000/api/config")
|
||||
frigate_api_response.raise_for_status()
|
||||
frigate_config = json.loads(frigate_api_response.content)
|
||||
self._camera_zones = {camera_name: {required_zone: {object_label for object_label in camera_config["zones"][required_zone]["objects"]} for required_zone in camera_config["review"]["alerts"]["required_zones"]} for camera_name, camera_config in frigate_config["cameras"].items()}
|
||||
|
||||
self._quiet_period = quiet_period
|
||||
self._camera_to_last_notification_time = {}
|
||||
|
||||
def send_notification(self, event_id, camera, object_label, score, priority=3):
|
||||
now = dt.datetime.now()
|
||||
if now - self._camera_to_last_notification_time.get(camera, dt.datetime.min) >= dt.timedelta(seconds=self._quiet_period):
|
||||
# Quiet period has passed since the last notification for this camera
|
||||
self._camera_to_last_notification_time[camera] = now
|
||||
camera_location = " ".join(word.capitalize() for word in camera.split("_"))
|
||||
|
||||
ntfy_api_response = requests.post("https://ntfy.homelab.net", json={
|
||||
"topic": "frigate_notifications",
|
||||
"title": "Frigate",
|
||||
"message": f"{object_label.capitalize()} at {camera_location} ({score:.0%})",
|
||||
"priority": priority,
|
||||
"click": f"https://frigate.homelab.net/cameras/{camera}",
|
||||
"icon": "https://cdn.jsdelivr.net/gh/walkxcode/dashboard-icons/png/frigate.png",
|
||||
"attach": f"https://frigate.homelab.net/api/events/{event_id}/thumbnail.jpg?format=android",
|
||||
"actions": [
|
||||
{
|
||||
"action": "http",
|
||||
"label": "Disable (30m)",
|
||||
"url": f"https://frigate.homelab.net/webcontrol/camera/{camera}/detection",
|
||||
"method": "POST",
|
||||
"headers": {
|
||||
"Content-Type": "application/json"
|
||||
},
|
||||
"body": json.dumps({
|
||||
"detection": False,
|
||||
"duration": 30
|
||||
}),
|
||||
"clear": True
|
||||
}
|
||||
]
|
||||
})
|
||||
ntfy_api_response.raise_for_status()
|
||||
|
||||
ntfy_api_response = requests.post("https://ntfy.homelab.net", json={
|
||||
"topic": "frigate_notifications_dad",
|
||||
"title": "Frigate",
|
||||
"message": f"{object_label.capitalize()} at {camera_location} ({score:.0%})",
|
||||
"priority": priority,
|
||||
"click": f"https://frigate.homelab.net/cameras/{camera}",
|
||||
"icon": "https://cdn.jsdelivr.net/gh/walkxcode/dashboard-icons/png/frigate.png",
|
||||
"attach": f"https://frigate.homelab.net/api/events/{event_id}/thumbnail.jpg?format=android",
|
||||
"actions": [
|
||||
{
|
||||
"action": "http",
|
||||
"label": "DBL (30m)",
|
||||
"url": f"https://frigate.homelab.net/webcontrol/camera/{camera}/detection",
|
||||
"method": "POST",
|
||||
"headers": {
|
||||
"Content-Type": "application/json"
|
||||
},
|
||||
"body": json.dumps({
|
||||
"detection": False,
|
||||
"duration": 30
|
||||
})
|
||||
}
|
||||
]
|
||||
})
|
||||
ntfy_api_response.raise_for_status()
|
||||
|
||||
def start(self):
|
||||
self._mqtt_client.connect(host="mqtt", port=1883)
|
||||
self._mqtt_client.loop_forever()
|
||||
|
||||
def _on_connect(self, client, userdata, flags, rc):
|
||||
print(f"Connected with return code {rc}")
|
||||
client.subscribe("frigate/events")
|
||||
|
||||
def _on_message(self, client, userdata, message):
|
||||
payload = json.loads(message.payload.decode())
|
||||
camera = payload["after"]["camera"]
|
||||
object_label = payload["after"]["label"]
|
||||
|
||||
if not self._camera_zones[camera]:
|
||||
# No required zones, send notification on receipt of new event
|
||||
if payload["type"] == "new":
|
||||
event_id = payload["after"]["id"]
|
||||
score = payload["after"]["top_score"]
|
||||
|
||||
self.send_notification(event_id, camera, object_label, score)
|
||||
else:
|
||||
new_zones = set(payload["after"]["entered_zones"]) - set(payload["before"]["entered_zones"])
|
||||
for zone in new_zones:
|
||||
if zone in self._camera_zones[camera] and (not self._camera_zones[camera][zone] or object_label in self._camera_zones[camera][zone]):
|
||||
event_id = payload["after"]["id"]
|
||||
score = payload["after"]["top_score"]
|
||||
|
||||
self.send_notification(event_id, camera, object_label, score)
|
||||
break
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
frigate_event_notifier = FrigateEventNotifier(os.environ.get("MQTT_USERNAME"), os.environ.get("MQTT_PASSWORD"))
|
||||
frigate_event_notifier.start()
|
|
@ -11,7 +11,7 @@ class MobileOS(StrEnum):
|
|||
ANDROID = "android"
|
||||
|
||||
|
||||
@dataclass
|
||||
@dataclass(frozen=True)
|
||||
class Notification:
|
||||
event_id: str
|
||||
camera: str
|
||||
|
|
|
@ -17,7 +17,7 @@ class NtfyPriority(IntEnum):
|
|||
MAX = 5
|
||||
|
||||
|
||||
@dataclass
|
||||
@dataclass(frozen=True)
|
||||
class NtfyNotification(Notification):
|
||||
priority: int = NtfyPriority.DEFAULT
|
||||
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
cameras:
|
||||
back_door: 'rtsp://frigate:8554/back_door'
|
||||
doorbell: 'rtsp://frigate:8554/doorbell'
|
||||
front_door: 'rtsp://frigate:8554/front_door'
|
||||
#front_door: 'rtsp://frigate:8554/front_door'
|
||||
garage: 'rtsp://frigate:8554/garage'
|
||||
ping_interval_s: 60
|
||||
ping_timeout_s: 30
|
||||
|
|
Loading…
Reference in New Issue