"""Module for activating a Darth Vader figurine by turning on LEDs on his suit
and playing sounds, all done via a Raspberry Pi (RPi).
The LEDs illuminate Darth Vader's lightsaber and the three slots in the chest
control box. 3 push buttons control the following sounds and LEDs:
1. Some of his famous quotes
2. The Imperial march theme song
3. The lightsaber drawing, hum and retraction sounds
4. The lightsaber illumination (3 LEDs)
His iconic breathing sound plays in the background indefinitely almost as soon
as the RPi is run with the script.
.. URLs
.. default cfg files
.. _default logging configuration file: https://github.com/raul23/archive/blob/master/SimulRPi/v0.1.0a0/default_logging_cfg.json
.. _default main configuration file: https://github.com/raul23/archive/blob/master/SimulRPi/v0.1.0a0/default_main_cfg.json
.. _default values: https://github.com/raul23/archive/blob/master/SimulRPi/v0.1.0a0/default_main_cfg.json#L1
.. _logging config file: https://github.com/raul23/Darth-Vader-RPi/blob/master/darth_vader_rpi/configs/default_logging_cfg.json
.. _main config file: https://github.com/raul23/archive/blob/master/SimulRPi/v0.1.0a0/default_main_cfg.json
.. _gpio_channels: https://github.com/raul23/archive/blob/master/SimulRPi/v0.1.0a0/default_main_cfg.json#L11
.. external links
.. _Are dictionaries ordered in Python 3.6+? (stackoverflow): https://stackoverflow.com/a/39980744
.. _Darth-Vader-RPi GitHub: https://github.com/raul23/Darth-Vader-RPi
.. _RPi.GPIO: https://pypi.org/project/RPi.GPIO/
.. _SimulRPi GitHub: https://github.com/raul23/SimulRPi
.. _SimulRPi.GPIO: https://pypi.org/project/SimulRPi/
.. _YouTube video: https://youtu.be/E2J_xl2MbGU?t=333
.. internal links
.. _installed: README_docs.html#installation-instructions-label
"""
import logging
import os
import threading
import time
from logging import NullHandler
os.environ['PYGAME_HIDE_SUPPORT_PROMPT'] = "hide"
import pygame
from darth_vader_rpi.utils import add_spaces_to_msg, SoundWrapper
from darth_vader_rpi.ledutils import turn_on_led, turn_off_led, turn_on_slot_leds
try:
import RPi.GPIO as GPIO
except ImportError:
import SimulRPi.GPIO as GPIO
logger = logging.getLogger(__name__)
logger.addHandler(NullHandler())
[docs]class ExceptionThread(threading.Thread):
"""A subclass from :class:`threading.Thread` that defines threads that can
catch errors if their target functions raise an exception.
Parameters
----------
verbose : bool, optional
If `True`, print the traceback when there is an exception. Otherwise,
print just a one-line error message, e.g. ``KeyError: 'test'``
args : tuple, optional
Positional arguments given to the thread's target function.
kwargs : dict, optional
Keyword arguments given to the thread's target function.
Attributes
----------
exc: :class:`Exception`
Represents the exception raised by the target function.
References
----------
* `stackoverflow <https://stackoverflow.com/a/51270466>`__
"""
def __init__(self, verbose=False, *args, **kwargs):
threading.Thread.__init__(self, *args, **kwargs)
self.verbose = verbose
self.exc = None
[docs] def run(self):
"""Method representing the thread’s activity.
Overridden from the base class :class:`threading.Thread`. This method
invokes the callable object passed to the object’s constructor as the
target argument, if any, with sequential and keyword arguments taken
from the args and kwargs arguments, respectively.
**It also saves and logs any error that the target function might
raise.**
"""
try:
self._target(*self._args, **self._kwargs)
except Exception as e:
self.exc = e
if self.verbose:
logger.exception(add_spaces_to_msg("Error: {}".format(e)))
else:
# TODO: add next line in a utility function
err_msg = "{}: {}".format(str(e.__class__).split("'")[1], e)
logger.error(add_spaces_to_msg(err_msg))
[docs]class DarthVader:
"""Class for activating a Darth Vader figurine by turning on LEDs on his
suit and playing sounds, all done via a Raspberry Pi (RPi).
The `main config file`_ is used to setup the :mod:`start_dv` script, such
as the GPIO pins and the sound files.
Parameters
----------
main_cfg : dict
Dictionary containing the configuration data to setup the
:mod:`start_dv` script, such as the GPIO pins and the sound files. See
`main config file`_ for a detailed look into its content.
Attributes
----------
th_slot_leds : start_dv.ExceptionThread
Thread responsible for turning on the three slot LEDs in a precise
sequence.
Its target function is :meth:`ledutils.turn_on_slot_leds`.
"""
def __init__(self, main_cfg):
self.main_cfg = main_cfg
self.th_slot_leds = None
[docs] def activate(self):
"""Activate a Darth Vader figurine by turning on LEDs on his suit and
playing sounds, all done via an RPi.
While the method waits for a pressed button, you can exit by pressing
``ctr`` + ``c``.
Returns
-------
retcode: int
If the method is run without any :exc:`Exception`, the return code is
0. Otherwise, it is 1.
Also, even if there is an :exc:`Exception`, the method will try to
clean up before exiting.
"""
retcode = 0
gpio_channels = {}
loaded_sounds = {}
try:
logger.debug("pygame mixer initialization")
pygame.mixer.init()
logger.debug("RPi initialization")
logger.debug("")
# Set the numbering system used to identify the I/O pins on an RPi
modes = {'BOARD': GPIO.BOARD, 'BCM': GPIO.BCM}
GPIO.setmode(modes[self.main_cfg['mode'].upper()])
GPIO.setwarnings(False)
# Setup LEDs and buttons
for gpio_ch in self.main_cfg['gpio_channels']:
# TODO: IMPORTANT add channel_type in main_cfg so you don't
# have to check '_led'
if gpio_ch['channel_id'].endswith("_led"):
# LEDs
GPIO.setup(gpio_ch['channel_number'], GPIO.OUT)
else:
# Buttons
GPIO.setup(gpio_ch['channel_number'], GPIO.IN,
pull_up_down=GPIO.PUD_UP)
gpio_channels[gpio_ch['channel_id']] = {
'channel_number': gpio_ch['channel_number'],
'channel_name': gpio_ch['channel_name'],
'key': gpio_ch.get('key'),
'led_symbol': gpio_ch.get('led_symbols')
}
### Sound
# Create separate channel
# Ref.: stackoverflow.com/a/59742418
audio_channels = self.main_cfg['audio_channels']
for ch_dict in audio_channels:
channel = pygame.mixer.Channel(ch_dict['channel_id'])
channel.set_volume(ch_dict['volume'])
sounds_dir = self.main_cfg['sounds_directory']
# Load sounds from cfg
logger.info('Loading sounds...')
logger.info("")
for sound_type in ['quotes', 'songs', 'sound_effects']:
logger.debug('Loading {}'.format(sound_type.replace("_", " ")))
for sound in self.main_cfg[sound_type]:
sound_id = sound['id']
sound_name = sound['name']
filepath = os.path.join(sounds_dir, sound['filename'])
logger.debug('Loading "{}": {}'.format(sound_name, filepath))
sw = SoundWrapper(
sound_id=sound_id,
sound_name=sound_name,
sound_filepath=filepath,
channel_id=sound['audio_channel_id'],
mute=sound.get('mute', False))
if sound_type == "quotes":
loaded_sounds.setdefault("quotes", {})
loaded_sounds['quotes'].setdefault(sound_id, sw)
else:
loaded_sounds.setdefault(sound_id, sw)
if sw.sound_id == 'breathing_sound' and not sw.mute:
loops = sound.get('loops', 0)
loaded_sounds[sound_id].play(loops)
logger.debug("")
quotes = list(loaded_sounds['quotes'].values())
self.th_slot_leds = ExceptionThread(
name="thread_slot_leds",
target=turn_on_slot_leds,
verbose=self.main_cfg['verbose'],
kwargs=dict(
top_led=gpio_channels['top_led']['channel_number'],
middle_led=gpio_channels['middle_led']['channel_number'],
bottom_led=gpio_channels['bottom_led']['channel_number'],
leds_sequence=self.main_cfg['slot_leds']['sequence'],
delay_between_steps=self.main_cfg['slot_leds']['delay_between_steps'],
time_per_step=self.main_cfg['slot_leds']['time_per_step']))
"""
args=(gpio_channels['top_led']['channel_number'],
gpio_channels['middle_led']['channel_number'],
gpio_channels['bottom_led']['channel_number'],
self.main_cfg['slot_leds']['sequence'],
self.main_cfg['slot_leds']['delay_between_steps'],
self.main_cfg['slot_leds']['time_per_step']))
"""
self.th_slot_leds.start()
logger.info("")
logger.info(add_spaces_to_msg("Press buttons"))
pressed_lightsaber = False
quote_idx = 0
while True:
if not GPIO.input(gpio_channels['lightsaber_button']['channel_number']):
# logger.debug("\n\nButton {} pressed...".format(
# lightsaber_button))
if pressed_lightsaber:
pressed_lightsaber = False
loaded_sounds['lightsaber_retraction_sound'].play()
time.sleep(0.1)
turn_off_led(22)
else:
pressed_lightsaber = True
loaded_sounds['lightsaber_drawing_sound'].play()
loaded_sounds['lightsaber_hum_sound'].play(-1)
time.sleep(0.1)
turn_on_led(gpio_channels['lightsaber_led']['channel_number'])
time.sleep(0.2)
elif not GPIO.input(gpio_channels['song_button']['channel_number']):
# logger.debug("\n\nButton {} pressed...".format(song_button))
loaded_sounds['imperial_march_song'].play()
time.sleep(0.2)
elif not GPIO.input(gpio_channels['quotes_button']['channel_number']):
"""
logger.debug("\n\nButton {} pressed...".format(
gpio_channels['quotes_button']['channel_name']))
"""
quote = quotes[quote_idx % len(quotes)]
quote_idx += 1
quote.play()
time.sleep(0.2)
elif not self.th_slot_leds.is_alive():
retcode = 1
logger.info(add_spaces_to_msg("Exiting..."))
break
except Exception as e:
retcode = 1
if self.main_cfg['verbose']:
logger.exception(add_spaces_to_msg("Error: {}".format(e)))
else:
# logger.error(add_spaces_to_msg(e.__repr__()))
# TODO: add next line in a utility function
err_msg = "{}: {}".format(str(e.__class__).split("'")[1], e)
logger.error(add_spaces_to_msg(err_msg))
except KeyboardInterrupt:
logger.info(add_spaces_to_msg("Exiting..."))
closing_sound = loaded_sounds.get('closing_sound')
if closing_sound and not closing_sound.mute:
closing_sound.play()
time.sleep(1)
finally:
self.cleanup(gpio_channels)
return retcode
[docs] def cleanup(self, gpio_channels):
"""Clean up any resources such as threads and GPIO channels.
The cleanup consists in the following actions:
* turn off each LED
* stop the thread ``th_slot_leds``
* stop each audio channel
* call ``RPi.GPIO.cleanup()`` which will return all GPIO channels back
to inputs with no pull up/down
* If in simulation mode, :obj:`SimulRPi.GPIO.cleanup` is called to
stop the threads among other things
Parameters
----------
gpio_channels : dict
Dictionary mapping channel id (:obj:`str`) to channel attributes
(:obj:`dict`). The channel attributes consist in the following:
* ``channel_number``
* ``channel_name``
* ``key``
* ``led_symbols``
.. note::
These channel attributes are those found in the setting
`gpio_channels`_ from the main configuration file.
"""
if hasattr(GPIO, "setprinting"):
GPIO.setprinting(False)
time.sleep(0.1)
if gpio_channels:
for channel_id, channel_info in gpio_channels.items():
if channel_id.endswith("_led"):
turn_off_led(channel_info['channel_number'])
logger.info(add_spaces_to_msg("Cleanup..."))
if self.th_slot_leds:
self.th_slot_leds.do_run = False
self.th_slot_leds.join()
logger.debug(add_spaces_to_msg("Thread stopped: {}".format(
self.th_slot_leds.name)))
for ch in self.main_cfg['audio_channels']:
pygame.mixer.Channel(ch['channel_id']).stop()
GPIO.cleanup()