Add files via upload

There, hopefully that's all where it's meant to be.
This commit is contained in:
not-matt 2017-12-18 20:50:27 +00:00 committed by GitHub
parent c90fc83224
commit b0b45efcf4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 1697 additions and 0 deletions

122
python/config.py Normal file
View File

@ -0,0 +1,122 @@
"""Settings for audio reactive LED strip"""
from __future__ import print_function
from __future__ import division
import os
DEVICE = 'stripless'
"""Device used to control LED strip. Must be 'pi', 'esp8266' or 'blinkstick'
'esp8266' means that you are using an ESP8266 module to control the LED strip
and commands will be sent to the ESP8266 over WiFi.
'pi' means that you are using a Raspberry Pi as a standalone unit to process
audio input and control the LED strip directly.
'blinkstick' means that a BlinkstickPro is connected to this PC which will be used
to control the leds connected to it.
'stripless' means that the program will run without sending data to a strip.
Useful for development etc, but doesn't look half as good ;)
"""
if DEVICE == 'esp8266':
AUTO_DETECT = False
"""Set to true if the ip address of the device changes. This is the case if it's connecting
through windows hotspot for instance. If so, give the mac address of the device."""
MAC_ADDR = "5c-cf-7f-f0-8c-f3"
"""MAC address of the ESP8266."""
UDP_IP = "192.168.1.68"
"""IP address of the ESP8266.
Unless using auto detect, it must match IP in ws2812_controller.ino"""
UDP_PORT = 7778
"""Port number used for socket communication between Python and ESP8266"""
SOFTWARE_GAMMA_CORRECTION = False
"""Set to False because the firmware handles gamma correction + dither"""
elif DEVICE == 'pi':
LED_PIN = 18
"""GPIO pin connected to the LED strip pixels (must support PWM)"""
LED_FREQ_HZ = 800000
"""LED signal frequency in Hz (usually 800kHz)"""
LED_DMA = 5
"""DMA channel used for generating PWM signal (try 5)"""
BRIGHTNESS = 255
"""Brightness of LED strip between 0 and 255"""
LED_INVERT = True
"""Set True if using an inverting logic level converter"""
SOFTWARE_GAMMA_CORRECTION = True
"""Set to True because Raspberry Pi doesn't use hardware dithering"""
elif DEVICE == 'blinkstick':
SOFTWARE_GAMMA_CORRECTION = True
"""Set to True because blinkstick doesn't use hardware dithering"""
elif DEVICE == 'stripless':
pass
else:
raise ValueError("Invalid device selected. Device {} not known.".format(DEVICE))
USE_GUI = True
"""Whether or not to display a PyQtGraph GUI plot of visualization"""
DISPLAY_FPS = False
"""Whether to display the FPS when running (can reduce performance)"""
N_PIXELS = 242
"""Number of pixels in the LED strip (must match ESP8266 firmware)"""
GAMMA_TABLE_PATH = os.path.join(os.path.dirname(__file__), 'gamma_table.npy')
"""Location of the gamma correction table"""
MIC_RATE = 48000
"""Sampling frequency of the microphone in Hz"""
FPS = 40
"""Desired refresh rate of the visualization (frames per second)
FPS indicates the desired refresh rate, or frames-per-second, of the audio
visualization. The actual refresh rate may be lower if the computer cannot keep
up with desired FPS value.
Higher framerates improve "responsiveness" and reduce the latency of the
visualization but are more computationally expensive.
Low framerates are less computationally expensive, but the visualization may
appear "sluggish" or out of sync with the audio being played if it is too low.
The FPS should not exceed the maximum refresh rate of the LED strip, which
depends on how long the LED strip is.
"""
_max_led_FPS = int(((N_PIXELS * 30e-6) + 50e-6)**-1.0)
assert FPS <= _max_led_FPS, 'FPS must be <= {}'.format(_max_led_FPS)
MIN_FREQUENCY = 20
"""Frequencies below this value will be removed during audio processing"""
MAX_FREQUENCY = 18000
"""Frequencies above this value will be removed during audio processing"""
LOGARITHMIC_SCALING = True
"""Scale frequencies logarithmically to match perceived pitch of human ear"""
N_FFT_BINS = 24
"""Number of frequency bins to use when transforming audio to frequency domain
Fast Fourier transforms are used to transform time-domain audio data to the
frequency domain. The frequencies present in the audio signal are assigned
to their respective frequency bins. This value indicates the number of
frequency bins to use.
A small number of bins reduces the frequency resolution of the visualization
but improves amplitude resolution. The opposite is true when using a large
number of bins. More bins is not always better!
There is no point using more bins than there are pixels on the LED strip.
"""
N_ROLLING_HISTORY = 4
"""Number of past audio frames to include in the rolling window"""
MIN_VOLUME_THRESHOLD = 1e-3
"""No music visualization displayed if recorded audio volume below threshold"""

53
python/dsp.py Normal file
View File

@ -0,0 +1,53 @@
from __future__ import print_function
import numpy as np
import config
import melbank
class ExpFilter:
"""Simple exponential smoothing filter"""
def __init__(self, val=0.0, alpha_decay=0.5, alpha_rise=0.5):
"""Small rise / decay factors = more smoothing"""
assert 0.0 < alpha_decay < 1.0, 'Invalid decay smoothing factor'
assert 0.0 < alpha_rise < 1.0, 'Invalid rise smoothing factor'
self.alpha_decay = alpha_decay
self.alpha_rise = alpha_rise
self.value = val
def update(self, value):
if isinstance(self.value, (list, np.ndarray, tuple)):
alpha = value - self.value
alpha[alpha > 0.0] = self.alpha_rise
alpha[alpha <= 0.0] = self.alpha_decay
else:
alpha = self.alpha_rise if value > self.value else self.alpha_decay
self.value = alpha * value + (1.0 - alpha) * self.value
return self.value
def rfft(data, window=None):
window = 1.0 if window is None else window(len(data))
ys = np.abs(np.fft.rfft(data * window))
xs = np.fft.rfftfreq(len(data), 1.0 / config.MIC_RATE)
return xs, ys
def fft(data, window=None):
window = 1.0 if window is None else window(len(data))
ys = np.fft.fft(data * window)
xs = np.fft.fftfreq(len(data), 1.0 / config.MIC_RATE)
return xs, ys
def create_mel_bank():
global samples, mel_y, mel_x
samples = int(config.MIC_RATE * config.N_ROLLING_HISTORY / (2.0 * config.FPS))
mel_y, (_, mel_x) = melbank.compute_melmat(num_mel_bands=config.N_FFT_BINS,
freq_min=config.MIN_FREQUENCY,
freq_max=config.MAX_FREQUENCY,
num_fft_bands=samples,
sample_rate=config.MIC_RATE)
samples = None
mel_y = None
mel_x = None
create_mel_bank()

BIN
python/gamma_table.npy Normal file

Binary file not shown.

51
python/gui.py Normal file
View File

@ -0,0 +1,51 @@
from __future__ import print_function
from __future__ import division
import time
import numpy as np
from pyqtgraph.Qt import QtGui
import pyqtgraph as pg
from pyqtgraph.dockarea import *
class GUI:
plot = []
curve = []
def __init__(self, width=800, height=450, title=''):
# Create GUI window
self.app = QtGui.QApplication([])
self.win = pg.GraphicsWindow(title)
self.win.resize(width, height)
self.win.setWindowTitle(title)
# Create GUI layout
self.layout = QtGui.QVBoxLayout()
self.win.setLayout(self.layout)
def add_plot(self, title):
new_plot = pg.PlotWidget()
self.layout.addWidget(new_plot)
self.plot.append(new_plot)
self.curve.append([])
def add_curve(self, plot_index, pen=(255, 255, 255)):
self.curve[plot_index].append(self.plot[plot_index].plot(pen=pen))
if __name__ == '__main__':
# Example test gui
N = 48
gui = GUI(title='Test')
# Sin plot
gui.add_plot(title='Sin Plot')
gui.add_curve(plot_index=0)
gui.win.nextRow()
# Cos plot
gui.add_plot(title='Cos Plot')
gui.add_curve(plot_index=1)
while True:
t = time.time()
x = np.linspace(t, 2 * np.pi + t, N)
gui.curve[0][0].setData(x=x, y=np.sin(x))
gui.curve[1][0].setData(x=x, y=np.cos(x))
gui.app.processEvents()
time.sleep(1.0 / 30.0)

