[med-svn] [python-mne] 235/376: ENH : basic EOG + ECG artifacts detectors

Yaroslav Halchenko debian at onerussian.com
Fri Nov 27 17:22:54 UTC 2015


This is an automated email from the git hooks/post-receive script.

yoh pushed a commit to annotated tag v0.1
in repository python-mne.

commit 8565be6846d8f9a0b0cfd3e03c8702035d286d6f
Author: Alexandre Gramfort <alexandre.gramfort at inria.fr>
Date:   Thu May 5 16:08:05 2011 -0400

    ENH : basic EOG + ECG artifacts detectors
---
 examples/plot_find_ecg_artifacts.py     |  47 +++++++++
 examples/plot_find_eog_artifacts.py     |  45 +++++++++
 mne/__init__.py                         |   1 +
 mne/artifacts/__init__.py               |   2 +
 mne/artifacts/ecg.py                    | 113 ++++++++++++++++++++++
 mne/artifacts/eog.py                    |  66 +++++++++++++
 mne/artifacts/peak_finder.py            | 164 ++++++++++++++++++++++++++++++++
 mne/artifacts/tests/test_peak_finder.py |   8 ++
 8 files changed, 446 insertions(+)

diff --git a/examples/plot_find_ecg_artifacts.py b/examples/plot_find_ecg_artifacts.py
new file mode 100644
index 0000000..bbcc76c
--- /dev/null
+++ b/examples/plot_find_ecg_artifacts.py
@@ -0,0 +1,47 @@
+"""
+==================
+Find ECG artifacts
+==================
+
+Locate QRS component of ECG.
+
+"""
+# Authors: Alexandre Gramfort <gramfort at nmr.mgh.harvard.edu>
+#
+# License: BSD (3-clause)
+
+print __doc__
+
+import numpy as np
+import pylab as pl
+import mne
+from mne import fiff
+from mne.datasets import sample
+data_path = sample.data_path('.')
+
+###############################################################################
+# Set parameters
+raw_fname = data_path + '/MEG/sample/sample_audvis_raw.fif'
+
+# Setup for reading the raw data
+raw = fiff.Raw(raw_fname)
+
+event_id = 999
+ecg_events = mne.artifacts.find_ecg_events(raw, event_id)
+
+# Read epochs
+picks = fiff.pick_types(raw.info, meg=False, eeg=False, stim=False, eog=False,
+                        include=['MEG 1531'])
+tmin, tmax = -0.1, 0.1
+epochs = mne.Epochs(raw, ecg_events, event_id, tmin, tmax, picks=picks,
+                    proj=False)
+data = epochs.get_data()
+
+print "Number of detected EOG artifacts : %d" % len(data)
+
+###############################################################################
+# Plot EOG artifacts
+pl.plot(1e3 * epochs.times, np.squeeze(data).T)
+pl.xlabel('Times (ms)')
+pl.ylabel('ECG')
+pl.show()
\ No newline at end of file
diff --git a/examples/plot_find_eog_artifacts.py b/examples/plot_find_eog_artifacts.py
new file mode 100644
index 0000000..22ef4d1
--- /dev/null
+++ b/examples/plot_find_eog_artifacts.py
@@ -0,0 +1,45 @@
+"""
+==================
+Find EOG artifacts
+==================
+
+Locate peaks of EOG to spot blinks and general EOG artifacts.
+
+"""
+# Authors: Alexandre Gramfort <gramfort at nmr.mgh.harvard.edu>
+#
+# License: BSD (3-clause)
+
+print __doc__
+
+import numpy as np
+import pylab as pl
+import mne
+from mne import fiff
+from mne.datasets import sample
+data_path = sample.data_path('.')
+
+###############################################################################
+# Set parameters
+raw_fname = data_path + '/MEG/sample/sample_audvis_filt-0-40_raw.fif'
+
+# Setup for reading the raw data
+raw = fiff.Raw(raw_fname)
+
+event_id = 998
+eog_events = mne.artifacts.find_eog_events(raw, event_id)
+
+# Read epochs
+picks = fiff.pick_types(raw.info, meg=False, eeg=False, stim=False, eog=True)
+tmin, tmax = -0.2, 0.2
+epochs = mne.Epochs(raw, eog_events, event_id, tmin, tmax, picks=picks)
+data = epochs.get_data()
+
+print "Number of detected EOG artifacts : %d" % len(data)
+
+###############################################################################
+# Plot EOG artifacts
+pl.plot(1e3 * epochs.times, np.squeeze(data).T)
+pl.xlabel('Times (ms)')
+pl.ylabel('EOG (muV)')
+pl.show()
\ No newline at end of file
diff --git a/mne/__init__.py b/mne/__init__.py
index e81addc..64c4a42 100755
--- a/mne/__init__.py
+++ b/mne/__init__.py
@@ -11,3 +11,4 @@ from .epochs import Epochs
 from .label import label_time_courses, read_label
 from .misc import parse_config, read_reject_parameters
 import fiff
