Major changes to onset detection and visualization

Changes to config.py:
> Increased STFT rolling window size
> Added configurable options for setting the max and min FFT frequency

Changes to dsp.py:
> Significantly improved the normalized weighted phase deviation code.
This noticeably improves the onset detection accuracy.
> Logarithmic partitioning of the FFT bins now happens after onset
detection instead of before onset detection. This improves onset
detection accuracy at the expense of CPU time.
> Fixed a bug in the log_partition function which would sometimes cause
out of bounds errors
> Refactored and removed some functions that aren't needed anymore

Changes to sandbox.py:
> Sweeping changes to the visualization functions. The onset detection
functions are now combined after applying Gaussian blur to each onset
function individually. This improves the overall agreement between the
onset functions.
> Hyperbolic tan function is used to map saturated pixels to the range
[0, 1]
> Added a new rainbow generation function with more options than the old
one.
> Refactored most of the led update functions.
> The LED brightness is now being plotted instead of onsets
This commit is contained in:
Scott Lawson 2016-10-24 16:42:03 -07:00
parent a38a8e1680
commit 262206d359
3 changed files with 104 additions and 86 deletions

View File

@ -18,7 +18,7 @@ UDP_PORT = 7777
MIC_RATE = 44100
"""Sampling frequency of the microphone in Hz"""
FPS = 45
FPS = 70
"""Desired LED strip update rate in frames (updates) per second
This is the desired update rate of the LED strip. The actual refresh rate of
@ -30,6 +30,15 @@ the duration of the short-time Fourier transform. This can negatively affect
low frequency (bass) response.
"""
MIN_FREQUENCY = 5
"""Frequencies below this value will be removed during onset detection"""
MAX_FREQUENCY = 12000
"""Frequencies above this value will be removed during onset detection"""
ENERGY_THRESHOLD = 14.0
"""Energy threshold for determining whether a beat has been detected
@ -56,7 +65,7 @@ One downside to using a variance threshold is that it is an absolute threshold
which is affected by the current volume.
"""
N_SUBBANDS = 80 # 240 #48
N_SUBBANDS = 60 # 240 #48
"""Number of frequency bins to use for beat detection
More subbands improve beat detection sensitivity but it may become more
@ -86,9 +95,9 @@ GAMMA_CORRECTION = True
"""Whether to correct LED brightness for nonlinear brightness perception"""
N_CURVES = 4
N_CURVES = 2
"""Number of curves to plot in the visualization window"""
N_ROLLING_HISTORY = 2
N_ROLLING_HISTORY = 8
"""Number of past audio frames to include in the rolling window"""

View File

@ -1,8 +1,6 @@
from __future__ import print_function
#from __future__ import division
import numpy as np
from scipy.interpolate import interp1d
import scipy.fftpack
import config
@ -33,13 +31,13 @@ _ys_historical_energy = np.tile(1.0, (config.N_SUBBANDS, config.N_HISTORY))
def beat_detect(ys):
"""Detect beats using an energy and variance theshold
Parameters
----------
ys : numpy.array
Array containing the magnitudes for each frequency bin of the
fast fourier transformed audio data.
Returns
-------
has_beat : numpy.array
@ -83,7 +81,7 @@ def onset(yt):
Onset detection is perfomed using an ensemble of three onset detection
functions.
The first onset detection function uses the rectified spectral flux (SF)
of successive FFT data frames.
The second onset detection function uses the normalized weighted phase
@ -106,34 +104,31 @@ def onset(yt):
Array of normalized weighted phase difference values
RCD : numpy.array
Array of rectified complex domain values
References
----------
Dixon, Simon "Onset Detection Revisted"
"""
global ys_prev, phase_prev, dphase_prev
xs, ys = fft_log_partition(yt,
subbands=config.N_SUBBANDS,
window=np.hamming,
fmin=1,
fmax=14000)
#ys = music_fft(yt)
xs, ys = fft(yt, window=np.hamming)
ys = ys[(xs >= config.MIN_FREQUENCY) * (xs <= config.MAX_FREQUENCY)]
xs = xs[(xs >= config.MIN_FREQUENCY) * (xs <= config.MAX_FREQUENCY)]
magnitude = np.abs(ys)
phase = wrap_phase(np.angle(ys))
phase = np.angle(ys)
# Special case for initialization
if ys_prev is None:
ys_prev = ys
phase_prev = phase
dphase_prev = phase
# Rectified spectral flux
SF = np.abs(ys) - np.abs(ys_prev)
SF = magnitude - np.abs(ys_prev)
SF[SF < 0.0] = 0.0
# First difference of phase
dphase = wrap_phase(phase - phase_prev)
dphase = phase - phase_prev
# Second difference of phase
ddphase = wrap_phase(dphase - dphase_prev)
ddphase = dphase - dphase_prev
# Normalized weighted phase deviation
NWPD = np.abs(ddphase * magnitude) / magnitude
NWPD = np.abs(ddphase) * magnitude
# Rectified complex domain onset detection function
RCD = np.abs(ys - ys_prev * dphase_prev)
RCD[RCD < 0.0] = 0.0
@ -146,50 +141,35 @@ def onset(yt):
SF = np.nan_to_num(SF)
NWPD = np.nan_to_num(NWPD)
RCD = np.nan_to_num(RCD)
_, SF = log_partition(xs, SF, subbands=config.N_SUBBANDS)
_, NWPD = log_partition(xs, NWPD, subbands=config.N_SUBBANDS)
_, RCD = log_partition(xs, RCD, subbands=config.N_SUBBANDS)
return SF, NWPD, RCD
def rfft(data, window=None):
if window is None:
window = 1.0
else:
window = window(len(data))
window = 1.0 if window is None else window(len(data))
ys = np.abs(np.fft.rfft(data*window))
xs = np.fft.rfftfreq(len(data), 1.0 / config.MIC_RATE)
return xs, ys
def rfft_log_partition(data, fmin=30, fmax=20000, subbands=64, window=None):
"""Returns FFT partitioned into subbands that are logarithmically spaced"""
xs, ys = rfft(data, window=window)
xs_log = np.logspace(np.log10(fmin), np.log10(fmax), num=subbands * 32)
f = interp1d(xs, ys)
ys_log = f(xs_log)
X, Y = [], []
for i in range(0, subbands * 32, 32):
X.append(np.mean(xs_log[i:i + 32]))
Y.append(np.mean(ys_log[i:i + 32]))
return np.array(X), np.array(Y)
def fft(data, window=None):
if window is None:
window = 1.0
else:
window = window(len(data))
window = 1.0 if window is None else window(len(data))
ys = np.fft.fft(data*window)
xs = np.fft.fftfreq(len(data), 1.0 / config.MIC_RATE)
return xs, ys
def fft_log_partition(data, fmin=30, fmax=20000, subbands=64, window=None):
"""Returns FFT partitioned into subbands that are logarithmically spaced"""
xs, ys = fft(data, window=window)
xs_log = np.logspace(np.log10(fmin), np.log10(fmax), num=subbands * 32)
def log_partition(xs, ys, subbands):
f = interp1d(xs, ys)
xs_log = np.logspace(np.log10(xs[0]), np.log10(xs[-1]), num=subbands * 24)
xs_log[0] = xs[0]
xs_log[-1] = xs[-1]
ys_log = f(xs_log)
X, Y = [], []
for i in range(0, subbands * 32, 32):
X.append(np.mean(xs_log[i:i + 32]))
Y.append(np.mean(ys_log[i:i + 32]))
return np.array(X), np.array(Y)
for i in range(0, subbands * 24, 24):
X.append(np.mean(xs_log[i:i + 24]))
Y.append(np.mean(ys_log[i:i + 24]))
return np.array(X), np.array(Y)

View File

@ -8,6 +8,7 @@ import config
import microphone
import dsp
import led
from scipy.ndimage.filters import gaussian_filter1d
def rainbow(length, speed=1.0 / 5.0):
@ -53,10 +54,25 @@ def rainbow(length, speed=1.0 / 5.0):
return x
def rainbow_gen(length, speed=1./5., center=0.5, width=0.5, f=[1, 1, 1]):
dt = 2.0 * np.pi / length
t = time.time() * speed
phi = 2.0 / 3.0 * np.pi
def r(t): return np.clip(np.sin(f[0] * t + 0. * phi) * width + center, 0., 1.)
def g(t): return np.clip(np.sin(f[1] * t + 1. * phi) * width + center, 0., 1.)
def b(t): return np.clip(np.sin(f[2] * t + 2. * phi) * width + center, 0., 1.)
x = np.tile(0.0, (length, 3))
for i in range(length):
x[i][0] = r(i * dt + t)
x[i][1] = g(i * dt + t)
x[i][2] = b(i * dt + t)
return x
_time_prev = time.time() * 1000.0
"""The previous time that the frames_per_second() function was called"""
_fps = dsp.ExponentialFilter(val=config.FPS, alpha_decay=0.01, alpha_rise=0.01)
_fps = dsp.ExponentialFilter(val=config.FPS, alpha_decay=0.05, alpha_rise=0.05)
"""The low-pass filter used to estimate frames-per-second"""
@ -110,7 +126,7 @@ def update_plot_1(x, y):
curves[i].setPen((colors[i][0], colors[i][1], colors[i][2]))
curves[i].setData(x=x, y=y[i])
p1.autoRange()
p1.setRange(yRange=(0, 2.0))
p1.setRange(yRange=(0.0, 2.0))
def interpolate(y, new_length):
@ -188,21 +204,10 @@ def update_leds_6(y):
Array containing the onset energies that should be visualized.
"""
y = np.copy(y)**1.25
y = np.abs(y)**1.25
# Update normalization constants and then normalize each bin
_EA_norm.update(y)
y /= _EA_norm.value
"""Force saturated pixels to leak brighness into neighbouring pixels"""
def smooth():
for n in range(1, len(y) - 1):
excess = y[n] - 1.0
if excess > 0.0:
y[n] = 1.0
y[n - 1] += excess / 2.0
y[n + 1] += excess / 2.0
# Several iterations because the adjacent pixels could also be saturated
for i in range(6):
smooth()
# Update the onset energy low-pass filter and discard value too dim
_EA_smooth.update(y)
_EA_smooth.value[_EA_smooth.value < .1] = 0.0
@ -220,9 +225,8 @@ _prev_energy = 0.0
def update_leds_5(y):
global _prev_energy
y = np.copy(y)
EF = y - _prev_energy
EF = np.max(y - _prev_energy, 0.0)
_prev_energy = np.copy(y)
EF[EF < 0] = 0.0
_EF_norm.update(EF)
EF /= _EF_norm.value
_EF_smooth.update(EF)
@ -231,28 +235,33 @@ def update_leds_5(y):
pixels = np.copy(_EF_smooth.value)
return pixels
_energy_flux = dsp.ExponentialFilter(1.0, alpha_decay=0.025, alpha_rise=0.9)
_energy_norm = dsp.ExponentialFilter(10.0, alpha_decay=.15, alpha_rise=.9)
_energy_smooth = dsp.ExponentialFilter(10.0, alpha_decay=0.1, alpha_rise=0.8)
# Modulate brightness of the entire strip with no individual addressing
# Modulate brightness by relative average rectified onset flux
def update_leds_4(y):
y = np.copy(y)
energy = np.sum(y)
_energy_flux.update(energy)
pixels = energy / _energy_flux.value
global _prev_energy
energy = np.sum(y**1.0)
EF = max(energy - _prev_energy, 0.0)
_prev_energy = energy
_energy_norm.update(EF)
_energy_smooth.update(min(EF / _energy_norm.value, 1.0))
pixels = np.tile(_energy_smooth.value, y.shape[0])
return pixels
# Energy flux based motion across the LED strip
def update_leds_3(y):
global pixels, color, _prev_energy, _energy_flux
global pixels, _prev_energy
y = np.copy(y)
# Calculate energy flux
energy = np.sum(y)
energy_flux = max(energy - _prev_energy, 0)
_prev_energy = energy
# Normalize energy flux
_energy_flux.update(energy_flux)
_energy_norm.update(energy_flux)
# Update and return pixels
pixels = np.roll(pixels, 1)
pixels[0] = energy_flux
@ -279,7 +288,7 @@ def update_leds_1(y):
def microphone_update(stream):
global y_roll, median, onset, SF_peak, NWPD_peak, RCD_peak, onset_peak
global y_roll
# Retrieve new audio samples and construct the rolling window
y = np.fromstring(stream.read(samples_per_frame), dtype=np.int16)
y = y / 2.0**15
@ -288,6 +297,10 @@ def microphone_update(stream):
y_data = np.concatenate(y_roll, axis=0)
# Calculate onset detection functions
SF, NWPD, RCD = dsp.onset(y_data)
# Apply Gaussian blur to improve agreement between onset functions
SF = gaussian_filter1d(SF, 1.0)
NWPD = gaussian_filter1d(NWPD, 1.0)
RCD = gaussian_filter1d(RCD, 1.0)
# Update and normalize peak followers
SF_peak.update(np.max(SF))
NWPD_peak.update(np.max(NWPD))
@ -296,8 +309,10 @@ def microphone_update(stream):
NWPD /= NWPD_peak.value
RCD /= RCD_peak.value
# Normalize and update onset spectrum
# onset = np.sqrt(SF**2.0 + NWPD**2.0 + RCD**2.0)
# onset = SF * NWPD * RCD
onset = SF + RCD
onset = SF + NWPD + RCD
# onset = SF + RCD
onset_peak.update(np.max(onset))
onset /= onset_peak.value
onsets.update(onset)
@ -306,13 +321,16 @@ def microphone_update(stream):
onset_values = interpolate(onsets.value, config.N_PIXELS)
else:
onset_values = np.copy(onsets.value)
led_visualization(onset_values)
brightness = led_visualization(onset_values)
# Plot the onsets
plot_x = np.array(range(1, len(onsets.value) + 1))
plot_y = [onsets.value**i for i in np.linspace(1.5, 0.25, config.N_CURVES)]
plot_y = [0*onsets.value**i for i in np.linspace(2.0, 0.25, config.N_CURVES)]
if brightness is not None:
plot_y = np.array([brightness, onsets.value])
#plot_y = brightness
update_plot_1(plot_x, plot_y)
app.processEvents()
print('FPS {:.2f} / {:.2f}'.format(frames_per_second(), config.FPS))
print('FPS {:.0f} / {:.0f}'.format(frames_per_second(), config.FPS))
# Create plot and window
@ -343,7 +361,7 @@ median = dsp.ExponentialFilter(val=config.N_SUBBANDS / 2.0,
alpha_decay=0.1, alpha_rise=0.1)
# Smooths the decay of the onset detection function
onsets = dsp.ExponentialFilter(val=np.tile(0.0, (config.N_SUBBANDS)),
alpha_decay=0.05, alpha_rise=0.75)
alpha_decay=0.15, alpha_rise=0.75)
# Peak followers used for normalization
SF_peak = dsp.ExponentialFilter(1.0, alpha_decay=0.01, alpha_rise=0.99)
@ -366,27 +384,38 @@ y_roll = np.random.rand(config.N_ROLLING_HISTORY, samples_per_frame) / 100.0
# Low pass filter for the LEDs being output to the strip
pixels_filt = dsp.ExponentialFilter(np.tile(0., (config.N_PIXELS, 3)), .35, .9)
pixels_filt = dsp.ExponentialFilter(np.tile(0., (config.N_PIXELS, 3)), .2, .8)
def hyperbolic_tan(x):
return 1.0 - 2.0 / (np.exp(2.0 * x) + 1.0)
# This is the function responsible for updating LED values
# Edit this function to change the visualization
def led_visualization(onset_values):
# Visualizations that we want to use (normalized to ~[0, 1])
pixels_A = update_leds_6(onset_values)
pixels_B = update_leds_4(onset_values)
# Take the product of the visualizations and scale by 255
pixels = pixels_A * pixels_B
# Combine the effects by taking the product
brightness = pixels_A * pixels_B
brightness = gaussian_filter1d(brightness, 1.0)**1.5
brightness = hyperbolic_tan(brightness)
# Combine pixels with color map
color = rainbow(onset_values.shape[0]) * 255.0
pixels = (pixels * color.T).T
color = rainbow_gen(onset_values.shape[0],
speed=1.,
center=0.5,
width=0.5,
f=[1.0, 1.0, 1.]) * 255.0
# color = rainbow(onset_values.shape[0]) * 255.0
pixels = (brightness * color.T).T
pixels = leak_saturated_pixels(pixels)
pixels = np.clip(pixels, 0.0, 255.0)
pixels = np.clip(pixels, 0., 255.)
# Apply low-pass filter to the output
pixels_filt.update(np.copy(pixels))
# Display values on the LED strip
led.pixels = np.round(pixels_filt.value).astype(int)
led.update()
return brightness
if __name__ == '__main__':