183
python/led.py Normal file
View File

@ -0,0 +1,183 @@
from __future__ import print_function
from __future__ import division
import platform
import numpy as np
import config
# ESP8266 uses WiFi communication
if config.DEVICE == 'esp8266':
import socket
from subprocess import check_output
from time import sleep
# Find the audio strip automagically
if config.AUTO_DETECT:
ip_addr = False
while not ip_addr:
arp_out = check_output(['arp', '-a']).splitlines()
for i in arp_out:
if config.MAC_ADDR in str(i):
ip_addr = i.split()[0].decode("utf-8")
break
else:
print("Device not found at physical address {}, retrying in 1s".format(config.MAC_ADDR))
sleep(1)
print("Found device {}, with IP address {}".format(config.MAC_ADDR, ip_addr))
config.UDP_IP = ip_addr
_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
_sock.settimeout(0.005)
# Raspberry Pi controls the LED strip directly
elif config.DEVICE == 'pi':
import neopixel
strip = neopixel.Adafruit_NeoPixel(config.N_PIXELS, config.LED_PIN,
config.LED_FREQ_HZ, config.LED_DMA,
config.LED_INVERT, config.BRIGHTNESS)
strip.begin()
elif config.DEVICE == 'blinkstick':
from blinkstick import blinkstick
import signal
import sys
#Will turn all leds off when invoked.
def signal_handler(signal, frame):
all_off = [0]*(config.N_PIXELS*3)
stick.set_led_data(0, all_off)
sys.exit(0)
stick = blinkstick.find_first()
# Create a listener that turns the leds off when the program terminates
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
_gamma = np.load(config.GAMMA_TABLE_PATH)
"""Gamma lookup table used for nonlinear brightness correction"""
_prev_pixels = np.tile(253, (3, config.N_PIXELS))
"""Pixel values that were most recently displayed on the LED strip"""
pixels = np.tile(1, (3, config.N_PIXELS))
"""Pixel values for the LED strip"""
_is_python_2 = int(platform.python_version_tuple()[0]) == 2
def _update_esp8266():
"""Sends UDP packets to ESP8266 to update LED strip values
The ESP8266 will receive and decode the packets to determine what values
to display on the LED strip. The communication protocol supports LED strips
with a maximum of 256 LEDs.
The packet encoding scheme is:
|i|r|g|b|
where
i (0 to 255): Index of LED to change (zero-based)
r (0 to 255): Red value of LED
g (0 to 255): Green value of LED
b (0 to 255): Blue value of LED
"""
global pixels, _prev_pixels
# Truncate values and cast to integer
pixels = np.clip(pixels, 0, 200).astype(int)
# Optionally apply gamma correc tio
p = _gamma[pixels] if config.SOFTWARE_GAMMA_CORRECTION else np.copy(pixels)
MAX_PIXELS_PER_PACKET = 256
# Pixel indices
idx = range(pixels.shape[1])
#idx = [i for i in idx if not np.array_equal(p[:, i], _prev_pixels[:, i])]
n_packets = len(idx) // MAX_PIXELS_PER_PACKET + 1
idx = np.array_split(idx, n_packets)
for packet_indices in idx:
m = '' if _is_python_2 else []
for i in packet_indices:
if _is_python_2:
m += chr(i) + chr(pixels[0][i]) + chr(pixels[1][i]) + chr(pixels[2][i])
else:
m.append(i) # Index of pixel to change
m.append(pixels[0][i]) # Pixel red value
m.append(pixels[1][i]) # Pixel green value
m.append(pixels[2][i]) # Pixel blue value
m = m if _is_python_2 else bytes(m)
_sock.sendto(m, (config.UDP_IP, config.UDP_PORT))
_prev_pixels = np.copy(pixels)
def _update_pi():
"""Writes new LED values to the Raspberry Pi's LED strip
Raspberry Pi uses the rpi_ws281x to control the LED strip directly.
This function updates the LED strip with new values.
"""
global pixels, _prev_pixels
# Truncate values and cast to integer
pixels = np.clip(pixels, 0, 255).astype(int)
# Optional gamma correction
p = _gamma[pixels] if config.SOFTWARE_GAMMA_CORRECTION else np.copy(pixels)
# Encode 24-bit LED values in 32 bit integers
r = np.left_shift(p[0][:].astype(int), 8)
g = np.left_shift(p[1][:].astype(int), 16)
b = p[2][:].astype(int)
rgb = np.bitwise_or(np.bitwise_or(r, g), b)
# Update the pixels
for i in range(config.N_PIXELS):
# Ignore pixels if they haven't changed (saves bandwidth)
if np.array_equal(p[:, i], _prev_pixels[:, i]):
continue
strip._led_data[i] = rgb[i]
_prev_pixels = np.copy(p)
strip.show()
def _update_blinkstick():
"""Writes new LED values to the Blinkstick.
This function updates the LED strip with new values.
"""
global pixels
# Truncate values and cast to integer
pixels = np.clip(pixels, 0, 250).astype(int)
# Optional gamma correction
p = _gamma[pixels] if config.SOFTWARE_GAMMA_CORRECTION else np.copy(pixels)
# Read the rgb values
r = p[0][:].astype(int)
g = p[1][:].astype(int)
b = p[2][:].astype(int)
#create array in which we will store the led states
newstrip = [None]*(config.N_PIXELS*3)
for i in range(config.N_PIXELS):
# blinkstick uses GRB format
newstrip[i*3] = g[i]
newstrip[i*3+1] = r[i]
newstrip[i*3+2] = b[i]
#send the data to the blinkstick
stick.set_led_data(0, newstrip)
def update():
"""Updates the LED strip values"""
if config.DEVICE == 'esp8266':
_update_esp8266()
elif config.DEVICE == 'pi':
_update_pi()
elif config.DEVICE == 'blinkstick':
_update_blinkstick()
elif config.DEVICE == 'stripless':
pass
# Execute this file to run a LED strand test
# If everything is working, you should see a red, green, and blue pixel scroll
# across the LED strip continously
if __name__.endswith('__main__'):
import time
# Turn all pixels off
pixels *= 0
pixels[0, 0] = 255 # Set 1st pixel red
pixels[1, 1] = 255 # Set 2nd pixel green
pixels[2, 2] = 255 # Set 3rd pixel blue
print('Starting LED strand test')
while True:
pixels = np.roll(pixels, 1, axis=1)
update()
time.sleep(.1)

151
python/melbank.py Normal file
View File

