[med-svn] [python-mne] 05/353: ENH: overlap-add filtering for long signals

Yaroslav Halchenko debian at onerussian.com
Fri Nov 27 17:24:23 UTC 2015


This is an automated email from the git hooks/post-receive script.

yoh pushed a commit to tag 0.4
in repository python-mne.

commit 4deab5f59f95ba0b0ed9984bc04ff62511004e61
Author: Martin Luessi <mluessi at nmr.mgh.harvard.edu>
Date:   Wed Oct 26 10:33:59 2011 -0400

    ENH: overlap-add filtering for long signals
---
 mne/filter.py            | 69 +++++++++++++++++++++++++++---------------------
 mne/tests/test_filter.py | 14 +++++-----
 2 files changed, 47 insertions(+), 36 deletions(-)

diff --git a/mne/filter.py b/mne/filter.py
index c37e18d..d91c5c8 100644
--- a/mne/filter.py
+++ b/mne/filter.py
@@ -9,7 +9,7 @@ def _overlap_add_filter(x, h, N_fft=None, zero_phase=True):
     Filters the signal x using a filter with the impulse response h.
     If zero_phase==True, the amplitude response is scaled and the filter is 
     applied in forward and backward direction, resulting in a zero-phase
-    filter. 
+    filter.
 
     Parameters
     ----------
@@ -30,13 +30,13 @@ def _overlap_add_filter(x, h, N_fft=None, zero_phase=True):
     """
 
     N_h = len(h)
- 
+
     # Extend the signal by mirroring the edges to reduce transient filter
     # response
     N_edge = min(N_h, len(x))
 
     x_ext = np.r_[2*x[0]-x[N_edge-1:0:-1], x, 2*x[-1]-x[-2:-N_edge-1:-1]]
-    
+
     N_x = len(x_ext)
 
     # Determine FFT length to use
@@ -44,9 +44,9 @@ def _overlap_add_filter(x, h, N_fft=None, zero_phase=True):
         if N_x > N_h:
             N_tot = 2*N_x if zero_phase else N_x
 
-            N = 2**np.arange(np.ceil(np.log2(N_h)), 
+            N = 2**np.arange(np.ceil(np.log2(N_h)),
                              np.floor(np.log2(N_tot)))
-            cost = np.ceil(N_tot / (N - N_h + 1)) * N  * (np.log2(N) + 1)                
+            cost = np.ceil(N_tot / (N - N_h + 1)) * N  * (np.log2(N) + 1)
             N_fft = N[np.argmin(cost)]
         else:
             # Use only a single block
@@ -55,14 +55,14 @@ def _overlap_add_filter(x, h, N_fft=None, zero_phase=True):
     if N_fft <= 0:
         raise ValueError('N_fft is too short, has to be at least len(h)')
 
-    # Filter in frequency domain    
+    # Filter in frequency domain
     h_fft = fft(np.r_[h, np.zeros(N_fft - N_h)])
 
     if zero_phase:
         # We will apply the filter in forward and backward direction: Scale 
         # frequency response of the filter so that the shape of the amplitude
         # response stays the same when it is applied twice
-        
+
         # be careful not to divide by too small numbers
         idx = np.where(np.abs(h_fft) > 1e-6)
         h_fft[idx] = h_fft[idx] / np.sqrt(np.abs(h_fft[idx]))
@@ -73,7 +73,7 @@ def _overlap_add_filter(x, h, N_fft=None, zero_phase=True):
     N_seg = N_fft - N_h + 1
 
     # Number of segements (including fractional segments)
-    num_segments = np.ceil(N_x / float(N_seg))
+    num_segments = int(np.ceil(N_x / float(N_seg)))
 
     filter_input = x_ext
 
@@ -87,10 +87,10 @@ def _overlap_add_filter(x, h, N_fft=None, zero_phase=True):
         for seg_ind in range(num_segments):
             seg = filter_input[seg_ind*N_seg:(seg_ind+1)*N_seg]
             seg_fft = fft(np.r_[seg, np.zeros(N_fft - len(seg))])
-    
-            if seg_ind*N_seg+N_fft < N_x:                               
+
+            if seg_ind*N_seg+N_fft < N_x:
                 x_filtered[seg_ind*N_seg:seg_ind*N_seg+N_fft] \
-                    += np.real(ifft(h_fft * seg_fft))            
+                    += np.real(ifft(h_fft * seg_fft))
             else:
                 # Last segment
                 x_filtered[seg_ind*N_seg:] \
@@ -103,18 +103,18 @@ def _overlap_add_filter(x, h, N_fft=None, zero_phase=True):
         # flip signal back
         x_filtered = np.flipud(x_filtered)
 
-    return x_filtered           
+    return x_filtered
+
 
-    
 def _filter(x, Fs, freq, gain):
     """ Filter signal using gain control points in the frequency domain.
 
