[med-svn] [python-mne] 64/376: adding routines to compute simple time frequency analysis with morlet wavelet

Yaroslav Halchenko debian at onerussian.com
Fri Nov 27 17:22:09 UTC 2015


This is an automated email from the git hooks/post-receive script.

yoh pushed a commit to annotated tag v0.1
in repository python-mne.

commit 55b831b6b741cb1da16d983e7798e31d3e0680b2
Author: Alexandre Gramfort <alexandre.gramfort at inria.fr>
Date:   Thu Feb 3 13:52:40 2011 -0500

    adding routines to compute simple time frequency analysis with morlet wavelet
---
 examples/plot_time_frequency.py |  85 +++++++++++++++++
 mne/__init__.py                 |   1 +
 mne/tfr.py                      | 204 ++++++++++++++++++++++++++++++++++++++++
 3 files changed, 290 insertions(+)

diff --git a/examples/plot_time_frequency.py b/examples/plot_time_frequency.py
new file mode 100644
index 0000000..b53eb4a
--- /dev/null
+++ b/examples/plot_time_frequency.py
@@ -0,0 +1,85 @@
+"""
+=========================================================
+Time frequency : Induced power and inter-trial phase-lock
+=========================================================
+
+This script shows how to compute induced power and inter-trial
+phase-lock for a list of epochs read in a raw file given
+a list of events.
+
+"""
+# Authors: Alexandre Gramfort <gramfort at nmr.mgh.harvard.edu>
+#
+# License: BSD (3-clause)
+
+print __doc__
+
+import os
+import numpy as np
+
+import mne
+from mne import fiff
+from mne import time_frequency
+
+###############################################################################
+# Set parameters
+raw_fname = os.environ['MNE_SAMPLE_DATASET_PATH']
+raw_fname += '/MEG/sample/sample_audvis_raw.fif'
+event_fname = os.environ['MNE_SAMPLE_DATASET_PATH']
+event_fname += '/MEG/sample/sample_audvis_raw-eve.fif'
+event_id = 1
+tmin = -0.2
+tmax = 0.5
+
+# Setup for reading the raw data
+raw = fiff.setup_read_raw(raw_fname)
+events = mne.read_events(event_fname)
+
+include = []
+exclude = raw['info']['bads'] + ['MEG 2443', 'EEG 053'] # bads + 2 more
+
+# picks MEG gradiometers
+picks = fiff.pick_types(raw['info'], meg='grad', eeg=False,
+                                stim=False, include=include, exclude=exclude)
+
+picks = [picks[97]]
+data, times, channel_names = mne.read_epochs(raw, events, event_id,
+                                tmin, tmax, picks=picks, baseline=(None, 0))
+epochs = np.array([d['epoch'] for d in data]) # as 3D matrix
+evoked_data = np.mean(epochs, axis=0) # compute evoked fields
+
+frequencies = np.arange(4, 30, 3) # define frequencies of interest
+Fs = raw['info']['sfreq'] # sampling in Hz
+power, phase_lock = time_frequency(epochs, Fs=Fs, frequencies=frequencies,
+                                   n_cycles=2)
+
+###############################################################################
+# View time-frequency plots
+import pylab as pl
+pl.clf()
+pl.subplots_adjust(0.1, 0.08, 0.98, 0.94, 0.2, 0.43)
+pl.subplot(3, 1, 1)
+pl.plot(times, evoked_data.T)
+pl.title('Evoked response (%s)' % raw['info']['ch_names'][picks[0]])
+pl.xlabel('time (s)')
+pl.ylabel('T / m')
+pl.xlim(times[0], times[-1])
+
+pl.subplot(3, 1, 2)
+pl.imshow(20*np.log10(power[0]), extent=[times[0], times[-1],
+                                      frequencies[0], frequencies[-1]],
+          aspect='auto', origin='lower')
+pl.xlabel('Time (s)')
+pl.ylabel('Frequency (Hz)')
+pl.title('Induced power (%s)' % raw['info']['ch_names'][picks[0]])
+pl.colorbar()
+
+pl.subplot(3, 1, 3)
+pl.imshow(phase_lock[0], extent=[times[0], times[-1],
+                              frequencies[0], frequencies[-1]],
+          aspect='auto', origin='lower')
+pl.xlabel('Time (s)')
+pl.ylabel('PLF')
+pl.title('Phase-lock (%s)' % raw['info']['ch_names'][picks[0]])
+pl.colorbar()
+pl.show()
diff --git a/mne/__init__.py b/mne/__init__.py
index 95ae1fb..f6e87f4 100644
--- a/mne/__init__.py
+++ b/mne/__init__.py
@@ -7,4 +7,5 @@ from .stc import read_stc, write_stc
 from .bem_surfaces import read_bem_surfaces
 from .inverse import read_inverse_operator, compute_inverse
 from .epochs import read_epochs
