Added camera uptime monitoring system #10
Squashed commit of the following:
commit 3c26ec1cf2c69f60f8dfc5db3f2ec292166d8f4e
Author: Ashish D'Souza <sudouser512@gmail.com>
Date:   Sun Feb 18 02:46:18 2024 -0600
    Added camera uptime monitoring system
			
			
This commit is contained in:
		
							parent
							
								
									20fe151435
								
							
						
					
					
						commit
						eab8f224e3
					
				| 
						 | 
				
			
			@ -115,6 +115,20 @@ services:
 | 
			
		|||
    - frigate
 | 
			
		||||
    ports:
 | 
			
		||||
    - 127.0.0.1:10001:80
 | 
			
		||||
  uptime:
 | 
			
		||||
    container_name: frigate-uptime
 | 
			
		||||
    image: frigate-uptime:latest
 | 
			
		||||
    restart: unless-stopped
 | 
			
		||||
    depends_on:
 | 
			
		||||
      frigate:
 | 
			
		||||
        condition: service_healthy
 | 
			
		||||
    volumes:
 | 
			
		||||
    - type: bind
 | 
			
		||||
      source: /etc/localtime
 | 
			
		||||
      target: /etc/localtime
 | 
			
		||||
      read_only: true
 | 
			
		||||
    networks:
 | 
			
		||||
    - frigate
 | 
			
		||||
  mqtt:
 | 
			
		||||
    container_name: frigate-mqtt
 | 
			
		||||
    image: eclipse-mosquitto:2.0.18
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
							
								
								
									
										27
									
								
								install.yaml
								
								
								
								
							
							
						
						
									
										27
									
								
								install.yaml
								
								
								
								
							| 
						 | 
				
			
			@ -73,7 +73,7 @@
 | 
			
		|||
      dest: '{{docker_build_dir.path}}'
 | 
			
		||||
      mode: preserve
 | 
			
		||||
  - name: Build frigate-notify Docker image
 | 
			
		||||
    ansible.builtin.docker_image:
 | 
			
		||||
    community.docker.docker_image:
 | 
			
		||||
      build:
 | 
			
		||||
        path: '{{docker_build_dir.path}}'
 | 
			
		||||
      name: frigate-notify
 | 
			
		||||
| 
						 | 
				
			
			@ -96,7 +96,7 @@
 | 
			
		|||
      dest: '{{docker_build_dir.path}}'
 | 
			
		||||
      mode: preserve
 | 
			
		||||
  - name: Build frigate-webcontrol Docker image
 | 
			
		||||
    ansible.builtin.docker_image:
 | 
			
		||||
    community.docker.docker_image:
 | 
			
		||||
      build:
 | 
			
		||||
        path: '{{docker_build_dir.path}}'
 | 
			
		||||
      name: frigate-webcontrol
 | 
			
		||||
| 
						 | 
				
			
			@ -109,6 +109,29 @@
 | 
			
		|||
      path: '{{docker_build_dir.path}}'
 | 
			
		||||
      state: absent
 | 
			
		||||
 | 
			
		||||
  - name: Create temporary Docker build directory for frigate-uptime
 | 
			
		||||
    ansible.builtin.tempfile:
 | 
			
		||||
      state: directory
 | 
			
		||||
    register: docker_build_dir
 | 
			
		||||
  - name: Copy Docker build directory
 | 
			
		||||
    ansible.builtin.copy:
 | 
			
		||||
      src: uptime/
 | 
			
		||||
      dest: '{{docker_build_dir.path}}'
 | 
			
		||||
      mode: preserve
 | 
			
		||||
  - name: Build frigate-uptime Docker image
 | 
			
		||||
    community.docker.docker_image:
 | 
			
		||||
      build:
 | 
			
		||||
        path: '{{docker_build_dir.path}}'
 | 
			
		||||
      name: frigate-uptime
 | 
			
		||||
      tag: latest
 | 
			
		||||
      source: build
 | 
			
		||||
      force_source: true
 | 
			
		||||
      state: present
 | 
			
		||||
  - name: Remove temporary Docker build directory for frigate-uptime
 | 
			
		||||
    ansible.builtin.file:
 | 
			
		||||
      path: '{{docker_build_dir.path}}'
 | 
			
		||||
      state: absent
 | 
			
		||||
 | 
			
		||||
  - name: Read homelab config
 | 
			
		||||
    ansible.builtin.slurp:
 | 
			
		||||
      src: '{{ansible_user_dir}}/.homelab.json'
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -0,0 +1,9 @@
 | 
			
		|||