@ -0,0 +1,151 @@
"""This module implements a Mel Filter Bank.
In other words it is a filter bank with triangular shaped bands
arnged on the mel frequency scale.
An example ist shown in the following figure:
.. plot::
from pylab import plt
import melbank
f1, f2 = 1000, 8000
melmat, (melfreq, fftfreq) = melbank.compute_melmat(6, f1, f2, num_fft_bands=4097)
fig, ax = plt.subplots(figsize=(8, 3))
ax.plot(fftfreq, melmat.T)
ax.grid(True)
ax.set_ylabel('Weight')
ax.set_xlabel('Frequency / Hz')
ax.set_xlim((f1, f2))
ax2 = ax.twiny()
ax2.xaxis.set_ticks_position('top')
ax2.set_xlim((f1, f2))
ax2.xaxis.set_ticks(melbank.mel_to_hertz(melfreq))
ax2.xaxis.set_ticklabels(['{:.0f}'.format(mf) for mf in melfreq])
ax2.set_xlabel('Frequency / mel')
plt.tight_layout()
fig, ax = plt.subplots()
ax.matshow(melmat)
plt.axis('equal')
plt.axis('tight')
plt.title('Mel Matrix')
plt.tight_layout()
Functions
---------
"""
from numpy import abs, append, arange, insert, linspace, log10, round, zeros
def hertz_to_mel(freq):
"""Returns mel-frequency from linear frequency input.
Parameter
---------
freq : scalar or ndarray
Frequency value or array in Hz.
Returns
-------
mel : scalar or ndarray
Mel-frequency value or ndarray in Mel
"""
return 2595.0 * log10(1 + (freq / 700.0))
def mel_to_hertz(mel):
"""Returns frequency from mel-frequency input.
Parameter
---------
mel : scalar or ndarray
Mel-frequency value or ndarray in Mel
Returns
-------
freq : scalar or ndarray
Frequency value or array in Hz.
"""
return 700.0 * (10**(mel / 2595.0)) - 700.0
def melfrequencies_mel_filterbank(num_bands, freq_min, freq_max, num_fft_bands):
"""Returns centerfrequencies and band edges for a mel filter bank
Parameters
----------
num_bands : int
Number of mel bands.
freq_min : scalar
Minimum frequency for the first band.
freq_max : scalar
Maximum frequency for the last band.
num_fft_bands : int
Number of fft bands.
Returns
-------
center_frequencies_mel : ndarray
lower_edges_mel : ndarray
upper_edges_mel : ndarray
"""
mel_max = hertz_to_mel(freq_max)
mel_min = hertz_to_mel(freq_min)
delta_mel = abs(mel_max - mel_min) / (num_bands + 1.0)
frequencies_mel = mel_min + delta_mel * arange(0, num_bands + 2)
lower_edges_mel = frequencies_mel[:-2]
upper_edges_mel = frequencies_mel[2:]
center_frequencies_mel = frequencies_mel[1:-1]
return center_frequencies_mel, lower_edges_mel, upper_edges_mel
def compute_melmat(num_mel_bands=12, freq_min=64, freq_max=8000,
num_fft_bands=513, sample_rate=16000):
"""Returns tranformation matrix for mel spectrum.
Parameters
----------
num_mel_bands : int
Number of mel bands. Number of rows in melmat.
Default: 24
freq_min : scalar
Minimum frequency for the first band.
Default: 64
freq_max : scalar
Maximum frequency for the last band.
Default: 8000
num_fft_bands : int
Number of fft-frequenc bands. This ist NFFT/2+1 !
number of columns in melmat.
Default: 513 (this means NFFT=1024)
sample_rate : scalar
Sample rate for the signals that will be used.
Default: 44100
Returns
-------
melmat : ndarray
Transformation matrix for the mel spectrum.
Use this with fft spectra of num_fft_bands_bands length
and multiply the spectrum with the melmat
this will tranform your fft-spectrum
to a mel-spectrum.
frequencies : tuple (ndarray <num_mel_bands>, ndarray <num_fft_bands>)
Center frequencies of the mel bands, center frequencies of fft spectrum.
"""
center_frequencies_mel, lower_edges_mel, upper_edges_mel = \
melfrequencies_mel_filterbank(
num_mel_bands,
freq_min,
freq_max,
num_fft_bands
)
center_frequencies_hz = mel_to_hertz(center_frequencies_mel)
lower_edges_hz = mel_to_hertz(lower_edges_mel)
upper_edges_hz = mel_to_hertz(upper_edges_mel)
freqs = linspace(0.0, sample_rate / 2.0, num_fft_bands)
melmat = zeros((num_mel_bands, num_fft_bands))
for imelband, (center, lower, upper) in enumerate(zip(
center_frequencies_hz, lower_edges_hz, upper_edges_hz)):
left_slope = (freqs >= lower) == (freqs <= center)
melmat[imelband, left_slope] = (
(freqs[left_slope] - lower) / (center - lower)
)
right_slope = (freqs >= center) == (freqs <= upper)
melmat[imelband, right_slope] = (
(upper - freqs[right_slope]) / (upper - center)
)
return melmat, (center_frequencies_mel, freqs)

32
python/microphone.py Normal file
View File

@ -0,0 +1,32 @@
import time
import numpy as np
import pyaudio
import config
def start_stream(callback):
p = pyaudio.PyAudio()
frames_per_buffer = int(config.MIC_RATE / config.FPS)
stream = p.open(format=pyaudio.paInt16,
channels=1,
rate=config.MIC_RATE,
input=True,
frames_per_buffer=frames_per_buffer)
overflows = 0
prev_ovf_time = time.time()
while True:
try:
y = np.fromstring(stream.read(frames_per_buffer), dtype=np.int16)
y = y.astype(np.float32)
callback(y)
except IOError:
overflows += 1
if time.time() > prev_ovf_time + 1:
prev_ovf_time = time.time()
if config.USE_GUI:
gui.label_error.setText('Audio buffer has overflowed {} times'.format(overflows))
else:
print('Audio buffer has overflowed {} times'.format(overflows))
stream.stop_stream()
stream.close()
p.terminate()

61
python/qfloatslider.py Normal file
View File

@ -0,0 +1,61 @@
from PyQt5 import QtCore, QtGui, QtWidgets
__all__ = ['QFloatSlider']
class QFloatSlider(QtWidgets.QSlider):
"""
Subclass of QtWidgets.QSlider
Horizontal slider giving floating point values.
Usage: QFloatSlider(min, max, step, default)
where min = minimum value of slider
max = maximum value of slider
step = interval between values. Must be a factor of (max-min)
default = default (starting) value of slider
"""
def __init__(self, min_value, max_value, step, default):
super().__init__(QtCore.Qt.Horizontal)
self.precision = 0.001
self.min_value = min_value
self.max_value = max_value
self.step = step
self.default = default
self.quotient, self.remainder = self._float_divmod(\
self.max_value-self.min_value, self.step)
if self.remainder:
raise ValueError("{} does not fit evenly between {} and {}"\
.format(step, min_value, max_value))
super().setMinimum(0)
super().setMaximum(self.quotient)
super().setSingleStep(1)
super().setValue(self._float_to_int(self.default))
super().valueChanged.connect(self._value_handler)
self.slider_value = 2.0
# This is mostly disgusting python i hate floating points >:(
def _float_divmod(self,a,b):
"""
Basically the divmod function but it works for floats (try 0.3 % 0.1 smh)
Returns the quotient, and a remainder.
"""
a = abs(a)
b = abs(b)
n = 1
while True:
c = a - b
c = abs(c)
if c < self.precision:
return (n, 0)
elif c > a:
return (n-1, a)
a = c
n += 1
def _float_to_int(self, a):
return int(round(a/self.step))
def _int_to_float(self, a):
return self.min_value+a*self.step
def _value_handler(self):
self.slider_value = self._int_to_float(super().value())

311
python/qrangeslider.py Normal file
View File

