[med-svn] [python-mne] 08/353: ENH: added filter_length parameter, improved test, cleaned up code

Yaroslav Halchenko debian at onerussian.com
Fri Nov 27 17:24:24 UTC 2015


This is an automated email from the git hooks/post-receive script.

yoh pushed a commit to tag 0.4
in repository python-mne.

commit 67922cd5b1074d161ff8714a0c0645f65956f6a4
Author: Martin Luessi <mluessi at nmr.mgh.harvard.edu>
Date:   Thu Nov 10 15:18:09 2011 -0500

    ENH: added filter_length parameter, improved test, cleaned up code
---
 mne/filter.py            | 109 ++++++++++++++++++++++++++---------------------
 mne/tests/test_filter.py |  33 ++++++++++----
 2 files changed, 86 insertions(+), 56 deletions(-)

diff --git a/mne/filter.py b/mne/filter.py
index d91c5c8..1b50fe0 100644
--- a/mne/filter.py
+++ b/mne/filter.py
@@ -3,11 +3,11 @@ from scipy import signal
 from scipy.fftpack import fft, ifft
 
 
-def _overlap_add_filter(x, h, N_fft=None, zero_phase=True):
+def _overlap_add_filter(x, h, n_fft=None, zero_phase=True):
     """ Filter using overlap-add FFTs.
 
     Filters the signal x using a filter with the impulse response h.
-    If zero_phase==True, the amplitude response is scaled and the filter is 
+    If zero_phase==True, the amplitude response is scaled and the filter is
     applied in forward and backward direction, resulting in a zero-phase
     filter.
 
@@ -17,9 +17,9 @@ def _overlap_add_filter(x, h, N_fft=None, zero_phase=True):
         Signal to filter
     h : 1d array
         Filter impule response (FIR filter coefficients)
-    N_fft : int
+    n_fft : int
         Length of the FFT. If None, the best size is determined automatically.
-    zero_phase : boolean
+    zero_phase : bool
         If True: the filter is applied in forward and backward direction,
         resulting in a zero-phase filter
 
@@ -28,38 +28,38 @@ def _overlap_add_filter(x, h, N_fft=None, zero_phase=True):
     xf : 1d array
         x filtered
     """
-
-    N_h = len(h)
+    n_h = len(h)
 
     # Extend the signal by mirroring the edges to reduce transient filter
     # response
-    N_edge = min(N_h, len(x))
+    n_edge = min(n_h, len(x))
 
-    x_ext = np.r_[2*x[0]-x[N_edge-1:0:-1], x, 2*x[-1]-x[-2:-N_edge-1:-1]]
+    x_ext = np.r_[2 * x[0] - x[n_edge - 1:0:-1],\
+                  x, 2 * x[-1] - x[-2:-n_edge - 1:-1]]
 
-    N_x = len(x_ext)
+    n_x = len(x_ext)
 
     # Determine FFT length to use
-    if N_fft == None:
-        if N_x > N_h:
-            N_tot = 2*N_x if zero_phase else N_x
-
-            N = 2**np.arange(np.ceil(np.log2(N_h)),
-                             np.floor(np.log2(N_tot)))
-            cost = np.ceil(N_tot / (N - N_h + 1)) * N  * (np.log2(N) + 1)
-            N_fft = N[np.argmin(cost)]
+    if n_fft == None:
+        if n_x > n_h:
+            n_tot = 2 * n_x if zero_phase else n_x
+
+            N = 2 ** np.arange(np.ceil(np.log2(n_h)),
+                               np.floor(np.log2(n_tot)))
+            cost = np.ceil(n_tot / (N - n_h + 1)) * N * (np.log2(N) + 1)
+            n_fft = N[np.argmin(cost)]
         else:
             # Use only a single block
-            N_fft = 2**np.ceil(np.log2(N_x + N_h - 1))
+            n_fft = 2 ** np.ceil(np.log2(n_x + n_h - 1))
 
-    if N_fft <= 0:
+    if n_fft <= 0:
         raise ValueError('N_fft is too short, has to be at least len(h)')
 
     # Filter in frequency domain
-    h_fft = fft(np.r_[h, np.zeros(N_fft - N_h)])
+    h_fft = fft(np.r_[h, np.zeros(n_fft - n_h)])
 
     if zero_phase:
-        # We will apply the filter in forward and backward direction: Scale 
+        # We will apply the filter in forward and backward direction: Scale
         # frequency response of the filter so that the shape of the amplitude
         # response stays the same when it is applied twice
 
@@ -70,10 +70,10 @@ def _overlap_add_filter(x, h, N_fft=None, zero_phase=True):
     x_filtered = np.zeros_like(x_ext)
 
     # Segment length for signal x
-    N_seg = N_fft - N_h + 1
+    n_seg = n_fft - n_h + 1
 
     # Number of segements (including fractional segments)