-    The filter impulse response is constructed from a Hamming window (window 
-    used in "firwin2" function) to avoid ripples in the frequency reponse 
-    (windowing is a smoothing in frequency domain). 
+    The filter impulse response is constructed from a Hamming window (window
+    used in "firwin2" function) to avoid ripples in the frequency reponse
+    (windowing is a smoothing in frequency domain).
 
-    If the signal is shorter than 10 seconds, it is filtered directly using
-    FFTs (zero-phase). If the signal is longer, zero-phase overlap-add 
+    If the signal is shorter than 1 minute, it is filtered directly using
+    FFTs (zero-phase). If the signal is longer, zero-phase overlap-add
     filtering is used.
 
     Parameters
@@ -138,13 +138,16 @@ def _filter(x, Fs, freq, gain):
     # normalize frequencies
     freq = [f / (Fs / 2) for f in freq]
 
-    if len(x) < 10*Fs:
+    if len(x) < 60*Fs:
         # Use direct FFT filtering for short signals
 
-        # Make x EVEN
         Norig = len(x)
-        if Norig % 2 == 1:
-            x = np.r_[x, 1]
+
+        if (gain[-1] == 0.0 and Norig % 2 == 1) \
+            or (gain[-1] == 1.0 and Norig % 2 != 1):
+            # Gain at Nyquist freq: 1: make x EVEN, 0: make x ODD
+            x = np.r_[x, x[-1]]
+
         N = len(x)
 
         H = signal.firwin2(N, freq, gain)
@@ -158,6 +161,12 @@ def _filter(x, Fs, freq, gain):
     else:
         # Use overlap-add filter
         N = int(10 * Fs)
+
+        if (gain[-1] == 0.0 and N % 2 == 1) \
+            or (gain[-1] == 1.0 and N % 2 != 1):
+            # Gain at Nyquist freq: 1: make N EVEN, 0: make N ODD
+            N += 1
+
         H = signal.firwin2(N, freq, gain)
         xf = _overlap_add_filter(x, H, zero_phase=True)
 
@@ -167,8 +176,8 @@ def _filter(x, Fs, freq, gain):
 def band_pass_filter(x, Fs, Fp1, Fp2):
     """Bandpass filter for the signal x.
 
-    Applies a zero-phase bandpass filter to the signal x.    
-    
+    Applies a zero-phase bandpass filter to the signal x.
+
     Parameters
     ----------
     x : 1d array
@@ -217,7 +226,7 @@ def band_pass_filter(x, Fs, Fp1, Fp2):
 def low_pass_filter(x, Fs, Fp):
     """Lowpass filter for the signal x.
 
-    Applies a zero-phase lowpass filter to the signal x.    
+    Applies a zero-phase lowpass filter to the signal x.
 
     Parameters
     ----------
@@ -248,11 +257,11 @@ def low_pass_filter(x, Fs, Fp):
     """
     Fs = float(Fs)
     Fp = float(Fp)
-    
+
     Fstop = Fp + 0.5
-    
+
     xf = _filter(x, Fs, [0, Fp, Fstop, Fs/2], [1, 1, 0, 0])
-    
+
     return xf
 
 
@@ -288,10 +297,10 @@ def high_pass_filter(x, Fs, Fp):
           Fp-0.5  Fp
 
     """
-    
+
     Fs = float(Fs)
     Fp = float(Fp)
-    
+
     Fstop = Fp - 0.5
 
     xf = _filter(x, Fs, [0, Fstop, Fp, Fs/2], [0, 0, 1, 1])
diff --git a/mne/tests/test_filter.py b/mne/tests/test_filter.py
index 98f5ff1..c071bba 100644
--- a/mne/tests/test_filter.py
+++ b/mne/tests/test_filter.py
@@ -4,9 +4,11 @@ from numpy.testing import assert_array_almost_equal
 from ..filter import band_pass_filter, high_pass_filter, low_pass_filter
 
 def test_filters():
-    a = np.random.randn(1000)
-    Fs = 1000
-    bp = band_pass_filter(a, Fs, 4, 8)
-    lp = low_pass_filter(a, Fs, 8)
-    hp = high_pass_filter(lp, Fs, 4)
-    assert_array_almost_equal(hp, bp, 2)
+    Fs = 250
+    # Test short and long signals (direct FFT and overlap-add FFT filtering)
+    for sig_len_secs in [10, 90]:
+        a = np.random.randn(sig_len_secs * Fs)
+        bp = band_pass_filter(a, Fs, 4, 8)
+        lp = low_pass_filter(a, Fs, 8)
+        hp = high_pass_filter(lp, Fs, 4)
+        assert_array_almost_equal(hp, bp, 2)

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-med/python-mne.git



More information about the debian-med-commit mailing list