FROM python:3.11
 | 
			
		||||
WORKDIR /code
 | 
			
		||||
 | 
			
		||||
ENTRYPOINT ["python", "-m", "uptime"]
 | 
			
		||||
 | 
			
		||||
COPY requirements.txt .
 | 
			
		||||
RUN pip3 install -r requirements.txt
 | 
			
		||||
 | 
			
		||||
COPY src .
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,2 @@
 | 
			
		|||
aiohttp==3.8.6
 | 
			
		||||
PyYAML==6.0.1
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,23 @@
 | 
			
		|||
version: 1
 | 
			
		||||
disable_existing_loggers: false
 | 
			
		||||
formatters:
 | 
			
		||||
  default:
 | 
			
		||||
    format: '[{asctime}.{msecs:03.0f}][{levelname}] {name}:{lineno}:{funcName} - {message}'
 | 
			
		||||
    style: '{'
 | 
			
		||||
    datefmt: '%Y-%m-%d %H:%M:%S'
 | 
			
		||||
handlers:
 | 
			
		||||
  stdout:
 | 
			
		||||
    class: logging.StreamHandler
 | 
			
		||||
    level: INFO
 | 
			
		||||
    formatter: default
 | 
			
		||||
    stream: ext://sys.stdout
 | 
			
		||||
  stderr:
 | 
			
		||||
    class: logging.StreamHandler
 | 
			
		||||
    level: ERROR
 | 
			
		||||
    formatter: default
 | 
			
		||||
    stream: ext://sys.stderr
 | 
			
		||||
root:
 | 
			
		||||
  level: DEBUG
 | 
			
		||||
  handlers:
 | 
			
		||||
  - stdout
 | 
			
		||||
  - stderr
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,25 @@
 | 
			
		|||
import asyncio
 | 
			
		||||
import logging
 | 
			
		||||
import logging.config
 | 
			
		||||
 | 
			
		||||
from uptime.utils.logging import configure_logging
 | 
			
		||||
from uptime.monitor import CameraMonitor
 | 
			
		||||
 | 
			
		||||
configure_logging()
 | 
			
		||||
logger = logging.getLogger(__name__)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def main() -> None:
 | 
			
		||||
    camera_monitor = CameraMonitor()
 | 
			
		||||
    await camera_monitor.run()
 | 
			
		||||
 | 
			
		||||
def entrypoint() -> None:
 | 
			
		||||
    try:
 | 
			
		||||
        asyncio.run(main())
 | 
			
		||||
    except KeyboardInterrupt:
 | 
			
		||||
        logger.info('Received Ctrl+C, exiting...')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if __name__ == '__main__':
 | 
			
		||||
    entrypoint()
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,62 @@
 | 
			
		|||
import re
 | 
			
		||||
import json
 | 
			
		||||
import logging
 | 
			
		||||
from contextlib import AsyncExitStack
 | 
			
		||||
from types import TracebackType
 | 
			
		||||
from typing import Any, ClassVar, Pattern, Self
 | 
			
		||||
 | 
			
		||||
import aiohttp
 | 
			
		||||
 | 
			
		||||
