import json import asyncio from typing import AsyncGenerator import aiohttp class NtfySubscriber: def __init__(self, topics: list[str]): self._topics = topics def _generate_ntfy_url(self): return 'https://ntfy.homelab.net/' + ','.join(self._topics) + '/json' async def subscribe(self) -> AsyncGenerator[tuple[str, str, str], None]: async with aiohttp.ClientSession(raise_for_status=True, timeout=aiohttp.ClientTimeout()) as session: async with session.get(self._generate_ntfy_url(), verify_ssl=False) as response: async for line in response.content: msg = json.loads(line.decode()) if msg['event'] == 'message': yield msg['topic'], msg['title'], msg['message']