Rewrite with asyncio #5
Squashed commit of the following:
commit 45d9f20c20a480c6b0aa5c67a1bea967207aba93
Author: Ashish D'Souza <sudouser512@gmail.com>
Date:   Sun Mar 24 03:17:38 2024 -0500
    Converted to asyncio
			
			
This commit is contained in:
		
							parent
							
								
									b3a2076416
								
							
						
					
					
						commit
						847c584db9
					
				| 
						 | 
				
			
			@ -106,7 +106,12 @@ services:
 | 
			
		|||
      timeout: 30s
 | 
			
		||||
    environment:
 | 
			
		||||
      IPGEOLOCATION_API_KEY: ${IPGEOLOCATION_API_KEY}
 | 
			
		||||
      FRIGATE_CONFIG_FILE: /frigate_config/config.yaml
 | 
			
		||||
    volumes:
 | 
			
		||||
    - type: volume
 | 
			
		||||
      source: config
 | 
			
		||||
      target: /frigate_config
 | 
			
		||||
      read_only: true
 | 
			
		||||
    - type: bind
 | 
			
		||||
      source: /etc/localtime
 | 
			
		||||
      target: /etc/localtime
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -14,10 +14,10 @@ class FrigateEventNotifier:
 | 
			
		|||
        self.mqtt_client.on_connect = self._on_connect
 | 
			
		||||
        self.mqtt_client.on_message = self._on_message
 | 
			
		||||
 | 
			
		||||
        frigate_api_response = requests.get('http://frigate:5000/api/config')
 | 
			
		||||
        frigate_api_response = requests.get("http://frigate:5000/api/config")
 | 
			
		||||
        frigate_api_response.raise_for_status()
 | 
			
		||||
        frigate_config = json.loads(frigate_api_response.content)
 | 
			
		||||
        self.camera_zones = {camera_name: {required_zone: {object_label for object_label in camera_config['zones'][required_zone]['objects']} for required_zone in camera_config['record']['events']['required_zones']} for camera_name, camera_config in frigate_config['cameras'].items()}
 | 
			
		||||
        self.camera_zones = {camera_name: {required_zone: {object_label for object_label in camera_config["zones"][required_zone]["objects"]} for required_zone in camera_config["record"]["events"]["required_zones"]} for camera_name, camera_config in frigate_config["cameras"].items()}
 | 
			
		||||
 | 
			
		||||
        self.quiet_period = quiet_period
 | 
			
		||||
        self.last_notification_time = {}
 | 
			
		||||
