Enabled NACK on exception

This commit is contained in:
Ashish D'Souza 2023-11-10 00:18:57 -06:00
parent da4c1ba2c4
commit b1c16a00ff
1 changed files with 64 additions and 59 deletions

View File

@ -1,8 +1,9 @@
import os import os
import json import json
import datetime import datetime
import functools
import threading import threading
import traceback
import functools
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
import pika import pika
@ -28,73 +29,77 @@ class Player(ABC):
def subscribe(self, rabbitmq_host: str, rabbitmq_port: str, rabbitmq_topics: list[str]) -> None: def subscribe(self, rabbitmq_host: str, rabbitmq_port: str, rabbitmq_topics: list[str]) -> None:
def handle_message(connection: pika.adapters.blocking_connection.BlockingConnection, channel: pika.channel.Channel, method: pika.spec.Basic.Deliver, body: bytes): def handle_message(connection: pika.adapters.blocking_connection.BlockingConnection, channel: pika.channel.Channel, method: pika.spec.Basic.Deliver, body: bytes):
topic = method.routing_key try:
message_type = topic.split('.', 1)[0] topic = method.routing_key
message_type = topic.split('.', 1)[0]
match message_type: match message_type:
case 'notification': case 'notification':
_, system, user, priority = topic.split('.') _, system, user, priority = topic.split('.')
notification_text = body.decode() notification_text = body.decode()
notification_audio = Audio.from_tts(f'Notification from {system}: {notification_text}') notification_audio = Audio.from_tts(f'Notification from {system}: {notification_text}')
audio_save_path = os.path.join(os.environ['AUDIO_ROOT_PATH'], 'notification.mp3') audio_save_path = os.path.join(os.environ['AUDIO_ROOT_PATH'], 'notification.mp3')
notification_audio.save(audio_save_path) notification_audio.save(audio_save_path)
self.play(audio_save_path) self.play(audio_save_path)
case 'call': case 'call':
_, caller, user = topic.split('.') _, caller, user = topic.split('.')
payload = json.loads(body.decode()) payload = json.loads(body.decode())
call_audio = Audio.from_tts(f'Call from: {caller}') call_audio = Audio.from_tts(f'Call from: {caller}')
ringtone_audio = Audio.from_file(os.path.join('/audio', payload['ringtone']['filename'])) ringtone_audio = Audio.from_file(os.path.join('/audio', payload['ringtone']['filename']))
if 'duration' in payload['ringtone']: if 'duration' in payload['ringtone']:
ringtone_audio.loop(duration=payload['ringtone']['duration']) ringtone_audio.loop(duration=payload['ringtone']['duration'])
if 'trim' in payload['ringtone']: if 'trim' in payload['ringtone']:
start_time = datetime.datetime.strptime(payload['ringtone']['trim']['start_time'], '%H:%M:%S.%f').time() start_time = datetime.datetime.strptime(payload['ringtone']['trim']['start_time'], '%H:%M:%S.%f').time()
end_time = datetime.datetime.strptime(payload['ringtone']['trim']['end_time'], '%H:%M:%S.%f').time() end_time = datetime.datetime.strptime(payload['ringtone']['trim']['end_time'], '%H:%M:%S.%f').time()
ringtone_audio.trim(start_time, end_time) ringtone_audio.trim(start_time, end_time)
call_audio.concat(ringtone_audio) call_audio.concat(ringtone_audio)
call_audio.loop(count=3) call_audio.loop(count=3)
if 'message' in payload: if 'message' in payload:
message = payload['message'] message = payload['message']
call_audio.concat(Audio.from_tts(f'{caller} says: {message}')) call_audio.concat(Audio.from_tts(f'{caller} says: {message}'))
audio_save_path = os.path.join(os.environ['AUDIO_ROOT_PATH'], 'call.mp3') audio_save_path = os.path.join(os.environ['AUDIO_ROOT_PATH'], 'call.mp3')
call_audio.save(audio_save_path) call_audio.save(audio_save_path)
self.play(audio_save_path) self.play(audio_save_path)
case 'alarm': case 'alarm':
_, alarm_type, user = topic.split('.') _, alarm_type, user = topic.split('.')
payload = json.loads(body.decode()) payload = json.loads(body.decode())
alarm_audio = Audio.from_file(os.path.join('/audio', payload['ringtone']['filename'])) alarm_audio = Audio.from_file(os.path.join('/audio', payload['ringtone']['filename']))
if 'duration' in payload['ringtone']: if 'duration' in payload['ringtone']:
alarm_audio.loop(duration=payload['ringtone']['duration']) alarm_audio.loop(duration=payload['ringtone']['duration'])
if 'trim' in payload['ringtone']: if 'trim' in payload['ringtone']:
start_time = datetime.datetime.strptime(payload['ringtone']['trim']['start_time'], '%H:%M:%S.%f').time() start_time = datetime.datetime.strptime(payload['ringtone']['trim']['start_time'], '%H:%M:%S.%f').time()
end_time = datetime.datetime.strptime(payload['ringtone']['trim']['end_time'], '%H:%M:%S.%f').time() end_time = datetime.datetime.strptime(payload['ringtone']['trim']['end_time'], '%H:%M:%S.%f').time()
alarm_audio.trim(start_time, end_time) alarm_audio.trim(start_time, end_time)
match alarm_type: match alarm_type:
case 'wakeup': case 'wakeup':
message_audio = Audio.from_tts(payload['message']) message_audio = Audio.from_tts(payload['message'])
alarm_audio.concat(message_audio) alarm_audio.concat(message_audio)
case 'reminder': case 'reminder':
reminder_text = payload['reminder'] reminder_text = payload['reminder']
reminder_audio = Audio.from_tts(f'Reminder to {reminder_text}') reminder_audio = Audio.from_tts(f'Reminder to {reminder_text}')
alarm_audio.concat(reminder_audio) alarm_audio.concat(reminder_audio)
case _: case _:
# Send NACK, do not requeue raise ValueError(f'No alarm type {_}')
connection.add_callback_threadsafe(functools.partial(channel.basic_nack, delivery_tag=method.delivery_tag, requeue=False))
audio_save_path = os.path.join(os.environ['AUDIO_ROOT_PATH'], 'alarm.mp3') audio_save_path = os.path.join(os.environ['AUDIO_ROOT_PATH'], 'alarm.mp3')
ringtone_audio.save(audio_save_path) alarm_audio.save(audio_save_path)
self.play(audio_save_path) self.play(audio_save_path)
case _: case _:
# Send NACK, do not requeue raise ValueError(f'No message type {_}')
connection.add_callback_threadsafe(functools.partial(channel.basic_nack, delivery_tag=method.delivery_tag, requeue=False))
# Send ACK # Send ACK
connection.add_callback_threadsafe(functools.partial(channel.basic_ack, delivery_tag=method.delivery_tag)) connection.add_callback_threadsafe(functools.partial(channel.basic_ack, delivery_tag=method.delivery_tag))
except Exception:
traceback.print_exc()
# Send NACK, do not requeue
connection.add_callback_threadsafe(functools.partial(channel.basic_nack, delivery_tag=method.delivery_tag, requeue=False))
def on_message(connection: pika.adapters.blocking_connection.BlockingConnection, channel: pika.channel.Channel, method: pika.spec.Basic.Deliver, properties: pika.spec.BasicProperties, body: bytes) -> None: def on_message(connection: pika.adapters.blocking_connection.BlockingConnection, channel: pika.channel.Channel, method: pika.spec.Basic.Deliver, properties: pika.spec.BasicProperties, body: bytes) -> None:
thread = threading.Thread(target=handle_message, args=(connection, channel, method, body)) thread = threading.Thread(target=handle_message, args=(connection, channel, method, body))