audio-reactive-led-strip/python/dsp.py

196 lines
6.4 KiB
Python

from __future__ import print_function
#from __future__ import division
import numpy as np
from scipy.interpolate import interp1d
import scipy.fftpack
import config
class ExponentialFilter:
"""Simple exponential smoothing filter"""
def __init__(self, val=0.0, alpha_decay=0.5, alpha_rise=0.5):
"""Small rise / decay factors = more smoothing"""
assert 0.0 < alpha_decay < 1.0, 'Invalid decay smoothing factor'
assert 0.0 < alpha_rise < 1.0, 'Invalid rise smoothing factor'
self.alpha_decay = alpha_decay
self.alpha_rise = alpha_rise
self.value = val
def update(self, value):
if not isinstance(self.value, (int, long, float)):
alpha = value - self.value
alpha[alpha > 0.0] = self.alpha_rise
alpha[alpha <= 0.0] = self.alpha_decay
else:
alpha = self.alpha_rise if value > self.value else self.alpha_decay
self.value = alpha * value + (1.0 - alpha) * self.value
return self.value
# FFT statistics for a few previous updates
_ys_historical_energy = np.tile(1.0, (config.N_SUBBANDS, config.N_HISTORY))
def beat_detect(ys):
"""Detect beats using an energy and variance theshold
Parameters
----------
ys : numpy.array
Array containing the magnitudes for each frequency bin of the
fast fourier transformed audio data.
Returns
-------
has_beat : numpy.array
Array of booleans indicating a beat was detected in each of the
frequency bins of ys.
current_energy / mean_energy : numpy.array
Array containing the ratios of the energies relative to the
historical average energy for each of the frequency bins. The energies
are calculated as the square of the real FFT magnitudes
ys_variance : numpy.array
The historical variance of the energies associated with each frequency
bin in ys.
"""
global _ys_historical_energy
# Beat energy criterion
current_energy = ys * ys
mean_energy = np.mean(_ys_historical_energy, axis=1)
has_beat_energy = current_energy > mean_energy * config.ENERGY_THRESHOLD
_ys_historical_energy = np.roll(_ys_historical_energy, shift=1, axis=1)
_ys_historical_energy[:, 0] = current_energy
# Beat variance criterion
ys_variance = np.var(_ys_historical_energy, axis=1)
has_beat_variance = ys_variance > config.VARIANCE_THRESHOLD
# Combined energy + variance detection
has_beat = has_beat_energy * has_beat_variance
return has_beat, current_energy / mean_energy, ys_variance
def wrap_phase(phase):
"""Converts phases in the range [0, 2 pi] to [-pi, pi]"""
return (phase + np.pi) % (2 * np.pi) - np.pi
ys_prev = None
phase_prev = None
dphase_prev = None
def onset(yt):
"""Detects onsets in the given audio time series data
Onset detection is perfomed using an ensemble of three onset detection
functions.
The first onset detection function uses the rectified spectral flux (SF)
of successive FFT data frames.
The second onset detection function uses the normalized weighted phase
difference (NWPD) of successive FFT data frames.
The third is a rectified complex domain onset detection function.
The product of these three functions forms an ensemble onset detection
function that returns continuous valued onset detection estimates.
Parameters
----------
yt : numpy.array
Array of time series data to perform onset detection on
Returns
-------
SF : numpy.array
Array of rectified spectral flux values
NWPD : numpy.array
Array of normalized weighted phase difference values
RCD : numpy.array
Array of rectified complex domain values
References
----------
Dixon, Simon "Onset Detection Revisted"
"""
global ys_prev, phase_prev, dphase_prev
xs, ys = fft_log_partition(yt,
subbands=config.N_SUBBANDS,
window=np.hamming,
fmin=1,
fmax=14000)
#ys = music_fft(yt)
magnitude = np.abs(ys)
phase = wrap_phase(np.angle(ys))
# Special case for initialization
if ys_prev is None:
ys_prev = ys
phase_prev = phase
dphase_prev = phase
# Rectified spectral flux
SF = np.abs(ys) - np.abs(ys_prev)
SF[SF < 0.0] = 0.0
# First difference of phase
dphase = wrap_phase(phase - phase_prev)
# Second difference of phase
ddphase = wrap_phase(dphase - dphase_prev)
# Normalized weighted phase deviation
NWPD = np.abs(ddphase * magnitude) / magnitude
# Rectified complex domain onset detection function
RCD = np.abs(ys - ys_prev * dphase_prev)
RCD[RCD < 0.0] = 0.0
RCD = RCD
# Update previous values
ys_prev = ys
phase_prev = phase
dphase_prev = dphase
# Replace NaN values with zero
SF = np.nan_to_num(SF)
NWPD = np.nan_to_num(NWPD)
RCD = np.nan_to_num(RCD)
return SF, NWPD, RCD
def rfft(data, window=None):
if window is None:
window = 1.0
else:
window = window(len(data))
ys = np.abs(np.fft.rfft(data*window))
xs = np.fft.rfftfreq(len(data), 1.0 / config.MIC_RATE)
return xs, ys
def rfft_log_partition(data, fmin=30, fmax=20000, subbands=64, window=None):
"""Returns FFT partitioned into subbands that are logarithmically spaced"""
xs, ys = rfft(data, window=window)
xs_log = np.logspace(np.log10(fmin), np.log10(fmax), num=subbands * 32)
f = interp1d(xs, ys)
ys_log = f(xs_log)
X, Y = [], []
for i in range(0, subbands * 32, 32):
X.append(np.mean(xs_log[i:i + 32]))
Y.append(np.mean(ys_log[i:i + 32]))
return np.array(X), np.array(Y)
def fft(data, window=None):
if window is None:
window = 1.0
else:
window = window(len(data))
ys = np.fft.fft(data*window)
xs = np.fft.fftfreq(len(data), 1.0 / config.MIC_RATE)
return xs, ys
def fft_log_partition(data, fmin=30, fmax=20000, subbands=64, window=None):
"""Returns FFT partitioned into subbands that are logarithmically spaced"""
xs, ys = fft(data, window=window)
xs_log = np.logspace(np.log10(fmin), np.log10(fmax), num=subbands * 32)
f = interp1d(xs, ys)
ys_log = f(xs_log)
X, Y = [], []
for i in range(0, subbands * 32, 32):
X.append(np.mean(xs_log[i:i + 32]))
Y.append(np.mean(ys_log[i:i + 32]))
return np.array(X), np.array(Y)