logger = logging.getLogger(__name__)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class FrigateConfig:
 | 
			
		||||
    _URL_ADDRESS_REGEX: ClassVar[Pattern[str]] = re.compile('(^|(?<=://)|(?<=@))[a-z0-9.\\-]+(:[0-9]+)?($|(?=/))')
 | 
			
		||||
 | 
			
		||||
    def __init__(self, frigate_base_url: str = 'http://frigate:5000') -> None:
 | 
			
		||||
        self._frigate_config_url = f'{frigate_base_url}/api/config'
 | 
			
		||||
        self._config = {}
 | 
			
		||||
        self._aiohttp_session: aiohttp.ClientSession
 | 
			
		||||
        self._async_exit_stack: AsyncExitStack
 | 
			
		||||
 | 
			
		||||
    async def __aenter__(self) -> Self:
 | 
			
		||||
        async with AsyncExitStack() as async_exit_stack:
 | 
			
		||||
            self._aiohttp_session = await async_exit_stack.enter_async_context(aiohttp.ClientSession(raise_for_status=True))
 | 
			
		||||
            self._async_exit_stack = async_exit_stack.pop_all()
 | 
			
		||||
 | 
			
		||||
            await self.refresh()
 | 
			
		||||
        return self
 | 
			
		||||
 | 
			
		||||
    async def __aexit__(self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None) -> None:
 | 
			
		||||
        await self._async_exit_stack.aclose()
 | 
			
		||||
 | 
			
		||||
    async def refresh(self) -> None:
 | 
			
		||||
        logger.debug('Fetching Frigate config...')
 | 
			
		||||
        try:
 | 
			
		||||
            async with self._aiohttp_session.get(self._frigate_config_url) as response:
 | 
			
		||||
                self._config = json.loads(await response.read())
 | 
			
		||||
            logger.debug('Finished fetching Frigate config')
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            if self._config:
 | 
			
		||||
                logger.warning('Failed to fetch Frigate config, falling back to previous value', exc_info=e)
 | 
			
		||||
            else:
 | 
			
		||||
                raise
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def active_cameras(self) -> list[str]:
 | 
			
		||||
        if 'cameras' not in self._config:
 | 
			
		||||
            raise ValueError('Configuration not yet fetched from Frigate')
 | 
			
		||||
 | 
			
		||||
        return [camera for camera in self._config['cameras'] if self._config['cameras'][camera].get('enabled', True)]
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def active_camera_addresses(self) -> dict[str, str]:
 | 
			
		||||
        active_cameras = self.active_cameras
 | 
			
		||||
        return {camera: self._get_address_from_url(self._config['cameras'][camera]['ffmpeg']['inputs'][0]['path']) for camera in active_cameras}
 | 
			
		||||
 | 
			
		||||
    @classmethod
 | 
			
		||||
    def _get_address_from_url(cls, url: str) -> str:
 | 
			
		||||
        match = cls._URL_ADDRESS_REGEX.search(url)
 | 
			
		||||
        if match is None:
 | 
			
		||||
            raise ValueError(f'Failed to retrieve address from {url=}')
 | 
			
		||||
        return match.group()
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,58 @@
 | 
			
		|||
import asyncio
 | 
			
		||||
import logging
 | 
			
		||||
from contextlib import AsyncExitStack
 | 
			
		||||
from types import TracebackType
 | 
			
		||||
from typing import Self
 | 
			
		||||
from collections import Counter
 | 
			
		||||
 | 
			
		||||
from uptime.frigate_config import FrigateConfig
 | 
			
		||||
from uptime.notify import NtfyNotifier
 | 
			
		||||
from uptime.ping import ip_ping_all
 | 
			
		||||
 | 
			
		||||
logger = logging.getLogger(__name__)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class CameraMonitor:
 | 
			
		||||
    def __init__(self, wait_time: int = 60, consecutive_down_threshold: int = 3):
 | 
			
		||||
        self._wait_time = wait_time
 | 
			
		||||
        self._consecutive_down_threshold = consecutive_down_threshold
 | 
			
		||||
        self._camera_downtime = Counter()
 | 
			
		||||
        self._frigate_config: FrigateConfig
 | 
			
		||||
        self._ntfy_notifier: NtfyNotifier
 | 
			
		||||
 | 
			
		||||
    async def _on_camera_up(self, camera: str) -> None:
 | 
			
		||||
        logger.info(f'Camera {camera} is back online')
 | 
			
		||||
        await self._ntfy_notifier.send_notification(camera, True)
 | 
			
		||||
 | 
			
		||||
    async def _on_camera_down(self, camera: str) -> None:
 | 
			
		||||
        await self._frigate_config.refresh()
 | 
			
		||||
        if camera not in self._frigate_config.active_cameras:
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        logger.info(f'Camera {camera} is down')
 | 
			
		||||
        await self._ntfy_notifier.send_notification(camera, False)
 | 
			
		||||
 | 
			
		||||
    async def _on_camera_ping(self, camera: str, success: bool) -> None:
 | 
			
		||||
        if success:
 | 
			
		||||
            if self._camera_downtime[camera] >= self._consecutive_down_threshold:
 | 
			
		||||
                await self._on_camera_up(camera)
 | 
			
		||||
            self._camera_downtime[camera] = 0
 | 
			
		||||
        else:
 | 
			
		||||
            if self._camera_downtime[camera] == self._consecutive_down_threshold:
 | 
			
		||||
                await self._on_camera_down(camera)
 | 
			
		||||
            self._camera_downtime[camera] += 1
 | 
			
		||||
 | 
			
		||||
    async def run(self) -> None:
 | 
			
		||||
        async with AsyncExitStack() as async_exit_stack:
 | 
			
		||||
            self._frigate_config = await async_exit_stack.enter_async_context(FrigateConfig())
 | 
			
		||||
            self._ntfy_notifier = await async_exit_stack.enter_async_context(NtfyNotifier())
 | 
			
		||||
 | 
			
		||||
            while True:
 | 
			
		||||
                camera_ips = {camera: address.split(':', 1)[0] for camera, address in self._frigate_config.active_camera_addresses.items()}
 | 
			
		||||
                ping_results = await ip_ping_all(*camera_ips.values())
 | 
			
		||||
                for i, camera in enumerate(camera_ips):
 | 
			
		||||
                    await self._on_camera_ping(camera, ping_results[i])
 | 
			
		||||
 | 
			
		||||
                logger.debug(f'Sleeping for {self._wait_time} seconds...')
 | 
			
		||||
                await asyncio.sleep(self._wait_time)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,37 @@
 | 
			
		|||
