Source code for ism3d.xyhelper.maskmoment.momfuncs

import numpy as np
import radio_beam
from scipy import ndimage
from astropy.stats import mad_std
from astropy.io import fits
from astropy import units as u
from astropy.convolution import Gaussian1DKernel
from astropy import wcs
from astropy.convolution import convolve, convolve_fft
from astropy.table import Table
from spectral_cube import SpectralCube

import logging
logger = logging.getLogger(__name__)

[docs]def makenoise(cubearray, gainarray=None, rms=None, perpixel=False, edge=None, clip=0.2): """ Given a data cube and an optional gain cube, estimate the noise at each position, using the robust mad_std estimator in astropy. Parameters ---------- cubearray : SpectralCube or `~numpy.ndarray` The input data cube. No default. gainarray : SpectralCube or `~numpy.ndarray`, optional Gain of a point source at each position of the cube (typically 0 to 1). Can be 2-D or 3-D image, e.g. from pb.fits file from CASA. Default is to assume the rms is constant with position. rms : float, optional Global estimate of the rms noise, to be used instead of trying to estimate it from the data. It should have the units of the input cube. Default is to calculate the rms from the data. perpixel : boolean, optional Whether to calculate the rms per pixel instead of over whole image. Set to True if you know there is a sensitivity variation across the image but you don't have a gain cube. Only used if rms is unset. Default: False edge : int, optional Number of channels at left and right edges of each spectrum to use for rms estimation. Default is to use all channels. clip : float, optional Pixels below this value in the input gainarray are marked invalid. Default: 0.2 Returns ------- noisecube *or* noisearray : SpectralCube or `~numpy.ndarray` The noise estimate as a 3-D array with the same shape and units as the input cube. If the input cube was a SpectralCube, a SpectralCube is returned. """ if isinstance(cubearray, SpectralCube): hdr = cubearray.header spcube = True unit = cubearray.unit cubearray = cubearray.filled_data[:] else: spcube = False if perpixel == False: # Calculate one rms for whole cube doax = None else: # rms varies with position doax = 0 if gainarray is None: if rms is None: if edge is None: rms = mad_std(cubearray, axis=doax, ignore_nan=True) else: # take only edge channels for estimating rms slc = np.r_[0:edge, cubearray.shape[0]-edge:cubearray.shape[0]] rms = mad_std(cubearray[slc, :, :], axis=doax, ignore_nan=True) noisearray = np.zeros_like(cubearray) + rms else: if isinstance(gainarray, SpectralCube): #gainarray = gainarray.unmasked_data[:] gainarray = gainarray.unitless_filled_data[:] if clip is not None: gainarray[gainarray < clip] = np.nan if rms is None: imflat = cubearray * np.broadcast_to(gainarray, cubearray.shape) if edge is None: rms = mad_std(imflat, axis=doax, ignore_nan=True) else: # take only edge channels for estimating rms slc = np.r_[0:edge, cubearray.shape[0]-edge:cubearray.shape[0]] rms = mad_std(imflat[slc, :, :], axis=doax, ignore_nan=True) noisearray = np.broadcast_to(rms/gainarray, cubearray.shape).copy() # Added 21may2020 to match masks of noise and image cubes noisearray[np.isnan(cubearray)] = np.nan if perpixel == False: logger.debug('Found rms value of {:.4f}'.format(rms)) if spcube: if unit != '' and hasattr(noisearray, 'unit') == False: noisecube = SpectralCube( data=noisearray*unit, header=hdr, wcs=wcs.WCS(hdr)) else: noisecube = SpectralCube( data=noisearray, header=hdr, wcs=wcs.WCS(hdr)) return noisecube else: return noisearray
# def snr_calc(image_cube, noise_cube): # """ # Given a data cube and a noise cube, compute their ratio and its maximum # along the spectral dimension. # # Parameters # ---------- # image_cube : SpectralCube # The input data cube. # noise_cube : SpectralCube or `~numpy.ndarray` # Noise estimate at each position of the cube. Can be 2-D or 3-D image. # # Returns # ------- # snr_cube : SpectralCube # The input data cube normalized by the noise. # snr_peak : `~numpy.ndarray` # The peak signal-to-noise ratio as a 2-D array. # """ # header = image_cube.header # if isinstance(noise_cube, SpectralCube): # noise_cube = noise_cube.unitless_filled_data[:] # #snrarray = image_cube.unitless_filled_data[:] / noise_cube # snrarray = image_cube.filled_data[:] / noise_cube # snr_cube = SpectralCube(data=snrarray, header=header, wcs=wcs.WCS(header)) # snr_peak = np.nanmax(snrarray, axis=0) # return snr_cube, snr_peak
[docs]def prunemask(maskarray, minarea=0, minch=2, byregion=True): """ Apply area and velocity width criteria on mask regions. Parameters ---------- maskarray : `~numpy.ndarray` The 3-D mask array with 1s for valid pixels and 0s otherwise. minarea : int, optional Minimum velocity-integrated area in pixel units. Default: 0 minch : int, optional Minimum velocity width in channel units. Default: 2 byregion : boolean, optional Whether to enforce min velocity width on whole region, rather than by pixel Default: True Returns ------- maskarray : `~numpy.ndarray` A copy of the input maskarray with regions failing the tests removed. """ labeled_array_ch, num_features_ch = ndimage.label(maskarray) loc_objects = ndimage.find_objects(labeled_array_ch) for i in range(1, num_features_ch+1): loc = loc_objects[i-1] collapsereg = np.sum(maskarray[loc], axis=0) if np.count_nonzero(collapsereg) < minarea: maskarray[labeled_array_ch == i] = 0 if byregion: if np.count_nonzero(collapsereg >= minch) == 0: maskarray[labeled_array_ch == i] = 0 else: flag = collapsereg < minch maskarray[loc][:, flag] = 0 return maskarray
[docs]def maskguard(maskarray, niter=1, xyonly=False, vonly=False): """ Pad a mask by specified number of pixels in all three dimensions. Parameters ---------- maskarray : `~numpy.ndarray` The 3-D mask array with 1s for valid pixels and 0s otherwise. niter : int, optional Number of iterations for expanding mask by binary dilation. Default: 1 xyonly : boolean, optional Whether to expand only in the two sky coordinates Default: False vonly : boolean, optional Whether to expand only in the spectral coordinate Default: False (ignored if xyonly==True) Returns ------- maskarray : `~numpy.ndarray` A copy of the input maskarray after padding. """ s = ndimage.generate_binary_structure(3, 1) if xyonly: s[0, :] = False s[2, :] = False elif vonly: s[1] = s[0] maskarray = ndimage.binary_dilation( maskarray, structure=s, iterations=niter) return maskarray
[docs]def dilmsk(snrcube, header=None, snr_hi=4, snr_lo=2, minbeam=1, snr_hi_minch=1, snr_lo_minch=1, min_tot_ch=2, nguard=[0, 0], debug=False): """ Dilate a mask from one specified threshold to another. Parameters ---------- snrcube : SpectralCube or `~numpy.ndarray` The image cube normalized by the estimated RMS noise. If a numpy array is provided the header must also be provided. header : `astropy.io.fits.Header` The cube FITS header, required if a numpy array is given. snr_hi : float, optional The high significance threshold from which to begin the mask dilation. Default: 4 snr_lo : float, optional The low significance threshold at which to end the mask dilation. Default: 2 minbeam : float, optional Minimum velocity-integrated area of a mask region in units of the beam size. Default: 1 snr_hi_minch : int, optional High significance mask is required to span at least this many channels at all pixels. Default: 1 snr_lo_minch : int, optional Low significance mask is required to span at least this many channels at all pixels. Default: 1 min_tot_ch : int, optional Dilated mask regions are required to span at least this many channels. Default: 2 nguard : tuple of two ints, optional Expand the final mask by this nguard[0] pixels in sky directions and nguard[1] channels in velocity. Currently these values must be equal if both are non-zero. If nguard[0] = 0 then no expansion is done in sky coordinates. If nguard[1] = 0 then no expansion is done in velocity. Default: [0,0] debug : boolean, optional Output a bunch of FITS files to diagnose what's going on. Default: False Returns ------- dil_mask : `~numpy.ndarray` A binary mask array (0s and 1s) which can be applied to a cube """ if isinstance(snrcube, SpectralCube): hdr = snrcube.header elif header is None: raise NameError('A header must be provided to dilmsk procedure') else: snrcube = SpectralCube( data=snrcube, header=header, wcs=wcs.WCS(header)) hdr = header bmarea = (2*np.pi/(8*np.log(2)) * snrcube.beam.major.value * snrcube.beam.minor.value / (hdr['cdelt2'])**2) minarea = minbeam * bmarea logger.debug('Minimum area is {:.1f} pixels'.format(minarea)) # High significance mask thresh_mask = (snrcube._data > snr_hi) # Low significance mask edge_mask = (snrcube._data > snr_lo) if debug: hdr['datamin'] = 0 hdr['datamax'] = 1 fits.writeto('snr_hi_mask.fits.gz', thresh_mask.astype( np.uint8), hdr, overwrite=True) fits.writeto('snr_lo_mask.fits.gz', edge_mask.astype( np.uint8), hdr, overwrite=True) # Require snr_hi_minch channels at all pixels in high significance mask if snr_hi_minch > 1: thresh_mask = prunemask( thresh_mask, minch=snr_hi_minch, byregion=False) if debug: fits.writeto('snr_hi_mask_minch.fits.gz', thresh_mask.astype(np.uint8), hdr, overwrite=True) # Require snr_lo_minch channels at all pixels in low significance mask if snr_lo_minch > 1: edge_mask = prunemask(edge_mask, minch=snr_lo_minch, byregion=False) if debug: fits.writeto('snr_lo_mask_minch.fits.gz', edge_mask.astype(np.uint8), hdr, overwrite=True) # Find islands in low significance mask if snr_lo < snr_hi: s = ndimage.generate_binary_structure(3, 1) labeled_edge, num_edge = ndimage.label(edge_mask, structure=s) if debug: hdr['datamax'] = num_edge fits.writeto('labeled_edge.fits.gz', labeled_edge.astype( np.uint8), hdr, overwrite=True) logger.debug('Found {} objects with SNR above {}'.format(num_edge, snr_lo)) # Keep only islands which reach high significance threshold hieval = labeled_edge[thresh_mask] dil_mask = np.isin(labeled_edge, hieval) & edge_mask if debug: hdr['datamax'] = 1 fits.writeto('snr_mrg_mask.fits.gz', dil_mask.astype( np.uint8), hdr, overwrite=True) else: dil_mask = thresh_mask # Final pruning to enforce area and total vel width constraints if min_tot_ch > 1 or minarea > 0: dil_mask = prunemask(dil_mask, minarea=minarea, minch=min_tot_ch, byregion=True) if debug: fits.writeto('pruned_mask.fits.gz', dil_mask.astype( np.uint8), hdr, overwrite=True) # Expand by nguard pixels (padding) if sum(nguard) > 0: if nguard[0] == 0: dil_mask = maskguard(dil_mask, niter=nguard[1], vonly=True) elif nguard[1] == 0: dil_mask = maskguard(dil_mask, niter=nguard[0], xyonly=True) else: if nguard[1] != nguard[0]: logger.debug('Warning: setting nguard = [{},{}]'.format( nguard[0], nguard[0])) dil_mask = maskguard(dil_mask, niter=nguard[0]) if debug: fits.writeto('guardband_mask.fits.gz', dil_mask.astype( np.uint8), hdr, overwrite=True) return dil_mask
[docs]def smcube(snrcube, header=None, fwhm=None, vsm=None, vsm_type='gauss', edgech=None): """ Smooth an SNRcube to produce a higher signal-to-noise SNRcube. Parameters ---------- snrcube : SpectralCube or `~numpy.ndarray` The image cube normalized by the estimated RMS noise. If a numpy array is provided the header must also be provided. header : `astropy.io.fits.Header` The cube FITS header, required if a numpy array is given. fwhm : float or :class:`~astropy.units.Quantity`, optional Final spatial resolution to smooth to. If not astropy quantity, assumed to be given in arcsec. Default: 10 arcsec vsm : float or :class:`~astropy.units.Quantity`, optional Full width of the spectral smoothing kernel (or FWHM if gaussian). If given as astropy quantity, should be given in velocity units. If not given as astropy quantity, interpreted as number of channels. Default: No spectral smoothing is applied. vsm_type : string, optional What type of spectral smoothing to employ. Currently three options: (1) 'boxcar' - 1D boxcar smoothing, vsm rounded to integer # of chans. (2) 'gauss' - 1D gaussian smoothing, vsm is the convolving gaussian FWHM. (3) 'gaussfinal' - 1D gaussian smoothing, vsm is the gaussian FWHM after convolution, assuming FWHM before convolution is 1 channel. Default: 'gauss' edgech : int, optional Number of channels at left and right edges of each spectrum to use for rms estimation. Default is to use all channels. Returns ------- sm_snrcube : SpectralCube A cube is SNR units after smoothing to the desired resolution. """ if isinstance(snrcube, SpectralCube): hdr = snrcube.header elif header is None: raise NameError('A header must be provided to smcube procedure') else: snrcube = SpectralCube( data=snrcube, header=header, wcs=wcs.WCS(header)) hdr = header logger.debug(snrcube) # -- Spatial smoothing if fwhm is not None: # Requested final resolution if not hasattr(fwhm, 'unit'): fwhm = fwhm * u.arcsec sm_beam = radio_beam.Beam(major=fwhm, minor=fwhm, pa=0*u.deg) logger.debug('Convolving to {}'.format(sm_beam)) # From convolve_to method in spectral_cube pixscale = wcs.utils.proj_plane_pixel_area( snrcube.wcs.celestial)**0.5*u.deg if hasattr(snrcube, 'beam'): logger.debug('Existing {}'.format(snrcube.beam)) convolution_kernel = sm_beam.deconvolve( snrcube.beam).as_kernel(pixscale) else: logger.debug('Warning: no existing beam found in input to smcube') convolution_kernel = sm_beam.as_kernel(pixscale) sm_snrcube = snrcube.spatial_smooth(convolution_kernel, convolve_fft, fill_value=0.0, nan_treatment='fill', preserve_nan=True, parallel=False) else: sm_snrcube = snrcube # -- Spectral smoothing if vsm is not None: fwhm_factor = np.sqrt(8*np.log(2)) if hasattr(vsm, 'unit'): delta_v = abs(hdr['CDELT3']) * u.m/u.s vsm_ch = (vsm/delta_v).decompose().value else: vsm_ch = vsm if vsm_type == 'gauss': gaussian_width = vsm_ch / fwhm_factor kernel = Gaussian1DKernel(gaussian_width) logger.debug('Gaussian smoothing with stddev:', gaussian_width, 'channels') elif vsm_type == 'boxcar': box_width = round(vsm_ch) kernel = Box1DKernel(box_width) logger.debug('Boxcar smoothing with width:', box_width, 'channels') elif vsm_type == 'gaussfinal': if vsm_ch > 1: gaussian_width = (vsm_ch**2 - 1)**0.5 / fwhm_factor kernel = Gaussian1DKernel(gaussian_width) logger.debug('Gaussian smoothing with stddev:', gaussian_width, 'channels') else: logger.debug('ERROR: requested resolution of', vsm_ch, 'chans is less than 1') sm2_snrcube = sm_snrcube.spectral_smooth(kernel) sm_snrcube = sm2_snrcube # -- Renormalize by rms newrms = makenoise(sm_snrcube, edge=edgech) sm_snrcube = sm_snrcube / newrms return sm_snrcube
[docs]def writemom(imarray, filename='outfile', type='mom1', hdr=None): """ Write out a moment map as a FITS file. Parameters ---------- imarray : SpectralCube or `~numpy.ndarray` The 2D array with the moment map values filename : string The root of the file name type : string A label for the filename extension hdr : `astropy.io.fits.Header` The FITS header to write out """ if np.any(~np.isnan(imarray)): hdr['datamin'] = np.nanmin(imarray.value) hdr['datamax'] = np.nanmax(imarray.value) hdr['bunit'] = imarray.unit.to_string('fits') fits.writeto(filename+'.'+type+'.fits.gz', imarray.astype(np.float32), hdr, overwrite=True) logger.debug('Wrote {}'.format(filename+'.'+type+'.fits.gz')) else: logger.debug('Skipping {} because image is all NaN'.format(type)) return
[docs]def findflux(imcube, rmscube, mask=None, projmask=None): """ Calculate integrated spectrum and total integrated flux. Parameters ---------- imcube : SpectralCube The image cube over which to measure the flux. rmscube : SpectralCube A cube representing the noise estimate at each location in the image cube. Should have the same units as the image cube. mask : `~numpy.ndarray` A binary mask array (0s and 1s) to be applied before measuring the flux and uncertainty. This should NOT be a SpectralCube. projmask : `~numpy.ndarray` A second mask array within which to measure the flux and uncertainty. This is normally the 2-D projected mask. Returns ------- fluxtab : :class:`~astropy.table.Table` A 3-column table providing the velocity, flux, and uncertainty. """ hd = imcube.header CDELT1, CDELT2 = hd['CDELT1']*u.deg, hd['CDELT2']*u.deg # The beam area in pixels, used to convert to Jy and for correlated noise beamarea = ((imcube.beam.sr).to(u.deg**2)/abs(CDELT1*CDELT2)).value vels = imcube.spectral_axis.to(u.km/u.s) delv = abs(vels[1]-vels[0]) if mask is not None: immask = imcube.with_mask(mask > 0) errmask = rmscube.with_mask(mask > 0) allcubes = [immask, errmask] if projmask is not None: immask2 = imcube.with_mask(projmask > 0) errmask2 = rmscube.with_mask(projmask > 0) allcubes.extend([immask2, errmask2]) else: immask = imcube errmask = rmscube allcubes = [immask, errmask] e_spec = [] e_tot = [] f_spec = [] f_tot = [] for i, spcube in enumerate(allcubes): if i % 2 == 1: var_spec = np.nansum( spcube.unitless_filled_data[:]**2, axis=(1, 2)) var_tot = np.nansum(spcube.unitless_filled_data[:]**2) e_spec.append(np.sqrt(var_spec*beamarea)) e_tot.append(np.sqrt(var_tot*beamarea) * delv.value) else: f_spec.append( np.nansum(spcube.unitless_filled_data[:], axis=(1, 2))) f_tot.append( np.nansum(spcube.unitless_filled_data[:]) * delv.value) if imcube.unit == 'Jy / beam': scl = beamarea out_unit = u.Jy else: scl = 1 out_unit = imcube.unit * u.pix if len(f_spec) == 1: fluxtab = Table([vels, f_spec[0]/scl, e_spec[0]/scl], names=('Velocity', 'Flux', 'FluxErr')) else: fluxtab = Table([vels, f_spec[0]/scl, e_spec[0]/scl, f_spec[1]/scl, e_spec[1]/scl], names=('Velocity', 'Flux', 'FluxErr', 'Flux2d', 'Flux2dErr')) fluxtab.meta['totflux'] = "{:.2f} +/- {:.2f}".format( f_tot[0]/scl, e_tot[0]/scl)+' '+out_unit.to_string()+' km/s' fluxtab['Velocity'].description = 'Velocity, ' + \ hd['ctype3'] + ', ' + hd['specsys'] fluxtab['Velocity'].format = '.3f' fluxtab['Flux'].unit = out_unit fluxtab['Flux'].format = '.4f' fluxtab['Flux'].description = 'Flux after masking' fluxtab['FluxErr'].unit = out_unit fluxtab['FluxErr'].format = '.4f' fluxtab['FluxErr'].description = 'Formal uncertainty in masked flux' if len(f_tot) == 2: fluxtab.meta['tot2dflux'] = "{:.2f} +/- {:.2f}".format( f_tot[1]/scl, e_tot[1]/scl)+' '+out_unit.to_string()+' km/s' fluxtab['Flux2d'].unit = out_unit fluxtab['Flux2d'].format = '.4f' fluxtab['Flux2d'].description = 'Flux after masking in 2D' fluxtab['Flux2dErr'].unit = out_unit fluxtab['Flux2dErr'].format = '.4f' fluxtab['Flux2dErr'].description = 'Formal uncertainty in 2D masked flux' return fluxtab
[docs]def calc_moments(imcube, rmscube, mask=None): """ Calculate moments of a masked cube and their errors Parameters ---------- imcube : SpectralCube The image cube for which to calculate the moments and their errors. rmscube : SpectralCube A cube representing the noise estimate at each location in the image cube. Should have the same units as the image cube. mask : `~numpy.ndarray` A binary mask array (0s and 1s) to be applied before measuring the flux and uncertainty. This should NOT be a SpectralCube. Returns ------- altmom : `~numpy.ndarray` A stack of the three moment maps. These are generally redundant since they were previously calculated by SpectralCube. errmom : `~numpy.ndarray` A stack of the three uncertainty maps. """ if mask is not None: immask = imcube.with_mask(mask > 0) errmask = rmscube.with_mask(mask > 0) else: immask = imcube errmask = rmscube tbarry = immask.unitless_filled_data[:] nsearry = errmask.unitless_filled_data[:] vels = immask.spectral_axis.to(u.km/u.s) vel3d = np.expand_dims(vels, axis=(1, 2)) velarry = np.broadcast_to(vel3d, immask.shape) mom0 = np.nansum(tbarry, axis=0) mom0_var = np.nansum(nsearry**2, axis=0) mom0_err = np.sqrt(mom0_var) mom1 = np.nansum(tbarry * velarry, axis=0) / mom0 mom1_var = np.nansum(((velarry - mom1)/mom0 * nsearry)**2, axis=0) mom1_err = np.sqrt(mom1_var) mom2 = np.nansum(tbarry * (velarry-mom1)**2, axis=0) / mom0 mom2_var = np.nansum(((mom0 * (velarry-mom1)**2 - np.nansum(tbarry*(velarry - mom1)**2, axis=0)) / mom0**2 * nsearry)**2 + (2*np.nansum( tbarry*(velarry-mom1), axis=0)/mom0 * mom1_err)**2, axis=0) stdev = np.sqrt(mom2) sderr = np.sqrt(mom2_var)/(2*stdev) for x in [mom1, stdev, mom1_err, sderr]: x[x == np.inf] = np.nan x[x == -np.inf] = np.nan altmom = np.stack([mom0, mom1, stdev], axis=0) errmom = np.stack([mom0_err, mom1_err, sderr], axis=0) return altmom, errmom