+import artifacts
diff --git a/mne/artifacts/__init__.py b/mne/artifacts/__init__.py
new file mode 100644
index 0000000..19a3252
--- /dev/null
+++ b/mne/artifacts/__init__.py
@@ -0,0 +1,2 @@
+from .eog import find_eog_events
+from .ecg import find_ecg_events
diff --git a/mne/artifacts/ecg.py b/mne/artifacts/ecg.py
new file mode 100644
index 0000000..ede3990
--- /dev/null
+++ b/mne/artifacts/ecg.py
@@ -0,0 +1,113 @@
+import numpy as np
+
+from .. import fiff
+from ..filter import band_pass_filter
+
+
+def qrs_detector(sfreq, ecg, thresh_value=0.6, levels=2.5, n_thresh=3):
+    """Detect QRS component in ECG channels.
+
+    QRS is the main wave on the heart beat.
+
+    Parameters
+    ----------
+    sfreq : float
+        Sampling rate
+    ecg : array
+        ECG signal
+    thresh_value: float
+        qrs detection threshold
+    levels: float
+        number of std from mean to include for detection
+    n_thresh: int
+        max number of crossings
+
+    Returns
+    -------
+    events : array
+        Indices of ECG peaks
+    """
+    win_size = round((60.0 * sfreq) / 120.0)
+
+    filtecg = band_pass_filter(ecg, sfreq, 5, 35)
+    n_points = len(filtecg)
+
+    absecg = np.abs(filtecg)
+    init = int(sfreq)
+
+    maxpt = np.empty(3)
+    maxpt[0] = np.max(absecg[:init])
+    maxpt[1] = np.max(absecg[init:init * 2])
+    maxpt[2] = np.max(absecg[init * 2:init * 3])
+
+    init_max = np.mean(maxpt)
+
+    thresh1 = init_max * thresh_value
+
+    numcross = []
+    time = []
+    rms = []
+    i = 0
+    while i < (n_points - win_size):
+        window = absecg[i:i + win_size]
+        if window[0] > thresh1:
+            maxTime = np.argmax(window)
+            time.append(i + maxTime)
+            numcross.append(np.sum(np.diff(window > thresh1) == 1))
+            rms.append(np.sqrt(np.mean(window ** 2)))
+            i += win_size
+        else:
+            i += 1
+
+    time = np.array(time)
+    rms_mean = np.mean(rms)
+    rms_std = np.std(rms)
+    rms_thresh = rms_mean + (rms_std * levels)
+    b = np.where(rms < rms_thresh)[0]
+    a = np.array(numcross)[b]
+    clean_events = time[b[a < n_thresh]]
+
+    return clean_events
+
+
+def find_ecg_events(raw, event_id=999):
+    """Find ECG peaks
+
+    Parameters
+    ----------
+    raw : instance of Raw
+        The raw data
+    event_id : int
+        The index to assign to found events
+
+    Returns
+    -------
+    ecg_events : array
+        Events
+    """
+    info = raw.info
+
+    # Geting ECG Channel
+    ch_ECG = fiff.pick_types(info, meg=False, eeg=False, stim=False,
+                                 eog=False, ecg=True, emg=False)
+
+    if len(ch_ECG) == 0:
+        # closest to the heart normally, In future we can search for it.
+        ch_ECG = fiff.pick_channels(raw.ch_names, include='MEG 1531')
+        print 'Using channel index %d to identify heart beats' % ch_ECG
+    else:
+        print 'ECG channel index for this subject is: %s' % ch_ECG
+
+    assert len(ch_ECG) == 1
+    ecg, times = raw[ch_ECG, :]
+
+    # detecting QRS and generating event file
+    ecg_events = qrs_detector(info['sfreq'], ecg.ravel())
+    n_events = len(ecg_events)
+    average_pulse = 60.0 * (times[-1] - times[0]) / n_events
+    print ("Number of ECG events detected : %d (average pulse %d / min.)"
+                                           % (n_events, average_pulse))
+
+    ecg_events = np.c_[ecg_events + raw.first_samp, np.zeros(n_events),
+                       event_id * np.ones(n_events)]
+    return ecg_events
diff --git a/mne/artifacts/eog.py b/mne/artifacts/eog.py
new file mode 100644
index 0000000..afc0808
--- /dev/null
+++ b/mne/artifacts/eog.py
@@ -0,0 +1,66 @@
+import numpy as np
+
+from .peak_finder import peak_finder
+from .. import fiff
+from ..filter import band_pass_filter
+
+
+def find_eog_events(raw, event_id=998):
+    """Locate EOG artifacts
+
+    Parameters
+    ----------
+
+
+    Returns
+    -------
+    """
+    info = raw.info
+
+    # Geting EOG Channel
+    ch_EOG = fiff.pick_types(info, meg=False, eeg=False, stim=False,
+                                                eog=True, ecg=False, emg=False)
+
+    if len(ch_EOG) == 0:
+        print 'No EOG channels found'
+        print 'Trying with EEG 061 and EEG 062'
+        ch_EOG = fiff.pick_channels(raw.ch_names,
+                                        include=['EEG 061', 'EEG 062'])
+        if len(ch_EOG) != 2:
+            raise ValueError('EEG 61 or EEG 62 channel not found !!')
+
+    print 'EOG channel index for this subject is: %s' % ch_EOG
+
+    sampRate = info['sfreq']
+
+    eog, _ = raw[ch_EOG, :]
+
+    print ('Filtering the data to remove DC offset to help distinguish '
+           'blinks from saccades')
+
+    # filtering to remove dc offset so that we know which is blink and saccades
+    filteog = np.array([band_pass_filter(x, sampRate, 2, 45) for x in eog])
+    temp = np.sqrt(np.sum(filteog ** 2, axis=1))
+
+    indexmax = np.argmax(temp)
+
+    # easy to detect peaks with this filtering.
+    filteog = band_pass_filter(eog[indexmax], sampRate, 1, 10)
+
+    # detecting eog blinks and generating event file
+
+    print 'Now detecting blinks and generating corresponding event file'
+
+    temp = filteog - np.mean(filteog)
+    if np.abs(np.max(temp)) > np.abs(np.min(temp)):
+        eog_events, _ = peak_finder(filteog, extrema=1)
+    else:
+        eog_events, _ = peak_finder(filteog, extrema=-1)
+
+    print 'Saving event file'
+    n_events = len(eog_events)
+    print "Number of EOG events detected : %d" % n_events
+    eog_events = np.c_[eog_events + raw.first_samp, np.zeros(n_events),
+                       event_id * np.ones(n_events)]
+
+    return eog_events
\ No newline at end of file
diff --git a/mne/artifacts/peak_finder.py b/mne/artifacts/peak_finder.py
new file mode 100644
index 0000000..85186c7
--- /dev/null
+++ b/mne/artifacts/peak_finder.py
@@ -0,0 +1,164 @@
+import numpy as np
+from math import ceil
+
+
+def peak_finder(x0, thresh=None, extrema=1):
+    """Noise tolerant fast peak finding algorithm
+
+    Parameters
+    ----------
+    x0: 1d array
+        A real vector from the maxima will be found (required)
+    thresh: float
+        The amount above surrounding data for a peak to be
+        identified (default = (max(x0)-min(x0))/4). Larger values mean
+        the algorithm is more selective in finding peaks.
+    extrema: {-1, 1}
+        1 if maxima are desired, -1 if minima are desired
+        (default = maxima, 1)
+
+    Returns
+    -------
+    peak_loc: array
+        The indicies of the identified peaks in x0
+    peak_mag: array
+        The magnitude of the identified peaks
+
+    Note
+    ----
+    If repeated values are found the first is identified as the peak.
+    Conversion from initial Matlab code from:
+    Nathanael C. Yoder (ncyoder at purdue.edu)
+
+    Exemple
+    -------
+    t = 0:.0001:10;
+    x = 12*sin(10*2*pi*t)-3*sin(.1*2*pi*t)+randn(1,numel(t));
+    x(1250:1255) = max(x);
+    peak_finder(x)
+    """
+
+    x0 = np.asanyarray(x0)
+
+    if x0.ndim >= 2:
+        raise ValueError('The input data must be a 1D vector')
+
+    s = x0.size
+
+    if thresh is None:
+        thresh = (np.max(x0) - np.min(x0)) / 4
+
+    assert extrema in [-1, 1]
+
+    if extrema == -1:
+        x0 = extrema * x0  # Make it so we are finding maxima regardless
+
+    dx0 = np.diff(x0)  # Find derivative
+    # This is so we find the first of repeated values
+    dx0[dx0 == 0] = -np.finfo(float).eps
+    # Find where the derivative changes sign
+    ind = np.where(dx0[:-1:] * dx0[1::] < 0)[0] + 1
+
+    # Include endpoints in potential peaks and valleys
+    x = np.concatenate((x0[:1], x0[ind], x0[-1:]))
+    ind = np.concatenate(([0], ind, [s - 1]))
+
+    #  x only has the peaks, valleys, and endpoints
+    length = x.size
+    min_mag = np.min(x)
+
+    if length > 2:  # Function with peaks and valleys
+
+        # Set initial parameters for loop
+        temp_mag = min_mag
+        found_peak = False
+        left_min = min_mag
+
+        # Deal with first point a little differently since tacked it on
+        # Calculate the sign of the derivative since we taked the first point
+        # on it does not neccessarily alternate like the rest.
+        signDx = np.sign(np.diff(x[:3]))
+        if signDx[0] <= 0:  # The first point is larger or equal to the second
+            ii = -1
+            if signDx[0] == signDx[1]:  # Want alternating signs
+                x = np.concatenate((x[:1], x[2:]))
+                ind = np.concatenate((ind[:1], ind[2:]))
+                length -= 1
+
+        else:  # First point is smaller than the second
+            ii = 0
+            if signDx[0] == signDx[1]:  # Want alternating signs
+                x = x[1:]
+                ind = ind[1:]
+                length -= 1
+
+        # Preallocate max number of maxima
+        maxPeaks = ceil(length / 2.0)
+        peak_loc = np.zeros(maxPeaks, dtype=np.int)
+        peak_mag = np.zeros(maxPeaks)
+        c_ind = 0
+        # Loop through extrema which should be peaks and then valleys
+        while ii < (length - 1):
+            ii += 1  # This is a peak
+            # Reset peak finding if we had a peak and the next peak is bigger
+            # than the last or the left min was small enough to reset.
+            if found_peak and ((x[ii] > peak_mag[-1])
+                              or (left_min < peak_mag[-1] - thresh)):
+                temp_mag = min_mag
+                found_peak = False
+
+            # Make sure we don't iterate past the length of our vector
+            if ii == length - 1:
+                break  # We assign the last point differently out of the loop
+
+            # Found new peak that was lager than temp mag and threshold larger
+            # than the minimum to its left.
+            if (x[ii] > temp_mag) and (x[ii] > left_min + thresh):
+                temp_loc = ii
+                temp_mag = x[ii]
+
+            ii += 1  # Move onto the valley
+            # Come down at least thresh from peak
+            if not found_peak and (temp_mag > (thresh + x[ii])):
+                found_peak = True  # We have found a peak
+                left_min = x[ii]
+                peak_loc[c_ind] = temp_loc  # Add peak to index
+                peak_mag[c_ind] = temp_mag
+                c_ind += 1
+            elif x[ii] < left_min:  # New left minima
+                left_min = x[ii]
+
+        # Check end point
+        if (x[-1] > temp_mag) and (x[-1] > (left_min + thresh)):
+            peak_loc[c_ind] = length - 1
+            peak_mag[c_ind] = x[-1]
+            c_ind += 1
+        elif not found_peak and temp_mag > min_mag:
+            # Check if we still need to add the last point
+            peak_loc[c_ind] = temp_loc
+            peak_mag[c_ind] = temp_mag
+            c_ind += 1
+
+        # Create output
+        peak_inds = ind[peak_loc[:c_ind]]
+        peak_mags = peak_mag[:c_ind]
+    else:  # This is a monotone function where an endpoint is the only peak
+        x_ind = np.argmax(x)
+        peak_mags = x[x_ind]
+        if peak_mags > (min_mag + thresh):
+            peak_inds = ind[x_ind]
+        else:
+            peak_mags = []
+            peak_inds = []
+
+    # Change sign of data if was finding minima
+    if extrema < 0:
+        peak_mags *= -1.0
+        x0 = -x0
+
+    # Plot if no output desired
+    if len(peak_inds) == 0:
+        print 'No significant peaks found'
+
+    return peak_inds, peak_mags
+
diff --git a/mne/artifacts/tests/test_peak_finder.py b/mne/artifacts/tests/test_peak_finder.py
new file mode 100644
index 0000000..522f7fc
--- /dev/null
+++ b/mne/artifacts/tests/test_peak_finder.py
@@ -0,0 +1,8 @@
+import numpy as np
+from ..peak_finder import peak_finder
+
+def test_peak_finder():
+    """Test the peak detection method"""
+    x = [0, 2, 5, 0, 6, -1]
+    peak_inds, peak_mags = peak_finder(x)
+    print peak_inds

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-med/python-mne.git



More information about the debian-med-commit mailing list