首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >改进的scipy.signal.welch方法在平均前剔除某些光谱

改进的scipy.signal.welch方法在平均前剔除某些光谱
EN

Stack Overflow用户
提问于 2016-04-08 01:35:31
回答 1查看 4.6K关注 0票数 1

我试图将背景地震噪声的频谱特征从一个时间序列中分离出来,该序列包含来自几个瞬变事件(例如地震)的背景噪声和信号。

为此,我使用了scipy.signal.welch方法,它实际上将我的时间序列分割成较小的段,计算了每个段的傅里叶变换,然后对结果进行平均值,并返回"welch谱“。

我想要做的是修改scipy.signal.welch,使被瞬态信号污染的片段的光谱从平均过程中被拒绝,使用一些允许识别受污染光谱的准则(例如,谱的均方根振幅阈值,甚至是时间序列段)。

我有scipy.signal.welch方法的相关源代码,并将相关代码分离为"spectral_helper“和"_fft_helper”函数,但我无法确定光谱平均值在哪里发生,或者哪里是实现光谱拒绝操作的最佳位置。

我不太熟练,不足以完全理解和修改它,为我的目的,有人能帮助我找到最好的方法来做这件事吗?

下面是验证和格式化要计算的光谱类型的函数,并调用执行fft的函数:

代码语言:javascript
复制
def _spectral_helper(x, y, fs=1.0, window='hann', nperseg=256,
                noverlap=None, nfft=None, detrend='constant',
                return_onesided=True, scaling='spectrum', axis=-1,
                mode='psd'):
"""
Calculate various forms of windowed FFTs for PSD, CSD, etc.
This is a helper function that implements the commonality between the
psd, csd, and spectrogram functions. It is not designed to be called
externally. The windows are not averaged over; the result from each window
is returned.
Parameters
---------
x : array_like
    Array or sequence containing the data to be analyzed.
y : array_like
    Array or sequence containing the data to be analyzed. If this is
    the same object in memoery as x (i.e. _spectral_helper(x, x, ...)),
    the extra computations are spared.
fs : float, optional
    Sampling frequency of the time series. Defaults to 1.0.
window : str or tuple or array_like, optional
    Desired window to use. See `get_window` for a list of windows and
    required parameters. If `window` is array_like it will be used
    directly as the window and its length will be used for nperseg.
    Defaults to 'hann'.
nperseg : int, optional
    Length of each segment.  Defaults to 256.
noverlap : int, optional
    Number of points to overlap between segments. If None,
    ``noverlap = nperseg // 2``.  Defaults to None.
nfft : int, optional
    Length of the FFT used, if a zero padded FFT is desired.  If None,
    the FFT length is `nperseg`. Defaults to None.
detrend : str or function or False, optional
    Specifies how to detrend each segment. If `detrend` is a string,
    it is passed as the ``type`` argument to `detrend`.  If it is a
    function, it takes a segment and returns a detrended segment.
    If `detrend` is False, no detrending is done.  Defaults to 'constant'.
return_onesided : bool, optional
    If True, return a one-sided spectrum for real data. If False return
    a two-sided spectrum. Note that for complex data, a two-sided
    spectrum is always returned.
scaling : { 'density', 'spectrum' }, optional
    Selects between computing the cross spectral density ('density')
    where `Pxy` has units of V**2/Hz and computing the cross spectrum
    ('spectrum') where `Pxy` has units of V**2, if `x` and `y` are
    measured in V and fs is measured in Hz.  Defaults to 'density'
axis : int, optional
    Axis along which the periodogram is computed; the default is over
    the last axis (i.e. ``axis=-1``).
mode : str, optional
    Defines what kind of return values are expected. Options are ['psd',
    'complex', 'magnitude', 'angle', 'phase'].
Returns
-------
freqs : ndarray
    Array of sample frequencies.
t : ndarray
    Array of times corresponding to each data segment
result : ndarray
    Array of output data, contents dependant on *mode* kwarg.
References
----------
.. [1] Stack Overflow, "Rolling window for 1D arrays in Numpy?",
    http://stackoverflow.com/a/6811241
.. [2] Stack Overflow, "Using strides for an efficient moving average
    filter", http://stackoverflow.com/a/4947453
Notes
-----
Adapted from matplotlib.mlab
.. versionadded:: 0.16.0
"""
if mode not in ['psd', 'complex', 'magnitude', 'angle', 'phase']:
    raise ValueError("Unknown value for mode %s, must be one of: "
                     "'default', 'psd', 'complex', "
                     "'magnitude', 'angle', 'phase'" % mode)