+from .tfr import time_frequency
 import fiff
diff --git a/mne/tfr.py b/mne/tfr.py
new file mode 100644
index 0000000..0fce274
--- /dev/null
+++ b/mne/tfr.py
@@ -0,0 +1,204 @@
+"""A module which implements the continuous wavelet transform
+with complex Morlet wavelets.
+
+Author : Alexandre Gramfort, gramfort at nmr.mgh.harvard.edu (2011)
+License : BSD 3-clause
+
+inspired by Matlab code from Sheraz Khan & Brainstorm & SPM
+"""
+
+from math import sqrt
+import numpy as np
+from scipy import linalg
+from scipy.fftpack import fftn, ifftn
+
+
+def morlet(Fs, freqs, n_cycles=7, sigma=None):
+    """Compute Wavelets for the given frequency range
+
+    Parameters
+    ----------
+    Fs : float
+        Sampling Frequency
+
+    freqs : array
+        frequency range of interest (1 x Frequencies)
+
+    n_cycles : float
+        Number of oscillations if wavelet
+
+    sigma : float, (optional)
+        It controls the width of the wavelet ie its temporal
+        resolution. If sigma is None the temporal resolution
+        is adapted with the frequency like for all wavelet transform.
+        The higher the frequency the shorter is the wavelet.
+        If sigma is fixed the temporal resolution is fixed
+        like for the short time Fourier transform and the number
+        of oscillations increases with the frequency.
+
+    Returns
+    -------
+    Ws : list of array
+        Wavelets time series
+    """
+    Ws = list()
+    for f in freqs:
+        # fixed or scale-dependent window
+        if sigma is None:
+            sigma_t = n_cycles / (2.0 * np.pi * f)
+        else:
+            sigma_t = n_cycles / (2.0 * np.pi * sigma)
+        # this scaling factor is proportional to (Tallon-Baudry 98):
+        # (sigma_t*sqrt(pi))^(-1/2);
+        t = np.arange(0, 5*sigma_t, 1.0 / Fs)
+        t = np.r_[-t[::-1], t[1:]]
+        W = np.exp(2.0 * 1j * np.pi * f *t)
+        W *= np.exp(-t**2 / (2.0 * sigma_t**2))
+        W /= sqrt(0.5) * linalg.norm(W.ravel())
+        Ws.append(W)
+    return Ws
+
+
+def _centered(arr, newsize):
+    # Return the center newsize portion of the array.
+    newsize = np.asarray(newsize)
+    currsize = np.array(arr.shape)
+    startind = (currsize - newsize) / 2
+    endind = startind + newsize
+    myslice = [slice(startind[k], endind[k]) for k in range(len(endind))]
+    return arr[tuple(myslice)]
+
+
+def _cwt_morlet_fft(x, Fs, freqs, mode="same", Ws=None):
+    """Compute cwt with fft based convolutions
+    """
+    x = np.asarray(x)
+    freqs = np.asarray(freqs)
+
+    # Precompute wavelets for given frequency range to save time
+    n_samples = x.size
+    n_freqs = freqs.size
+
+    if Ws is None:
+        Ws = morlet(Fs, freqs)
+
+    Ws_max_size = max(W.size for W in Ws)
+    size = n_samples + Ws_max_size - 1
+    # Always use 2**n-sized FFT
+    fsize = 2**np.ceil(np.log2(size))
+    fft_x = fftn(x, [fsize])
+
+    if mode == "full":
+        tfr = np.zeros((n_freqs, fsize), dtype=np.complex128)
+    elif mode == "same" or mode == "valid":
+        tfr = np.zeros((n_freqs, n_samples), dtype=np.complex128)
+
+    for i, W in enumerate(Ws):
+        ret = ifftn(fft_x * fftn(W, [fsize]))[:n_samples + W.size - 1]
+        if mode == "valid":
+            sz = abs(W.size - n_samples) + 1
+            offset = (n_samples - sz) / 2
+            tfr[i, offset:(offset + sz)] = _centered(ret, sz)
+        else:
+            tfr[i] = _centered(ret, n_samples)
+    return tfr
+
+
+def _cwt_morlet_convolve(x, Fs, freqs, mode='same', Ws=None):
+    """Compute time freq decomposition with temporal convolutions
+    """
+    x = np.asarray(x)
+    freqs = np.asarray(freqs)
+
+    if Ws is None:
+        Ws = morlet(Fs, freqs)
+
+    n_samples = x.size
+    # Compute convolutions
+    tfr = np.zeros((freqs.size, len(x)), dtype=np.complex128)
+    for i, W in enumerate(Ws):
+        ret = np.convolve(x, W, mode=mode)
+        if mode == "valid":
+            sz = abs(W.size - n_samples) + 1
+            offset = (n_samples - sz) / 2
+            tfr[i, offset:(offset + sz)] = ret
+        else:
+            tfr[i] = ret
+    return tfr
+
+
+def cwt_morlet(x, Fs, freqs, use_fft=True, n_cycles=7.0):
+    """Compute time freq decomposition with Morlet wavelets
+
+    Parameters
+    ----------
+    x : array
+        signal
+
+    Fs : float
+        sampling Frequency
+
+    freqs : array
+        Array of frequencies of interest
+
+    Returns
+    -------
+    tfr : 2D array
+        Time Frequency Decomposition (Frequencies x Timepoints)
+    """
+    mode = 'same'
+    # mode = "valid"
+
+    # Precompute wavelets for given frequency range to save time
+    Ws = morlet(Fs, freqs, n_cycles=n_cycles)
+
+    if use_fft:
+        return _cwt_morlet_fft(x, Fs, freqs, mode, Ws)
+    else:
+        return _cwt_morlet_convolve(x, Fs, freqs, mode, Ws)
+
+
+def time_frequency(epochs, Fs, frequencies, use_fft=True, n_cycles=25):
+    """Compute time induced power and inter-trial phase-locking factor
+
+    The time frequency decomposition is done with Morlet wavelets
+
+    Parameters
+    ----------
+    epochs : array
+        3D array of shape [n_epochs, n_channels, n_times]
+
+    Fs : float
+        sampling Frequency
+
+    frequencies : array
+        Array of frequencies of interest
+
+    use_fft : bool
+        Compute transform with fft based convolutions or temporal
+        convolutions.
+
+    n_cycles : int
+        The number of cycles in the wavelet
+
+    Returns
+    -------
+    power : 2D array
+        Induced power (Channels x Frequencies x Timepoints)
+    phase_lock : 2D array
+        Phase locking factor in [0, 1] (Channels x Frequencies x Timepoints)
+    """
+    n_frequencies = len(frequencies)
+    n_epochs, n_channels, n_times = epochs.shape
+    psd = np.zeros((n_channels, n_frequencies, n_times)) # PSD
+    plf = np.zeros((n_channels, n_frequencies, n_times)) # phase lock
+    for c in range(n_channels):
+        for e in range(n_epochs):
+            tfr = cwt_morlet(epochs[e, c, :].ravel(), Fs, frequencies,
+                                      use_fft=use_fft, n_cycles=n_cycles)
+            psd[c,:,:] += np.abs(tfr)
+            plf[c,:,:] += tfr / psd[c,:,:]
+
+    psd /= n_epochs
+    plf = np.abs(plf) / n_epochs
+    return psd, plf

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-med/python-mne.git



More information about the debian-med-commit mailing list