import logging
 | 
			
		||||
from contextlib import AsyncExitStack
 | 
			
		||||
from types import TracebackType
 | 
			
		||||
from typing import Self
 | 
			
		||||
 | 
			
		||||
import aiohttp
 | 
			
		||||
 | 
			
		||||
logger = logging.getLogger(__name__)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class NtfyNotifier:
 | 
			
		||||
    def __init__(self, ntfy_url: str = 'https://ntfy.homelab.net') -> None:
 | 
			
		||||
        self._ntfy_url = ntfy_url
 | 
			
		||||
        self._aiohttp_session: aiohttp.ClientSession
 | 
			
		||||
        self._async_exit_stack: AsyncExitStack
 | 
			
		||||
 | 
			
		||||
    async def __aenter__(self) -> Self:
 | 
			
		||||
        async with AsyncExitStack() as async_exit_stack:
 | 
			
		||||
            self._aiohttp_session = await async_exit_stack.enter_async_context(aiohttp.ClientSession(raise_for_status=True))
 | 
			
		||||
            self._async_exit_stack = async_exit_stack.pop_all()
 | 
			
		||||
        return self
 | 
			
		||||
 | 
			
		||||
    async def __aexit__(self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None) -> None:
 | 
			
		||||
        await self._async_exit_stack.aclose()
 | 
			
		||||
 | 
			
		||||
    async def send_notification(self, camera: str, status: bool) -> None:
 | 
			
		||||
        logger.debug(f'Sending notification for {camera=}...')
 | 
			
		||||
        message = f'{camera} is back online' if status else f'{camera} is offline'
 | 
			
		||||
        await self._aiohttp_session.post(self._ntfy_url, json={
 | 
			
		||||
            'topic': 'frigate_camera_uptime',
 | 
			
		||||
            'title': 'Frigate',
 | 
			
		||||
            'message': message,
 | 
			
		||||
            'priority': 3,
 | 
			
		||||
            'click': f'https://frigate.homelab.net/cameras/{camera}',
 | 
			
		||||
            'icon': 'https://cdn.jsdelivr.net/gh/walkxcode/dashboard-icons/png/frigate.png',
 | 
			
		||||
        })
 | 
			
		||||
        logger.debug(f'Sent notification for {camera=}')
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,19 @@
 | 
			
		|||
import asyncio
 | 
			
		||||
import logging
 | 
			
		||||
 | 
			
		||||
logger = logging.getLogger(__name__)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def ip_ping(host: str) -> bool:
 | 
			
		||||
    logger.debug(f'Pinging {host}...')
 | 
			
		||||
    process = await asyncio.create_subprocess_exec('ping', '-w', '3', '-c', '1', host, stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL)
 | 
			
		||||
    return_code = await process.wait()
 | 
			
		||||
    logger.debug(f'Finished pinging {host}')
 | 
			
		||||
    return return_code == 0
 | 
			
		||||
 | 
			
		||||
async def ip_ping_all(*hosts: str) -> list[bool]:
 | 
			
		||||
    return await asyncio.gather(*[ip_ping(host) for host in hosts])
 | 
			
		||||
 | 
			
		||||
async def tcp_ping(host: str, port: int) -> bool:
 | 
			
		||||
    ...
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,11 @@
 | 
			
		|||
import logging
 | 
			
		||||
 | 
			
		||||
import yaml
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def configure_logging() -> None:
 | 
			
		||||
    with open('logging.yaml', 'r') as logging_yaml:
 | 
			
		||||
        logging_config = yaml.safe_load(logging_yaml)
 | 
			
		||||
 | 
			
		||||
    logging.config.dictConfig(logging_config)
 | 
			
		||||
 | 
			
		||||
		Loading…
	
		Reference in New Issue