119 lines
5.5 KiB
Python
119 lines
5.5 KiB
Python
import os
|
|
import json
|
|
import datetime as dt
|
|
|
|
import requests
|
|
import paho.mqtt.client as mqtt
|
|
|
|
|
|
class FrigateEventNotifier:
|
|
def __init__(self, mqtt_username, mqtt_password, quiet_period=3 * 60):
|
|
self.mqtt_client = mqtt.Client()
|
|
if mqtt_username is not None and mqtt_password is not None:
|
|
self.mqtt_client.username_pw_set(mqtt_username, password=mqtt_password)
|
|
self.mqtt_client.on_connect = self._on_connect
|
|
self.mqtt_client.on_message = self._on_message
|
|
|
|
frigate_api_response = requests.get("http://frigate:5000/api/config")
|
|
frigate_api_response.raise_for_status()
|
|
frigate_config = json.loads(frigate_api_response.content)
|
|
self.camera_zones = {camera_name: {required_zone: {object_label for object_label in camera_config["zones"][required_zone]["objects"]} for required_zone in camera_config["record"]["events"]["required_zones"]} for camera_name, camera_config in frigate_config["cameras"].items()}
|
|
|
|
self.quiet_period = quiet_period
|
|
self.last_notification_time = {}
|
|
|
|
def send_notification(self, event_id, camera, object_label, score, priority=3):
|
|
now = dt.datetime.now()
|
|
if now - self.last_notification_time.get(camera, dt.datetime.min) >= dt.timedelta(seconds=self.quiet_period):
|
|
# Quiet period has passed since the last notification for this camera
|
|
self.last_notification_time[camera] = now
|
|
camera_location = " ".join(word.capitalize() for word in camera.split("_"))
|
|
|
|
ntfy_api_response = requests.post("https://ntfy.homelab.net", json={
|
|
"topic": "frigate_notifications",
|
|
"title": "Frigate",
|
|
"message": f"{object_label.capitalize()} at {camera_location} ({score:.0%})",
|
|
"priority": priority,
|
|
"click": f"https://frigate.homelab.net/cameras/{camera}",
|
|
"icon": "https://cdn.jsdelivr.net/gh/walkxcode/dashboard-icons/png/frigate.png",
|
|
"attach": f"https://frigate.homelab.net/api/events/{event_id}/thumbnail.jpg?format=android",
|
|
"actions": [
|
|
{
|
|
"action": "http",
|
|
"label": "Disable (30m)",
|
|
"url": f"https://frigate.homelab.net/webcontrol/camera/{camera}/detection",
|
|
"method": "POST",
|
|
"headers": {
|
|
"Content-Type": "application/json"
|
|
},
|
|
"body": json.dumps({
|
|
"detection": False,
|
|
"duration": 30
|
|
}),
|
|
"clear": True
|
|
}
|
|
]
|
|
})
|
|
ntfy_api_response.raise_for_status()
|
|
|
|
ntfy_api_response = requests.post("https://ntfy.homelab.net", json={
|
|
"topic": "frigate_notifications_dad",
|
|
"title": "Frigate",
|
|
"message": f"{object_label.capitalize()} at {camera_location} ({score:.0%})",
|
|
"priority": priority,
|
|
"click": f"https://frigate.homelab.net/cameras/{camera}",
|
|
"icon": "https://cdn.jsdelivr.net/gh/walkxcode/dashboard-icons/png/frigate.png",
|
|
"attach": f"https://frigate.homelab.net/api/events/{event_id}/thumbnail.jpg?format=android",
|
|
"actions": [
|
|
{
|
|
"action": "http",
|
|
"label": "DBL (30m)",
|
|
"url": f"https://frigate.homelab.net/webcontrol/camera/{camera}/detection",
|
|
"method": "POST",
|
|
"headers": {
|
|
"Content-Type": "application/json"
|
|
},
|
|
"body": json.dumps({
|
|
"detection": False,
|
|
"duration": 30
|
|
})
|
|
}
|
|
]
|
|
})
|
|
ntfy_api_response.raise_for_status()
|
|
|
|
def start(self):
|
|
self.mqtt_client.connect(host="mqtt", port=1883)
|
|
self.mqtt_client.loop_forever()
|
|
|
|
def _on_connect(self, client, userdata, flags, rc):
|
|
print(f"Connected with return code {rc}")
|
|
client.subscribe("frigate/events")
|
|
|
|
def _on_message(self, client, userdata, message):
|
|
payload = json.loads(message.payload.decode())
|
|
camera = payload["after"]["camera"]
|
|
object_label = payload["after"]["label"]
|
|
|
|
if not self.camera_zones[camera]:
|
|
# No required zones, send notification on receipt of new event
|
|
if payload["type"] == "new":
|
|
event_id = payload["after"]["id"]
|
|
score = payload["after"]["top_score"]
|
|
|
|
self.send_notification(event_id, camera, object_label, score)
|
|
else:
|
|
new_zones = set(payload["after"]["entered_zones"]) - set(payload["before"]["entered_zones"])
|
|
for zone in new_zones:
|
|
if zone in self.camera_zones[camera] and (not self.camera_zones[camera][zone] or object_label in self.camera_zones[camera][zone]):
|
|
event_id = payload["after"]["id"]
|
|
score = payload["after"]["top_score"]
|
|
|
|
self.send_notification(event_id, camera, object_label, score)
|
|
break
|
|
|
|
|
|
if __name__ == "__main__":
|
|
frigate_event_notifier = FrigateEventNotifier(os.environ.get("MQTT_USERNAME"), os.environ.get("MQTT_PASSWORD"))
|
|
frigate_event_notifier.start()
|