@ -0,0 +1,311 @@
import sys, os
from PyQt5 import QtCore, QtGui, QtWidgets
__all__ = ['QRangeSlider']
DEFAULT_CSS = """
QRangeSlider * {
border: 0px;
padding: 0px;
}
QRangeSlider #Head {
background: #222;
}
QRangeSlider #Span {
background: #393;
}
QRangeSlider #Span:active {
background: #282;
}
QRangeSlider #Tail {
background: #222;
}
QRangeSlider > QSplitter::handle {
background: #393;
}
QRangeSlider > QSplitter::handle:vertical {
height: 4px;
}
QRangeSlider > QSplitter::handle:pressed {
background: #ca5;
}
"""
def scale(val, src, dst):
return int(((val - src[0]) / float(src[1]-src[0])) * (dst[1]-dst[0]) + dst[0])
class Ui_Form(object):
def setupUi(self, Form):
Form.setObjectName("QRangeSlider")
Form.resize(300, 30)
Form.setStyleSheet(DEFAULT_CSS)
self.gridLayout = QtWidgets.QGridLayout(Form)
self.gridLayout.setContentsMargins(0, 0, 0, 0)
self.gridLayout.setSpacing(0)
self.gridLayout.setObjectName("gridLayout")
self._splitter = QtWidgets.QSplitter(Form)
self._splitter.setMinimumSize(QtCore.QSize(0, 0))
self._splitter.setMaximumSize(QtCore.QSize(16777215, 16777215))
self._splitter.setOrientation(QtCore.Qt.Horizontal)
self._splitter.setObjectName("splitter")
self._head = QtWidgets.QGroupBox(self._splitter)
self._head.setTitle("")
self._head.setObjectName("Head")
self._handle = QtWidgets.QGroupBox(self._splitter)
self._handle.setTitle("")
self._handle.setObjectName("Span")
self._tail = QtWidgets.QGroupBox(self._splitter)
self._tail.setTitle("")
self._tail.setObjectName("Tail")
self.gridLayout.addWidget(self._splitter, 0, 0, 1, 1)
self.retranslateUi(Form)
QtCore.QMetaObject.connectSlotsByName(Form)
def retranslateUi(self, Form):
_translate = QtCore.QCoreApplication.translate
Form.setWindowTitle(_translate("QRangeSlider", "QRangeSlider"))
class Element(QtWidgets.QGroupBox):
def __init__(self, parent, main):
super(Element, self).__init__(parent)
self.main = main
def setStyleSheet(self, style):
self.parent().setStyleSheet(style)
def textColor(self):
return getattr(self, '__textColor', QtGui.QColor(125, 125, 125))
def setTextColor(self, color):
if type(color) == tuple and len(color) == 3:
color = QtGui.QColor(color[0], color[1], color[2])
elif type(color) == int:
color = QtGui.QColor(color, color, color)
setattr(self, '__textColor', color)
def paintEvent(self, event):
qp = QtGui.QPainter()
qp.begin(self)
if self.main.drawValues():
self.drawText(event, qp)
qp.end()
class Head(Element):
def __init__(self, parent, main):
super(Head, self).__init__(parent, main)
def drawText(self, event, qp):
qp.setPen(self.textColor())
qp.setFont(QtGui.QFont('Arial', 10))
qp.drawText(event.rect(), QtCore.Qt.AlignLeft, str(self.main.min()))
class Tail(Element):
def __init__(self, parent, main):
super(Tail, self).__init__(parent, main)
def drawText(self, event, qp):
qp.setPen(self.textColor())
qp.setFont(QtGui.QFont('Arial', 10))
qp.drawText(event.rect(), QtCore.Qt.AlignRight, str(self.main.max()))
class Handle(Element):
def __init__(self, parent, main):
super(Handle, self).__init__(parent, main)
def drawText(self, event, qp):
qp.setPen(self.textColor())
qp.setFont(QtGui.QFont('Arial', 10))
qp.drawText(event.rect(), QtCore.Qt.AlignLeft, str(self.main.start()))
qp.drawText(event.rect(), QtCore.Qt.AlignRight, str(self.main.end()))
def mouseMoveEvent(self, event):
event.accept()
mx = event.globalX()
_mx = getattr(self, '__mx', None)
if not _mx:
setattr(self, '__mx', mx)
dx = 0
else:
dx = mx - _mx
setattr(self, '__mx', mx)
if dx == 0:
event.ignore()
return
elif dx > 0:
dx = 1
elif dx < 0:
dx = -1
s = self.main.start() + dx
e = self.main.end() + dx
if s >= self.main.min() and e <= self.main.max():
self.main.setRange(s, e)
class QRangeSlider(QtWidgets.QWidget, Ui_Form):
endValueChanged = QtCore.pyqtSignal(int)
maxValueChanged = QtCore.pyqtSignal(int)
minValueChanged = QtCore.pyqtSignal(int)
startValueChanged = QtCore.pyqtSignal(int)
minValueChanged = QtCore.pyqtSignal(int)
maxValueChanged = QtCore.pyqtSignal(int)
startValueChanged = QtCore.pyqtSignal(int)
endValueChanged = QtCore.pyqtSignal(int)
_SPLIT_START = 1
_SPLIT_END = 2
def __init__(self, parent=None):
super(QRangeSlider, self).__init__(parent)
self.setupUi(self)
self.setMouseTracking(False)
self._splitter.splitterMoved.connect(self._handleMoveSplitter)
self._head_layout = QtWidgets.QHBoxLayout()
self._head_layout.setSpacing(0)
self._head_layout.setContentsMargins(0, 0, 0, 0)
self._head.setLayout(self._head_layout)
self.head = Head(self._head, main=self)
self._head_layout.addWidget(self.head)
self._handle_layout = QtWidgets.QHBoxLayout()
self._handle_layout.setSpacing(0)
self._handle_layout.setContentsMargins(0, 0, 0, 0)
self._handle.setLayout(self._handle_layout)
self.handle = Handle(self._handle, main=self)
self.handle.setTextColor((150, 255, 150))
self._handle_layout.addWidget(self.handle)
self._tail_layout = QtWidgets.QHBoxLayout()
self._tail_layout.setSpacing(0)
self._tail_layout.setContentsMargins(0, 0, 0, 0)
self._tail.setLayout(self._tail_layout)
self.tail = Tail(self._tail, main=self)
self._tail_layout.addWidget(self.tail)
self.setMin(0)
self.setMax(99)
self.setStart(0)
self.setEnd(99)
self.setDrawValues(True)
def min(self):
return getattr(self, '__min', None)
def max(self):
return getattr(self, '__max', None)
def setMin(self, value):
setattr(self, '__min', value)
self.minValueChanged.emit(value)
def setMax(self, value):
setattr(self, '__max', value)
self.maxValueChanged.emit(value)
def start(self):
return getattr(self, '__start', None)
def end(self):
return getattr(self, '__end', None)
def _setStart(self, value):
setattr(self, '__start', value)
self.startValueChanged.emit(value)
def setStart(self, value):
v = self._valueToPos(value)
self._splitter.splitterMoved.disconnect()
self._splitter.moveSplitter(v, self._SPLIT_START)
self._splitter.splitterMoved.connect(self._handleMoveSplitter)
self._setStart(value)
def _setEnd(self, value):
setattr(self, '__end', value)
self.endValueChanged.emit(value)
def setEnd(self, value):
v = self._valueToPos(value)
self._splitter.splitterMoved.disconnect()
self._splitter.moveSplitter(v, self._SPLIT_END)
self._splitter.splitterMoved.connect(self._handleMoveSplitter)
self._setEnd(value)
def drawValues(self):
return getattr(self, '__drawValues', None)
def setDrawValues(self, draw):
setattr(self, '__drawValues', draw)
def getRange(self):
return (self.start(), self.end())
def setRange(self, start, end):
self.setStart(start)
self.setEnd(end)
def keyPressEvent(self, event):
key = event.key()
if key == QtCore.Qt.Key_Left:
s = self.start()-1
e = self.end()-1
elif key == QtCore.Qt.Key_Right:
s = self.start()+1
e = self.end()+1
else:
event.ignore()
return
event.accept()
if s >= self.min() and e <= self.max():
self.setRange(s, e)
def setBackgroundStyle(self, style):
self._tail.setStyleSheet(style)
self._head.setStyleSheet(style)
def setSpanStyle(self, style):
self._handle.setStyleSheet(style)
def _valueToPos(self, value):
return scale(value, (self.min(), self.max()), (0, self.width()))
def _posToValue(self, xpos):
return scale(xpos, (0, self.width()), (self.min(), self.max()))
def _handleMoveSplitter(self, xpos, index):
hw = self._splitter.handleWidth()
def _lockWidth(widget):
width = widget.size().width()
widget.setMinimumWidth(width)
widget.setMaximumWidth(width)
def _unlockWidth(widget):
widget.setMinimumWidth(0)
widget.setMaximumWidth(16777215)
v = self._posToValue(xpos)
if index == self._SPLIT_START:
_lockWidth(self._tail)
if v >= self.end():
return
offset = -20
w = xpos + offset
self._setStart(v)
elif index == self._SPLIT_END:
_lockWidth(self._head)
if v <= self.start():
return
offset = -40
w = self.width() - xpos + offset
self._setEnd(v)
_unlockWidth(self._tail)
_unlockWidth(self._head)
_unlockWidth(self._handle)
if __name__ == '__main__':
app = QtWidgets.QApplication(sys.argv)
rs = QRangeSlider()
rs.show()
rs.setRange(15, 35)
rs.setBackgroundStyle('background: qlineargradient(x1:0, y1:0, x2:0, y2:1, stop:0 #222, stop:1 #333);')
rs.handle.setStyleSheet('background: qlineargradient(x1:0, y1:0, x2:0, y2:1, stop:0 #282, stop:1 #393);')
app.exec_()

