bioacoustic tools#

# file: bioacoustic_tools.py
#
# bioacoustic tools
#
import numpy as np
import matplotlib.pyplot as plt

import scipy.signal as signal
#

Time series FIR filter#

Use oberlap and add method to filter a long time series.

Hilbert filter#

Use FIR to generate an ‘analytical’ of complex valued time series by means of Hilbert transform

Matched filter#

Use FIR filter to match filter time series

# Overlap-add FIR filter, (c) Joachim Thiemann 2016
# https://github.com/jthiem/overlapadd/blob/master/olafilt.py
#
# modified by WMXZ to work on multi-channel data
#
def fft_filt(b, x, zi=None, nh=0):
    """
    Filter a one-dimensional array with an FIR filter

    Filter a data sequence, `x`, using a FIR filter given in `b`.
    Filtering uses the overlap-add method converting both `x` and `b`
    into frequency domain first.  The FFT size is determined as the
    next higher power of 2 of twice the length of `b`.

    Parameters
    ----------
    b : one-dimensional numpy array
        The impulse response of the filter
    x : one-dimensional numpy array
        Signal to be filtered
    zi : one-dimensional numpy array, optional
        Initial condition of the filter, but in reality just the
        runout of the previous computation.  If `zi` is None or not
        given, then zero initial state is assumed.
    nh : time shift of result (group delay)

    Returns
    -------
    y : array
        The output of the digital filter.
    zf : array, optional
        If `zi` is None, this is not returned, otherwise, `zf` holds the
        final filter delay values.
    """

    if x.ndim==1: x=x.reshape(-1,1)
    if b.ndim==1: b=b.reshape(-1,1)

    L_I,N_I = b.shape
    L_sig,N_sig = x.shape

    N_chan=max(N_sig,N_I)

    # Find power of 2 larger that 2*L_I (from abarnert on Stackoverflow)
    L_F = 2<<(L_I-1).bit_length()
    L_S = L_F - L_I + 1
    if L_sig<L_F:
        offsets = range(1)
    else:
        offsets = range(0, L_sig, L_S)

    # handle complex or real input
    if np.iscomplexobj(b) or np.iscomplexobj(x):
        fft_func = np.fft.fft
        ifft_func = np.fft.ifft
        res = np.zeros((L_sig+L_F,N_chan), dtype=np.complex128)
    else:
        fft_func = np.fft.rfft
        ifft_func = np.fft.irfft
        res = np.zeros((L_sig+L_F,N_chan))

    FDir = fft_func(b, n=L_F,axis=0)

    # overlap and add
    for n in offsets:
        u1=fft_func(x[n:n+L_S,:], n=L_F,axis=0)
        u2=u1*FDir
        res[n:n+L_F,:] += ifft_func(u2,axis=0)

    if zi is not None:
        res[:zi.shape[0],:] = res[:zi.shape[0],:] + zi
        zi=res[L_sig:,:]
        return res[nh:nh+L_sig,:],zi 
    else:
        return res[nh:nh+L_sig,:]

def hilbert_filt(ss,nh):
    '''
    filter timeseries using hilbert response (complex valued )
    ss is data to be filtered
    nh is number if (positive) coefficients of hilbert response
    '''
    hk=np.pi*np.arange(-nh,nh+1)
    hk[nh]=1        # to avoid division by 0; will be corrected below
    #
    hh= 0*hk+1j*(1-np.cos(hk))/hk
    hh[nh]=1+1j*0   # correct previous adjustment
    #
    zz=fft_filt(hh,ss,nh=nh)
    return zz

