From b1c16a00ff0462e679b1c7852631f67f71132e0d Mon Sep 17 00:00:00 2001 From: Ashish D'Souza Date: Fri, 10 Nov 2023 00:18:57 -0600 Subject: [PATCH] Enabled NACK on exception --- player/src/player.py | 123 ++++++++++++++++++++++--------------------- 1 file changed, 64 insertions(+), 59 deletions(-) diff --git a/player/src/player.py b/player/src/player.py index 65c3aa9..d0060cd 100644 --- a/player/src/player.py +++ b/player/src/player.py @@ -1,8 +1,9 @@ import os import json import datetime -import functools import threading +import traceback +import functools from abc import ABC, abstractmethod import pika @@ -28,73 +29,77 @@ class Player(ABC): def subscribe(self, rabbitmq_host: str, rabbitmq_port: str, rabbitmq_topics: list[str]) -> None: def handle_message(connection: pika.adapters.blocking_connection.BlockingConnection, channel: pika.channel.Channel, method: pika.spec.Basic.Deliver, body: bytes): - topic = method.routing_key - message_type = topic.split('.', 1)[0] + try: + topic = method.routing_key + message_type = topic.split('.', 1)[0] - match message_type: - case 'notification': - _, system, user, priority = topic.split('.') - notification_text = body.decode() + match message_type: + case 'notification': + _, system, user, priority = topic.split('.') + notification_text = body.decode() - notification_audio = Audio.from_tts(f'Notification from {system}: {notification_text}') + notification_audio = Audio.from_tts(f'Notification from {system}: {notification_text}') - audio_save_path = os.path.join(os.environ['AUDIO_ROOT_PATH'], 'notification.mp3') - notification_audio.save(audio_save_path) - self.play(audio_save_path) - case 'call': - _, caller, user = topic.split('.') - payload = json.loads(body.decode()) + audio_save_path = os.path.join(os.environ['AUDIO_ROOT_PATH'], 'notification.mp3') + notification_audio.save(audio_save_path) + self.play(audio_save_path) + case 'call': + _, caller, user = topic.split('.') + payload = json.loads(body.decode()) - call_audio = Audio.from_tts(f'Call from: {caller}') - ringtone_audio = Audio.from_file(os.path.join('/audio', payload['ringtone']['filename'])) - if 'duration' in payload['ringtone']: - ringtone_audio.loop(duration=payload['ringtone']['duration']) - if 'trim' in payload['ringtone']: - start_time = datetime.datetime.strptime(payload['ringtone']['trim']['start_time'], '%H:%M:%S.%f').time() - end_time = datetime.datetime.strptime(payload['ringtone']['trim']['end_time'], '%H:%M:%S.%f').time() - ringtone_audio.trim(start_time, end_time) - call_audio.concat(ringtone_audio) - call_audio.loop(count=3) - if 'message' in payload: - message = payload['message'] - call_audio.concat(Audio.from_tts(f'{caller} says: {message}')) + call_audio = Audio.from_tts(f'Call from: {caller}') + ringtone_audio = Audio.from_file(os.path.join('/audio', payload['ringtone']['filename'])) + if 'duration' in payload['ringtone']: + ringtone_audio.loop(duration=payload['ringtone']['duration']) + if 'trim' in payload['ringtone']: + start_time = datetime.datetime.strptime(payload['ringtone']['trim']['start_time'], '%H:%M:%S.%f').time() + end_time = datetime.datetime.strptime(payload['ringtone']['trim']['end_time'], '%H:%M:%S.%f').time() + ringtone_audio.trim(start_time, end_time) + call_audio.concat(ringtone_audio) + call_audio.loop(count=3) + if 'message' in payload: + message = payload['message'] + call_audio.concat(Audio.from_tts(f'{caller} says: {message}')) - audio_save_path = os.path.join(os.environ['AUDIO_ROOT_PATH'], 'call.mp3') - call_audio.save(audio_save_path) - self.play(audio_save_path) - case 'alarm': - _, alarm_type, user = topic.split('.') - payload = json.loads(body.decode()) + audio_save_path = os.path.join(os.environ['AUDIO_ROOT_PATH'], 'call.mp3') + call_audio.save(audio_save_path) + self.play(audio_save_path) + case 'alarm': + _, alarm_type, user = topic.split('.') + payload = json.loads(body.decode()) - alarm_audio = Audio.from_file(os.path.join('/audio', payload['ringtone']['filename'])) - if 'duration' in payload['ringtone']: - alarm_audio.loop(duration=payload['ringtone']['duration']) - if 'trim' in payload['ringtone']: - start_time = datetime.datetime.strptime(payload['ringtone']['trim']['start_time'], '%H:%M:%S.%f').time() - end_time = datetime.datetime.strptime(payload['ringtone']['trim']['end_time'], '%H:%M:%S.%f').time() - alarm_audio.trim(start_time, end_time) + alarm_audio = Audio.from_file(os.path.join('/audio', payload['ringtone']['filename'])) + if 'duration' in payload['ringtone']: + alarm_audio.loop(duration=payload['ringtone']['duration']) + if 'trim' in payload['ringtone']: + start_time = datetime.datetime.strptime(payload['ringtone']['trim']['start_time'], '%H:%M:%S.%f').time() + end_time = datetime.datetime.strptime(payload['ringtone']['trim']['end_time'], '%H:%M:%S.%f').time() + alarm_audio.trim(start_time, end_time) - match alarm_type: - case 'wakeup': - message_audio = Audio.from_tts(payload['message']) - alarm_audio.concat(message_audio) - case 'reminder': - reminder_text = payload['reminder'] - reminder_audio = Audio.from_tts(f'Reminder to {reminder_text}') - alarm_audio.concat(reminder_audio) - case _: - # Send NACK, do not requeue - connection.add_callback_threadsafe(functools.partial(channel.basic_nack, delivery_tag=method.delivery_tag, requeue=False)) + match alarm_type: + case 'wakeup': + message_audio = Audio.from_tts(payload['message']) + alarm_audio.concat(message_audio) + case 'reminder': + reminder_text = payload['reminder'] + reminder_audio = Audio.from_tts(f'Reminder to {reminder_text}') + alarm_audio.concat(reminder_audio) + case _: + raise ValueError(f'No alarm type {_}') - audio_save_path = os.path.join(os.environ['AUDIO_ROOT_PATH'], 'alarm.mp3') - ringtone_audio.save(audio_save_path) - self.play(audio_save_path) - case _: - # Send NACK, do not requeue - connection.add_callback_threadsafe(functools.partial(channel.basic_nack, delivery_tag=method.delivery_tag, requeue=False)) + audio_save_path = os.path.join(os.environ['AUDIO_ROOT_PATH'], 'alarm.mp3') + alarm_audio.save(audio_save_path) + self.play(audio_save_path) + case _: + raise ValueError(f'No message type {_}') - # Send ACK - connection.add_callback_threadsafe(functools.partial(channel.basic_ack, delivery_tag=method.delivery_tag)) + # Send ACK + connection.add_callback_threadsafe(functools.partial(channel.basic_ack, delivery_tag=method.delivery_tag)) + except Exception: + traceback.print_exc() + + # Send NACK, do not requeue + connection.add_callback_threadsafe(functools.partial(channel.basic_nack, delivery_tag=method.delivery_tag, requeue=False)) def on_message(connection: pika.adapters.blocking_connection.BlockingConnection, channel: pika.channel.Channel, method: pika.spec.Basic.Deliver, properties: pika.spec.BasicProperties, body: bytes) -> None: thread = threading.Thread(target=handle_message, args=(connection, channel, method, body))