-    num_segments = int(np.ceil(N_x / float(N_seg)))
+    n_segments = int(np.ceil(n_x / float(n_seg)))
 
     filter_input = x_ext
 
@@ -84,20 +84,20 @@ def _overlap_add_filter(x, h, N_fft=None, zero_phase=True):
             filter_input = np.flipud(x_filtered)
             x_filtered = np.zeros_like(x_ext)
 
-        for seg_ind in range(num_segments):
-            seg = filter_input[seg_ind*N_seg:(seg_ind+1)*N_seg]
-            seg_fft = fft(np.r_[seg, np.zeros(N_fft - len(seg))])
+        for seg_idx in range(n_segments):
+            seg = filter_input[seg_idx * n_seg:(seg_idx + 1) * n_seg]
+            seg_fft = fft(np.r_[seg, np.zeros(n_fft - len(seg))])
 
-            if seg_ind*N_seg+N_fft < N_x:
-                x_filtered[seg_ind*N_seg:seg_ind*N_seg+N_fft] \
+            if seg_idx * n_seg + n_fft < n_x:
+                x_filtered[seg_idx * n_seg:seg_idx * n_seg + n_fft]\
                     += np.real(ifft(h_fft * seg_fft))
             else:
                 # Last segment
-                x_filtered[seg_ind*N_seg:] \
-                    += np.real(ifft(h_fft * seg_fft))[:N_x-seg_ind*N_seg]
+                x_filtered[seg_idx * n_seg:] \
+                    += np.real(ifft(h_fft * seg_fft))[:n_x - seg_idx * n_seg]
 
     # Remove mirrored edges that we added
-    x_filtered = x_filtered[N_edge-1:-N_edge+1]
+    x_filtered = x_filtered[n_edge - 1:-n_edge + 1]
 
     if zero_phase:
         # flip signal back
@@ -106,16 +106,12 @@ def _overlap_add_filter(x, h, N_fft=None, zero_phase=True):
     return x_filtered
 
 
-def _filter(x, Fs, freq, gain):
+def _filter(x, Fs, freq, gain, filter_length=None):
     """ Filter signal using gain control points in the frequency domain.
 
     The filter impulse response is constructed from a Hamming window (window
     used in "firwin2" function) to avoid ripples in the frequency reponse
-    (windowing is a smoothing in frequency domain).
-
-    If the signal is shorter than 1 minute, it is filtered directly using
-    FFTs (zero-phase). If the signal is longer, zero-phase overlap-add
-    filtering is used.
+    (windowing is a smoothing in frequency domain). The filter is zero-phase.
 
     Parameters
     ----------
@@ -127,6 +123,10 @@ def _filter(x, Fs, freq, gain):
         Frequency sampling points in Hz
     gain : 1d array
         Filter gain at frequency sampling points
+    filter_length : int (default: None)
+        Length of the filter to use. If None or "len(x) < filter_length", the
+        filter length used is len(x). Otherwise, overlap-add filtering with a
+        filter of the specified length is used (faster for long signals).
 
     Returns
     -------
@@ -138,7 +138,7 @@ def _filter(x, Fs, freq, gain):
     # normalize frequencies
     freq = [f / (Fs / 2) for f in freq]
 
-    if len(x) < 60*Fs:
+    if filter_length == None or len(x) <= filter_length:
         # Use direct FFT filtering for short signals
 
         Norig = len(x)
@@ -159,8 +159,8 @@ def _filter(x, Fs, freq, gain):
         xf = xf[:Norig]
         x = x[:Norig]
     else:
-        # Use overlap-add filter
-        N = int(10 * Fs)
+        # Use overlap-add filter with a fixed length
+        N = filter_length
 
         if (gain[-1] == 0.0 and N % 2 == 1) \
             or (gain[-1] == 1.0 and N % 2 != 1):
@@ -173,7 +173,7 @@ def _filter(x, Fs, freq, gain):
     return xf
 
 
-def band_pass_filter(x, Fs, Fp1, Fp2):
+def band_pass_filter(x, Fs, Fp1, Fp2, filter_length=None):
     """Bandpass filter for the signal x.
 
     Applies a zero-phase bandpass filter to the signal x.
@@ -188,6 +188,10 @@ def band_pass_filter(x, Fs, Fp1, Fp2):
         low cut-off frequency
     Fp2 : float
         high cut-off frequency
+    filter_length : int (default: None)
+        Length of the filter to use. If None or "len(x) < filter_length", the
+        filter length used is len(x). Otherwise, overlap-add filtering with a
+        filter of the specified length is used (faster for long signals).
 
     Returns
     -------