# If x and y are the same object we can save ourselves some computation.
same_data = y is x

if not same_data and mode != 'psd':
    raise ValueError("x and y must be equal if mode is not 'psd'")

axis = int(axis)

# Ensure we have np.arrays, get outdtype
x = np.asarray(x)
if not same_data:
    y = np.asarray(y)
    outdtype = np.result_type(x,y,np.complex64)
else:
    outdtype = np.result_type(x,np.complex64)

if not same_data:
    # Check if we can broadcast the outer axes together
    xouter = list(x.shape)
    youter = list(y.shape)
    xouter.pop(axis)
    youter.pop(axis)
    try:
        outershape = np.broadcast(np.empty(xouter), np.empty(youter)).shape
    except ValueError:
        raise ValueError('x and y cannot be broadcast together.')

if same_data:
    if x.size == 0:
        return np.empty(x.shape), np.empty(x.shape), np.empty(x.shape)
else:
    if x.size == 0 or y.size == 0:
        outshape = outershape + (min([x.shape[axis], y.shape[axis]]),)
        emptyout = np.rollaxis(np.empty(outshape), -1, axis)
        return emptyout, emptyout, emptyout

if x.ndim > 1:
    if axis != -1:
        x = np.rollaxis(x, axis, len(x.shape))
        if not same_data and y.ndim > 1:
            y = np.rollaxis(y, axis, len(y.shape))

# Check if x and y are the same length, zero-pad if neccesary
if not same_data:
    if x.shape[-1] != y.shape[-1]:
        if x.shape[-1] < y.shape[-1]:
            pad_shape = list(x.shape)
            pad_shape[-1] = y.shape[-1] - x.shape[-1]
            x = np.concatenate((x, np.zeros(pad_shape)), -1)
        else:
            pad_shape = list(y.shape)
            pad_shape[-1] = x.shape[-1] - y.shape[-1]
            y = np.concatenate((y, np.zeros(pad_shape)), -1)

# X and Y are same length now, can test nperseg with either
if x.shape[-1] < nperseg:
    warnings.warn('nperseg = {0:d}, is greater than input length = {1:d}, '
                  'using nperseg = {1:d}'.format(nperseg, x.shape[-1]))
    nperseg = x.shape[-1]

nperseg = int(nperseg)
if nperseg < 1:
    raise ValueError('nperseg must be a positive integer')

if nfft is None:
    nfft = nperseg
elif nfft < nperseg:
    raise ValueError('nfft must be greater than or equal to nperseg.')
else:
    nfft = int(nfft)

if noverlap is None:
    noverlap = nperseg//2
elif noverlap >= nperseg:
    raise ValueError('noverlap must be less than nperseg.')
else:
    noverlap = int(noverlap)

# Handle detrending and window functions
if not detrend:
    def detrend_func(d):
        return d
elif not hasattr(detrend, '__call__'):
    def detrend_func(d):
        return signaltools.detrend(d, type=detrend, axis=-1)
elif axis != -1:
    # Wrap this function so that it receives a shape that it could
    # reasonably expect to receive.
    def detrend_func(d):
        d = np.rollaxis(d, -1, axis)
        d = detrend(d)
        return np.rollaxis(d, axis, len(d.shape))
else:
    detrend_func = detrend

if isinstance(window, string_types) or type(window) is tuple:
    win = get_window(window, nperseg)
else:
    win = np.asarray(window)
    if len(win.shape) != 1:
        raise ValueError('window must be 1-D')
    if win.shape[0] != nperseg:
        raise ValueError('window must have length of nperseg')

if np.result_type(win,np.complex64) != outdtype:
    win = win.astype(outdtype)

if mode == 'psd':
    if scaling == 'density':
        scale = 1.0 / (fs * (win*win).sum())
    elif scaling == 'spectrum':
        scale = 1.0 / win.sum()**2
    else:
        raise ValueError('Unknown scaling: %r' % scaling)
else:
    scale = 1

if return_onesided is True:
    if np.iscomplexobj(x):
        sides = 'twosided'
    else:
        sides = 'onesided'
        if not same_data:
            if np.iscomplexobj(y):
                sides = 'twosided'
else:
    sides = 'twosided'

if sides == 'twosided':
    num_freqs = nfft