def matched_filt(xx,ss):
    
    '''
    matched filter
    xx is signal for replica
    ss is data to be filtered
    '''
    nd=np.shape(xx)[0]
    rep=np.flipud(xx)/np.sqrt((nd/2*np.sum(xx**2)))
    #
    zz=fft_filt(rep,ss,nh=len(rep)//2)
    return zz
#

Some utilities#

  • quadInt

  • stack

  • cosd,sind

#------------------------------------------------------------------
def quadInt(uu,imx,nd1):
    '''    
    three point quardatic interpolation 
    to find location and value of interpolated maxima
    uu  : time series
    imx : index of maxima
    nd1 : reference sample (to be subtracted from nd1)
    '''
    nc=len(imx)
    uo=uu[imx,range(nc)]
    um=uu[imx-1,range(nc)]
    up=uu[imx+1,range(nc)]

    b=(up+um-2*uo)/2;

    xo=(um-up)/(4*b);
    yo=uo-b*xo**2;
    
    ux=np.zeros((nc,2))
    ux[:,0]=xo+imx-nd1
    ux[:,1]=yo
    return ux


def stack(x): '''stack time series''';  return x+np.ones((x.shape[0],1))*range(x.shape[1])

def cosd(x): '''cosine of degrees'''; return np.cos(x*np.pi/180)
def sind(x): '''Sine of degrees'''; return np.sin(x*np.pi/180)

Array setup#

  • array_init

  • array_setup

  • array_delay

  • array_timeseries

  • signal_direction

  • signal_directionFromGeometry

  • simulate_multiPath

def array_init(ro,type):
    ''' contruct volimetric array of hydrophones
        ro   : radius of bounding sphere
        type : configuration ("tetahedron","octahedron","cube")'''
    #
    dz=np.sqrt(0.5)
    if type=='tetrahedron':
        ho=np.array([[1,0,-dz],[-1,0,-dz],[0,-1,dz],[0,1,dz]])
    elif type=='octahedron':
        ang=np.array([0,120,240,60,180,300])*np.pi/180
        ho=np.transpose([np.cos(ang),np.sin(ang),0*ang])
        ho[:3,2]=-dz
        ho[3:,2]=+dz
    elif type=='cube':
        ho=np.array([[1,0,-dz],[0,1,-dz],[-1,0,-dz],[0,-1,-dz],
                     [1,0, dz],[0,1, dz],[-1,0, dz],[0,-1, dz]])
    ho *=ro
    return ho
#
def array_setup(ho,iar=-1):
    ''' generate vector of hydrophone pairs
        ho  : hydrophone vectors
        iar : selected sub array (-1: all pairs)'''
    nh=ho.shape[0]
    hsel=np.zeros((nh*(nh-1)//2,2),dtype=int)
    #
    kk=0
    for ii in range(nh): 
        for jj in range(ii+1,nh): 
            hsel[kk,:]=[jj,ii]
            kk +=1
    #
    D=ho[hsel[:,0],:]-ho[hsel[:,1],:]
    L=np.sqrt(np.sum(D**2,1))
    Lc=(L*1e+6).astype(int)
    Lunique=np.unique(Lc)
    if (iar>=0) & (iar<len(Lunique)):
        nl=np.where(Lunique[iar]==Lc)[0]
        D=D[nl,:]
        L=L[nl]
        hsel=hsel[nl,:]
    #
    DI=np.linalg.pinv(D)
    return DI,D,L,hsel

def array_delay(S,ho,sv=1500,fs=None):
    '''
    estimate delays using direction vector
    S direction vector
    ho hydrophone locations
    fs sampling frequency
    '''
    DC=np.sum(ho*S,1)   # ho are locations relative to array center
    DT=DC/sv            # ms from array center
    if fs==None: 
        return DT       # ms
    else:
        return DT*fs    # samples

def array_timeseries(yy,DS,noise):
    '''
    delay signal according to array geometry 
    (use fractional delay with sinc function)
    yy      : time series
    DS      : delay vector [samples]
    noise   : noise variance
    '''
    N=int(5*np.max(np.abs(DS)))
    kk=np.arange(-N,N).reshape(-1,1)
    if 1:
        win=np.sinc(kk+DS)
        ss=fft_filt(win,yy)
    else:
        ss=np.zeros((len(yy),len(DS)))
        for ii in range(len(DS)):
            win=np.sinc(kk+DS[ii])
            ss[:,ii]=fft_filt(win,yy)[:,0]

    nn=np.random.normal(scale=noise, size=ss.shape)
    ss +=nn
    return ss

def signal_direction(az,el):
    '''
    estimate direction using arrival angles
    az  : azimuth angle
    el  : elevation angle
    '''
    C=np.array([np.cos(az)*np.cos(el),np.sin(az)*np.cos(el), np.sin(el)])
    return C

def signal_directionFromGeometry(az,dx,dy):
    '''
    estimate direction using distance, depth geometry
    dx  : horizontal distance 
    dy  : vertical separation 
    az  : azimuth angle
    '''
    el=np.arctan2(dy,dx)
    C=signal_direction(az,el)
    return C,el

def signal_path(dx,dy,az,ho,sv):
    '''
    estimate delays using distance, depth geometry
    dx  : horizontal distance 
    dy  : vertical separation 
    az  : azimuth angle
    ho  : hydrophone locations
    '''
    rr=np.sqrt(dx**2 + dy**2)
    C,el=signal_directionFromGeometry(az,dx,dy)
    #
    DT=array_delay(C,ho,sv)
    return DT,el,rr

def D_path(hd,sd): ''' direct path'''; return hd-sd
def S_path(hd,sd): ''' surface reflected path'''; return hd+sd
def B_path(hd,sd,bd): ''' bottom reflected path'''; return hd-(2*bd-sd)

def simulate_multiPath(hd,sd,dx,bd,az,ho,sv,type):
    '''
    Simulate multi-path
    hd  : hydrophone depth
    sd  : source depth
    dx  : horizontal distance (hydrophone-source)
    az  : azimuth angle
    ho  : hydrophone locations
    type: type of arrival {'D','S','B'}
    '''
    if type=='D':
        return signal_path(dx,D_path(hd,sd),az,ho,sv)
    elif type=='S':
        return signal_path(dx,S_path(hd,sd),az,ho,sv)
    elif type=='B':
        return signal_path(dx,B_path(hd,sd,bd),az,ho,sv)

Generate tonal (Frequency Modulated) signals#

  • fs sampling frequency [Hz]

  • ax amplitude

  • fx FM modulation function

Linear FM#

  • f1 start frequency [Hz]

  • f2 stop frequency [Hz]

  • t2 duration [s]

  • fs sampling frequency [Hz]

Sinusoidal FM#

  • fc center frequeny [Hz]

  • fm frequeny modulation [Hz]

  • fa periods of sinusoid [Hz]

  • t2 duration [s]

  • fs sampling frequency [Hz]

def lfm(f1,f2,t2,fs):
    '''LFM from f1 Hz to f2 Hz over t2 s, with fs sampling frequency'''
    return f1+(f2-f1)/t2*np.arange(0,t2,1/fs)

def sinfm(fc,fm,fa,t2,fs):  
    '''Sinusoid with fc Hz center, fm Hz modulation frequency''' 
    return fc+fm*np.sin(2*np.pi*fa*np.arange(0, t2, 1/fs))

def tonal(fs,ax,fx):
    '''
    generate tonal from frequency
    e.g. generate 5 s LFM from 1 to 10 kHz and 96 kHz sampling
      xx=tonal(96000,1,lfm(1000,10000,5,96000))
    e.g. generate 2 s sinusoidal fm: 3 oscillations/s from 1 to 3 kHz and 96 kHz sampling 
      xx=tonal(96000,1,sinfm(2000,1000,3,2,96000))
    '''
    phase=2*np.pi*np.cumsum(fx)/fs
    return ax*np.sin(phase)

Noise generation#

def pinkNoise(f,fo): 
    ''' ping noise (1/f)'''
    return 1/ np.where(f==0,np.inf,np.sqrt(f))

def brownNoise(f,fo): 
    ''' brown noise (1/f**2)'''
    return 1/ np.where(f==0,np.inf,f)

def shipNoise(f,fo):
    ''' ship noise''' 
    return 1/ np.maximum(fo,f)

def genNoise(N,fs,fo=0,method=None):
    ''' 
    generate colored noise
    N       : number of samples
    fs      : sampling frequency
    fo      : lower cut-off frequency
    method  : noise color method 
                pinkNoise(f)
                brownNoise(f)
                shipNoise(f,fo)
    '''
    N2 = N//2+1
    f = np.arange(0,N2)*fs/N
    A2 = method(f,fo)
    A2 /= np.sqrt(np.sum(A2**2))
    n2 = np.random.random(N2)
    p2 = 2*np.pi*n2
    d2 = A2*np.exp(1j*p2)
    x = np.fft.irfft(d2)
    return x
def estimate_TDOA_0(zz,DI,hsel):
    '''use time differences of temporal maxima'''
    imx=np.argmax(zz,axis=0)
    ux=quadInt(zz,imx,0)
    #
    uu = ux[hsel[:,0],0]-ux[hsel[:,1],0]
    #
    vv = -uu @ DI.T
    return vv,ux

def direction_TDOA_0(zz,DI,hsel):
    '''use time differences of temporal maxima'''
    vv,ux=estimate_TDOA_0(zz,DI,hsel)
    #
    azx=180/np.pi*np.arctan2(vv[1],vv[0])
    elx=180/np.pi*np.arctan2(vv[2],np.sqrt(vv[0]**2+vv[1]**2))
    return azx,elx,ux
def estimate_phaseShift(ss,hsel,DI,nh=None):
    ''' estimate phase-based direction components'''
    # use phase shifts
    if nh==None:
        nh=10
    zz=hilbert_filt(ss,nh)
    
    uu = -np.log(zz[:,hsel[:,0]]*np.conjugate(zz[:,hsel[:,1]])).imag  
    vv = -uu @ DI.T
    return vv,uu
    

def dirextion_phaseShift(ss,hsel,DI,nh=None):
    ''' estimate phase-based direction angles'''
    # use phase shifts
    vv,uu=estimate_phaseShift(ss,hsel,DI,nh)
    #
    azx=180/np.pi*np.arctan2(vv[:,1],vv[:,0])
    elx=180/np.pi*np.arctan2(vv[:,2],np.sqrt(vv[:,0]**2+vv[:,1]**2))
    return azx,elx
def estimate_intensity(xx,DI,hsel,fs,nw,nfft):
    ''' estimated intensity based direction components '''
    f,t,Q=signal.stft(xx,fs=fs,nperseg=nw,noverlap=nw//2,nfft=nfft,axis=0)
    Q=Q.swapaxes(1,2)   #swap from (nfr,nchan,nsamp) to (nfr,nsamp,nchan)
    C = -np.imag(Q[:,:,hsel[:,0]]*np.conjugate(Q[:,:,hsel[:,1]]))
    I = -C@DI.T
    return f,t,I
#

def direction_intensity(xx,DI,hsel,fs,nw,nfft):
    ''' estimated intensity based direction angles '''
    f,t,I=estimate_intensity(xx,DI,hsel,fs,nw,nfft)
    Azx=180/np.pi*np.arctan2(I[:,:,1],I[:,:,0])
    Elx=180/np.pi*np.arctan2(I[:,:,2],np.sqrt(I[:,:,0]**2+I[:,:,1]**2))
    return Azx,Elx,f,t,I
def xcorr_fft(xx,hsel,method=None):
    ''' FFT-based cross-correlation '''
    nx=xx.shape[0]
    uu=np.fft.rfft(xx,axis=0)
    vv=uu[:,hsel[:,0]]*np.conjugate(uu[:,hsel[:,1]])
    #
    if method=='PHAT': vv /= np.abs(vv)
    #
    yy=np.fft.fftshift(np.fft.irfft(vv,axis=0),axes=0)
    amx= yy[1:-1,:,:].max(axis=0)
    jmx= yy[1:-1,:,:].argmax(axis=0)+1
    #
    ux=quadInt(yy,jmx,nx//2)
    return amx,jmx,ux
#
def estimate_TDOA(ss,DI,hsel,nd=256):
    '''estimate time delay-based direction components
    process all data in small chunks'''
    offsets=np.arange(0,ss.shape[0]-nd,nd//2,dtype=int)
    imx=np.zeros((len(offsets),hsel.shape[0]),dtype=int)
    amx=np.zeros((len(offsets),hsel.shape[0]))
    ux =np.zeros((len(offsets),hsel.shape[0],2))
    #
    for ii,jj in zip(offsets,range(len(offsets))):
        xx=ss[ii:ii+nd,:]
        amx[jj,:],imx[jj,:],ux[jj,:,:]=xcorr_fft(xx,hsel)
    #
    # project delays to 3-d coordinates
    V=-ux[:,:,0]@DI.T
    return V,ux,offsets
#
def direction_TDOA(ss,DI,hsel,nd=256):
    ''' estimate time delay-based direction angles using cross-correlation'''
    V,ux,offsets=estimate_TDOA(ss,DI,hsel,nd)
    azx=180/np.pi*np.arctan2(V[:,1],V[:,0])
    elx=180/np.pi*np.arctan2(V[:,2],np.sqrt(V[:,0]**2+V[:,1]**2))
    return azx,elx,offsets    
def estimate_TDOA_fast(ss,DI,hsel,nd=256):
    ''' estimate directional direction components using temporal maxima'''
    # use time differences of temporal maxima
    # process all data in small chunks
    nx,nch=ss.shape
    offsets=np.arange(0,nx-nd,nd//2,dtype=int)
    ux =np.zeros((len(offsets),nch,2))
    #
    for ii,jj in zip(offsets,range(len(offsets))):
        xx=ss[ii:ii+nd,:]
        jmx=np.argmax(xx[1:-1],axis=0)+1
        ux[jj,:,:]=quadInt(xx,jmx,0)
    #
    uu=ux[:,hsel[:,0],0]-ux[:,hsel[:,1],0]
    # project delays to 3-d coordinates
    vv=-uu @ DI.T
    #
    return vv,uu,offsets

def direction_TDOA_fast(ss,DI,hsel,nd=256):
    ''' estimate direction using temporal maxima'''
    V,ux,offsets=estimate_TDOA_fast(ss,DI,hsel,nd)
    azx=180/np.pi*np.arctan2(V[:,1],V[:,0])
    elx=180/np.pi*np.arctan2(V[:,2],np.sqrt(V[:,0]**2+V[:,1]**2))
    return azx,elx,offsets
def quadInt1(C,infft,dn):
    ''' simple quadratic estimate interpolated maxima
     limit location of maxima to infft +- (dn-1)'''
    #
    uu=C[infft//2-dn:infft//2+dn,:,:]
    imx=np.argmax(uu[1:-1,:,:],axis=0)+1
    vv=np.zeros((uu.shape[1],uu.shape[2]))
    for ii in range(uu.shape[1]):
        dd=uu[:,ii,:]
        ux=quadInt(dd,imx[ii,:],dn)
        vv[ii,:]=ux[:,0]
    return vv

def estimate_xcorr_stft(ss,fs,nw,nover,nfft,DI,hsel,nh):
    ''' extimate cross-correlation using stfs-based spectrogram'''
    #
    f,t,Q= signal.stft(ss,axis=0,fs=fs,nperseg=nw,noverlap=nover,nfft=nfft)
    C = Q[:,hsel[:,0],:]*np.conjugate(Q[:,hsel[:,1],:])
    #
    # cross-correlation estimate
    infft=nfft
    C = np.fft.irfft(C,axis=0,n=infft).swapaxes(1,2)
    C = np.fft.fftshift(C,axes=0)

    #find cross-correlation peak
    uu=quadInt1(C,infft,nh)
    vv=-uu @ DI.T
    return vv,uu,t,f

def direction_xcorr_stft(ss,fs,nw,nover,nfft,DI,hsel,nh):
    ''' estimate direction using stfs-based spectrogram'''
    vv,uu,t,f = estimate_xcorr_stft(ss,fs,nw,nover,nfft,DI,hsel,nh)
    azx=180/np.pi*np.arctan2(vv[:,1],vv[:,0])
    elx=180/np.pi*np.arctan2(vv[:,2],np.sqrt(vv[:,0]**2+vv[:,1]**2))
    return azx,elx,t,f
def directionFinding(xx,DI,hsel,fs,nw=None,nfft=None,nh=None, method=None):
    """
        estimates the direction of a sound source of array recordings
        parameters
            xx: multi-dimensional time series
            DI: hydrophone array mapping function 
            fs: sampling frequency
            nw: size of processing window, defautls to 256
            nfft: fft size (if needed) defaults to window size
            nh: number of positive taps (if needed, e.g hilbert transform)
            method: processing method
                'TDOA_0'    : uses local maxima of whole tim series
                'TDOA_fast' : uses time series maxima (hilbert transform)
                'TDOA'      : uses fft based cross-correlation
                'Phase'     : uses local phase differences
                'STFT'      : uses spectrogram
                'XCORR'     : uses spectrogram to cross correlate (no overlap-add)

         returns:
            Azx:   Azimuth
            Elx:   Elevation
            t:     time axis
            f:     frequency axis (if applicable)
        #
    """
    if nw==None:
        nw=256
    if nfft==None:
        nfft=nw
    if nh==None:
        nh=10
    if method==None:
        method='TDOA_fast'
    #
    if method=='TDOA_0':
        azx,elx,ux=direction_TDOA_0(xx,DI,hsel)
        t=np.mean(ux[:,0]/fs)
        f=[]
        return azx,elx,t,f,ux
    if method=='TDOA_fast':
        azx,elx,offsets=direction_TDOA_fast(xx,DI,hsel,nd=nw)
        t=offsets/fs
        f=[]
        return azx,elx,t,f
    if method=='TDOA':
        azx,elx,offsets=direction_TDOA(xx,DI,hsel,nd=nw,method='PHAT')
        t=offsets/fs
        f=[]
        return azx,elx,t,f
    if method=='Phase':
        azx,elx=dirextion_phaseShift(xx,hsel,DI,nh=nh)
        t=np.arange(xx.shape[0])/fs
        f=[]
        return azx,elx,t,f
    if method=='STFT':
        Azx,Elx,f,t,I=direction_intensity(xx,DI,hsel,fs,nw,nfft)
        return Azx,Elx,t,f,I
    if method=='XCORR':
        azx,elx,t,f=direction_xcorr_stft(xx,fs,nw,nw//2,nfft,DI,hsel,nh)
        return azx,elx,t,f
def beamForm(yy,DS):
    '''delay signal according to array geometry 
    (use fractional delay with sinc function)'''
    N=int(5*np.max(np.abs(DS)))
    kk=np.arange(-N,N,1);
    ss=np.zeros((len(yy),len(DS)))
    # delay
    for ii in range(len(DS)):
        ss[:,ii]=fft_filt(np.sinc(kk-DS[ii]),yy[:,ii])[:,0]
    # average
    zz=np.mean(ss,1)
    return zz

Array setup#