Added video ping to uptime monitor

This commit is contained in:
Ashish D'Souza 2024-08-09 01:58:43 -05:00
parent 6481511dcd
commit de4b7792d1
5 changed files with 60 additions and 18 deletions

View File

@ -4,7 +4,7 @@ WORKDIR /code
ENTRYPOINT ["python", "-m", "uptime"]
RUN apt update --fix-missing
RUN apt install -y iputils-ping
RUN apt install -y iputils-ping libgl1
COPY requirements.txt .
RUN pip3 install -r requirements.txt

View File

@ -1,2 +1,3 @@
aiohttp==3.8.6
PyYAML==6.0.1
aiohttp>=3.8.6
PyYAML>=6.0.1
opencv-python>=4.10.0.84

View File

@ -10,6 +10,7 @@ logger = logging.getLogger(__name__)
class FrigateConfig:
_LOCALHOST_REGEX: ClassVar[Pattern[str]] = re.compile("127\\.0\\.0\\.1|localhost")
_URL_ADDRESS_REGEX: ClassVar[Pattern[str]] = re.compile("(^|(?<=://)|(?<=@))[a-z0-9.\\-]+(:[0-9]+)?($|(?=/))")
_WYZE_CAMERAS: ClassVar[dict[str, str]] = {"back_yard_cam": "192.168.0.202:554"}
@ -38,10 +39,18 @@ class FrigateConfig:
return [camera for camera in self._config["cameras"] if self._config["cameras"][camera].get("enabled", True)]
@property
def active_camera_urls(self) -> dict[str, str]:
return {camera: self._replace_localhost(self._config["cameras"][camera]["ffmpeg"]["inputs"][0]["path"]) for camera in self.active_cameras}
@property
def active_camera_addresses(self) -> dict[str, str]:
active_cameras = self.active_cameras
return {camera: self._get_address_from_url(self._config["cameras"][camera]["ffmpeg"]["inputs"][0]["path"]) for camera in active_cameras}
return {camera: self._get_address_from_url(url) for camera, url in self.active_camera_urls.items()}
@classmethod
def _replace_localhost(cls, url: str) -> str:
return cls._LOCALHOST_REGEX.sub("frigate", url, 1)
@classmethod
def _get_address_from_url(cls, url: str) -> str:

View File

@ -1,24 +1,30 @@
import time
import asyncio
import logging
import datetime as dt
from contextlib import AsyncExitStack
from types import TracebackType
from typing import Self
from typing import ClassVar, Self
from collections import Counter
from concurrent.futures import ProcessPoolExecutor
from uptime.frigate_config import FrigateConfig
from uptime.notify import NtfyNotifier
from uptime.ping import ip_ping_all
from uptime.ping import video_ping
logger = logging.getLogger(__name__)
class CameraMonitor:
def __init__(self, wait_time: int = 60, consecutive_down_threshold: int = 3):
self._wait_time = wait_time
_FRIGATE_CONFIG_REFRESH_INTERVAL: ClassVar[dt.timedelta] = dt.timedelta(minutes=60)
def __init__(self, ping_interval: dt.timedelta = dt.timedelta(seconds=60), consecutive_down_threshold: int = 3):
self._ping_interval = ping_interval
self._consecutive_down_threshold = consecutive_down_threshold
self._camera_downtime = Counter()
self._frigate_config = FrigateConfig()
self._ntfy_notifier = NtfyNotifier()
self._process_pool = ProcessPoolExecutor()
async def _on_camera_up(self, camera: str) -> None:
logger.info(f"Camera {camera} is back online")
@ -42,14 +48,35 @@ class CameraMonitor:
await self._on_camera_down(camera)
self._camera_downtime[camera] += 1
async def _ping_all_cameras(self) -> None:
async def video_ping_async(url: str) -> bool:
event_loop = asyncio.get_running_loop()
return await event_loop.run_in_executor(
self._process_pool,
video_ping,
url,
)
ping_tasks = {}
async with asyncio.TaskGroup() as tg:
event_loop = asyncio.get_running_loop()
for camera, url in self._frigate_config.active_camera_urls.items():
ping_tasks[camera] = tg.create_task(video_ping_async(url))
for camera, task in ping_tasks.items():
await self._on_camera_ping(camera, task.result())
async def run(self) -> None:
await self._frigate_config.refresh()
last_refresh = time.time()
while True:
camera_ips = {camera: address.split(":", 1)[0] for camera, address in self._frigate_config.active_camera_addresses.items()}
ping_results = await ip_ping_all(*camera_ips.values())
for i, camera in enumerate(camera_ips):
await self._on_camera_ping(camera, ping_results[i])
logger.debug(f"Sleeping for {self._wait_time} seconds...")
await asyncio.sleep(self._wait_time)
start_time = time.time()
if time.time() - last_refresh >= self._FRIGATE_CONFIG_REFRESH_INTERVAL.total_seconds():
logger.info("Refreshing Frigate config...")
await self._frigate_config.refresh()
last_refresh = time.time()
await self._ping_all_cameras()
sleep_duration = self._ping_interval.total_seconds() - (time.time() - start_time)
logger.debug(f"Sleeping for {sleep_duration} seconds...")
await asyncio.sleep(sleep_duration)

View File

@ -1,6 +1,8 @@
import asyncio
import logging
import cv2
logger = logging.getLogger(__name__)
@ -11,9 +13,12 @@ async def ip_ping(host: str) -> bool:
logger.debug(f"Finished pinging {host}")
return return_code == 0
async def ip_ping_all(*hosts: str) -> list[bool]:
return await asyncio.gather(*[ip_ping(host) for host in hosts])
async def tcp_ping(host: str, port: int) -> bool:
...
def video_ping(url: str) -> bool:
capture = cv2.VideoCapture(url)
success, _ = capture.read()
capture.release()
return success