From da4c1ba2c41cc3fae22deb67ee518bf422086740 Mon Sep 17 00:00:00 2001 From: Ashish D'Souza Date: Sun, 5 Nov 2023 15:42:03 -0600 Subject: [PATCH] Created MVP --- .gitignore | 3 + conf/docker-compose.yaml | 63 +++++++++++++ player/Dockerfile | 17 ++++ player/requirements.txt | 3 + player/src/audio_utils.py | 153 +++++++++++++++++++++++++++++++ player/src/google_home_player.py | 42 +++++++++ player/src/main.py | 30 ++++++ player/src/player.py | 119 ++++++++++++++++++++++++ 8 files changed, 430 insertions(+) create mode 100644 .gitignore create mode 100644 conf/docker-compose.yaml create mode 100644 player/Dockerfile create mode 100644 player/requirements.txt create mode 100644 player/src/audio_utils.py create mode 100644 player/src/google_home_player.py create mode 100644 player/src/main.py create mode 100644 player/src/player.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e83ba85 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +.idea/ +__pycache__/ +*.swp diff --git a/conf/docker-compose.yaml b/conf/docker-compose.yaml new file mode 100644 index 0000000..289bbf5 --- /dev/null +++ b/conf/docker-compose.yaml @@ -0,0 +1,63 @@ +services: + rabbitmq: + container_name: broadcast-rabbitmq + image: rabbitmq:3.12.6 + restart: unless-stopped + healthcheck: + test: rabbitmq-diagnostics -q ping + interval: 30s + retries: 3 + timeout: 10s + start_period: 30s + start_interval: 5s + volumes: + - type: bind + source: /etc/localtime + target: /etc/localtime + read_only: true + ports: + - 5672:5672 + bedroom-speaker-player: + container_name: broadcast-bedroom-speaker-player + image: broadcast-player:latest + restart: unless-stopped + depends_on: + rabbitmq: + condition: service_healthy + command: ['google_home', 'bedroom_speaker'] + environment: + RABBITMQ_HOST: localhost + RABBITMQ_PORT: 5672 + RABBITMQ_TOPICS: 'notification.*.ashish.*,call.*.ashish,alarm.*.ashish' + GOOGLE_HOME_NAME: Bedroom speaker + HTTP_IP: 192.168.0.10 + HTTP_PORT: 24984 + volumes: + - type: volume + source: bedroom-speaker-audio + target: /mnt/audio + - type: bind + source: /etc/localtime + target: /etc/localtime + read_only: true + network_mode: host + nginx: + container_name: broadcast-nginx + image: nginx:1.25.2 + restart: unless-stopped + volumes: + - type: volume + source: bedroom-speaker-audio + target: /usr/share/nginx/html + read_only: true + - type: bind + source: /etc/localtime + target: /etc/localtime + read_only: true + ports: + - 192.168.0.10:24984:80 + +volumes: + bedroom-speaker-audio: + name: broadcast-bedroom-speaker-audio + external: true diff --git a/player/Dockerfile b/player/Dockerfile new file mode 100644 index 0000000..8d327bf --- /dev/null +++ b/player/Dockerfile @@ -0,0 +1,17 @@ +FROM python:3.11 +LABEL author="Ashish D'Souza" + +WORKDIR /code + +ENTRYPOINT ["python", "main.py"] + +ENV AUDIO_ROOT_PATH=/mnt/audio + +RUN apt update --fix-missing +RUN apt install -y ffmpeg + +COPY requirements.txt . +RUN pip install -r requirements.txt + +COPY audio /audio +COPY src . diff --git a/player/requirements.txt b/player/requirements.txt new file mode 100644 index 0000000..2116ec6 --- /dev/null +++ b/player/requirements.txt @@ -0,0 +1,3 @@ +pychromecast==13.0.7 +gTTS==2.3.2 +pika==1.3.2 diff --git a/player/src/audio_utils.py b/player/src/audio_utils.py new file mode 100644 index 0000000..e82feba --- /dev/null +++ b/player/src/audio_utils.py @@ -0,0 +1,153 @@ +import os +import math +import shutil +import tempfile +import datetime +import subprocess +from enum import IntEnum +from typing import Self + +from gtts import gTTS + + +class LinkType(IntEnum): + SOFT = 1 + HARD = 2 + + +class Audio: + def __init__(self): + self._audio_dir = None + self._audio_filename = None + + @classmethod + def from_file(cls, audio_filepath: str) -> Self: + audio_filepath = os.path.abspath(audio_filepath) + if not os.path.isfile(audio_filepath): + raise FileNotFoundError(f'No audio file {audio_filepath}') + + audio = cls() + audio._audio_dir = tempfile.TemporaryDirectory() + audio._audio_filename = os.path.basename(audio_filepath) + + cls._copy_file(audio_filepath, audio._get_filepath(), link_type=None) + return audio + + @classmethod + def from_tts(cls, text: str) -> Self: + audio = cls() + audio._audio_dir = tempfile.TemporaryDirectory() + audio._audio_filename = 'tts.mp3' + + tts_audio = gTTS(text=text, lang='en') + tts_audio.save(audio._get_filepath()) + return audio + + @staticmethod + def _copy_file(src_filepath: str, dest_filepath: str, link_type: LinkType | None = None) -> None: + match link_type: + case LinkType.SOFT: # Symbolic link to file + os.symlink(src_filepath, dest_filepath) + case LinkType.HARD: # Hard link to file + os.link(src_filepath, dest_filepath) + case _: # Copy file + shutil.copyfile(src_filepath, dest_filepath) + + @staticmethod + def _move_file(src_filepath: str, dest_filepath: str, link_type: LinkType | None = None) -> None: + match link_type: + case LinkType.HARD: # Hard link to file + os.link(src_filepath, dest_filepath) + os.remove(src_filepath) + case _: # Move file + shutil.move(src_filepath, dest_filepath) + + def _get_filepath(self) -> str: + return os.path.join(self._audio_dir.name, self._audio_filename) + + def save(self, save_path: str) -> None: + if self._audio_dir is None or self._audio_filename is None: + raise ValueError('Cannot save empty Audio object') + + self._copy_file(self._get_filepath(), save_path, link_type=None) + + def get_duration(self) -> float: + ffprobe_process = subprocess.run([ + 'ffprobe', + '-i', self._get_filepath(), + '-show_entries', 'format=duration', + '-v', 'quiet', + '-of','csv=p=0' + ], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True) + ffprobe_process.check_returncode() + return float(ffprobe_process.stdout.strip()) + + def loop(self, count: int = 1, duration: float = None) -> Self: + output_filename = '.ffmpeg'.join(os.path.splitext(self._audio_filename)) + ffmpeg_args = ['ffmpeg'] + if duration is None: + ffmpeg_args.extend([ + '-stream_loop', str(count - 1), + '-i', self._get_filepath(), + '-c', 'copy' + ]) + else: + ffmpeg_args.extend([ + '-stream_loop', str(math.ceil(duration / self.get_duration()) - 1), + '-i', self._get_filepath(), + '-c', 'copy', + '-t', str(duration) + ]) + ffmpeg_args.append(os.path.join(self._audio_dir.name, output_filename)) + + ffmpeg_process = subprocess.run(ffmpeg_args, stderr=subprocess.DEVNULL, text=True) + ffmpeg_process.check_returncode() + + # Move output file back to original location + os.remove(self._get_filepath()) + self._move_file(os.path.join(self._audio_dir.name, output_filename), self._get_filepath(), link_type=LinkType.HARD) + return self + + def concat(self, *args: Self) -> Self: + concat_filepath = os.path.join(self._audio_dir.name, 'concat.txt') + with open(concat_filepath, 'w') as concat_file: + concat_file.writelines([f'file \'{audio._get_filepath()}\'\n' for audio in (self,) + args]) + + output_filename = '.ffmpeg'.join(os.path.splitext(self._audio_filename)) + ffmpeg_process = subprocess.run([ + 'ffmpeg', + '-f', 'concat', + '-safe', '0', + '-i', concat_filepath, + '-c', 'copy', + os.path.join(self._audio_dir.name, output_filename) + ], stderr=subprocess.DEVNULL, text=True) + os.remove(concat_filepath) + ffmpeg_process.check_returncode() + + # Move outupt file back to original location + os.remove(self._get_filepath()) + self._move_file(os.path.join(self._audio_dir.name, output_filename), self._get_filepath(), link_type=LinkType.HARD) + return self + + def trim(self, start: datetime.time = datetime.time(0, 0, 0, 0), end: datetime.time = None) -> Self: + output_filename = '.ffmpeg'.join(os.path.splitext(self._audio_filename)) + ffmpeg_args = [ + 'ffmpeg', + '-i', self._get_filepath(), + '-c', 'copy', + '-ss', start.strftime('%H:%M:%S.%f') + ] + if end is not None: + ffmpeg_args.extend([ + '-to', end.strftime('%H:%M:%S.%f'), + ]) + ffmpeg_args.append(os.path.join(self._audio_dir.name, output_filename)) + + ffmpeg_process = subprocess.run(ffmpeg_args, stderr=subprocess.DEVNULL, text=True) + ffmpeg_process.check_returncode() + + # Move outupt file back to original location + os.remove(self._get_filepath()) + self._move_file(os.path.join(self._audio_dir.name, output_filename), self._get_filepath(), link_type=LinkType.HARD) + return self diff --git a/player/src/google_home_player.py b/player/src/google_home_player.py new file mode 100644 index 0000000..959c974 --- /dev/null +++ b/player/src/google_home_player.py @@ -0,0 +1,42 @@ +import os +import shutil +import time +import datetime + +import pychromecast + +from player import Player +from audio_utils import Audio + + +class GoogleHomePlayer(Player): + def __init__(self, name: str) -> None: + super().__init__(name) + google_home_mini = pychromecast.get_listed_chromecasts(friendly_names=[os.environ['GOOGLE_HOME_NAME']])[0][0] + google_home_mini.wait() + + self._media_controller = google_home_mini.media_controller + + def _generate_audio_url(self, audio_filepath: str) -> str: + http_ip = os.environ['HTTP_IP'] + http_port = os.environ['HTTP_PORT'] + audio_url_path = os.path.abspath(audio_filepath)[len(os.environ['AUDIO_ROOT_PATH']):].lstrip('/') + timestamp = int(datetime.datetime.now().timestamp() * 1e6) + return f'http://{http_ip}:{http_port}/{audio_url_path}?t={timestamp}' + + def play(self, audio_filepath: str) -> None: + duration = Audio.from_file(audio_filepath).get_duration() + self._media_controller.play_media(self._generate_audio_url(audio_filepath), 'audio/mpeg') + self._media_controller.block_until_active() + time.sleep(duration) + #time.sleep(self._media_controller.status.duration) + + def pause(self) -> None: + self._media_controller.pause() + + def stop(self) -> None: + self._media_controller.stop() + + def play_message(self, message: str) -> None: + self.interrupt = True + super().play_message(message) diff --git a/player/src/main.py b/player/src/main.py new file mode 100644 index 0000000..954f070 --- /dev/null +++ b/player/src/main.py @@ -0,0 +1,30 @@ +import os +import sys + +from player import Player +#from local_player import LocalPlayer +from google_home_player import GoogleHomePlayer + + +def get_player(player_type: str, player_name: str) -> Player: + match player_type: + case 'google_home': + player = GoogleHomePlayer(player_name) + case 'local': + #player = LocalPlayer(player_name) + pass + case _: + raise ValueError(f'No player type "{player_type}"') + return player + + +if __name__ == '__main__': + if len(sys.argv) < 3: + raise ValueError('Missing arguments for player type/name') + + player = get_player(player_type=sys.argv[1], player_name=sys.argv[2]) + + rabbitmq_host = os.environ.get('RABBITMQ_HOST', 'localhost') + rabbitmq_port = int(os.environ.get('RABBITMQ_PORT', '5672')) + rabbitmq_topics = os.environ['RABBITMQ_TOPICS'].split(',') + player.subscribe(rabbitmq_host, rabbitmq_port, rabbitmq_topics) diff --git a/player/src/player.py b/player/src/player.py new file mode 100644 index 0000000..65c3aa9 --- /dev/null +++ b/player/src/player.py @@ -0,0 +1,119 @@ +import os +import json +import datetime +import functools +import threading +from abc import ABC, abstractmethod + +import pika + +from audio_utils import Audio + + +class Player(ABC): + def __init__(self, name: str) -> None: + self.name = name + + @abstractmethod + def play(self, audio_filepath: str) -> None: + pass + + @abstractmethod + def pause(self) -> None: + pass + + @abstractmethod + def stop(self) -> None: + pass + + def subscribe(self, rabbitmq_host: str, rabbitmq_port: str, rabbitmq_topics: list[str]) -> None: + def handle_message(connection: pika.adapters.blocking_connection.BlockingConnection, channel: pika.channel.Channel, method: pika.spec.Basic.Deliver, body: bytes): + topic = method.routing_key + message_type = topic.split('.', 1)[0] + + match message_type: + case 'notification': + _, system, user, priority = topic.split('.') + notification_text = body.decode() + + notification_audio = Audio.from_tts(f'Notification from {system}: {notification_text}') + + audio_save_path = os.path.join(os.environ['AUDIO_ROOT_PATH'], 'notification.mp3') + notification_audio.save(audio_save_path) + self.play(audio_save_path) + case 'call': + _, caller, user = topic.split('.') + payload = json.loads(body.decode()) + + call_audio = Audio.from_tts(f'Call from: {caller}') + ringtone_audio = Audio.from_file(os.path.join('/audio', payload['ringtone']['filename'])) + if 'duration' in payload['ringtone']: + ringtone_audio.loop(duration=payload['ringtone']['duration']) + if 'trim' in payload['ringtone']: + start_time = datetime.datetime.strptime(payload['ringtone']['trim']['start_time'], '%H:%M:%S.%f').time() + end_time = datetime.datetime.strptime(payload['ringtone']['trim']['end_time'], '%H:%M:%S.%f').time() + ringtone_audio.trim(start_time, end_time) + call_audio.concat(ringtone_audio) + call_audio.loop(count=3) + if 'message' in payload: + message = payload['message'] + call_audio.concat(Audio.from_tts(f'{caller} says: {message}')) + + audio_save_path = os.path.join(os.environ['AUDIO_ROOT_PATH'], 'call.mp3') + call_audio.save(audio_save_path) + self.play(audio_save_path) + case 'alarm': + _, alarm_type, user = topic.split('.') + payload = json.loads(body.decode()) + + alarm_audio = Audio.from_file(os.path.join('/audio', payload['ringtone']['filename'])) + if 'duration' in payload['ringtone']: + alarm_audio.loop(duration=payload['ringtone']['duration']) + if 'trim' in payload['ringtone']: + start_time = datetime.datetime.strptime(payload['ringtone']['trim']['start_time'], '%H:%M:%S.%f').time() + end_time = datetime.datetime.strptime(payload['ringtone']['trim']['end_time'], '%H:%M:%S.%f').time() + alarm_audio.trim(start_time, end_time) + + match alarm_type: + case 'wakeup': + message_audio = Audio.from_tts(payload['message']) + alarm_audio.concat(message_audio) + case 'reminder': + reminder_text = payload['reminder'] + reminder_audio = Audio.from_tts(f'Reminder to {reminder_text}') + alarm_audio.concat(reminder_audio) + case _: + # Send NACK, do not requeue + connection.add_callback_threadsafe(functools.partial(channel.basic_nack, delivery_tag=method.delivery_tag, requeue=False)) + + audio_save_path = os.path.join(os.environ['AUDIO_ROOT_PATH'], 'alarm.mp3') + ringtone_audio.save(audio_save_path) + self.play(audio_save_path) + case _: + # Send NACK, do not requeue + connection.add_callback_threadsafe(functools.partial(channel.basic_nack, delivery_tag=method.delivery_tag, requeue=False)) + + # Send ACK + connection.add_callback_threadsafe(functools.partial(channel.basic_ack, delivery_tag=method.delivery_tag)) + + def on_message(connection: pika.adapters.blocking_connection.BlockingConnection, channel: pika.channel.Channel, method: pika.spec.Basic.Deliver, properties: pika.spec.BasicProperties, body: bytes) -> None: + thread = threading.Thread(target=handle_message, args=(connection, channel, method, body)) + thread.start() + + # Connect to RabbitMQ broker + connection_parameters = pika.ConnectionParameters(host=rabbitmq_host, port=rabbitmq_port) + connection = pika.BlockingConnection(connection_parameters) + channel = connection.channel() + + # Create topic exchange + channel.exchange_declare(exchange='broadcast', exchange_type='topic') + + # Create queue + channel.queue_declare(queue=self.name, auto_delete=True) + for topic in rabbitmq_topics: + channel.queue_bind(queue=self.name, exchange='broadcast', routing_key=topic) + + # Listen for messages + channel.basic_qos(prefetch_count=1) # Prevent simultaneous handling of messages (wait for ACK before accepting next message) + channel.basic_consume(queue=self.name, on_message_callback=functools.partial(on_message, connection), auto_ack=False) + channel.start_consuming()