From d9f28198d81cd879f2973c07e9346f975137bd00 Mon Sep 17 00:00:00 2001 From: Robin Scheibler Date: Mon, 2 Dec 2024 08:39:36 +0900 Subject: [PATCH] Replaces cosine octave filter bank by that of Antoni 2009 (#384) * Adds implementation of Antoni's octave band filters. * Replaces the old cosine octave filter bank by the energy preserving perfect reconstruction filter bank from Antoni 2009. --- CHANGELOG.rst | 15 +- examples/octave_band_filterbank.py | 218 ++++++ pyroomacoustics/acoustics.py | 727 ++++++++++++++++++--- pyroomacoustics/directivities/measured.py | 31 +- pyroomacoustics/room.py | 246 +------ pyroomacoustics/simulation/ism.py | 4 +- pyroomacoustics/tests/test_octave_bands.py | 111 ++++ 7 files changed, 1008 insertions(+), 344 deletions(-) create mode 100644 examples/octave_band_filterbank.py create mode 100644 pyroomacoustics/tests/test_octave_bands.py diff --git a/CHANGELOG.rst b/CHANGELOG.rst index a2b8d305..86ebf4e3 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -11,7 +11,20 @@ adheres to `Semantic Versioning `_. `Unreleased`_ ------------- -Nothing yet +This new release introduces source and receiver directivities for the ray +tracing simulation engine. + +Added +~~~~~ + +- New ``pyroomacoustics.random`` module that provides some primitives for sampling + at random from arbitrary distributions on the sphere. This is used for source directivities + in the ray tracing simulator. + +- New octave filter bank with energy conservation and perfect reconstruction described + in Antoni, "Orthogonal-like fractional-octave-band filters," 2009. + The filter bank is implemented in ``pyroomacoustics.acoustics.AntoniOctaveFilterBank``. + `0.8.2`_ - 2024-11-06 --------------------- diff --git a/examples/octave_band_filterbank.py b/examples/octave_band_filterbank.py new file mode 100644 index 00000000..87682344 --- /dev/null +++ b/examples/octave_band_filterbank.py @@ -0,0 +1,218 @@ +""" +# Octave Band Filter Banks + +Multi-frequency simulation relies on octave-band filterbanks +to run the simulation in different perceptually relevant frequency bands +before merging the results into a single room impulse response. + +This scripts demonstrate some of the octave band filters available +in pyroomacoustics. + +Two ocatave band filter banks are implemented in pyroomacoustics. + +## Cosine Filterbank + +This filterbank uses a number of overlapping cosine filters to +cover the octaves. +It guarantees perfect reconstruction, but does not conserve the energy +in the bands. + +## Antoni's Orthogonal-like Fractional Octave Bands + +This class implements a type of fractional octave filter bank with +both perfect reconstruction and energy conservation. + +J. Antoni, Orthogonal-like fractional-octave-band filters, J. Acoust. Soc. +Am., 127, 2, February 2010 +""" + +import matplotlib.pyplot as plt +import numpy as np +from scipy.signal import chirp + +import pyroomacoustics as pra + +if __name__ == "__main__": + + # Test unit energy. + fs = 16000 + n_fft = 2**10 # 1024 + base_freq = 125.0 # Hertz, default frequency in pyroomacoustics. + + # The cosine filter bank + octave_bands = pra.OctaveBandsFactory( + fs=fs, + n_fft=n_fft, + keep_dc=True, + base_frequency=base_freq, + ) + + # The orthogonal-like filterbankd with perfect reconstruction and energy + # conservation. + # The `band_overlap_ratio` and `slope` parameters control the transition + # between adjacent bands. + antoni_octave_bands = pra.AntoniOctaveFilterBank( + fs=fs, + base_frequency=base_freq, + band_overlap_ratio=0.5, + slope=0, + n_fft=n_fft, + third=False, + ) + + fig, axes = plt.subplots(2, 2, figsize=(10, 5)) + + filters_orth = np.zeros(n_fft) + filters_orth[n_fft // 2] = 1.0 + filters_orth = antoni_octave_bands.analysis(filters_orth) + + for i, (lbl, ob) in enumerate( + {"Cosine": octave_bands.filters, "Antoni": filters_orth}.items() + ): + filters_spectrum = np.fft.rfft(ob, axis=0) + n_freq = filters_spectrum.shape[0] + freq = np.arange(n_freq) / n_freq * (fs / 2) + time = np.arange(ob.shape[0]) / fs + + sum_reconstruction = np.sum(abs(filters_spectrum), axis=1) + + for b in range(ob.shape[-1]): + c = antoni_octave_bands.centers[b] + line_label = ( + (f"{c if c < 1000 else c / 1000:.0f}{'k' if c >= 1000 else ''}Hz") + if lbl == "Antoni" + else None + ) + axes[0, i].plot(freq, abs(filters_spectrum[:, b]) ** 2, label=line_label) + axes[1, i].plot(time, ob[:, b]) + axes[0, i].plot( + freq, sum_reconstruction, label="sum magnitude" if lbl == "Antoni" else None + ) + axes[0, i].set_title(f"{lbl} - energy response") + axes[1, i].set_title(f"{lbl} - impulse response") + axes[1, i].set_xlim(0.025, 0.04) + + fig.legend(loc="lower right") + fig.tight_layout() + + # Test octave bands interpolation + coeffs = np.arange(antoni_octave_bands.n_bands)[::-1] + 1 + + mat_interp = { + "Cosine": octave_bands.synthesis(coeffs, min_phase=True), + "Cosine, min. phase": octave_bands.synthesis(coeffs, min_phase=False), + "Antoni": antoni_octave_bands.synthesis(coeffs, min_phase=False), + "Antoni, min. phase": antoni_octave_bands.synthesis(coeffs, min_phase=True), + } + + # Compare the energy of the original coefficients to those after filtering. + energy = (octave_bands.get_bw() / octave_bands.fs * 2.0) * coeffs**2 + bar_labels = [ + f"{c if c < 1000 else c / 1000:.0f}{'k' if c >= 1000 else ''}" + for c in antoni_octave_bands.centers + ] + ["Total"] + bar_energy_original = energy.tolist() + [energy.sum()] + + bar_width = 0.9 / (len(mat_interp) + 2) + + def bar_x(idx): + bar_space = 0.1 / (len(mat_interp) + 2) + x = np.arange(len(bar_energy_original)) + return bar_width / 2.0 + idx * (bar_width + bar_space) + x + + fig, axes = plt.subplots(1, 3, figsize=(15, 5)) + axes[0].set_title("Impulse response") + axes[1].set_title("Magnitude response") + axes[2].set_title("Per-band energy") + for idx, (lbl, values) in enumerate(mat_interp.items()): + axes[0].plot(np.arange(values.shape[-1]) / fs, values, label=lbl) + axes[0].set_xlabel("Time (s)") + + H = abs(np.fft.rfft(values)) + axes[1].plot(np.arange(H.shape[-1]) * fs / values.shape[-1], H, label=lbl) + axes[1].set_xlabel("Frequency (Hz)") + + energy_bands = antoni_octave_bands.energy(values, oversampling=4).tolist() + energy_bands += [sum(energy_bands)] + axes[2].bar(bar_x(idx), energy_bands, label=lbl, width=bar_width) + + axes[2].bar( + bar_x(len(mat_interp)), bar_energy_original, label="True", width=bar_width + ) + axes[2].set_xticks(np.arange(len(bar_energy_original)) + 0.5, bar_labels) + axes[2].set_xlabel("Band centers (Hz)") + axes[2].legend() + fig.tight_layout() + + # Test reconstruction + time = np.arange(fs * 5) / fs + f0 = 1.0 + x = np.zeros_like(time) + x[fs : 3 * fs] = np.sin(2 * np.pi * f0 * time[fs : 3 * fs]) + x = chirp(time, 100, time[-1], 7500, method="linear") + + x_bands = octave_bands.analysis(x) + x_rec = x_bands.sum(axis=-1) + + band_energy = antoni_octave_bands.energy(x, oversampling=4) + print( + f"Energy of input signal: {np.square(x).sum()}, " + f"sum of band energies: {band_energy.sum()}" + ) + print(f"Reconstruction error: {abs(x - x_rec).max()}") + + low = None + high = None + + low = 0 if low is None else low + high = x.shape[-1] if high is None else high + + num_plots = x_bands.shape[-1] + 1 + freq = np.arange(x_bands.shape[-2] // 2 + 1) / x_bands.shape[-2] * fs + time = np.arange(x.shape[-1]) / fs + fig, axes = plt.subplots(num_plots, 2, figsize=(10, 6)) + axes[0, 0].plot(time[low:high], x_rec[low:high], label="reconstructed") + axes[0, 0].plot(time[low:high], x[low:high], label="original") + axes[0, 0].plot(time[low:high], x[low:high] - x_rec[low:high], label="error") + L = axes[0, 1].magnitude_spectrum(x, label="reconstructed", Fs=fs) + L = axes[0, 1].magnitude_spectrum(x_rec, label="reconstructed", Fs=fs) + axes[0, 0].legend(fontsize="xx-small") + + ylim_time = x.min(), x.max() + ylim = -0.01 * max(L[0]), max(L[0]) + axes[0, 0].set_title("Filtered signal") + axes[0, 1].set_title("Magnitude response") + axes[0, 1].set_xlabel("") + axes[0, 1].set_ylabel("") + axes[0, 1].set_ylim(ylim) + axes[0, 1].set_xticks([]) + axes[0, 0].set_ylim(ylim_time) + axes[0, 0].set_xticks([]) + axes[0, 0].set_ylabel("Sum") + for b in range(1, num_plots): + axes[b, 0].plot(time[low:high], x_bands[low:high, b - 1]) + axes[b, 0].set_ylim(ylim_time) + axes[b, 1].magnitude_spectrum(x_bands[low:high, b - 1], Fs=fs) + axes[b, 1].set_ylim(ylim) + axes[b, 1].set_xlabel("") + axes[b, 1].set_ylabel("") + if b < num_plots - 1: + axes[b, 0].set_xticks([]) + axes[b, 1].set_xticks([]) + else: + axes[b, 0].set_xlabel("Time (s)") + ticks = np.arange(0, fs / 2 + 1, 1000) + ticklabels = [ + f"{c if c < 1000 else c / 1000:.0f}{'k' if c > 1000 else ''} Hz" + for c in ticks + ] + axes[b, 1].set_xticks(ticks, ticklabels) + if b > 0: + c = antoni_octave_bands.centers[b - 1] + axes[b, 0].set_ylabel( + f"{c if c < 1000 else c / 1000:.0f}{'k' if c > 1000 else ''} Hz" + ) + axes[b, 0].set_xlabel("Frequency (Hz)") + fig.tight_layout() + + plt.show() diff --git a/pyroomacoustics/acoustics.py b/pyroomacoustics/acoustics.py index 26bc176f..e2bf055d 100644 --- a/pyroomacoustics/acoustics.py +++ b/pyroomacoustics/acoustics.py @@ -25,13 +25,16 @@ # not, see . from __future__ import division +import abc +import dataclasses import itertools import math +from typing import List import numpy as np from scipy.fftpack import dct from scipy.interpolate import interp1d -from scipy.signal import butter, fftconvolve +from scipy.signal import butter, fftconvolve, hilbert from .parameters import constants from .transform import stft @@ -126,7 +129,233 @@ def octave_bands(fc=1000, third=False, start=0.0, n=8): return bands, fcentre -class OctaveBandsFactory(object): +def magnitude_response_to_minimum_phase(magnitude_response, n_fft, axis=-1, eps=1e-5): + """ + Creates a minimum phase filter from its magnitude response following + the method proposed here. + https://ccrma.stanford.edu/~jos/sasp/Minimum_Phase_Filter_Design.html + + Parameters + ---------- + magnitude_response: np.ndarray + The response + n_fft: int + The FFT size to use + axis: int + The axis where to make the transformation + + Returns + ------- + The minimum phase impulse response with given magnitude response. + """ + magnitude_response = np.moveaxis(magnitude_response, axis, -1) + + n_freq = n_fft // 2 + 1 + if n_fft % 2 == 0: + padding = n_fft - 2 * (magnitude_response.shape[-1] - 1) + else: + padding = n_fft - 2 * (magnitude_response.shape[-1] - 1) - 1 + + if padding < 0: + raise ValueError( + "The FFT size should at least twice the frequency response size." + ) + + zero_pad = np.zeros( + magnitude_response.shape[:-1] + (padding,), dtype=magnitude_response.dtype + ) + freq_resp = np.concatenate( + (magnitude_response, zero_pad, magnitude_response[..., :0:-1]), axis=-1 + ) + + freq_resp = np.maximum(freq_resp, eps) + m_p = np.imag(-hilbert(np.log(freq_resp), axis=-1)) + freq_resp = freq_resp[..., :n_freq] * np.exp(1j * m_p[..., :n_freq]) + + freq_resp = np.moveaxis(freq_resp, -1, axis) + filters = np.fft.irfft(freq_resp, n=n_fft, axis=axis) + return filters + + +def cosine_magnitude_octave_filter_response(n_fft, centers, fs, keep_dc=True): + """ + Creates the magnitude response of a cosine octave-band filterbank as + described in D. Schroeder's PhD thesis. + """ + + # This seems to work only for Octave bands out of the box + n = len(centers) + + new_bands = [[centers[0] / 2, centers[1]]] + + for i in range(1, n - 1): + new_bands.append([centers[i - 1], centers[i + 1]]) + new_bands.append([centers[-2], fs / 2]) + + n_freq = n_fft // 2 + 1 + mag_resp = np.zeros((n_freq, n)) + + freq = np.arange(n_freq) / n_fft * fs # This only contains positive newfrequencies + + for b, (band, center) in enumerate(zip(new_bands, centers)): + lo = np.logical_and(band[0] <= freq, freq < center) + + mag_resp[lo, b] = 0.5 * (1 + np.cos(2 * np.pi * freq[lo] / center)) + + if b == 0 and keep_dc: + # Converting Octave bands so that the minimum phase filters do not + # have ripples. + make_one = freq < center + mag_resp[make_one, b] = 1.0 + + if b != n - 1: + hi = np.logical_and(center <= freq, freq < band[1]) + mag_resp[hi, b] = 0.5 * (1 - np.cos(2 * np.pi * freq[hi] / band[1])) + else: + hi = center <= freq + mag_resp[hi, b] = 1.0 + + n_freq = n_fft // 2 + 1 + km = np.round(centers / fs * n_fft).astype(int) + k1, k2 = [], [] + freq = np.arange(mag_resp.shape[0]) + for band in range(mag_resp.shape[1]): + f_nz = freq[mag_resp[:, band] > 0] + k1.append(f_nz[0]) + k2.append(f_nz[-1] + 1) + k1 = np.array(k1) + k2 = np.array(k2) + + return mag_resp.T, n_freq, k1, km, k2 + + +def antoni_magnitude_octave_filter_response( + n_fft, centers, bands, fs, overlap_ratio, slope +): + """ + Implementation adapted from + https://github.com/pyfar/pyfar/blob/main/pyfar/dsp/filter/fractional_octaves.py#L339 + MIT License. + """ + n_freq = n_fft // 2 + 1 + + # Discretize the bands boundaries. + k1 = np.round(bands[:, 0] / fs * n_fft).astype(int) + km = np.round(centers / fs * n_fft).astype(int) + k2 = np.round(bands[:, 1] / fs * n_fft).astype(int) + + G = np.ones((km.shape[0], n_freq)) + + P = np.round(overlap_ratio * (k2 - km)).astype(int) + + # Corrects the start and end of the first and last bands, respectively. + k1[0] = 0 + k2[-1] = n_freq + + k_low, k_high = [0], [] + for band in range(1, km.shape[0]): + + if P[band] > 0: + p = np.arange(-P[band], P[band] + 1) + + # Compute phi according to eq. (18) and (19). + phi = p / P[band] + for _ in range(slope): + phi = np.sin(np.pi / 2 * phi) + phi = 0.5 * (phi + 1) + + # Build the decreasing part of the previous band. + G[band - 1, k1[band] - P[band] : k1[band] + P[band] + 1] = np.cos( + np.pi / 2 * phi + ) + # apply fade in in to next channel + G[band, k1[band] - P[band] : k1[band] + P[band] + 1] = np.sin( + np.pi / 2 * phi + ) + + # set current and next channel to zero outside their range + G[band - 1, k1[band] + P[band] :] = 0.0 + G[band, : k1[band] - P[band]] = 0.0 + + k_high.append(k1[band] + P[band]) + k_low.append(k1[band] - P[band]) + k_high.append(n_freq) + + return G, n_freq, np.array(k_low), km, np.array(k_high) + + +class BaseOctaveFilterBank(metaclass=abc.ABCMeta): + """ + A base class for octave filter banks. + """ + + @abc.abstractmethod + def get_bw(self): + pass + + @abc.abstractmethod + def analysis(self, x, band=None, **kwargs): + pass + + @abc.abstractmethod + def synthesis(self, coeffs, min_phase=False, **kwargs): + pass + + @abc.abstractmethod + def energy(self, x, **kwargs): + pass + + def __call__(self, coeffs=0.0, center_freqs=None, interp_kind="linear", **kwargs): + """ + Takes as input a list of values with optional corresponding center frequency. + Returns a list with the correct number of octave bands. Interpolation and + extrapolation are used to fill in the missing values. + + Parameters + ---------- + coeffs: list + A list of values to use for the octave bands + center_freqs: list, optional + The optional list of center frequencies + interp_kind: str + Specifies the kind of interpolation as a string (‘linear’, + ‘nearest’, ‘zero’, ‘slinear’, ‘quadratic’, ‘cubic’, ‘previous’, + ‘next’, where ‘zero’, ‘slinear’, ‘quadratic’ and ‘cubic’ refer to a + spline interpolation of zeroth, first, second or third order; + ‘previous’ and ‘next’ simply return the previous or next value of + the point) or as an integer specifying the order of the spline + interpolator to use. Default is ‘linear’. + """ + + if not isinstance(coeffs, (list, np.ndarray)): + # when the parameter is a scalar just do flat extrapolation + ret = [coeffs] * self.n_bands + + if len(coeffs) == 1: + ret = coeffs * int(self.n_bands) + + else: + # by default infer the center freq to be the low ones + if center_freqs is None: + center_freqs = self.centers[: len(coeffs)] + + # create the interpolator in log domain + interpolator = interp1d( + np.log2(center_freqs), + coeffs, + fill_value="extrapolate", + kind=interp_kind, + ) + ret = interpolator(np.log2(self.centers)) + + # now clip between 0. and 1. + ret[ret < 0.0] = 0.0 + ret[ret > 1.0] = 1.0 + + return ret + + +class OctaveBandsFactory(BaseOctaveFilterBank): """ A class to process uniformly all properties that are defined on octave bands. @@ -146,8 +375,6 @@ class OctaveBandsFactory(object): The list of bin boundaries for the octave bands centers The list of band centers - all_materials: list of Material - The list of all Material objects created by the factory Parameters ---------- @@ -155,15 +382,22 @@ class OctaveBandsFactory(object): The center frequency of the first octave band (default: 125 Hz) fs: float, optional The sampling frequency used (default: 16000 Hz) - third_octave: bool, optional - Use third octave bands if True (default: False) + n_fft: bool, optional + The FFT size to use + keep_dc: bool + If True, include all the lower frequencies in the first filter + min_phase: bool + If True, make the filters minimum phase """ - def __init__(self, base_frequency=125.0, fs=16000, n_fft=512, keep_dc=False): + def __init__( + self, base_frequency=125.0, fs=16000, n_fft=512, keep_dc=False, min_phase=False + ): self.base_freq = base_frequency self.fs = fs self.n_fft = n_fft self.keep_dc = keep_dc + self.min_phase = min_phase # compute the number of bands self.n_bands = math.floor(np.log2(fs / base_frequency)) @@ -172,13 +406,18 @@ def __init__(self, base_frequency=125.0, fs=16000, n_fft=512, keep_dc=False): fc=self.base_freq, n=self.n_bands, third=False ) - self._make_filters() + self.filters, self.magnitude_response = self._make_filters() def get_bw(self): """Returns the bandwidth of the bands""" - return np.array([b2 - b1 for b1, b2 in self.bands]) + bands = self.bands + if self.keep_dc: + bands[0] = [0.0, bands[0][1]] + bands[-1] = [bands[-1][0], self.fs / 2] + + return np.array([min(b2, self.fs // 2) - max(b1, 0) for b1, b2 in bands]) - def analysis(self, x, band=None): + def analysis(self, x, band=None, mode="same"): """ Process a signal x through the filter bank @@ -186,126 +425,438 @@ def analysis(self, x, band=None): ---------- x: ndarray (n_samples) The input signal + band: + The index of the band to transform. If ``None``, all the bands are + analyzed and returned. + mode: + The mode to use for fftconvolve. Returns ------- ndarray (n_samples, n_bands) The input signal filters through all the bands """ - if band is None: - bands = range(self.filters.shape[1]) - else: + bands = np.arange(self.n_bands) + elif isinstance(band, int): bands = [band] + elif isinstance(band, list) and all(isinstance(b, int) for b in band): + bands = band + else: + raise ValueError(f"band should be an int or list of int (got {band}).") - output = np.zeros((x.shape[0], len(bands)), dtype=x.dtype) + filters = self.filters[:, bands] - for i, b in enumerate(bands): - output[:, i] = fftconvolve(x, self.filters[:, b], mode="same") + x = np.stack([x] * len(bands), axis=-1) + output = fftconvolve(x, filters, mode=mode, axes=(-2,)) if output.shape[1] == 1: return output[:, 0] else: return output - def __call__(self, coeffs=0.0, center_freqs=None, interp_kind="linear", **kwargs): + def synthesis(self, coeffs, min_phase=False): """ - Takes as input a list of values with optional corresponding center frequency. - Returns a list with the correct number of octave bands. Interpolation and - extrapolation are used to fill in the missing values. + Creates a filter with the desired band amplitudes. Parameters ---------- - coeffs: list - A list of values to use for the octave bands - center_freqs: list, optional - The optional list of center frequencies - interp_kind: str - Specifies the kind of interpolation as a string (‘linear’, - ‘nearest’, ‘zero’, ‘slinear’, ‘quadratic’, ‘cubic’, ‘previous’, - ‘next’, where ‘zero’, ‘slinear’, ‘quadratic’ and ‘cubic’ refer to a - spline interpolation of zeroth, first, second or third order; - ‘previous’ and ‘next’ simply return the previous or next value of - the point) or as an integer specifying the order of the spline - interpolator to use. Default is ‘linear’. + band_magnitudes: np.ndarray + The band amplitude coefficents (..., n_bands) + min_phase: bool + The filters are made minimum phase if ``True``. + + Returns + ------- + The impulse responses with the correct levels (..., n_fft) """ + ir = np.einsum("...b,tb->...t", coeffs, self.filters) + if min_phase: + mag_resp = np.abs(np.fft.rfft(ir, axis=-1)) + return magnitude_response_to_minimum_phase( + mag_resp, self.n_fft, axis=-1, eps=1e-7 + ) + else: + return ir - if not isinstance(coeffs, (list, np.ndarray)): - # when the parameter is a scalar just do flat extrapolation - ret = [coeffs] * self.n_bands + def energy(self, x): + """ + Computes the per-band energy of the input signal. - if len(coeffs) == 1: - ret = coeffs * int(self.n_bands) + Parameters + ---------- + x: np.ndarray (..., n_samples) + The signal to analyze. - else: - # by default infer the center freq to be the low ones - if center_freqs is None: - center_freqs = self.centers[: len(coeffs)] + Returns + ------- + np.ndarray (..., n_bands) + The per-band energy of the input signal. + """ + x_bands = self.analysis(x) + return np.sum(x_bands**2, axis=-1) - # create the interpolator in log domain - interpolator = interp1d( - np.log2(center_freqs), - coeffs, - fill_value="extrapolate", - kind=interp_kind, + def _make_filters(self): + """ + Creates the band-pass filters for the octave bands + """ + mag_resp, *_ = cosine_magnitude_octave_filter_response( + self.n_fft, self.centers, self.fs, self.keep_dc + ) + mag_resp = mag_resp.T + + # Delay the filters to match mode="same" of fftconvolve. + n_freq = self.n_fft // 2 + 1 + delay = np.exp( + 2j * np.pi * np.arange(n_freq) * (self.n_fft // 2 + 1) / self.n_fft + ) + filters = np.fft.irfft(mag_resp * delay[:, None], n=self.n_fft, axis=0) + + if self.min_phase: + magnitude_response = np.abs(np.fft.rfft(filters, axis=0)) + filters = magnitude_response_to_minimum_phase( + magnitude_response, self.n_fft, axis=0, eps=2e-2 ) - ret = interpolator(np.log2(self.centers)) - # now clip between 0. and 1. - ret[ret < 0.0] = 0.0 - ret[ret > 1.0] = 1.0 + # Octave band filters in frequency domain + self.filters_freq_domain = np.fft.fft(filters, axis=0, n=self.n_fft) - return ret + return filters, mag_resp - def _make_filters(self): + +@dataclasses.dataclass(frozen=True) +class AntoniOctaveFilterBankParameters: + """ + A data structure to hold the paramters used for the analysis + of a signal with the Antoni octave filter bank. + """ + + windows: List[np.ndarray] + n_fft: int + analyzed_band_indices: List[int] + bands_lower_bins: np.ndarray + bands_center_bins: np.ndarray + bands_upper_bins: np.ndarray + output_length: int + output_dtype: type + padded_length: int + + +class AntoniOctaveFilterBank(BaseOctaveFilterBank): + """ + This class implements a type of fractional octave filter bank with + both perfect reconstruction and energy conservation. + + J. Antoni, Orthogonal-like fractional-octave-band filters, J. Acoust. Soc. + Am., 127, 2, February 2010 + + Attributes + ---------- + base_freq: float + The center frequency of the first octave band + fs: float + The target sampling frequency + n_bands: int + The number of octave bands needed to cover from base_freq to fs / 2 + (i.e. floor(log2(fs / base_freq))) + bands: list of tuple + The list of bin boundaries for the octave bands + centers + The list of band centers + + Parameters + ---------- + base_frequency: float, optional + The center frequency of the first octave band (default: 125 Hz) + fs: float, optional + The sampling frequency used (default: 16000 Hz) + n_fft: bool, optional + The FFT size to use + band_overlap_ratio: float + The overlap between bands. It should be between 0.0 and 0.5. + slope: int + A parameter controlling the transition between bands. + The larger, the sharper the transition. + third: bool + If set to True, a third Octave band filter bank is created. + """ + + def __init__( + self, + base_frequency: float = 125.0, + fs: float = 16000, + n_fft: int = 512, + band_overlap_ratio: float = 0.5, + slope: int = 0, + third: bool = False, + ): + if not (0.0 <= band_overlap_ratio <= 0.5): + raise ValueError("The band overlap ratio should be in [0, 0.5].") + + self.base_freq = base_frequency + self.fs = fs + self.n_fft = n_fft + self.overlap_ratio = band_overlap_ratio + self.slope = slope + + # Compute the number of octaves. + n_octaves = math.floor(np.log2(fs / base_frequency)) + self.bands, self.centers = octave_bands( + fc=self.base_freq, n=n_octaves, third=third + ) + self.n_bands = self.centers.shape[0] + + G, *_ = self._make_window_function(self.n_fft) + self.filters = G.T + + def get_bw(self, n_fft=None): + """Returns the bandwidth of the bands""" + if n_fft is None: + n_fft = self.n_fft + k1 = np.round(self.bands[:, 0] / self.fs * n_fft).astype(int) + k2 = np.round(self.bands[:, 1] / self.fs * n_fft).astype(int) + + # Corrects the start and end of the first and last bands, respectively. + k1[0] = 0 + k2[-1] = n_fft // 2 + 1 + + diff = k2 - k1 + ratio = diff / diff.sum() + return ratio * self.fs / 2.0 + + def _make_window_function(self, n_fft) -> np.ndarray: """ - Creates the band-pass filters for the octave bands + Implementation adapted from + https://github.com/pyfar/pyfar/blob/main/pyfar/dsp/filter/fractional_octaves.py#L339 + MIT License. """ - # This seems to work only for Octave bands out of the box - centers = self.centers - n = len(self.centers) + return antoni_magnitude_octave_filter_response( + n_fft, self.centers, self.bands, self.fs, self.overlap_ratio, self.slope + ) - new_bands = [[centers[0] / 2, centers[1]]] + def wavelet_analysis(self, x, band=None, oversampling=2): + """ + Compute the decomposition proposed by Antoni 2008. - for i in range(1, n - 1): - new_bands.append([centers[i - 1], centers[i + 1]]) - new_bands.append([centers[-2], self.fs / 2]) + Parameters + ---------- + x: ndarray (..., n_samples) + The input signal + band: + The index of the band to transform. If ``None``, all the bands are + analyzed and returned. + oversampling: int + Oversampling of FFT to use (default: 2). - n_freq = self.n_fft // 2 + 1 - freq_resp = np.zeros((n_freq, n)) + Returns + ------- + signal: list[np.ndarray] + The coefficients of the input signal filters obtained by + time-frequency analysis. + parameters: AntoniOctaveFilterBankParameters + A data structure that contains the parameters used during + the analysis. + """ + if band is None: + bands = np.arange(self.n_bands) + elif isinstance(band, int): + bands = [band] + elif isinstance(band, list) and all(isinstance(b, int) for b in band): + bands = band + else: + raise ValueError(f"band should be an int or list of int (got {band}).") + + n_fft = 2 ** math.ceil(math.log2(oversampling * x.shape[-1])) + X = np.fft.rfft(x, axis=-1, n=n_fft) / np.sqrt(n_fft) + + G, n_freq, k1, km, k2 = self._make_window_function(n_fft) + + Nm = np.ceil((k2 - k1) / 2.0).astype(int) # index of Nm is i in the paper. + upper = np.max(k1 + 2 * Nm) + padded_length = X.shape[-1] + if upper > n_freq: + padding = np.zeros(X.shape[:-1] + (upper - n_freq,), dtype=X.dtype) + padded_length += upper - n_freq + X = np.concatenate((X, padding), axis=-1) + padding = np.zeros(G.shape[:-1] + (upper - n_freq,), dtype=G.dtype) + G = np.concatenate((G, padding), axis=-1) + + signal = [] # X_ij in the paper + windows_nonzero = [] + for band in bands: + N = Nm[band] + k = k1[band] + np.arange(2 * N, dtype=int) + delta = np.sqrt(1 / (2.0 * N)) + windows_nonzero.append(G[band, k]) + + # Analysis + j = np.arange(-N + 1, N + 1) + factor = 1j**band * delta + cexp = np.exp(1j * np.pi * (km[band] - k1[band]) * j / N) + W_pos = np.fft.fft(G[band, k] * X[..., k], axis=-1) + W_neg = np.conj(np.fft.fft(G[band, k] * np.conj(X[..., k]), axis=-1)) + W_np = np.concatenate( + (W_neg[..., N - 1 : 0 : -1], W_pos[..., : N + 1]), axis=-1 + ) + signal.append(factor * cexp * W_np) + + return signal, AntoniOctaveFilterBankParameters( + windows=windows_nonzero, + n_fft=n_fft, + analyzed_band_indices=bands, + bands_lower_bins=k1, + bands_center_bins=km, + bands_upper_bins=k2, + output_length=x.shape[-1], + output_dtype=X.dtype, + padded_length=padded_length, + ) + + def wavelet_synthesis( + self, signal: List[np.ndarray], parameters: AntoniOctaveFilterBankParameters + ) -> np.ndarray: + """ + Given the decomposition of the signal by Antoni 2008, compute + the octave band signals. + + Paramters + --------- + coeffs: list[np.ndarray] + A list containing the coefficients corresponsing to every octave band. + parameters: AntoniOctaveFilterBankParameters + The parameters of the analysis filterbank. + + Returns + ------- + np.ndarray (..., num_samples, num_bands) + The time domain representation of the octave bands at the original sampling rate. + """ + + W = signal + n_freq = parameters.n_fft // 2 + 1 + G = parameters.windows + k1 = parameters.bands_lower_bins + km = parameters.bands_center_bins + + X_filt = np.zeros( + W[0].shape[:-1] + (parameters.padded_length, len(W)), + dtype=parameters.output_dtype, + ) + + for idx, band in enumerate(parameters.analyzed_band_indices): + coeffs = W[idx] + window = G[idx] + N = coeffs.shape[-1] // 2 + k = k1[band] + np.arange(2 * N, dtype=int) + delta = np.sqrt(1 / (2.0 * N)) + + # Synthesis + factor = (-1j) ** band * window * delta + cexp1 = np.exp(-1j * np.pi * (k - km[band]) * (N - 1) / N) + cexp2 = np.exp(-1j * np.pi * (km[band] - k1[band]) * np.arange(2 * N) / N) + Y = np.conj(np.fft.fft(np.conj(cexp2 * coeffs))) + X_filt[..., k1[band] : k1[band] + 2 * N, idx] = factor * cexp1 * Y + + # Synthesize the output band-pass signals. + y = np.fft.irfft(X_filt[..., :n_freq, :], axis=-2) * np.sqrt(parameters.n_fft) + return y[..., : parameters.output_length, :] + + def energy(self, x, oversampling=2): + """ + Computes the per-band energy of the input signal. - freq = ( - np.arange(n_freq) / self.n_fft * self.fs - ) # This only contains positive newfrequencies + Parameters + ---------- + x: np.ndarray (..., n_samples) + The signal to analyze. + oversampling: int, optional + The oversampling to use in the analysis (default 2). - for b, (band, center) in enumerate(zip(new_bands, centers)): - if b == 0 and self.keep_dc: - # Converting Octave bands so that the minimum phase filters do not - # have ripples. - make_one = freq < center - freq_resp[make_one, b] = 1.0 + Returns + ------- + np.ndarray (..., n_bands) + The per-band energy of the input signal. + """ + coeffs, _ = self.wavelet_analysis(x, oversampling=oversampling) + energy = 2.0 * np.array([(abs(w) ** 2).sum() for w in coeffs]) + return energy - lo = np.logical_and(band[0] <= freq, freq < center) + def analysis(self, x, band=None, oversampling=2): + """ + Process a signal x through the filter bank - freq_resp[lo, b] = 0.5 * (1 + np.cos(2 * np.pi * freq[lo] / center)) + Parameters + ---------- + x: ndarray (..., n_samples) + The input signal + band: int + The index of the band to transform. If ``None``, all the bands are + analyzed and returned. + oversampling: int + Oversampling of FFT to use (default: 2). - if b != n - 1: - hi = np.logical_and(center <= freq, freq < band[1]) - freq_resp[hi, b] = 0.5 * (1 - np.cos(2 * np.pi * freq[hi] / band[1])) - else: - hi = center <= freq - freq_resp[hi, b] = 1.0 + Returns + ------- + ndarray (..., n_samples, n_bands) + The input signal filters through all the bands + """ - filters = np.fft.fftshift( - np.fft.irfft(freq_resp, n=self.n_fft, axis=0), - axes=[0], + coeffs, parameters = self.wavelet_analysis( + x, band=band, oversampling=oversampling ) - # remove the first sample to make them odd-length symmetric filters - self.filters = filters[1:, :] + output = self.wavelet_synthesis(coeffs, parameters) + if output.shape[-1] == 1: + return output[..., 0] + else: + return output - # Octave band filters in frequency domain - self.filters_freq_domain = np.fft.fft(filters, axis=0, n=self.n_fft) + def synthesis( + self, band_magnitudes, min_phase=False, filter_length=None, oversampling=2 + ): + """ + Creates a filter with the desired band amplitudes. + + Parameters + ---------- + band_magnitudes: np.ndarray + The band amplitude coefficents (..., n_bands) + min_phase: bool + The filters are made minimum phase if ``True``. + filter_length: int + The length of the filters. + oversampling: int + The oversampling to use in the analysis. + + Returns + ------- + The impulse responses with the correct levels (..., n_fft) + """ + coeffs = np.array(band_magnitudes) + + if filter_length is None: + filter_length = self.n_fft + + # We will reshape the response of a unit impulse filter. + ir = np.zeros(coeffs.shape[:-1] + (filter_length,), dtype=coeffs.dtype) + ir[..., filter_length // 2] = 1.0 + + # Analyze the signal and reshape each band by the target magnitude. + signal, paramters = self.wavelet_analysis(ir, oversampling=oversampling) + for band in range(coeffs.shape[-1]): + signal[band] *= coeffs[..., [band]] + + # Construct the filter. + ir = self.wavelet_synthesis(signal, paramters).sum(axis=-1) + + if min_phase: + # Transform the filter to be minimum phase. + mag_resp = np.abs(np.fft.rfft(ir, axis=-1)) + return magnitude_response_to_minimum_phase( + mag_resp, ir.shape[-1], axis=-1, eps=1e-7 + ) + else: + return ir def critical_bands(): @@ -407,7 +958,7 @@ def bands_hz2s(bands_hz, Fs, N, transform="dft"): else: j += 1 - return np.array(bands_s, dtype=np.int) + return np.array(bands_s, dtype=int) def melscale(f): diff --git a/pyroomacoustics/directivities/measured.py b/pyroomacoustics/directivities/measured.py index d4b1f4fd..643ca3da 100644 --- a/pyroomacoustics/directivities/measured.py +++ b/pyroomacoustics/directivities/measured.py @@ -76,8 +76,10 @@ from scipy.spatial import cKDTree from .. import random +from ..acoustics import OctaveBandsFactory from ..datasets import SOFADatabase from ..doa import Grid, GridSphere, cart2spher, fibonacci_spherical_sampling, spher2cart +from ..parameters import constants from ..utilities import requires_matplotlib from .base import Directivity from .direction import Rotation3D @@ -92,13 +94,10 @@ class MeasuredDirectivitySampler(random.sampler.DirectionalSampler): Parameters ---------- - loc: array_like - The unit vector pointing in the main direction of the cardioid - p: float - Parameter of the cardioid pattern. A value of 0 corresponds to a - figure-eight pattern, 0.5 to a cardioid pattern, and 1 to an omni - pattern - The parameter must be between 0 and 1 + kdtree: array_like + A kd-tree for the measurement points in cartesian coordinates. + energy: array_like + The energy measured at each measurement point. """ def __init__(self, kdtree, energy): @@ -143,6 +142,18 @@ def __init__(self, orientation, grid, impulse_responses, fs): # set the initial orientation self.set_orientation(orientation) + def _compute_octave_bands_energy(self): + self.octave_bands = OctaveBandsFactory( + fs=self.fs, + n_fft=constants.get("octave_bands_n_fft"), + keep_dc=constants.get("octave_bands_keep_dc"), + base_frequency=constants.get("octave_bands_base_freq"), + ) + + ir_per_band = self.octave_bands.analysis(self._irs) + + self.energy = np.square(ir_per_band).sum(axis=0) + @property def is_impulse_response(self): return True @@ -172,9 +183,7 @@ def set_orientation(self, orientation): ) # create the kd-tree self._kdtree = cKDTree(self._grid.cartesian.T) - - # create the ray sampler - ir_energy = np.square(self._irs).mean(axis=-1) + ir_energy = np.square(self._irs).sum(axis=-1) self._ray_sampler = MeasuredDirectivitySampler(self._kdtree, ir_energy) def get_response( @@ -234,13 +243,11 @@ def plot(self, freq_bin=0, n_grid=100, ax=None, depth=False, offset=None): The axes on which the directivity is plotted """ import matplotlib.pyplot as plt - from matplotlib import cm cart = self._grid.cartesian.T length = np.abs(np.fft.rfft(self._irs, axis=-1)[:, freq_bin]) # regrid the data on a 2D grid - g = np.linspace(-1, 1, n_grid) AZ, COL = np.meshgrid( np.linspace(0, 2 * np.pi, n_grid), np.linspace(0, np.pi, n_grid // 2) ) diff --git a/pyroomacoustics/room.py b/pyroomacoustics/room.py index 852c99d4..127dca54 100644 --- a/pyroomacoustics/room.py +++ b/pyroomacoustics/room.py @@ -686,17 +686,14 @@ def callback_mix(premix, snr=0, sir=0, ref_mic=0, n_src=None, n_tgt=None): from __future__ import division, print_function import math -import os -import sys import warnings import numpy as np import scipy.spatial as spatial -from scipy.interpolate import interp1d from . import beamforming as bf from . import libroom -from .acoustics import OctaveBandsFactory, rt60_eyring, rt60_sabine +from .acoustics import AntoniOctaveFilterBank, rt60_eyring, rt60_sabine from .beamforming import MicrophoneArray from .directivities import CardioidFamily, MeasuredDirectivity from .experimental import measure_rt60 @@ -921,11 +918,12 @@ def _var_init( self.max_order = max_order self.sigma2_awgn = sigma2_awgn - self.octave_bands = OctaveBandsFactory( + self.octave_bands = AntoniOctaveFilterBank( fs=self.fs, n_fft=constants.get("octave_bands_n_fft"), - keep_dc=constants.get("octave_bands_keep_dc"), + # keep_dc=constants.get("octave_bands_keep_dc"), base_frequency=constants.get("octave_bands_base_freq"), + slope=0, ) self.max_rand_disp = max_rand_disp @@ -1308,10 +1306,10 @@ def from_corners( materials = [Material(0.0, 0.0)] * n_walls # Resample material properties at octave bands - octave_bands = OctaveBandsFactory( + octave_bands = AntoniOctaveFilterBank( fs=fs, n_fft=constants.get("octave_bands_n_fft"), - keep_dc=constants.get("octave_bands_keep_dc"), + # keep_dc=constants.get("octave_bands_keep_dc"), base_frequency=constants.get("octave_bands_base_freq"), ) if not Material.all_flat(materials): @@ -2302,238 +2300,6 @@ def compute_rir(self): self.simulator_state["rir_done"] = True - def dft_scale_rir_calc( - self, - attenuations, - dist, - time, - bws, - N, - azi_m, - col_m, - azi_s, - col_s, - src_pos=0, - mic_pos=0, - ): - """ - Full DFT scale RIR construction. - - This function also takes into account the FIR's of the source and receiver retrieved from the SOFA file. - - - - Parameters - ---------- - attenuations: arr - Dampings for all the image sources Shape : ( No_of_octave_band x no_img_src) - dist : arr - distance of all the image source present in the room from this particular mic Shape : (no_img_src) - time : arr - Time of arrival of all the image source Shape : (no_img_src) - bws : - bandwidth of all the octave bands - N : - azi_m : arr - Azimuth angle of arrival of this particular mic for all image sources Shape : (no_img_src) - col_m : arr - Colatitude angle of arrival of this particular mic for all image sources Shape : (no_img_src) - azi_s : arr - Azimuth angle of departure of this particular source for all image sources Shape : (no_img_src) - col_s : arr - Colatitude angle of departure of this particular source for all image sources Shape : (no_img_src) - src_pos : int - The particular source we are calculating RIR - mic_pos : int - The particular mic we are calculating RIR - - Returns - ------- - rir : :py:class:`~numpy.ndarray` - Constructed RIR for this particlar src mic pair . - - The constructed RIR still lacks air absorption and distance absorption because in the old pyroom these calculation happens on the octave band level. - - - """ - - attenuations = attenuations / dist - alp = [] - window_length = 81 - - no_imag_src = attenuations.shape[1] - - fp_im = N - fir_length_octave_band = self.octave_bands.n_fft - - from .build_rir import ( - fast_convolution_3, - fast_convolution_4, - fast_window_sinc_interpolator, - ) - - rec_presence = True if (len(azi_m) > 0 and len(col_m) > 0) else False - source_presence = True if (len(azi_s) > 0 and len(col_s) > 0) else False - - final_fir_IS_len = ( - (self.mic_array.directivity[mic_pos].filter_len_ir if (rec_presence) else 1) - + ( - self.sources[src_pos].directivity.filter_len_ir - if (source_presence) - else 1 - ) - + window_length - + fir_length_octave_band - ) - 3 - - if rec_presence and source_presence: - resp_mic = self.mic_array.directivity[mic_pos].get_response( - azimuth=azi_m, colatitude=col_m, degrees=False - ) # Return response as an array of number of (img_sources * length of filters) - resp_src = self.sources[src_pos].directivity.get_response( - azimuth=azi_s, colatitude=col_s, degrees=False - ) - - if self.mic_array.directivity[mic_pos].filter_len_ir == 1: - resp_mic = np.array(resp_mic).reshape(-1, 1) - - else: - assert ( - self.fs == self.mic_array.directivity[mic_pos].fs - ), "Mic directivity: frequency of simulation should be same as frequency of interpolation" - - if self.sources[src_pos].directivity.filter_len_ir == 1: - resp_src = np.array(resp_src).reshape(-1, 1) - else: - assert ( - self.fs == self.sources[src_pos].directivity.fs - ), "Source directivity: frequency of simulation should be same as frequency of interpolation" - - else: - if source_presence: - assert ( - self.fs == self.sources[src_pos].directivity.fs - ), "Directivity source frequency of simulation should be same as frequency of interpolation" - - resp_src = self.sources[src_pos].directivity.get_response( - azimuth=azi_s, - colatitude=col_s, - degrees=False, - ) - - elif rec_presence: - assert ( - self.fs == self.mic_array.directivity[mic_pos].fs - ), "Directivity mic frequency of simulation should be same as frequency of interpolation" - - resp_mic = self.mic_array.directivity[mic_pos].get_response( - azimuth=azi_m, - colatitude=col_m, - degrees=False, - ) - - # else: - # txt = "No" - # final_fir_IS_len = (fir_length_octave_band + window_length) - 1 - - time_arrival_is = time # For min phase - - # Calculating fraction delay sinc filter - sample_frac = time_arrival_is * self.fs # Find the fractional sample number - - ir_diff = np.zeros(N + (final_fir_IS_len)) # 2050 #600 - - # Create arrays for fractional delay low pass filter, sum of {damping coeffiecients * octave band filter}, source response, receiver response. - - cpy_ir_len_1 = np.zeros((no_imag_src, final_fir_IS_len), dtype=np.complex_) - cpy_ir_len_2 = np.zeros((no_imag_src, final_fir_IS_len), dtype=np.complex_) - cpy_ir_len_3 = np.zeros((no_imag_src, final_fir_IS_len), dtype=np.complex_) - cpy_ir_len_4 = np.zeros((no_imag_src, final_fir_IS_len), dtype=np.complex_) - att_in_dft_scale = np.zeros( - (no_imag_src, fir_length_octave_band), dtype=np.complex_ - ) - - # Vectorized sinc filters - - vectorized_interpolated_sinc = np.zeros( - (no_imag_src, window_length), dtype=np.double - ) - vectorized_time_ip = np.array( - [int(math.floor(sample_frac[img_src])) for img_src in range(no_imag_src)] - ) - vectorized_time_fp = [ - sample_frac[img_src] - int(math.floor(sample_frac[img_src])) - for img_src in range(no_imag_src) - ] - vectorized_time_fp = np.array(vectorized_time_fp, dtype=np.double) - vectorized_interpolated_sinc = fast_window_sinc_interpolator( - vectorized_time_fp, window_length, vectorized_interpolated_sinc - ) - - for i in range(no_imag_src): # Loop through Image source - att_in_octave_band = attenuations[:, i] - att_in_dft_scale_ = att_in_dft_scale[i, :] - - # Interpolating attenuations given in the single octave band to a DFT scale. - - att_in_dft_scale_ = self.octave_bands.octave_band_dft_interpolation( - att_in_octave_band, - self.air_absorption, - dist[i], - att_in_dft_scale_, - bws, - self.min_phase, - ) - - # time_ip = int(math.floor(sample_frac[i])) # Calculating the integer sample - - # time_fp = sample_frac[i] - time_ip # Calculating the fractional sample - - # windowed_sinc_filter = fast_window_sinc_interpolater(time_fp) - - cpy_ir_len_1[i, : att_in_dft_scale_.shape[0]] = np.fft.ifft( - att_in_dft_scale_ - ) - cpy_ir_len_2[i, :window_length] = vectorized_interpolated_sinc[i, :] - - if source_presence and rec_presence: - cpy_ir_len_3[i, : resp_src[i, :].shape[0]] = resp_src[i, :] - - cpy_ir_len_4[i, : resp_mic[i, :].shape[0]] = resp_mic[i, :] - - out = fast_convolution_4( - cpy_ir_len_1[i, :], - cpy_ir_len_2[i, :], - cpy_ir_len_3[i, :], - cpy_ir_len_4[i, :], - final_fir_IS_len, - ) - - ir_diff[ - vectorized_time_ip[i] : (vectorized_time_ip[i] + final_fir_IS_len) - ] += np.real(out) - - else: - if source_presence: - resp = resp_src[i, :] - elif rec_presence: - resp = resp_mic[i, :] - - cpy_ir_len_3[i, : resp.shape[0]] = resp - - out = fast_convolution_3( - cpy_ir_len_1[i, :], - cpy_ir_len_2[i, :], - cpy_ir_len_3[i, :], - final_fir_IS_len, - ) - - ir_diff[ - vectorized_time_ip[i] : (vectorized_time_ip[i] + final_fir_IS_len) - ] += np.real(out) - - return ir_diff - def simulate( self, snr=None, diff --git a/pyroomacoustics/simulation/ism.py b/pyroomacoustics/simulation/ism.py index 02479179..11662e86 100644 --- a/pyroomacoustics/simulation/ism.py +++ b/pyroomacoustics/simulation/ism.py @@ -290,9 +290,7 @@ def compute_ism_rir( irs *= oct_band_amplitude.T else: - ir_att = interpolate_octave_bands( - octave_bands, oct_band_amplitude, min_phase=min_phase - ) + ir_att = octave_bands.synthesis(oct_band_amplitude.T, min_phase=min_phase) full_band_imp_resp.append(ir_att) irs = multi_convolve(*full_band_imp_resp) diff --git a/pyroomacoustics/tests/test_octave_bands.py b/pyroomacoustics/tests/test_octave_bands.py new file mode 100644 index 00000000..1fccff99 --- /dev/null +++ b/pyroomacoustics/tests/test_octave_bands.py @@ -0,0 +1,111 @@ +""" +This test verifies properties of octave band filters. +""" + +import numpy as np +import pytest + +import pyroomacoustics as pra + +_FS = 16000 +_NFFT = 4096 +_BASE_FREQ = 125.0 + +octave_band_objects = [ + pra.OctaveBandsFactory( + fs=_FS, + n_fft=_NFFT, + keep_dc=True, + base_frequency=_BASE_FREQ, + ), + pra.AntoniOctaveFilterBank( + fs=_FS, + n_fft=pra.constants.get("octave_bands_n_fft"), + base_frequency=_BASE_FREQ, + ), +] + + +@pytest.mark.parametrize("octave_bands", octave_band_objects) +def test_octave_bands_perfect_reconstruction(octave_bands): + fs = octave_bands.fs + + np.random.seed(0) + x = np.random.randn(fs) + + x_bands = octave_bands.analysis(x) + + x_rec = x_bands.sum(axis=-1) + + error = abs(x - x_rec).max() + print(error) + assert np.allclose(x, x_rec) + + +@pytest.mark.parametrize("octave_bands", octave_band_objects) +def test_octave_bands_single_band(octave_bands): + fs = octave_bands.fs + + np.random.seed(0) + x = np.random.randn(fs) + + x_bands = octave_bands.analysis(x) + + x_bands_single = [ + octave_bands.analysis(x, band=b) for b in range(octave_bands.n_bands) + ] + x_bands_single = np.stack(x_bands_single, axis=-1) + + assert np.allclose(x_bands, x_bands_single) + + +@pytest.mark.parametrize("octave_bands", octave_band_objects) +def test_octave_bands_multi_band(octave_bands): + fs = octave_bands.fs + + np.random.seed(0) + x = np.random.randn(fs) + + x_bands = octave_bands.analysis(x) + + x_bands_multi = octave_bands.analysis(x, band=list(range(octave_bands.n_bands))) + + assert np.allclose(x_bands, x_bands_multi) + + +def test_octave_bands_interpolation(): + octave_bands = pra.AntoniOctaveFilterBank(fs=_FS, n_fft=256) + coeffs = np.arange(octave_bands.n_bands)[::-1] + 1 + + interpolation_filter = octave_bands.synthesis( + coeffs, min_phase=False, filter_length=256 + ) + + # Compare the energy of the original coefficients to those after filtering. + expected_energy = np.sum( + (octave_bands.get_bw() / octave_bands.fs * 2.0) * coeffs**2 + ) + filter_energy = np.sum(octave_bands.energy(interpolation_filter)) + + assert abs(expected_energy - filter_energy) < 0.1 + + +def test_energy_preserving_filtering(): + fs = 16000 + + pra.constants.set("octave_bands_keep_dc", True) + + octave_bands = pra.AntoniOctaveFilterBank( + fs=fs, + n_fft=pra.constants.get("octave_bands_n_fft"), + base_frequency=pra.constants.get("octave_bands_base_freq"), + ) + + # Reads the file containing the Eigenmike's directivity measurements + eigenmike = pra.MeasuredDirectivityFile("EM32_Directivity", fs=fs) + ir = eigenmike.impulse_responses[0, 0] + + ir_energy = np.square(ir).sum() + ir_energy_bands = octave_bands.energy(ir) + + assert abs(ir_energy - ir_energy_bands.sum()) < 1e-3