| 
						 | 
				
			
			@ -27,55 +27,55 @@ class FrigateEventNotifier:
 | 
			
		|||
        if now - self.last_notification_time.get(camera, dt.datetime.min) >= dt.timedelta(seconds=self.quiet_period):
 | 
			
		||||
            # Quiet period has passed since the last notification for this camera
 | 
			
		||||
            self.last_notification_time[camera] = now
 | 
			
		||||
            camera_location = ' '.join(word.capitalize() for word in camera.split('_'))
 | 
			
		||||
            camera_location = " ".join(word.capitalize() for word in camera.split("_"))
 | 
			
		||||
 | 
			
		||||
            ntfy_api_response = requests.post('https://ntfy.homelab.net', json={
 | 
			
		||||
                'topic': 'frigate_notifications',
 | 
			
		||||
                'title': 'Frigate',
 | 
			
		||||
                'message': f'{object_label.capitalize()} at {camera_location} ({score:.0%})',
 | 
			
		||||
                'priority': priority,
 | 
			
		||||
                'click': f'https://frigate.homelab.net/cameras/{camera}',
 | 
			
		||||
                'icon': 'https://cdn.jsdelivr.net/gh/walkxcode/dashboard-icons/png/frigate.png',
 | 
			
		||||
                'attach': f'https://frigate.homelab.net/api/events/{event_id}/thumbnail.jpg?format=android',
 | 
			
		||||
                'actions': [
 | 
			
		||||
            ntfy_api_response = requests.post("https://ntfy.homelab.net", json={
 | 
			
		||||
                "topic": "frigate_notifications",
 | 
			
		||||
                "title": "Frigate",
 | 
			
		||||
                "message": f"{object_label.capitalize()} at {camera_location} ({score:.0%})",
 | 
			
		||||
                "priority": priority,
 | 
			
		||||
                "click": f"https://frigate.homelab.net/cameras/{camera}",
 | 
			
		||||
                "icon": "https://cdn.jsdelivr.net/gh/walkxcode/dashboard-icons/png/frigate.png",
 | 
			
		||||
                "attach": f"https://frigate.homelab.net/api/events/{event_id}/thumbnail.jpg?format=android",
 | 
			
		||||
                "actions": [
 | 
			
		||||
                    {
 | 
			
		||||
                        'action': 'http',
 | 
			
		||||
                        'label': 'Disable (30m)',
 | 
			
		||||
                        'url': f'https://frigate.homelab.net/webcontrol/camera/{camera}/detect',
 | 
			
		||||
                        'method': 'POST',
 | 
			
		||||
                        'headers': {
 | 
			
		||||
                            'Content-Type': 'application/json'
 | 
			
		||||
                        "action": "http",
 | 
			
		||||
                        "label": "Disable (30m)",
 | 
			
		||||
                        "url": f"https://frigate.homelab.net/webcontrol/camera/{camera}/detection",
 | 
			
		||||
                        "method": "POST",
 | 
			
		||||
                        "headers": {
 | 
			
		||||
                            "Content-Type": "application/json"
 | 
			
		||||
                        },
 | 
			
		||||
                        'body': json.dumps({
 | 
			
		||||
                            'value': False,
 | 
			
		||||
                            'duration': 30
 | 
			
		||||
                        "body": json.dumps({
 | 
			
		||||
                            "detection": False,
 | 
			
		||||
                            "duration": 30
 | 
			
		||||
                        }),
 | 
			
		||||
                        'clear': True
 | 
			
		||||
                        "clear": True
 | 
			
		||||
                    }
 | 
			
		||||
                ]
 | 
			
		||||
            })
 | 
			
		||||
            ntfy_api_response.raise_for_status()
 | 
			
		||||
 | 
			
		||||
            ntfy_api_response = requests.post('https://ntfy.homelab.net', json={
 | 
			
		||||
                'topic': 'frigate_notifications_dad',
 | 
			
		||||
                'title': 'Frigate',
 | 
			
		||||
                'message': f'{object_label.capitalize()} at {camera_location} ({score:.0%})',
 | 
			
		||||
                'priority': priority,
 | 
			
		||||
                'click': f'https://frigate.homelab.net/cameras/{camera}',
 | 
			
		||||
                'icon': 'https://cdn.jsdelivr.net/gh/walkxcode/dashboard-icons/png/frigate.png',
 | 
			
		||||
                'attach': f'https://frigate.homelab.net/api/events/{event_id}/thumbnail.jpg?format=android',
 | 
			
		||||
                'actions': [
 | 
			
		||||
            ntfy_api_response = requests.post("https://ntfy.homelab.net", json={
 | 
			
		||||
                "topic": "frigate_notifications_dad",
 | 
			
		||||
                "title": "Frigate",
 | 
			
		||||
                "message": f"{object_label.capitalize()} at {camera_location} ({score:.0%})",
 | 
			
		||||
                "priority": priority,
 | 
			
		||||
                "click": f"https://frigate.homelab.net/cameras/{camera}",
 | 
			
		||||
                "icon": "https://cdn.jsdelivr.net/gh/walkxcode/dashboard-icons/png/frigate.png",
 | 
			
		||||
                "attach": f"https://frigate.homelab.net/api/events/{event_id}/thumbnail.jpg?format=android",
 | 
			
		||||
                "actions": [
 | 
			
		||||
                    {
 | 
			
		||||
                        'action': 'http',
 | 
			
		||||
                        'label': 'DBL (30m)',
 | 
			
		||||
                        'url': f'https://frigate.homelab.net/webcontrol/camera/{camera}/detect',
 | 
			
		||||
                        'method': 'POST',
 | 
			
		||||
                        'headers': {
 | 
			
		||||
                            'Content-Type': 'application/json'
 | 
			
		||||
                        "action": "http",
 | 
			
		||||
                        "label": "DBL (30m)",
 | 
			
		||||
                        "url": f"https://frigate.homelab.net/webcontrol/camera/{camera}/detection",
 | 
			
		||||
                        "method": "POST",
 | 
			
		||||
                        "headers": {
 | 
			
		||||
                            "Content-Type": "application/json"
 | 
			
		||||
                        },
 | 
			
		||||
                        'body': json.dumps({
 | 
			
		||||
                            'value': False,
 | 
			
		||||
                            'duration': 30
 | 
			
		||||
                        "body": json.dumps({
 | 
			
		||||
                            "detection": False,
 | 
			
		||||
                            "duration": 30
 | 
			
		||||
                        })
 | 
			
		||||
                    }
 | 
			
		||||
                ]
 | 
			
		||||
| 
						 | 
				
			
			@ -83,36 +83,36 @@ class FrigateEventNotifier:
 | 
			
		|||
            ntfy_api_response.raise_for_status()
 | 
			
		||||
 | 
			
		||||
    def start(self):
 | 
			
		||||
        self.mqtt_client.connect(host='mqtt', port=1883)
 | 
			
		||||
        self.mqtt_client.connect(host="mqtt", port=1883)
 | 
			
		||||
        self.mqtt_client.loop_forever()
 | 
			
		||||
 | 
			
		||||
    def _on_connect(self, client, userdata, flags, rc):
 | 
			
		||||
        print(f'Connected with return code {rc}')
 | 
			
		||||
        client.subscribe('frigate/events')
 | 
			
		||||
        print(f"Connected with return code {rc}")
 | 
			
		||||
        client.subscribe("frigate/events")
 | 
			
		||||
 | 
			
		||||
    def _on_message(self, client, userdata, message):
 | 
			
		||||
        payload = json.loads(message.payload.decode())
 | 
			
		||||
        camera = payload['after']['camera']
 | 
			
		||||
        object_label = payload['after']['label']
 | 
			
		||||
        camera = payload["after"]["camera"]
 | 
			
		||||
        object_label = payload["after"]["label"]
 | 
			
		||||
 | 
			
		||||
        if not self.camera_zones[camera]:
 | 
			
		||||
            # No required zones, send notification on receipt of new event
 | 
			
		||||
            if payload['type'] == 'new':
 | 
			
		||||
                event_id = payload['after']['id']
 | 
			
		||||
                score = payload['after']['top_score']
 | 
			
		||||
            if payload["type"] == "new":
 | 
			
		||||
                event_id = payload["after"]["id"]
 | 
			
		||||
                score = payload["after"]["top_score"]
 | 
			
		||||
 | 
			
		||||
                self.send_notification(event_id, camera, object_label, score)
 | 
			
		||||
        else:
 | 
			
		||||
            new_zones = set(payload['after']['entered_zones']) - set(payload['before']['entered_zones'])
 | 
			
		||||
            new_zones = set(payload["after"]["entered_zones"]) - set(payload["before"]["entered_zones"])
 | 
			
		||||
            for zone in new_zones:
 | 
			
		||||
                if zone in self.camera_zones[camera] and (not self.camera_zones[camera][zone] or object_label in self.camera_zones[camera][zone]):
 | 
			
		||||
                    event_id = payload['after']['id']
 | 
			
		||||
                    score = payload['after']['top_score']
 | 
			
		||||
                    event_id = payload["after"]["id"]
 | 
			
		||||
                    score = payload["after"]["top_score"]
 | 
			
		||||
 | 
			
		||||
                    self.send_notification(event_id, camera, object_label, score)
 | 
			
		||||
                    break
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if __name__ == '__main__':
 | 
			
		||||
    frigate_event_notifier = FrigateEventNotifier(os.environ.get('MQTT_USERNAME'), os.environ.get('MQTT_PASSWORD'))
 | 
			
		||||
if __name__ == "__main__":
 | 
			
		||||
    frigate_event_notifier = FrigateEventNotifier(os.environ.get("MQTT_USERNAME"), os.environ.get("MQTT_PASSWORD"))
 | 
			
		||||
    frigate_event_notifier.start()
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -2,7 +2,7 @@ version: 1
 | 
			
		|||
disable_existing_loggers: false
 | 
			
		||||
formatters:
 | 
			
		||||
  default:
 | 
			
		||||
    format: '[{asctime}.{msecs:03.0f}][{levelname}] {name}:{lineno}:{funcName} - {message}'
 | 
			
		||||
    format: '[{asctime}.{msecs:03.0f}][{levelname}] {name}:{lineno} - {message}'
 | 
			
		||||
    style: '{'
 | 
			
		||||
    datefmt: '%Y-%m-%d %H:%M:%S'
 | 
			
		||||
handlers:
 | 
			
		||||
| 
						 | 
				
			
			@ -17,9 +17,9 @@ def entrypoint() -> None:
 | 
			
		|||
    try:
 | 
			
		||||
        asyncio.run(main())
 | 
			
		||||
    except KeyboardInterrupt:
 | 
			
		||||
        logger.info('Received Ctrl+C, exiting...')
 | 
			
		||||
        logger.info("Received Ctrl+C, exiting...")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if __name__ == '__main__':
 | 
			
		||||
if __name__ == "__main__":
 | 
			
		||||
    entrypoint()
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -2,8 +2,7 @@ import re
 | 
			
		|||
import json
 | 
			
		||||
import logging
 | 
			
		||||
from contextlib import AsyncExitStack
 | 
			
		||||
from types import TracebackType
 | 
			
		||||
from typing import Any, ClassVar, Pattern, Self
 | 
			
		||||
from typing import ClassVar, Pattern
 | 
			
		||||
 | 
			
		||||
import aiohttp
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -11,58 +10,47 @@ logger = logging.getLogger(__name__)
 | 
			
		|||
 | 
			
		||||
 | 
			
		||||
class FrigateConfig:
 | 
			
		||||
    _URL_ADDRESS_REGEX: ClassVar[Pattern[str]] = re.compile('(^|(?<=://)|(?<=@))[a-z0-9.\\-]+(:[0-9]+)?($|(?=/))')
 | 
			
		||||
    _WYZE_CAMERAS: ClassVar[dict[str, str]] = {'back_yard_cam': '192.168.0.202:554'}
 | 
			
		||||
    _URL_ADDRESS_REGEX: ClassVar[Pattern[str]] = re.compile("(^|(?<=://)|(?<=@))[a-z0-9.\\-]+(:[0-9]+)?($|(?=/))")
 | 
			
		||||
    _WYZE_CAMERAS: ClassVar[dict[str, str]] = {"back_yard_cam": "192.168.0.202:554"}
 | 
			
		||||
 | 
			
		||||
    def __init__(self, frigate_base_url: str = 'http://frigate:5000') -> None:
 | 
			
		||||
        self._frigate_config_url = f'{frigate_base_url}/api/config'
 | 
			
		||||
    def __init__(self, frigate_base_url: str = "http://frigate:5000") -> None:
 | 
			
		||||
        self._frigate_config_url = f"{frigate_base_url}/api/config"
 | 
			
		||||
        self._config = {}
 | 
			
		||||
        self._aiohttp_session: aiohttp.ClientSession
 | 
			
		||||
        self._async_exit_stack: AsyncExitStack
 | 
			
		||||
 | 
			
		||||
    async def __aenter__(self) -> Self:
 | 
			
		||||
        async with AsyncExitStack() as async_exit_stack:
 | 
			
		||||
            self._aiohttp_session = await async_exit_stack.enter_async_context(aiohttp.ClientSession(raise_for_status=True))
 | 
			
		||||
            self._async_exit_stack = async_exit_stack.pop_all()
 | 
			
		||||
 | 
			
		||||
            await self.refresh()
 | 
			
		||||
        return self
 | 
			
		||||
 | 
			
		||||
    async def __aexit__(self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None) -> None:
 | 
			
		||||
        await self._async_exit_stack.aclose()
 | 
			
		||||
 | 
			
		||||
    async def refresh(self) -> None:
 | 
			
		||||
        logger.debug('Fetching Frigate config...')
 | 
			
		||||
        logger.debug("Fetching Frigate configuration")
 | 
			
		||||
        try:
 | 
			
		||||
            async with self._aiohttp_session.get(self._frigate_config_url) as response:
 | 
			
		||||
            async with AsyncExitStack() as exit_stack:
 | 
			
		||||
                aiohttp_session = await exit_stack.enter_async_context(aiohttp.ClientSession(raise_for_status=True))
 | 
			
		||||
                response = await exit_stack.enter_async_context(aiohttp_session.get(self._frigate_config_url))
 | 
			
		||||
                self._config = json.loads(await response.read())
 | 
			
		||||
            logger.debug('Finished fetching Frigate config')
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            if self._config:
 | 
			
		||||
                logger.warning('Failed to fetch Frigate config, falling back to previous value', exc_info=e)
 | 
			
		||||
                logger.error("Failed to fetch Frigate config, falling back to previous value", exc_info=e)
 | 
			
		||||
            else:
 | 
			
		||||
                raise
 | 
			
		||||
        logger.debug("Fetched Frigate configuration")
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def active_cameras(self) -> list[str]:
 | 
			
		||||
        if 'cameras' not in self._config:
 | 
			
		||||
            raise ValueError('Configuration not yet fetched from Frigate')
 | 
			
		||||
        if "cameras" not in self._config:
 | 
			
		||||
            raise ValueError("Configuration not yet fetched from Frigate")
 | 
			
		||||
 | 
			
		||||
        return [camera for camera in self._config['cameras'] if self._config['cameras'][camera].get('enabled', True)]
 | 
			
		||||
        return [camera for camera in self._config["cameras"] if self._config["cameras"][camera].get("enabled", True)]
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def active_camera_addresses(self) -> dict[str, str]:
 | 
			
		||||
        active_cameras = self.active_cameras
 | 
			
		||||
        return {camera: self._get_address_from_url(self._config['cameras'][camera]['ffmpeg']['inputs'][0]['path']) for camera in active_cameras}
 | 
			
		||||
        return {camera: self._get_address_from_url(self._config["cameras"][camera]["ffmpeg"]["inputs"][0]["path"]) for camera in active_cameras}
 | 
			
		||||
 | 
			
		||||
    @classmethod
 | 
			
		||||
    def _get_address_from_url(cls, url: str) -> str:
 | 
			
		||||
        match = cls._URL_ADDRESS_REGEX.search(url.lower())
 | 
			
		||||
        if match is None:
 | 
			
		||||
            raise ValueError(f'Failed to retrieve address from {url=}')
 | 
			
		||||
            raise ValueError(f"Failed to retrieve address from {url=}")
 | 
			
		||||
 | 
			
		||||
        # Handle case of wyze-bridge and hardcode cameras
 | 
			
		||||
        if match.group().startswith('wyze-bridge'):
 | 
			
		||||
            wyze_camera = url.lower().rsplit('/', 1)[-1].replace('-', '_')
 | 
			
		||||
        if match.group().startswith("wyze-bridge"):
 | 
			
		||||
            wyze_camera = url.lower().rsplit("/", 1)[-1].replace("-", "_")
 | 
			
		||||
            return cls._WYZE_CAMERAS[wyze_camera]
 | 
			
		||||
        return match.group()
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -17,11 +17,11 @@ class CameraMonitor:
 | 
			
		|||
        self._wait_time = wait_time
 | 
			
		||||
        self._consecutive_down_threshold = consecutive_down_threshold
 | 
			
		||||
        self._camera_downtime = Counter()
 | 
			
		||||
        self._frigate_config: FrigateConfig
 | 
			
		||||
        self._ntfy_notifier: NtfyNotifier
 | 
			
		||||
        self._frigate_config = FrigateConfig()
 | 
			
		||||
        self._ntfy_notifier = NtfyNotifier
 | 
			
		||||
 | 
			
		||||
    async def _on_camera_up(self, camera: str) -> None:
 | 
			
		||||
        logger.info(f'Camera {camera} is back online')
 | 
			
		||||
        logger.info(f"Camera {camera} is back online")
 | 
			
		||||
        await self._ntfy_notifier.send_notification(camera, True)
 | 
			
		||||
 | 
			
		||||
    async def _on_camera_down(self, camera: str) -> None:
 | 
			
		||||
| 
						 | 
				
			
			@ -29,7 +29,7 @@ class CameraMonitor:
 | 
			
		|||
        if camera not in self._frigate_config.active_cameras:
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        logger.info(f'Camera {camera} is down')
 | 
			
		||||
        logger.info(f"Camera {camera} is down")
 | 
			
		||||
        await self._ntfy_notifier.send_notification(camera, False)
 | 
			
		||||
 | 
			
		||||
    async def _on_camera_ping(self, camera: str, success: bool) -> None:
 | 
			
		||||
| 
						 | 
				
			
			@ -43,16 +43,12 @@ class CameraMonitor:
 | 
			
		|||
            self._camera_downtime[camera] += 1
 | 
			
		||||
 | 
			
		||||
    async def run(self) -> None:
 | 
			
		||||
        async with AsyncExitStack() as async_exit_stack:
 | 
			
		||||
            self._frigate_config = await async_exit_stack.enter_async_context(FrigateConfig())
 | 
			
		||||
            self._ntfy_notifier = await async_exit_stack.enter_async_context(NtfyNotifier())
 | 
			
		||||
        while True:
 | 
			
		||||
            camera_ips = {camera: address.split(":", 1)[0] for camera, address in frigate_config.active_camera_addresses.items()}
 | 
			
		||||
            ping_results = await ip_ping_all(*camera_ips.values())
 | 
			
		||||
            for i, camera in enumerate(camera_ips):
 | 
			
		||||
                await self._on_camera_ping(camera, ping_results[i])
 | 
			
		||||
 | 
			
		||||
            while True:
 | 
			
		||||
                camera_ips = {camera: address.split(':', 1)[0] for camera, address in self._frigate_config.active_camera_addresses.items()}
 | 
			
		||||
                ping_results = await ip_ping_all(*camera_ips.values())
 | 
			
		||||
                for i, camera in enumerate(camera_ips):
 | 
			
		||||
                    await self._on_camera_ping(camera, ping_results[i])
 | 
			
		||||
 | 
			
		||||
                logger.debug(f'Sleeping for {self._wait_time} seconds...')
 | 
			
		||||
                await asyncio.sleep(self._wait_time)
 | 
			
		||||
            logger.debug(f"Sleeping for {self._wait_time} seconds...")
 | 
			
		||||
            await asyncio.sleep(self._wait_time)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,7 +1,4 @@
 | 
			
		|||
import logging
 | 
			
		||||
from contextlib import AsyncExitStack
 | 
			
		||||
from types import TracebackType
 | 
			
		||||
from typing import Self
 | 
			
		||||
 | 
			
		||||
import aiohttp
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -9,29 +6,19 @@ logger = logging.getLogger(__name__)
 | 
			
		|||
 | 
			
		||||
 | 
			
		||||
class NtfyNotifier:
 | 
			
		||||
    def __init__(self, ntfy_url: str = 'https://ntfy.homelab.net') -> None:
 | 
			
		||||
    def __init__(self, ntfy_url: str = "https://ntfy.homelab.net") -> None:
 | 
			
		||||
        self._ntfy_url = ntfy_url
 | 
			
		||||
        self._aiohttp_session: aiohttp.ClientSession
 | 
			
		||||
        self._async_exit_stack: AsyncExitStack
 | 
			
		||||
 | 
			
		||||
    async def __aenter__(self) -> Self:
 | 
			
		||||
        async with AsyncExitStack() as async_exit_stack:
 | 
			
		||||
            self._aiohttp_session = await async_exit_stack.enter_async_context(aiohttp.ClientSession(raise_for_status=True))
 | 
			
		||||
            self._async_exit_stack = async_exit_stack.pop_all()
 | 
			
		||||
        return self
 | 
			
		||||
 | 
			
		||||
    async def __aexit__(self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None) -> None:
 | 
			
		||||
        await self._async_exit_stack.aclose()
 | 
			
		||||
 | 
			
		||||
    async def send_notification(self, camera: str, status: bool) -> None:
 | 
			
		||||
        logger.debug(f'Sending notification for {camera=}...')
 | 
			
		||||
        message = f'{camera} is back online' if status else f'{camera} is offline'
 | 
			
		||||
        await self._aiohttp_session.post(self._ntfy_url, ssl=False, json={
 | 
			
		||||
            'topic': 'frigate_camera_uptime',
 | 
			
		||||
            'title': 'Frigate',
 | 
			
		||||
            'message': message,
 | 
			
		||||
            'priority': 3,
 | 
			
		||||
            'click': f'https://frigate.homelab.net/cameras/{camera}',
 | 
			
		||||
            'icon': 'https://cdn.jsdelivr.net/gh/walkxcode/dashboard-icons/png/frigate.png',
 | 
			
		||||
        })
 | 
			
		||||
        logger.debug(f'Sent notification for {camera=}')
 | 
			
		||||
        async with aiohttp.ClientSession(raise_for_status=True) as aiohttp_session:
 | 
			
		||||
            logger.debug(f"Sending notification for {camera=}...")
 | 
			
		||||
            message = f"{camera} is back online" if status else f"{camera} is offline"
 | 
			
		||||
            await aiohttp_session.post(self._ntfy_url, ssl=False, json={
 | 
			
		||||
                "topic": "frigate_camera_uptime",
 | 
			
		||||
                "title": "Frigate",
 | 
			
		||||
                "message": message,
 | 
			
		||||
                "priority": 3,
 | 
			
		||||
                "click": f"https://frigate.homelab.net/cameras/{camera}",
 | 
			
		||||
                "icon": "https://cdn.jsdelivr.net/gh/walkxcode/dashboard-icons/png/frigate.png",
 | 
			
		||||
            })
 | 
			
		||||
            logger.debug(f"Sent notification for {camera=}")
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -5,10 +5,10 @@ logger = logging.getLogger(__name__)
 | 
			
		|||
 | 
			
		||||
 | 
			
		||||
async def ip_ping(host: str) -> bool:
 | 
			
		||||
    logger.debug(f'Pinging {host}...')
 | 
			
		||||
    process = await asyncio.create_subprocess_exec('ping', '-w', '3', '-c', '1', host, stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL)
 | 
			
		||||
    logger.debug(f"Pinging {host}")
 | 
			
		||||
    process = await asyncio.create_subprocess_exec("ping", "-w", "3", "-c", "1", host, stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL)
 | 
			
		||||
    return_code = await process.wait()
 | 
			
		||||
    logger.debug(f'Finished pinging {host}')
 | 
			
		||||
    logger.debug(f"Finished pinging {host}")
 | 
			
		||||
    return return_code == 0
 | 
			
		||||
 | 
			
		||||
async def ip_ping_all(*hosts: str) -> list[bool]:
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,10 +1,11 @@
 | 
			
		|||
import logging
 | 
			
		||||
import logging.config
 | 
			
		||||
 | 
			
		||||
import yaml
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def configure_logging() -> None:
 | 
			
		||||
    with open('logging.yaml', 'r') as logging_yaml:
 | 
			
		||||
    with open("configs/logging.yaml", "r") as logging_yaml:
 | 
			
		||||
        logging_config = yaml.safe_load(logging_yaml)
 | 
			
		||||
 | 
			
		||||
    logging.config.dictConfig(logging_config)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,7 +1,8 @@
 | 
			
		|||
FROM python:3.11
 | 
			
		||||
WORKDIR /code
 | 
			
		||||
 | 
			
		||||
ENTRYPOINT ["python3", "server.py"]
 | 
			
		||||
ENTRYPOINT ["uvicorn", "webcontrol:api", "--host", "0.0.0.0", "--port", "80"]
 | 
			
		||||
CMD ["--log-level", "warning"]
 | 
			
		||||
 | 
			
		||||
RUN pip3 install --upgrade pip
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,3 +1,6 @@
 | 
			
		|||
flask==2.3.2
 | 
			
		||||
PyYAML==6.0.1
 | 
			
		||||
pydantic==2.6.4
 | 
			
		||||
fastapi==0.110.0
 | 
			
		||||
uvicorn[standard]==0.29.0
 | 
			
		||||
aiohttp==3.8.6
 | 
			
		||||
paho-mqtt==1.6.1
 | 
			
		||||
requests==2.31.0
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -0,0 +1,23 @@
 | 
			
		|||
version: 1
 | 
			
		||||
disable_existing_loggers: false
 | 
			
		||||
formatters:
 | 
			
		||||
  default:
 | 
			
		||||
    format: '[{asctime}.{msecs:03.0f}][{levelname}] {name}:{lineno} - {message}'
 | 
			
		||||
    style: '{'
 | 
			
		||||
    datefmt: '%Y-%m-%d %H:%M:%S'
 | 
			
		||||
handlers:
 | 
			
		||||
  stdout:
 | 
			
		||||
    class: logging.StreamHandler
 | 
			
		||||
    level: INFO
 | 
			
		||||
    formatter: default
 | 
			
		||||
    stream: ext://sys.stdout
 | 
			
		||||
  stderr:
 | 
			
		||||
    class: logging.StreamHandler
 | 
			
		||||
    level: ERROR
 | 
			
		||||
    formatter: default
 | 
			
		||||
    stream: ext://sys.stderr
 | 
			
		||||
root:
 | 
			
		||||
  level: DEBUG
 | 
			
		||||
  handlers:
 | 
			
		||||
  - stdout
 | 
			
		||||
  - stderr
 | 
			
		||||
| 
						 | 
				
			
			@ -1,92 +0,0 @@
 | 
			
		|||
import os
 | 
			
		||||
import sys
 | 
			
		||||
import json
 | 
			
		||||
import time
 | 
			
		||||
import traceback
 | 
			
		||||
from threading import Thread
 | 
			
		||||
import datetime as dt
 | 
			
		||||
 | 
			
		||||
import requests
 | 
			
		||||
from flask import Blueprint, request, jsonify
 | 
			
		||||
import paho.mqtt.publish as mqtt_publish
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
blueprint = Blueprint('detection', __name__)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_sunset_time() -> dt.datetime:
 | 
			
		||||
    sunset_date = dt.datetime.now().date()
 | 
			
		||||
    try:
 | 
			
		||||
        IPGEOLOCATION_API_KEY = os.environ['IPGEOLOCATION_API_KEY']
 | 
			
		||||
        ipgeolocation_api_response = requests.get(f'https://api.ipgeolocation.io/astronomy?apiKey={IPGEOLOCATION_API_KEY}&location=Winter+Haven,+FL')
 | 
			
		||||
        ipgeolocation_api_response.raise_for_status()
 | 
			
		||||
 | 
			
		||||
        astronomical_json = json.loads(ipgeolocation_api_response.content)
 | 
			
		||||
        sunset_time = dt.datetime.strptime(astronomical_json['sunset'], '%H:%M').time()
 | 
			
		||||
    except Exception:
 | 
			
		||||
        traceback.print_exc()
 | 
			
		||||
        sunset_time = dt.time(20, 00, 00)
 | 
			
		||||
    finally:
 | 
			
		||||
        if sunset_time < dt.datetime.now().time():
 | 
			
		||||
            # Sunset has already passed today
 | 
			
		||||
            sunset_date += dt.timedelta(days=1)
 | 
			
		||||
        return dt.datetime.combine(sunset_date, sunset_time)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def reset_all_camera_detection_at_sunset() -> None:
 | 
			
		||||
    while True:
 | 
			
		||||
        try:
 | 
			
		||||
            # Get names of cameras with detection enabled in configuration
 | 
			
		||||
            frigate_api_response = requests.get('http://frigate:5000/api/config')
 | 
			
		||||
            frigate_api_response.raise_for_status()
 | 
			
		||||
            frigate_camera_config = json.loads(frigate_api_response.content)['cameras']
 | 
			
		||||
 | 
			
		||||
            sunset_time = get_sunset_time() + dt.timedelta(minutes=30)
 | 
			
		||||
            print(f'Waiting until {sunset_time} to reset detection for all cameras...', file=sys.stderr)
 | 
			
		||||
            seconds_until_sunset = (sunset_time - dt.datetime.now()).total_seconds()
 | 
			
		||||
            time.sleep(seconds_until_sunset)
 | 
			
		||||
 | 
			
		||||
            for camera_name in frigate_camera_config:
 | 
			
		||||
                camera_enabled = frigate_camera_config[camera_name].get('enabled', True)
 | 
			
		||||
                detection_enabled = frigate_camera_config[camera_name].get('detect', {}).get('enabled', True)
 | 
			
		||||
 | 
			
		||||
                if camera_enabled and detection_enabled:
 | 
			
		||||
                    set_camera_detection(camera_name, True)
 | 
			
		||||
        except Exception:
 | 
			
		||||
            traceback.print_exc()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def set_camera_detection(camera: str, value: bool, delay: int = 0) -> None:
 | 
			
		||||
    time.sleep(delay)
 | 
			
		||||
 | 
			
		||||
    mqtt_auth = {'username': os.environ.get('MQTT_USERNAME'), 'password': os.environ.get('MQTT_PASSWORD')}
 | 
			
		||||
    if not all(mqtt_auth.values()):
 | 
			
		||||
        mqtt_auth = None
 | 
			
		||||
 | 
			
		||||
    mqtt_publish.single(f'frigate/{camera}/detect/set', 'ON' if value else 'OFF', hostname='mqtt', port=1883, auth=mqtt_auth)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@blueprint.route('/camera/<string:camera>/detect', methods=['POST'])
 | 
			
		||||
def camera_detect_POST(camera):
 | 
			
		||||
    if not request.json:
 | 
			
		||||
        return jsonify({
 | 
			
		||||
            'status': 'failure',
 | 
			
		||||
            'description': 'Request body needs to be in JSON format'
 | 
			
		||||
        }), 400
 | 
			
		||||
 | 
			
		||||
    value = request.json.get('value', True)
 | 
			
		||||
    set_camera_detection(camera, value)
 | 
			
		||||
 | 
			
		||||
    duration = request.json.get('duration', 0)
 | 
			
		||||
    if duration > 0:
 | 
			
		||||
        # Start sleeping thread to revert value after duration
 | 
			
		||||
        thread = Thread(target=set_camera_detection, args=(camera, not value, duration * 60))
 | 
			
		||||
        thread.start()
 | 
			
		||||
 | 
			
		||||
    return jsonify({
 | 
			
		||||
        'status': 'success'
 | 
			
		||||
    }), 200
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
sunset_reset_thread = Thread(target=reset_all_camera_detection_at_sunset, args=())
 | 
			
		||||
sunset_reset_thread.start()
 | 
			
		||||
| 
						 | 
				
			
			@ -1,20 +0,0 @@
 | 
			
		|||
from flask import Flask, jsonify
 | 
			
		||||
 | 
			
		||||
import detection
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
app = Flask(__name__)
 | 
			
		||||
app.register_blueprint(detection.blueprint)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@app.route('/', methods=['GET'])
 | 
			
		||||
def home_GET():
 | 
			
		||||
    return jsonify({
 | 
			
		||||
        'status': 'success',
 | 
			
		||||
        'name': 'webcontrol',
 | 
			
		||||
        'description': 'This is a custom webcontrol API for Frigate that allows minimal control over the NVR system through an HTTP API'
 | 
			
		||||
    }), 200
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if __name__ == '__main__':
 | 
			
		||||
    app.run(host='0.0.0.0', port=80, debug=False)
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,4 @@
 | 
			
		|||
from webcontrol.api import api
 | 
			
		||||
from webcontrol.utils.logging import configure_logging
 | 
			
		||||
 | 
			
		||||
configure_logging()
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,18 @@
 | 
			
		|||
import logging
 | 
			
		||||
 | 
			
		||||
from fastapi import FastAPI
 | 
			
		||||
 | 
			
		||||
from webcontrol.api.detection import api as detection_api
 | 
			
		||||
 | 
			
		||||
logger = logging.getLogger(__name__)
 | 
			
		||||
 | 
			
		||||
api = FastAPI()
 | 
			
		||||
api.include_router(detection_api)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@api.get("/")
 | 
			
		||||
async def get_root() -> dict[str, str]:
 | 
			
		||||
    return {
 | 
			
		||||
        "message": "This is a Frigate webcontrol API"
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,36 @@
 | 
			
		|||
import datetime as dt
 | 
			
		||||
import logging
 | 
			
		||||
 | 
			
		||||
from fastapi import APIRouter
 | 
			
		||||
from pydantic import BaseModel
 | 
			
		||||
 | 
			
		||||
from webcontrol.detection import set_camera_detection, reset_all_cameras_detection_after_sunset
 | 
			
		||||
from webcontrol.utils.asyncio import create_task, schedule_coroutine
 | 
			
		||||
 | 
			
		||||
logger = logging.getLogger(__name__)
 | 
			
		||||
 | 
			
		||||
api = APIRouter()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@api.on_event("startup")
 | 
			
		||||
def schedule_reset_all_cameras_detection_after_sunset() -> None:
 | 
			
		||||
    create_task(reset_all_cameras_detection_after_sunset())
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class TemporaryDetectionSettings(BaseModel):
 | 
			
		||||
    detection: bool = True
 | 
			
		||||
    duration: int = 0
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@api.post("/camera/{camera}/detection")
 | 
			
		||||
async def post_camera_detection(camera: str, temporary_detection_settings: TemporaryDetectionSettings) -> dict[str, str]:
 | 
			
		||||
    await set_camera_detection(camera, temporary_detection_settings.detection)
 | 
			
		||||
    if temporary_detection_settings.duration > 0:
 | 
			
		||||
        schedule_coroutine(
 | 
			
		||||
            set_camera_detection(camera, not temporary_detection_settings.detection),
 | 
			
		||||
            dt.timedelta(minutes=temporary_detection_settings.duration)
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    return {
 | 
			
		||||
        "status": "success"
 | 
			
		||||
    }
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,33 @@
 | 
			
		|||
import asyncio
 | 
			
		||||
import datetime as dt
 | 
			
		||||
import logging
 | 
			
		||||
import os
 | 
			
		||||
 | 
			
		||||
import paho.mqtt.publish as mqtt_publish
 | 
			
		||||
 | 
			
		||||
from webcontrol.frigate_config import FrigateConfigFile
 | 
			
		||||
from webcontrol.nighttime import get_nighttimes
 | 
			
		||||
 | 
			
		||||
logger = logging.getLogger(__name__)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def set_camera_detection(camera: str, detection: bool) -> None:
 | 
			
		||||
    mqtt_auth = {"username": os.environ.get("MQTT_USERNAME"), "password": os.environ.get("MQTT_PASSWORD")}
 | 
			
		||||
    if not all(mqtt_auth.values()):
 | 
			
		||||
        mqtt_auth = None
 | 
			
		||||
 | 
			
		||||
    logger.debug(f"Setting {camera} camera {detection=}")
 | 
			
		||||
    mqtt_publish.single(f"frigate/{camera}/detect/set", "ON" if detection else "OFF", hostname="mqtt", port=1883, auth=mqtt_auth)
 | 
			
		||||
    logger.info(f"Set {camera} camera {detection=}")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def reset_all_cameras_detection_after_sunset() -> None:
 | 
			
		||||
    frigate_config_file = FrigateConfigFile()
 | 
			
		||||
    async for nighttime in get_nighttimes("Winter Haven, FL"):
 | 
			
		||||
        logger.info(f"Waiting until {nighttime} to reset detection for all cameras")
 | 
			
		||||
        await asyncio.sleep((nighttime - dt.datetime.now()).total_seconds())
 | 
			
		||||
 | 
			
		||||
        await frigate_config_file.reload()
 | 
			
		||||
        active_and_detection_enabled_cameras = set(frigate_config_file.active_cameras).intersection(frigate_config_file.detection_enabled_cameras)
 | 
			
		||||
        for camera in active_and_detection_enabled_cameras:
 | 
			
		||||
            await set_camera_detection(camera, True)
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,46 @@
 | 
			
		|||
import logging
 | 
			
		||||
import os
 | 
			
		||||
 | 
			
		||||
import yaml
 | 
			
		||||
 | 
			
		||||
logger = logging.getLogger(__name__)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class FrigateConfigFile:
 | 
			
		||||
    def __init__(self, filepath: str = os.environ["FRIGATE_CONFIG_FILE"]) -> None:
 | 
			
		||||
        self._filepath = filepath
 | 
			
		||||
        self._config = {}
 | 
			
		||||
 | 
			
		||||
    async def reload(self) -> None:
 | 
			
		||||
        logger.debug("Loading Frigate configuration file")
 | 
			
		||||
        try:
 | 
			
		||||
            with open(self._filepath, "r") as frigate_config_yaml:
 | 
			
		||||
                self._config = yaml.safe_load(frigate_config_yaml)
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            if self._config:
 | 
			
		||||
                logger.error("Failed to load Frigate config file, falling back to previous value", exc_info=e)
 | 
			
		||||
            else:
 | 
			
		||||
                raise
 | 
			
		||||
        logger.debug("Loaded Frigate configuration file")
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def active_cameras(self) -> list[str]:
 | 
			
		||||
        if not self._config:
 | 
			
		||||
            raise ValueError("Configuration file not yet loaded from Frigate")
 | 
			
		||||
 | 
			
		||||
        return [
 | 
			
		||||
            camera
 | 
			
		||||
            for camera in self._config["cameras"]
 | 
			
		||||
            if self._config["cameras"][camera].get("enabled", True)
 | 
			
		||||
        ]
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def detection_enabled_cameras(self) -> list[str]:
 | 
			
		||||
        if not self._config:
 | 
			
		||||
            raise ValueError("Configuration file not yet loaded from Frigate")
 | 
			
		||||
 | 
			
		||||
        return [
 | 
			
		||||
            camera
 | 
			
		||||
            for camera in self._config["cameras"]
 | 
			
		||||
            if self._config["cameras"][camera].get("detect", {}).get("enabled", True)
 | 
			
		||||
        ]
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,39 @@
 | 
			
		|||
import datetime as dt
 | 
			
		||||
import json
 | 
			
		||||
import logging
 | 
			
		||||
import os
 | 
			
		||||
import urllib.parse
 | 
			
		||||
from collections.abc import AsyncGenerator
 | 
			
		||||
 | 
			
		||||
import aiohttp
 | 
			
		||||
 | 
			
		||||
logger = logging.getLogger(__name__)
 | 
			
		||||
 | 
			
		||||
SUNSET_FALLBACK = dt.time(19, 0, 0)
 | 
			
		||||
DARKNESS_DELAY = dt.timedelta(minutes=30)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def get_nighttimes(location: str) -> AsyncGenerator[dt.datetime, None]:
 | 
			
		||||
    async with aiohttp.ClientSession(raise_for_status=True) as aiohttp_session:
 | 
			
		||||
        date = dt.date.today()
 | 
			
		||||
        sunset_time = SUNSET_FALLBACK
 | 
			
		||||
 | 
			
		||||
        while True:
 | 
			
		||||
            try:
 | 
			
		||||
                ipgeolocation_api_url = "https://api.ipgeolocation.io/astronomy?apiKey={api_key}&location={location}&date={date}".format(
 | 
			
		||||
                    api_key=urllib.parse.quote(os.environ["IPGEOLOCATION_API_KEY"]),
 | 
			
		||||
                    location=urllib.parse.quote(location),
 | 
			
		||||
                    date=urllib.parse.quote(date.strftime("%Y-%m-%d"))
 | 
			
		||||
                )
 | 
			
		||||
                async with aiohttp_session.get(ipgeolocation_api_url) as response:
 | 
			
		||||
                    response_json = json.loads(await response.read())
 | 
			
		||||
                    sunset_time = dt.datetime.strptime(response_json["sunset"], "%H:%M").time()
 | 
			
		||||
            except Exception as e:
 | 
			
		||||
                logger.error(f"Failed to query sunset time, falling back to {sunset_time}", exc_info=e)
 | 
			
		||||
 | 
			
		||||
            nighttime = dt.datetime.combine(date, sunset_time) + DARKNESS_DELAY
 | 
			
		||||
            date += dt.timedelta(days=1)
 | 
			
		||||
            if nighttime < dt.datetime.now():
 | 
			
		||||
                continue
 | 
			
		||||
            yield nighttime
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,18 @@
 | 
			
		|||
import asyncio
 | 
			
		||||
import datetime as dt
 | 
			
		||||
from collections.abc import Awaitable
 | 
			
		||||
 | 
			
		||||
tasks = set()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def create_task(coroutine: Awaitable[None]) -> None:
 | 
			
		||||
    task = asyncio.create_task(coroutine)  # type: ignore
 | 
			
		||||
    task.add_done_callback(tasks.discard)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def schedule_coroutine(coroutine: Awaitable[None], delay: dt.timedelta) -> None:
 | 
			
		||||
    async def wrapper() -> None:
 | 
			
		||||
        await asyncio.sleep(delay.seconds)
 | 
			
		||||
        await coroutine
 | 
			
		||||
 | 
			
		||||
    create_task(wrapper())
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,12 @@
 | 
			
		|||
import logging
 | 
			
		||||
import logging.config
 | 
			
		||||
 | 
			
		||||
import yaml
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def configure_logging() -> None:
 | 
			
		||||
    with open("configs/logging.yaml", "r") as logging_yaml:
 | 
			
		||||
        logging_config = yaml.safe_load(logging_yaml)
 | 
			
		||||
 | 
			
		||||
    logging.config.dictConfig(logging_config)
 | 
			
		||||
 | 
			
		||||
		Loading…
	
		Reference in New Issue