@@ -210,7 +214,7 @@ def band_pass_filter(x, Fs, Fp1, Fp2):
     Fs1 = Fp1 - 0.5 in Hz
     Fs2 = Fp2 + 0.5 in Hz
     """
-    Fs  = float(Fs)
+    Fs = float(Fs)
     Fp1 = float(Fp1)
     Fp2 = float(Fp2)
 
@@ -218,12 +222,13 @@ def band_pass_filter(x, Fs, Fp1, Fp2):
     Fs1 = Fp1 - 0.5
     Fs2 = Fp2 + 0.5
 
-    xf = _filter(x, Fs, [0, Fs1, Fp1, Fp2, Fs2, Fs/2], [0, 0, 1, 1, 0, 0])
+    xf = _filter(x, Fs, [0, Fs1, Fp1, Fp2, Fs2, Fs / 2], [0, 0, 1, 1, 0, 0],
+                 filter_length)
 
     return xf
 
 
-def low_pass_filter(x, Fs, Fp):
+def low_pass_filter(x, Fs, Fp, filter_length=None):
     """Lowpass filter for the signal x.
 
     Applies a zero-phase lowpass filter to the signal x.
@@ -236,6 +241,10 @@ def low_pass_filter(x, Fs, Fp):
         sampling rate
     Fp : float
         cut-off frequency
+    filter_length : int (default: None)
+        Length of the filter to use. If None or "len(x) < filter_length", the
+        filter length used is len(x). Otherwise, overlap-add filtering with a
+        filter of the specified length is used (faster for long signals).
 
     Returns
     -------
@@ -260,12 +269,12 @@ def low_pass_filter(x, Fs, Fp):
 
     Fstop = Fp + 0.5
 
-    xf = _filter(x, Fs, [0, Fp, Fstop, Fs/2], [1, 1, 0, 0])
+    xf = _filter(x, Fs, [0, Fp, Fstop, Fs / 2], [1, 1, 0, 0], filter_length)
 
     return xf
 
 
-def high_pass_filter(x, Fs, Fp):
+def high_pass_filter(x, Fs, Fp, filter_length=None):
     """Highpass filter for the signal x.
 
     Applies a zero-phase highpass filter to the signal x.
@@ -278,6 +287,10 @@ def high_pass_filter(x, Fs, Fp):
         sampling rate
     Fp : float
         cut-off frequency
+    filter_length : int (default: None)
+        Length of the filter to use. If None or "len(x) < filter_length", the
+        filter length used is len(x). Otherwise, overlap-add filtering with a
+        filter of the specified length is used (faster for long signals).
 
     Returns
     -------
@@ -303,6 +316,6 @@ def high_pass_filter(x, Fs, Fp):
 
     Fstop = Fp - 0.5
 
-    xf = _filter(x, Fs, [0, Fstop, Fp, Fs/2], [0, 0, 1, 1])
+    xf = _filter(x, Fs, [0, Fstop, Fp, Fs / 2], [0, 0, 1, 1], filter_length)
 
-    return xf
+    return xf
\ No newline at end of file
diff --git a/mne/tests/test_filter.py b/mne/tests/test_filter.py
index c071bba..83d5bab 100644
--- a/mne/tests/test_filter.py
+++ b/mne/tests/test_filter.py
@@ -3,12 +3,29 @@ from numpy.testing import assert_array_almost_equal
 
 from ..filter import band_pass_filter, high_pass_filter, low_pass_filter
 
+
 def test_filters():
-    Fs = 250
-    # Test short and long signals (direct FFT and overlap-add FFT filtering)
-    for sig_len_secs in [10, 90]:
-        a = np.random.randn(sig_len_secs * Fs)
-        bp = band_pass_filter(a, Fs, 4, 8)
-        lp = low_pass_filter(a, Fs, 8)
-        hp = high_pass_filter(lp, Fs, 4)
-        assert_array_almost_equal(hp, bp, 2)
+    Fs = 500
+    sig_len_secs = 60
+
+    # Filtering of short signals (filter length = len(a))
+    a = np.random.randn(sig_len_secs * Fs)
+    bp = band_pass_filter(a, Fs, 4, 8)
+    lp = low_pass_filter(a, Fs, 8)
+    hp = high_pass_filter(lp, Fs, 4)
+    assert_array_almost_equal(hp, bp, 2)
+
+    # Overlap-add filtering with a fixed filter length
+    filter_length = 8192
+    bp_oa = band_pass_filter(a, Fs, 4, 8, filter_length)
+    lp_oa = low_pass_filter(a, Fs, 8, filter_length)
+    hp_oa = high_pass_filter(lp_oa, Fs, 4, filter_length)
+    assert_array_almost_equal(hp_oa, bp_oa, 2)
+
+    # The two methods should give the same result
+    # As filtering for short signals uses a circular convolution (FFT) and
+    # the overlap-add filter implements a linear convolution, the signal
+    # boundary will be slightly different and we ignore it
+    n_edge_ignore = 1000
+    assert_array_almost_equal(hp[n_edge_ignore:-n_edge_ignore],
+                              hp_oa[n_edge_ignore:-n_edge_ignore], 2)
\ No newline at end of file

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-med/python-mne.git



More information about the debian-med-commit mailing list