elif sides == 'onesided':
    if nfft % 2:
        num_freqs = (nfft + 1)//2
    else:
        num_freqs = nfft//2 + 1

# Perform the windowed FFTs
result = _fft_helper(x, win, detrend_func, nperseg, noverlap, nfft)
result = result[..., :num_freqs]
freqs = fftpack.fftfreq(nfft, 1/fs)[:num_freqs]

if not same_data:
    # All the same operations on the y data
    result_y = _fft_helper(y, win, detrend_func, nperseg, noverlap, nfft)
    result_y = result_y[..., :num_freqs]
    result = np.conjugate(result) * result_y
elif mode == 'psd':
    result = np.conjugate(result) * result
elif mode == 'magnitude':
    result = np.absolute(result)
elif mode == 'angle' or mode == 'phase':
    result = np.angle(result)
elif mode == 'complex':
    pass

result *= scale
if sides == 'onesided':
    if nfft % 2:
        result[...,1:] *= 2
    else:
        # Last point is unpaired Nyquist freq point, don't double
        result[...,1:-1] *= 2

t = np.arange(nperseg/2, x.shape[-1] - nperseg/2 + 1, nperseg - noverlap)/float(fs)

if sides != 'twosided' and not nfft % 2:
    # get the last value correctly, it is negative otherwise
    freqs[-1] *= -1

# we unwrap the phase here to handle the onesided vs. twosided case
if mode == 'phase':
    result = np.unwrap(result, axis=-1)

result = result.astype(outdtype)

# All imaginary parts are zero anyways
if same_data and mode != 'complex':
    result = result.real

# Output is going to have new last axis for window index
if axis != -1:
    # Specify as positive axis index
    if axis < 0:
        axis = len(result.shape)-1-axis

    # Roll frequency axis back to axis where the data came from
    result = np.rollaxis(result, -1, axis)
else:
    # Make sure window/time index is last axis
    result = np.rollaxis(result, -1, -2)

return freqs, t, result

下面是执行实际fft的函数:

代码语言:javascript
复制
def _fft_helper(x, win, detrend_func, nperseg, noverlap, nfft):
"""
Calculate windowed FFT, for internal use by scipy.signal._spectral_helper
This is a helper function that does the main FFT calculation for
_spectral helper. All input valdiation is performed there, and the data
axis is assumed to be the last axis of x. It is not designed to be called
externally. The windows are not averaged over; the result from each window
is returned.
Returns
-------
result : ndarray
    Array of FFT data
References
----------
.. [1] Stack Overflow, "Repeat NumPy array without replicating data?",
    http://stackoverflow.com/a/5568169
Notes
-----
Adapted from matplotlib.mlab
.. versionadded:: 0.16.0
"""
# Created strided array of data segments
if nperseg == 1 and noverlap == 0:
    result = x[..., np.newaxis]
else:
    step = nperseg - noverlap
    shape = x.shape[:-1]+((x.shape[-1]-noverlap)//step, nperseg)
    strides = x.strides[:-1]+(step*x.strides[-1], x.strides[-1])
    result = np.lib.stride_tricks.as_strided(x, shape=shape,
                                             strides=strides)

# Detrend each data segment individually
result = detrend_func(result)

# Apply window by multiplication
result = win * result

# Perform the fft. Acts on last axis by default. Zero-pads automatically
result = fftpack.fft(result, n=nfft)
np.savetxt('result.csv', np.absolute(result), delimiter=',')


return result
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2016-04-08 02:11:08

我有scipy.signal.welch方法的相关来源,我已经将相关的代码分离到_spectral_helper_fft_helper函数,但是我无法确定谱的平均发生在哪里,或者哪里是实现光谱拒绝操作的最佳位置。

_spectral_helper_fft_helper中无法找到平均值的原因可能是因为这不是在哪里完成的,正如两个函数中包含的函数文档所指出的那样:

不对窗口进行平均处理;返回来自每个窗口的结果。

相反,对函数_spectral_helper中的csd (从welch调用)中的结果执行平均值,使用:

代码语言:javascript
复制
# Average over windows.
if len(Pxy.shape) >= 2 and Pxy.size > 0:
    if Pxy.shape[-1] > 1:
        Pxy = Pxy.mean(axis=-1)
    else:
        Pxy = np.reshape(Pxy, Pxy.shape[:-1])

请注意,与修改welch不同,您可以使用spectrogram,它不进行平均,因此在进行平均值之前,您可以选择拒绝光谱。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/36490060

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档