bioacoustic tools#
# file: bioacoustic_tools.py
#
# bioacoustic tools
#
import numpy as np
import matplotlib.pyplot as plt
import scipy.signal as signal
#
Time series FIR filter#
Use oberlap and add method to filter a long time series.
Hilbert filter#
Use FIR to generate an ‘analytical’ of complex valued time series by means of Hilbert transform
Matched filter#
Use FIR filter to match filter time series
# Overlap-add FIR filter, (c) Joachim Thiemann 2016
# https://github.com/jthiem/overlapadd/blob/master/olafilt.py
#
# modified by WMXZ to work on multi-channel data
#
def fft_filt(b, x, zi=None, nh=0):
"""
Filter a one-dimensional array with an FIR filter
Filter a data sequence, `x`, using a FIR filter given in `b`.
Filtering uses the overlap-add method converting both `x` and `b`
into frequency domain first. The FFT size is determined as the
next higher power of 2 of twice the length of `b`.
Parameters
----------
b : one-dimensional numpy array
The impulse response of the filter
x : one-dimensional numpy array
Signal to be filtered
zi : one-dimensional numpy array, optional
Initial condition of the filter, but in reality just the
runout of the previous computation. If `zi` is None or not
given, then zero initial state is assumed.
nh : time shift of result (group delay)
Returns
-------
y : array
The output of the digital filter.
zf : array, optional
If `zi` is None, this is not returned, otherwise, `zf` holds the
final filter delay values.
"""
if x.ndim==1: x=x.reshape(-1,1)
if b.ndim==1: b=b.reshape(-1,1)
L_I,N_I = b.shape
L_sig,N_sig = x.shape
N_chan=max(N_sig,N_I)
# Find power of 2 larger that 2*L_I (from abarnert on Stackoverflow)
L_F = 2<<(L_I-1).bit_length()
L_S = L_F - L_I + 1
if L_sig<L_F:
offsets = range(1)
else:
offsets = range(0, L_sig, L_S)
# handle complex or real input
if np.iscomplexobj(b) or np.iscomplexobj(x):
fft_func = np.fft.fft
ifft_func = np.fft.ifft
res = np.zeros((L_sig+L_F,N_chan), dtype=np.complex128)
else:
fft_func = np.fft.rfft
ifft_func = np.fft.irfft
res = np.zeros((L_sig+L_F,N_chan))
FDir = fft_func(b, n=L_F,axis=0)
# overlap and add
for n in offsets:
u1=fft_func(x[n:n+L_S,:], n=L_F,axis=0)
u2=u1*FDir
res[n:n+L_F,:] += ifft_func(u2,axis=0)
if zi is not None:
res[:zi.shape[0],:] = res[:zi.shape[0],:] + zi
zi=res[L_sig:,:]
return res[nh:nh+L_sig,:],zi
else:
return res[nh:nh+L_sig,:]
def hilbert_filt(ss,nh):
'''
filter timeseries using hilbert response (complex valued )
ss is data to be filtered
nh is number if (positive) coefficients of hilbert response
'''
hk=np.pi*np.arange(-nh,nh+1)
hk[nh]=1 # to avoid division by 0; will be corrected below
#
hh= 0*hk+1j*(1-np.cos(hk))/hk
hh[nh]=1+1j*0 # correct previous adjustment
#
zz=fft_filt(hh,ss,nh=nh)
return zz
def matched_filt(xx,ss):
'''
matched filter
xx is signal for replica
ss is data to be filtered
'''
nd=np.shape(xx)[0]
rep=np.flipud(xx)/np.sqrt((nd/2*np.sum(xx**2)))
#
zz=fft_filt(rep,ss,nh=len(rep)//2)
return zz
#
Some utilities#
quadInt
stack
cosd,sind
#------------------------------------------------------------------
def quadInt(uu,imx,nd1):
'''
three point quardatic interpolation
to find location and value of interpolated maxima
uu : time series
imx : index of maxima
nd1 : reference sample (to be subtracted from nd1)
'''
nc=len(imx)
uo=uu[imx,range(nc)]
um=uu[imx-1,range(nc)]
up=uu[imx+1,range(nc)]
b=(up+um-2*uo)/2;
xo=(um-up)/(4*b);
yo=uo-b*xo**2;
ux=np.zeros((nc,2))
ux[:,0]=xo+imx-nd1
ux[:,1]=yo
return ux
def stack(x): '''stack time series'''; return x+np.ones((x.shape[0],1))*range(x.shape[1])
def cosd(x): '''cosine of degrees'''; return np.cos(x*np.pi/180)
def sind(x): '''Sine of degrees'''; return np.sin(x*np.pi/180)
Array setup#
array_init
array_setup
array_delay
array_timeseries
signal_direction
signal_directionFromGeometry
simulate_multiPath
def array_init(ro,type):
''' contruct volimetric array of hydrophones
ro : radius of bounding sphere
type : configuration ("tetahedron","octahedron","cube")'''
#
dz=np.sqrt(0.5)
if type=='tetrahedron':
ho=np.array([[1,0,-dz],[-1,0,-dz],[0,-1,dz],[0,1,dz]])
elif type=='octahedron':
ang=np.array([0,120,240,60,180,300])*np.pi/180
ho=np.transpose([np.cos(ang),np.sin(ang),0*ang])
ho[:3,2]=-dz
ho[3:,2]=+dz
elif type=='cube':
ho=np.array([[1,0,-dz],[0,1,-dz],[-1,0,-dz],[0,-1,-dz],
[1,0, dz],[0,1, dz],[-1,0, dz],[0,-1, dz]])
ho *=ro
return ho
#
def array_setup(ho,iar=-1):
''' generate vector of hydrophone pairs
ho : hydrophone vectors
iar : selected sub array (-1: all pairs)'''
nh=ho.shape[0]
hsel=np.zeros((nh*(nh-1)//2,2),dtype=int)
#
kk=0
for ii in range(nh):
for jj in range(ii+1,nh):
hsel[kk,:]=[jj,ii]
kk +=1
#
D=ho[hsel[:,0],:]-ho[hsel[:,1],:]
L=np.sqrt(np.sum(D**2,1))
Lc=(L*1e+6).astype(int)
Lunique=np.unique(Lc)
if (iar>=0) & (iar<len(Lunique)):
nl=np.where(Lunique[iar]==Lc)[0]
D=D[nl,:]
L=L[nl]
hsel=hsel[nl,:]
#
DI=np.linalg.pinv(D)
return DI,D,L,hsel
def array_delay(S,ho,sv=1500,fs=None):
'''
estimate delays using direction vector
S direction vector
ho hydrophone locations
fs sampling frequency
'''
DC=np.sum(ho*S,1) # ho are locations relative to array center
DT=DC/sv # ms from array center
if fs==None:
return DT # ms
else:
return DT*fs # samples
def array_timeseries(yy,DS,noise):
'''
delay signal according to array geometry
(use fractional delay with sinc function)
yy : time series
DS : delay vector [samples]
noise : noise variance
'''
N=int(5*np.max(np.abs(DS)))
kk=np.arange(-N,N).reshape(-1,1)
if 1:
win=np.sinc(kk+DS)
ss=fft_filt(win,yy)
else:
ss=np.zeros((len(yy),len(DS)))
for ii in range(len(DS)):
win=np.sinc(kk+DS[ii])
ss[:,ii]=fft_filt(win,yy)[:,0]
nn=np.random.normal(scale=noise, size=ss.shape)
ss +=nn
return ss
def signal_direction(az,el):
'''
estimate direction using arrival angles
az : azimuth angle
el : elevation angle
'''
C=np.array([np.cos(az)*np.cos(el),np.sin(az)*np.cos(el), np.sin(el)])
return C
def signal_directionFromGeometry(az,dx,dy):
'''
estimate direction using distance, depth geometry
dx : horizontal distance
dy : vertical separation
az : azimuth angle
'''
el=np.arctan2(dy,dx)
C=signal_direction(az,el)
return C,el
def signal_path(dx,dy,az,ho,sv):
'''
estimate delays using distance, depth geometry
dx : horizontal distance
dy : vertical separation
az : azimuth angle
ho : hydrophone locations
'''
rr=np.sqrt(dx**2 + dy**2)
C,el=signal_directionFromGeometry(az,dx,dy)
#
DT=array_delay(C,ho,sv)
return DT,el,rr
def D_path(hd,sd): ''' direct path'''; return hd-sd
def S_path(hd,sd): ''' surface reflected path'''; return hd+sd
def B_path(hd,sd,bd): ''' bottom reflected path'''; return hd-(2*bd-sd)
def simulate_multiPath(hd,sd,dx,bd,az,ho,sv,type):
'''
Simulate multi-path
hd : hydrophone depth
sd : source depth
dx : horizontal distance (hydrophone-source)
az : azimuth angle
ho : hydrophone locations
type: type of arrival {'D','S','B'}
'''
if type=='D':
return signal_path(dx,D_path(hd,sd),az,ho,sv)
elif type=='S':
return signal_path(dx,S_path(hd,sd),az,ho,sv)
elif type=='B':
return signal_path(dx,B_path(hd,sd,bd),az,ho,sv)
Generate tonal (Frequency Modulated) signals#
fs sampling frequency [Hz]
ax amplitude
fx FM modulation function
Linear FM#
f1 start frequency [Hz]
f2 stop frequency [Hz]
t2 duration [s]
fs sampling frequency [Hz]
Sinusoidal FM#
fc center frequeny [Hz]
fm frequeny modulation [Hz]
fa periods of sinusoid [Hz]
t2 duration [s]
fs sampling frequency [Hz]
def lfm(f1,f2,t2,fs):
'''LFM from f1 Hz to f2 Hz over t2 s, with fs sampling frequency'''
return f1+(f2-f1)/t2*np.arange(0,t2,1/fs)
def sinfm(fc,fm,fa,t2,fs):
'''Sinusoid with fc Hz center, fm Hz modulation frequency'''
return fc+fm*np.sin(2*np.pi*fa*np.arange(0, t2, 1/fs))
def tonal(fs,ax,fx):
'''
generate tonal from frequency
e.g. generate 5 s LFM from 1 to 10 kHz and 96 kHz sampling
xx=tonal(96000,1,lfm(1000,10000,5,96000))
e.g. generate 2 s sinusoidal fm: 3 oscillations/s from 1 to 3 kHz and 96 kHz sampling
xx=tonal(96000,1,sinfm(2000,1000,3,2,96000))
'''
phase=2*np.pi*np.cumsum(fx)/fs
return ax*np.sin(phase)
Noise generation#
def pinkNoise(f,fo):
''' ping noise (1/f)'''
return 1/ np.where(f==0,np.inf,np.sqrt(f))
def brownNoise(f,fo):
''' brown noise (1/f**2)'''
return 1/ np.where(f==0,np.inf,f)
def shipNoise(f,fo):
''' ship noise'''
return 1/ np.maximum(fo,f)
def genNoise(N,fs,fo=0,method=None):
'''
generate colored noise
N : number of samples
fs : sampling frequency
fo : lower cut-off frequency
method : noise color method
pinkNoise(f)
brownNoise(f)
shipNoise(f,fo)
'''
N2 = N//2+1
f = np.arange(0,N2)*fs/N
A2 = method(f,fo)
A2 /= np.sqrt(np.sum(A2**2))
n2 = np.random.random(N2)
p2 = 2*np.pi*n2
d2 = A2*np.exp(1j*p2)
x = np.fft.irfft(d2)
return x
def estimate_TDOA_0(zz,DI,hsel):
'''use time differences of temporal maxima'''
imx=np.argmax(zz,axis=0)
ux=quadInt(zz,imx,0)
#
uu = ux[hsel[:,0],0]-ux[hsel[:,1],0]
#
vv = -uu @ DI.T
return vv,ux
def direction_TDOA_0(zz,DI,hsel):
'''use time differences of temporal maxima'''
vv,ux=estimate_TDOA_0(zz,DI,hsel)
#
azx=180/np.pi*np.arctan2(vv[1],vv[0])
elx=180/np.pi*np.arctan2(vv[2],np.sqrt(vv[0]**2+vv[1]**2))
return azx,elx,ux
def estimate_phaseShift(ss,hsel,DI,nh=None):
''' estimate phase-based direction components'''
# use phase shifts
if nh==None:
nh=10
zz=hilbert_filt(ss,nh)
uu = -np.log(zz[:,hsel[:,0]]*np.conjugate(zz[:,hsel[:,1]])).imag
vv = -uu @ DI.T
return vv,uu
def dirextion_phaseShift(ss,hsel,DI,nh=None):
''' estimate phase-based direction angles'''
# use phase shifts
vv,uu=estimate_phaseShift(ss,hsel,DI,nh)
#
azx=180/np.pi*np.arctan2(vv[:,1],vv[:,0])
elx=180/np.pi*np.arctan2(vv[:,2],np.sqrt(vv[:,0]**2+vv[:,1]**2))
return azx,elx
def estimate_intensity(xx,DI,hsel,fs,nw,nfft):
''' estimated intensity based direction components '''
f,t,Q=signal.stft(xx,fs=fs,nperseg=nw,noverlap=nw//2,nfft=nfft,axis=0)
Q=Q.swapaxes(1,2) #swap from (nfr,nchan,nsamp) to (nfr,nsamp,nchan)
C = -np.imag(Q[:,:,hsel[:,0]]*np.conjugate(Q[:,:,hsel[:,1]]))
I = -C@DI.T
return f,t,I
#
def direction_intensity(xx,DI,hsel,fs,nw,nfft):
''' estimated intensity based direction angles '''
f,t,I=estimate_intensity(xx,DI,hsel,fs,nw,nfft)
Azx=180/np.pi*np.arctan2(I[:,:,1],I[:,:,0])
Elx=180/np.pi*np.arctan2(I[:,:,2],np.sqrt(I[:,:,0]**2+I[:,:,1]**2))
return Azx,Elx,f,t,I
def xcorr_fft(xx,hsel,method=None):
''' FFT-based cross-correlation '''
nx=xx.shape[0]
uu=np.fft.rfft(xx,axis=0)
vv=uu[:,hsel[:,0]]*np.conjugate(uu[:,hsel[:,1]])
#
if method=='PHAT': vv /= np.abs(vv)
#
yy=np.fft.fftshift(np.fft.irfft(vv,axis=0),axes=0)
amx= yy[1:-1,:,:].max(axis=0)
jmx= yy[1:-1,:,:].argmax(axis=0)+1
#
ux=quadInt(yy,jmx,nx//2)
return amx,jmx,ux
#
def estimate_TDOA(ss,DI,hsel,nd=256):
'''estimate time delay-based direction components
process all data in small chunks'''
offsets=np.arange(0,ss.shape[0]-nd,nd//2,dtype=int)
imx=np.zeros((len(offsets),hsel.shape[0]),dtype=int)
amx=np.zeros((len(offsets),hsel.shape[0]))
ux =np.zeros((len(offsets),hsel.shape[0],2))
#
for ii,jj in zip(offsets,range(len(offsets))):
xx=ss[ii:ii+nd,:]
amx[jj,:],imx[jj,:],ux[jj,:,:]=xcorr_fft(xx,hsel)
#
# project delays to 3-d coordinates
V=-ux[:,:,0]@DI.T
return V,ux,offsets
#
def direction_TDOA(ss,DI,hsel,nd=256):
''' estimate time delay-based direction angles using cross-correlation'''
V,ux,offsets=estimate_TDOA(ss,DI,hsel,nd)
azx=180/np.pi*np.arctan2(V[:,1],V[:,0])
elx=180/np.pi*np.arctan2(V[:,2],np.sqrt(V[:,0]**2+V[:,1]**2))
return azx,elx,offsets
def estimate_TDOA_fast(ss,DI,hsel,nd=256):
''' estimate directional direction components using temporal maxima'''
# use time differences of temporal maxima
# process all data in small chunks
nx,nch=ss.shape
offsets=np.arange(0,nx-nd,nd//2,dtype=int)
ux =np.zeros((len(offsets),nch,2))
#
for ii,jj in zip(offsets,range(len(offsets))):
xx=ss[ii:ii+nd,:]
jmx=np.argmax(xx[1:-1],axis=0)+1
ux[jj,:,:]=quadInt(xx,jmx,0)
#
uu=ux[:,hsel[:,0],0]-ux[:,hsel[:,1],0]
# project delays to 3-d coordinates
vv=-uu @ DI.T
#
return vv,uu,offsets
def direction_TDOA_fast(ss,DI,hsel,nd=256):
''' estimate direction using temporal maxima'''
V,ux,offsets=estimate_TDOA_fast(ss,DI,hsel,nd)
azx=180/np.pi*np.arctan2(V[:,1],V[:,0])
elx=180/np.pi*np.arctan2(V[:,2],np.sqrt(V[:,0]**2+V[:,1]**2))
return azx,elx,offsets
def quadInt1(C,infft,dn):
''' simple quadratic estimate interpolated maxima
limit location of maxima to infft +- (dn-1)'''
#
uu=C[infft//2-dn:infft//2+dn,:,:]
imx=np.argmax(uu[1:-1,:,:],axis=0)+1
vv=np.zeros((uu.shape[1],uu.shape[2]))
for ii in range(uu.shape[1]):
dd=uu[:,ii,:]
ux=quadInt(dd,imx[ii,:],dn)
vv[ii,:]=ux[:,0]
return vv
def estimate_xcorr_stft(ss,fs,nw,nover,nfft,DI,hsel,nh):
''' extimate cross-correlation using stfs-based spectrogram'''
#
f,t,Q= signal.stft(ss,axis=0,fs=fs,nperseg=nw,noverlap=nover,nfft=nfft)
C = Q[:,hsel[:,0],:]*np.conjugate(Q[:,hsel[:,1],:])
#
# cross-correlation estimate
infft=nfft
C = np.fft.irfft(C,axis=0,n=infft).swapaxes(1,2)
C = np.fft.fftshift(C,axes=0)
#find cross-correlation peak
uu=quadInt1(C,infft,nh)
vv=-uu @ DI.T
return vv,uu,t,f
def direction_xcorr_stft(ss,fs,nw,nover,nfft,DI,hsel,nh):
''' estimate direction using stfs-based spectrogram'''
vv,uu,t,f = estimate_xcorr_stft(ss,fs,nw,nover,nfft,DI,hsel,nh)
azx=180/np.pi*np.arctan2(vv[:,1],vv[:,0])
elx=180/np.pi*np.arctan2(vv[:,2],np.sqrt(vv[:,0]**2+vv[:,1]**2))
return azx,elx,t,f
def directionFinding(xx,DI,hsel,fs,nw=None,nfft=None,nh=None, method=None):
"""
estimates the direction of a sound source of array recordings
parameters
xx: multi-dimensional time series
DI: hydrophone array mapping function
fs: sampling frequency
nw: size of processing window, defautls to 256
nfft: fft size (if needed) defaults to window size
nh: number of positive taps (if needed, e.g hilbert transform)
method: processing method
'TDOA_0' : uses local maxima of whole tim series
'TDOA_fast' : uses time series maxima (hilbert transform)
'TDOA' : uses fft based cross-correlation
'Phase' : uses local phase differences
'STFT' : uses spectrogram
'XCORR' : uses spectrogram to cross correlate (no overlap-add)
returns:
Azx: Azimuth
Elx: Elevation
t: time axis
f: frequency axis (if applicable)
#
"""
if nw==None:
nw=256
if nfft==None:
nfft=nw
if nh==None:
nh=10
if method==None:
method='TDOA_fast'
#
if method=='TDOA_0':
azx,elx,ux=direction_TDOA_0(xx,DI,hsel)
t=np.mean(ux[:,0]/fs)
f=[]
return azx,elx,t,f,ux
if method=='TDOA_fast':
azx,elx,offsets=direction_TDOA_fast(xx,DI,hsel,nd=nw)
t=offsets/fs
f=[]
return azx,elx,t,f
if method=='TDOA':
azx,elx,offsets=direction_TDOA(xx,DI,hsel,nd=nw,method='PHAT')
t=offsets/fs
f=[]
return azx,elx,t,f
if method=='Phase':
azx,elx=dirextion_phaseShift(xx,hsel,DI,nh=nh)
t=np.arange(xx.shape[0])/fs
f=[]
return azx,elx,t,f
if method=='STFT':
Azx,Elx,f,t,I=direction_intensity(xx,DI,hsel,fs,nw,nfft)
return Azx,Elx,t,f,I
if method=='XCORR':
azx,elx,t,f=direction_xcorr_stft(xx,fs,nw,nw//2,nfft,DI,hsel,nh)
return azx,elx,t,f
def beamForm(yy,DS):
'''delay signal according to array geometry
(use fractional delay with sinc function)'''
N=int(5*np.max(np.abs(DS)))
kk=np.arange(-N,N,1);
ss=np.zeros((len(yy),len(DS)))
# delay
for ii in range(len(DS)):
ss[:,ii]=fft_filt(np.sinc(kk-DS[ii]),yy[:,ii])[:,0]
# average
zz=np.mean(ss,1)
return zz