733
python/visualization.py Normal file
View File

@ -0,0 +1,733 @@
from __future__ import print_function
from __future__ import division
import time
import sys
import numpy as np
from scipy.ndimage.filters import gaussian_filter1d
from collections import deque
from qrangeslider import QRangeSlider
from qfloatslider import QFloatSlider
import config
import microphone
import dsp
import led
if config.USE_GUI:
import pyqtgraph as pg
from PyQt5.QtCore import *
from PyQt5.QtWidgets import *
class Visualizer():
def __init__(self):
self.effects = {"Scroll":self.visualize_scroll,
"Energy":self.visualize_energy,
"Spectrum":self.visualize_spectrum,
#"Power":self.visualize_power,
"Wavelength":self.visualize_wavelength,
"Beat":self.visualize_beat,
"Wave":self.visualize_wave,}
#"Auto":self.visualize_auto}
self.colors = {"Red":(255,0,0),
"Orange":(255,40,0),
"Yellow":(255,255,0),
"Green":(0,255,0),
"Blue":(0,0,255),
"Light blue":(1,247,161),
"Purple":(80,5,252),
"Pink":(255,0,178)}
self.wavelength_color_modes = {"Spectral":"rgb",
"Dancefloor":"rpb",
"Brilliance":"ywb",
"Jungle":"ryg"}
self.current_effect = "Wavelength"
# Setup for frequency detection algorithm
self.freq_channel_history = 40
self.beat_count = 0
self.freq_channels = [deque(maxlen=self.freq_channel_history) for i in range(config.N_FFT_BINS)]
self.prev_output = np.array([[0 for i in range(config.N_PIXELS)] for i in range(3)])
self.prev_spectrum = [0 for i in range(config.N_PIXELS//2)]
self.current_freq_detects = {"beat":False,
"low":False,
"mid":False,
"high":False}
self.prev_freq_detects = {"beat":0,
"low":0,
"mid":0,
"high":0}
self.detection_ranges = {"beat":(0,1),
"low":(1,int(config.N_FFT_BINS*0.2)),
"mid":(int(config.N_FFT_BINS*0.4),int(config.N_FFT_BINS*0.6)),
"high":(int(config.N_FFT_BINS*0.7),int(config.N_FFT_BINS))}
self.min_detect_amplitude = {"beat":0.7,
"low":0.5,
"mid":0.3,
"high":0.05}
# Configurable options for effects go in here.
# Usage: self.effect_opts[effect][option]
self.effect_opts = {"Energy":{"blur": 1, # Amount of blur to apply
"scale":0.9}, # Width of effect on strip
"Wave":{"color_wave": "Red", # Colour of moving bit
"wipe_len":5, # Initial length of colour bit after beat
"wipe_speed":2}, # Number of pixels added to colour bit every frame
"Wavelength":{"roll": False, # Cycle colour overlay across strip
"color_mode": "Spectral", # Colour mode of overlay (rgb, rpb, ywb, ryg)
"mirror": False} # Reflect output down centre of strip?
}
# Configurations for dynamic ui generation. Effect options can be changed by widgets created at runtime,
# meaning that you don't need to worry about the user interface - it's all done for you.
# Each effect key points to a list. Each list contains lists giving config for each option.
# Syntax: effect:[variable, label_text, ui_element, opts]
# effect - the effect which you want to change options for. MUST have a key in self.effect_opts
# variable - the key of thing you want to be changed. MUST be in self.effect_opts[effect], otherwise it won't work.
# label - the text displayed on the ui
# ui_element - how you want the variable to be changed
# opts - options for the ui element. Must be a tuple.
# UI Elements + opts:
# slider, (min, max, interval, default) (for integer values in a given range)
# float_slider, (min, max, interval, default) (for floating point values in a given range)
# checkbox, (default) (for True/False values)
# dropdown, (dict, default) (dict example see self.colors above)
#
self.dynamic_effects_config = {"Energy":[["blur", "Blur", "float_slider", (0.1,4.0,0.1,1.0)],
["scale", "Scale", "float_slider", (0.4,1.0,0.05,0.9)]],
"Wave":[["color_wave", "Wave Color", "dropdown", self.colors],
["wipe_len", "Wave Start Length", "slider", (0,config.N_PIXELS//4,1,5)],
["wipe_speed", "Wave Speed", "slider", (1,10,1,2)]],
"Wavelength":[["roll", "Roll Colors", "checkbox", False],
["color_mode", "Color Mode", "dropdown", self.wavelength_color_modes]]
}
# Setup for "Wave" (don't change these)
self.wave_wipe_count = 0
# Setup for "Wavelength" (don't change these)
self._wavelength_set_color_mode(self.effect_opts["Wavelength"]["color_mode"])
def _wavelength_set_color_mode(self, mode):
# chunks of colour gradients
self.rgb_overlay = np.zeros((3,242))
# used to construct rgb overlay. [0-255,255...] whole length of strip
_gradient_whole = [int(i*255/(config.N_PIXELS//2)) for i in range(config.N_PIXELS//2)] +\
[255 for i in range(config.N_PIXELS//2)]
# used to construct rgb overlay. [0-255,255...] 1/2 length of strip
_gradient_half = _gradient_whole[::2]
if self.wavelength_color_modes[self.effect_opts["Wavelength"]["color_mode"]] == "rgb":
self.rgb_overlay[0, :config.N_PIXELS//2] = _gradient_half[::-1]
self.rgb_overlay[1, :] = _gradient_half + _gradient_half[::-1]
self.rgb_overlay[2, :] = np.flipud(self.rgb_overlay[0])
elif self.wavelength_color_modes[self.effect_opts["Wavelength"]["color_mode"]] == "rpb":
self.rgb_overlay[0, :] = _gradient_whole[::-1]
self.rgb_overlay[2, :] = _gradient_whole
elif self.wavelength_color_modes[self.effect_opts["Wavelength"]["color_mode"]] == "ywb":
self.rgb_overlay[0, :] = _gradient_whole[::-1]
self.rgb_overlay[1, :] = 255
self.rgb_overlay[2, :] = _gradient_whole
elif self.wavelength_color_modes[self.effect_opts["Wavelength"]["color_mode"]] == "ryg":
self.rgb_overlay[0, :] = _gradient_whole[::-1]
self.rgb_overlay[1, :] = _gradient_whole
else:
raise ValueError("Colour mode '{}' not known. Leave an issue on github if you want it added!".format(mode))
self.effect_opts["Wavelength"]["color_mode"] = mode
def get_vis(self, y):
self.update_freq_channels(y)
self.detect_freqs()
self.prev_output = np.copy(self.effects[self.current_effect](y))
return self.prev_output
def _split_equal(self, value, parts):
value = float(value)
return [int(round(i*value/parts)) for i in range(1,parts+1)]
def update_freq_channels(self, y):
for i in range(len(y)):
self.freq_channels[i].appendleft(y[i])
def detect_freqs(self):
"""
Function that updates current_freq_detects. Any visualisation algorithm can check if
there is currently a beat, low, mid, or high by querying the self.current_freq_detects dict.
"""
channel_avgs = []
differences = []
for i in range(config.N_FFT_BINS):
channel_avgs.append(sum(self.freq_channels[i])/len(self.freq_channels[i]))
differences.append(((self.freq_channels[i][0]-channel_avgs[i])*100)//channel_avgs[i])
for i in ["beat", "low", "mid", "high"]:
if any(differences[j] >= 100 and self.freq_channels[j][0] >= self.min_detect_amplitude[i]\
for j in range(*self.detection_ranges[i]))\
and (time.time() - self.prev_freq_detects[i] > 0.15)\
and len(self.freq_channels[0]) == self.freq_channel_history:
self.prev_freq_detects[i] = time.time()
self.current_freq_detects[i] = True
#print(i)
else:
self.current_freq_detects[i] = False
#if self.current_freq_detects["beat"]:
# print(time.time(),"Beat")
#pass
#print(differences[0], channel_avgs[0])
#print("{1: <{0}}{2: <{0}}{4: <{0}}{4}".format(7, self.current_freq_detects["beat"],
# self.current_freq_detects["low"],
# self.current_freq_detects["mid"],
# self.current_freq_detects["high"]))
def visualize_scroll(self, y):
"""Effect that originates in the center and scrolls outwards"""
global p
y = y**2.0
gain.update(y)
y /= gain.value
y *= 255.0
r = int(np.max(y[:len(y) // 3]))
g = int(np.max(y[len(y) // 3: 2 * len(y) // 3]))
b = int(np.max(y[2 * len(y) // 3:]))
# Scrolling effect window
p[:, 1:] = p[:, :-1]
p *= 0.98
p = gaussian_filter1d(p, sigma=0.2)
# Create new color originating at the center
p[0, 0] = r
p[1, 0] = g
p[2, 0] = b
# Update the LED strip
return np.concatenate((p[:, ::-1], p), axis=1)
def visualize_energy(self, y):
"""Effect that expands from the center with increasing sound energy"""
global p
y = np.copy(y)
gain.update(y)
y /= gain.value
scale = self.effect_opts["Energy"]["scale"]
# Scale by the width of the LED strip
y *= float((config.N_PIXELS * scale) - 1)
# Map color channels according to energy in the different freq bands
r = int(np.mean(y[:len(y) // 3]**scale))
g = int(np.mean(y[len(y) // 3: 2 * len(y) // 3]**scale))
b = int(np.mean(y[2 * len(y) // 3:]**scale))
# Assign color to different frequency regions
p[0, :r] = 255.0
p[0, r:] = 0.0
p[1, :g] = 255.0
p[1, g:] = 0.0
p[2, :b] = 255.0
p[2, b:] = 0.0
p_filt.update(p)
p = np.round(p_filt.value)
# Apply blur to smooth the edges
p[0, :] = gaussian_filter1d(p[0, :], sigma=self.effect_opts["Energy"]["blur"])
p[1, :] = gaussian_filter1d(p[1, :], sigma=self.effect_opts["Energy"]["blur"])
p[2, :] = gaussian_filter1d(p[2, :], sigma=self.effect_opts["Energy"]["blur"])
# Set the new pixel value
return np.concatenate((p[:, ::-1], p), axis=1)
def visualize_wavelength(self, y):
y = np.copy(interpolate(y, config.N_PIXELS // 2))
common_mode.update(y)
diff = y - self.prev_spectrum
self.prev_spectrum = np.copy(y)
# Color channel mappings
r = r_filt.update(y - common_mode.value)
g = np.abs(diff)
b = b_filt.update(np.copy(y))
if self.effect_opts["Wavelength"]["mirror"]:
r = r.extend(r[::-1])
r = r.extend(r[::-1])
else:
# stretch (double) r so it covers the entire spectrum
r = np.array([j for i in zip(r,r) for j in i])
b = np.array([j for i in zip(b,b) for j in i])
output = [self.rgb_overlay[0]*r,self.rgb_overlay[1]*r,self.rgb_overlay[2]*r]
self.prev_spectrum = y
if self.effect_opts["Wavelength"]["roll"]:
self.rgb_overlay = np.roll(self.rgb_overlay,1,axis=1)
output[0] = gaussian_filter1d(output[0], sigma=4.0)
output[1] = gaussian_filter1d(output[1], sigma=4.0)
output[2] = gaussian_filter1d(output[2], sigma=4.0)
return output
#return np.concatenate((p[:, ::-1], p), axis=1)
def visualize_power(self, y):
"""Effect that pulses different reqions of the strip increasing sound energy"""
global p
_p = np.copy(p)
y = np.copy(interpolate(y, config.N_PIXELS // 2))
common_mode.update(y)
diff = y - self.prev_spectrum
self.prev_spectrum = np.copy(y)
# Color channel mappings
r = r_filt.update(y - common_mode.value)
g = np.abs(diff)
b = b_filt.update(np.copy(y))
# I have no idea what any of this does but it looks cool
r = [int(i*255) for i in r[::3]]
g = [int(i*255) for i in g[::3]]
b = [int(i*255) for i in b[::3]]
_p[0, 0:len(r)] = r
_p[1, len(r):len(r)+len(g)] = g
_p[2, len(r)+len(g):config.N_PIXELS] = b[:39]
p_filt.update(_p)
# Clip it into range
_p = np.clip(p, 0, 255).astype(int)
# Apply substantial blur to smooth the edges
_p[0, :] = gaussian_filter1d(_p[0, :], sigma=3.0)
_p[1, :] = gaussian_filter1d(_p[1, :], sigma=3.0)
_p[2, :] = gaussian_filter1d(_p[2, :], sigma=3.0)
self.prev_spectrum = y
return np.concatenate((_p[:, ::-1], _p), axis=1)
def visualize_spectrum(self, y):
"""Effect that maps the Mel filterbank frequencies onto the LED strip"""
global p
#print(len(y))
#print(y)
y = np.copy(interpolate(y, config.N_PIXELS // 2))
common_mode.update(y)
diff = y - self.prev_spectrum
self.prev_spectrum = np.copy(y)
# Color channel mappings
r = r_filt.update(y - common_mode.value)
g = np.abs(diff)
b = b_filt.update(np.copy(y))
# Mirror the color channels for symmetric output
r = np.concatenate((r[::-1], r))
g = np.concatenate((g[::-1], g))
b = np.concatenate((b[::-1], b))
output = np.array([r, g,b]) * 255
self.prev_spectrum = y
return output
def visualize_auto(self,y):
"""Automatically (intelligently?) cycle through effects"""
return self.visualize_beat(y) # real intelligent
def visualize_wave(self, y):
"""Effect that flashes to the beat with scrolling coloured bits"""
if self.current_freq_detects["beat"]:
output = np.array([[255 for i in range(config.N_PIXELS)] for i in range(3)])
self.wave_wipe_count = self.effect_opts["Wave"]["wipe_len"]
else:
output = np.copy(self.prev_output)
#for i in range(len(self.prev_output)):
# output[i] = np.hsplit(self.prev_output[i],2)[0]
output = np.multiply(self.prev_output,0.7)
for i in range(self.wave_wipe_count):
output[0][i]=self.colors[self.effect_opts["Wave"]["color_wave"]][0]
output[0][-i]=self.colors[self.effect_opts["Wave"]["color_wave"]][0]
output[1][i]=self.colors[self.effect_opts["Wave"]["color_wave"]][1]
output[1][-i]=self.colors[self.effect_opts["Wave"]["color_wave"]][1]
output[2][i]=self.colors[self.effect_opts["Wave"]["color_wave"]][2]
output[2][-i]=self.colors[self.effect_opts["Wave"]["color_wave"]][2]
#output = np.concatenate([output,np.fliplr(output)], axis=1)
self.wave_wipe_count += self.effect_opts["Wave"]["wipe_speed"]
if self.wave_wipe_count > config.N_PIXELS//2:
self.wave_wipe_count = config.N_PIXELS//2
return output
def visualize_beat(self, y):
"""Effect that flashes to the beat"""
if self.current_freq_detects["beat"]:
output = np.array([[255 for i in range(config.N_PIXELS)] for i in range(3)])
else:
output = np.copy(self.prev_output)
output = np.multiply(self.prev_output,0.7)
return output
class GUI(QWidget):
def __init__(self):
super().__init__()
self.initUI()
def initUI(self):
# ==================================== Set up window and wrapping layout
self.setWindowTitle("Visualization")
wrapper = QVBoxLayout()
# ========================================== Set up FPS and error labels
labels_layout = QHBoxLayout()
self.label_error = QLabel("")
self.label_fps = QLabel("")
self.label_fps.setAlignment(Qt.AlignRight | Qt.AlignVCenter)
labels_layout.addWidget(self.label_error)
labels_layout.addStretch()
labels_layout.addWidget(self.label_fps)
# ================================================== Set up graph layout
graph_view = pg.GraphicsView()
graph_layout = pg.GraphicsLayout(border=(100,100,100))
graph_view.setCentralItem(graph_layout)
# Mel filterbank plot
fft_plot = graph_layout.addPlot(title='Filterbank Output', colspan=3)
fft_plot.setRange(yRange=[-0.1, 1.2])
fft_plot.disableAutoRange(axis=pg.ViewBox.YAxis)
x_data = np.array(range(1, config.N_FFT_BINS + 1))
self.mel_curve = pg.PlotCurveItem()
self.mel_curve.setData(x=x_data, y=x_data*0)
fft_plot.addItem(self.mel_curve)
# Visualization plot
graph_layout.nextRow()
led_plot = graph_layout.addPlot(title='Visualization Output', colspan=3)
led_plot.setRange(yRange=[-5, 260])
led_plot.disableAutoRange(axis=pg.ViewBox.YAxis)
# Pen for each of the color channel curves
r_pen = pg.mkPen((255, 30, 30, 200), width=4)
g_pen = pg.mkPen((30, 255, 30, 200), width=4)
b_pen = pg.mkPen((30, 30, 255, 200), width=4)
# Color channel curves
self.r_curve = pg.PlotCurveItem(pen=r_pen)
self.g_curve = pg.PlotCurveItem(pen=g_pen)
self.b_curve = pg.PlotCurveItem(pen=b_pen)
# Define x data
x_data = np.array(range(1, config.N_PIXELS + 1))
self.r_curve.setData(x=x_data, y=x_data*0)
self.g_curve.setData(x=x_data, y=x_data*0)
self.b_curve.setData(x=x_data, y=x_data*0)
# Add curves to plot
led_plot.addItem(self.r_curve)
led_plot.addItem(self.g_curve)
led_plot.addItem(self.b_curve)
# ================================================= Set up button layout
label_active = QLabel("Active Effect")
button_grid = QGridLayout()
buttons = {}
connecting_funcs = {}
grid_width = 4
i = 0
j = 0
# Dynamically layout buttons and connect them to the visualisation effects
def connect_generator(effect):
def func():
visualizer.current_effect = effect
func.__name__ = effect
return func
# Where the magic happens
for effect in visualizer.effects:
connecting_funcs[effect] = connect_generator(effect)
buttons[effect] = QPushButton(effect)
buttons[effect].clicked.connect(connecting_funcs[effect])
button_grid.addWidget(buttons[effect], j, i)
i += 1
if i % grid_width == 0:
i = 0
j += 1
# ============================================== Set up frequency slider
# Frequency range label
label_slider = QLabel("Frequency Range")
# Frequency slider
def freq_slider_change(tick):
minf = freq_slider.tickValue(0)**2.0 * (config.MIC_RATE / 2.0)
maxf = freq_slider.tickValue(1)**2.0 * (config.MIC_RATE / 2.0)
t = 'Frequency range: {:.0f} - {:.0f} Hz'.format(minf, maxf)
freq_label.setText(t)
config.MIN_FREQUENCY = minf
config.MAX_FREQUENCY = maxf
dsp.create_mel_bank()
def set_freq_min():
config.MIN_FREQUENCY = freq_slider.start()
dsp.create_mel_bank()
def set_freq_max():
config.MAX_FREQUENCY = freq_slider.end()
dsp.create_mel_bank()
freq_slider = QRangeSlider()
freq_slider.show()
freq_slider.setMin(0)
freq_slider.setMax(20000)
freq_slider.setRange(config.MIN_FREQUENCY, config.MAX_FREQUENCY)
freq_slider.setBackgroundStyle('background: qlineargradient(x1:0, y1:0, x2:0, y2:1, stop:0 #222, stop:1 #333);')
freq_slider.setSpanStyle('background: qlineargradient(x1:0, y1:0, x2:0, y2:1, stop:0 #282, stop:1 #393);')
freq_slider.setDrawValues(True)
freq_slider.endValueChanged.connect(set_freq_max)
freq_slider.startValueChanged.connect(set_freq_min)
freq_slider.setStyleSheet("""
QRangeSlider * {
border: 0px;
padding: 0px;
}
QRangeSlider > QSplitter::handle {
background: #fff;
}
QRangeSlider > QSplitter::handle:vertical {
height: 3px;
}
QRangeSlider > QSplitter::handle:pressed {
background: #ca5;
}
""")
# ============================================ Set up option tabs layout
label_options = QLabel("Effect Options")
opts_tabs = QTabWidget()
# Dynamically set up tabs
tabs = {}
grid_layouts = {}
self.grid_layout_widgets = {}
options = visualizer.effect_opts.keys()
for effect in visualizer.effects:
# Make the tab
self.grid_layout_widgets[effect] = {}
tabs[effect] = QWidget()
grid_layouts[effect] = QGridLayout()
tabs[effect].setLayout(grid_layouts[effect])
opts_tabs.addTab(tabs[effect],effect)
# These functions make functions for the dynamic ui generation
# YOU WANT-A DYNAMIC I GIVE-A YOU DYNAMIC!
def gen_slider_valuechanger(effect, key):
def func():
visualizer.effect_opts[effect][key] = self.grid_layout_widgets[effect][key].value()
return func
def gen_float_slider_valuechanger(effect, key):
def func():
visualizer.effect_opts[effect][key] = self.grid_layout_widgets[effect][key].slider_value
return func
def gen_combobox_valuechanger(effect, key):
def func():
visualizer.effect_opts[effect][key] = self.grid_layout_widgets[effect][key].currentText()
visualizer._wavelength_set_color_mode(visualizer.effect_opts[effect][key])
return func
def gen_checkbox_valuechanger(effect, key):
def func():
visualizer.effect_opts[effect][key] = self.grid_layout_widgets[effect][key].isChecked()
return func
# Dynamically generate ui for settings
if effect in visualizer.dynamic_effects_config:
i = 0
connecting_funcs[effect] = {}
for key, label, ui_element, opts in visualizer.dynamic_effects_config[effect][:]:
if ui_element == "slider":
connecting_funcs[effect][key] = gen_slider_valuechanger(effect, key)
self.grid_layout_widgets[effect][key] = QSlider(Qt.Horizontal)
self.grid_layout_widgets[effect][key].setMinimum(opts[0])
self.grid_layout_widgets[effect][key].setMaximum(opts[1])
self.grid_layout_widgets[effect][key].setValue(opts[2])
self.grid_layout_widgets[effect][key].valueChanged.connect(
connecting_funcs[effect][key])
grid_layouts[effect].addWidget(QLabel(label),i,0)
grid_layouts[effect].addWidget(self.grid_layout_widgets[effect][key],i,1)
elif ui_element == "float_slider":
connecting_funcs[effect][key] = gen_float_slider_valuechanger(effect, key)
self.grid_layout_widgets[effect][key] = QFloatSlider(*opts)
self.grid_layout_widgets[effect][key].valueChanged.connect(
connecting_funcs[effect][key])
grid_layouts[effect].addWidget(QLabel(label),i,0)
grid_layouts[effect].addWidget(self.grid_layout_widgets[effect][key],i,1)
elif ui_element == "dropdown":
connecting_funcs[effect][key] = gen_combobox_valuechanger(effect, key)
self.grid_layout_widgets[effect][key] = QComboBox()
self.grid_layout_widgets[effect][key].addItems(opts.keys())
self.grid_layout_widgets[effect][key].currentIndexChanged.connect(
connecting_funcs[effect][key])
grid_layouts[effect].addWidget(QLabel(label),i,0)
grid_layouts[effect].addWidget(self.grid_layout_widgets[effect][key],i,1)
elif ui_element == "checkbox":
connecting_funcs[effect][key] = gen_checkbox_valuechanger(effect, key)
self.grid_layout_widgets[effect][key] = QCheckBox()
#self.grid_layout_widgets[effect][key].addItems(opts.keys())
self.grid_layout_widgets[effect][key].stateChanged.connect(
connecting_funcs[effect][key])
grid_layouts[effect].addWidget(QLabel(label),i,0)
grid_layouts[effect].addWidget(self.grid_layout_widgets[effect][key],i,1)
i += 1
#visualizer.effect_settings[effect]
else:
grid_layouts[effect].addWidget(QLabel("No customisable options for this effect :("),0,0)
# ============================================= Add layouts into wrapper
self.setLayout(wrapper)
wrapper.addLayout(labels_layout)
wrapper.addWidget(graph_view)
wrapper.addWidget(label_active)
wrapper.addLayout(button_grid)
wrapper.addWidget(label_slider)
wrapper.addWidget(freq_slider)
wrapper.addWidget(label_options)
wrapper.addWidget(opts_tabs)
self.show()
def frames_per_second():
"""Return the estimated frames per second
Returns the current estimate for frames-per-second (FPS).
FPS is estimated by measured the amount of time that has elapsed since
this function was previously called. The FPS estimate is low-pass filtered
to reduce noise.
This function is intended to be called one time for every iteration of
the program's main loop.
Returns
-------
fps : float
Estimated frames-per-second. This value is low-pass filtered
to reduce noise.
"""
global _time_prev, _fps
time_now = time.time() * 1000.0
dt = time_now - _time_prev
_time_prev = time_now
if dt == 0.0:
return _fps.value
return _fps.update(1000.0 / dt)
def memoize(function):
"""Provides a decorator for memoizing functions"""
from functools import wraps
memo = {}
@wraps(function)
def wrapper(*args):
if args in memo:
return memo[args]
else:
rv = function(*args)
memo[args] = rv
return rv
return wrapper
@memoize
def _normalized_linspace(size):
return np.linspace(0, 1, size)
def interpolate(y, new_length):
"""Intelligently resizes the array by linearly interpolating the values
Parameters
----------
y : np.array
Array that should be resized
new_length : int
The length of the new interpolated array
Returns
-------
z : np.array
New array with length of new_length that contains the interpolated
values of y.
"""
if len(y) == new_length:
return y
x_old = _normalized_linspace(len(y))
x_new = _normalized_linspace(new_length)
z = np.interp(x_new, x_old, y)
return z
def microphone_update(audio_samples):
global y_roll, prev_rms, prev_exp, prev_fps_update
# Normalize samples between 0 and 1
y = audio_samples / 2.0**15
# Construct a rolling window of audio samples
y_roll[:-1] = y_roll[1:]
y_roll[-1, :] = np.copy(y)
y_data = np.concatenate(y_roll, axis=0).astype(np.float32)
vol = np.max(np.abs(y_data))
if vol < config.MIN_VOLUME_THRESHOLD:
if config.USE_GUI:
gui.label_error.setText("No audio input. Volume below threshold.")
else:
print("No audio input. Volume below threshold. Volume: {}".format(vol))
visualizer.prev_output = np.multiply(visualizer.prev_output,0.95)
led.pixels = visualizer.prev_output
led.update()
else:
# Transform audio input into the frequency domain
N = len(y_data)
N_zeros = 2**int(np.ceil(np.log2(N))) - N
# Pad with zeros until the next power of two
y_data *= fft_window
y_padded = np.pad(y_data, (0, N_zeros), mode='constant')
YS = np.abs(np.fft.rfft(y_padded)[:N // 2])
# Construct a Mel filterbank from the FFT data
mel = np.atleast_2d(YS).T * dsp.mel_y.T
# Scale data to values more suitable for visualization
# mel = np.sum(mel, axis=0)
mel = np.sum(mel, axis=0)
mel = mel**2.0
# Gain normalization
mel_gain.update(np.max(gaussian_filter1d(mel, sigma=1.0)))
mel /= mel_gain.value
mel = mel_smoothing.update(mel)
# Map filterbank output onto LED strip
led.pixels = visualizer.get_vis(mel)
led.update()
if config.USE_GUI:
# Plot filterbank output
x = np.linspace(config.MIN_FREQUENCY, config.MAX_FREQUENCY, len(mel))
gui.mel_curve.setData(x=x, y=fft_plot_filter.update(mel))
gui.label_error.setText("")
if config.USE_GUI:
fps = frames_per_second()
if time.time() - 0.5 > prev_fps_update:
prev_fps_update = time.time()
app.processEvents()
# Plot the color channels
gui.r_curve.setData(y=led.pixels[0])
gui.g_curve.setData(y=led.pixels[1])
gui.b_curve.setData(y=led.pixels[2])
# Update fps counter
gui.label_fps.setText('{:.0f} / {:.0f} FPS'.format(fps, config.FPS))
if config.DISPLAY_FPS:
print('FPS {:.0f} / {:.0f}'.format(fps, config.FPS))
# Initialise visualiser and GUI
visualizer = Visualizer()
if config.USE_GUI:
# Create GUI window
app = QApplication([])
app.setApplicationName('Visualization')
gui = GUI()
app.processEvents()
# Initialise filter stuff
fft_plot_filter = dsp.ExpFilter(np.tile(1e-1, config.N_FFT_BINS),
alpha_decay=0.5, alpha_rise=0.99)
mel_gain = dsp.ExpFilter(np.tile(1e-1, config.N_FFT_BINS),
alpha_decay=0.01, alpha_rise=0.99)
mel_smoothing = dsp.ExpFilter(np.tile(1e-1, config.N_FFT_BINS),
alpha_decay=0.5, alpha_rise=0.99)
volume = dsp.ExpFilter(config.MIN_VOLUME_THRESHOLD,
alpha_decay=0.02, alpha_rise=0.02)
fft_window = np.hamming(int(config.MIC_RATE / config.FPS) * config.N_ROLLING_HISTORY)
prev_fps_update = time.time()
# Initialise more filter stuff
r_filt = dsp.ExpFilter(np.tile(0.01, config.N_PIXELS // 2),
alpha_decay=0.2, alpha_rise=0.99)
g_filt = dsp.ExpFilter(np.tile(0.01, config.N_PIXELS // 2),
alpha_decay=0.05, alpha_rise=0.3)
b_filt = dsp.ExpFilter(np.tile(0.01, config.N_PIXELS // 2),
alpha_decay=0.1, alpha_rise=0.5)
common_mode = dsp.ExpFilter(np.tile(0.01, config.N_PIXELS // 2),
alpha_decay=0.99, alpha_rise=0.01)
p_filt = dsp.ExpFilter(np.tile(1, (3, config.N_PIXELS // 2)),
alpha_decay=0.1, alpha_rise=0.99)
p = np.tile(1.0, (3, config.N_PIXELS // 2))
gain = dsp.ExpFilter(np.tile(0.01, config.N_FFT_BINS),
alpha_decay=0.001, alpha_rise=0.99)
# The previous time that the frames_per_second() function was called
_time_prev = time.time() * 1000.0
# The low-pass filter used to estimate frames-per-second
_fps = dsp.ExpFilter(val=config.FPS, alpha_decay=0.2, alpha_rise=0.2)
# Number of audio samples to read every time frame
samples_per_frame = int(config.MIC_RATE / config.FPS)
# Array containing the rolling audio sample window
y_roll = np.random.rand(config.N_ROLLING_HISTORY, samples_per_frame) / 1e16
# Initialize LEDs
led.update()
# Start listening to live audio stream
microphone.start_stream(microphone_update)