Source code for ism3d.utils.io

#from .vis_utils import read_ms
#from .ms import read_ms0
from sys import getsizeof
import logging
import time
from .utils import *
import os
from astropy.io import fits 
from astropy.table import Table
from astropy.table import Column
#from .vis_utils import write_ms

logger = logging.getLogger(__name__)


import warnings
import hickle as hkl

#from hickle.hickle import SerializedWarning   
#warnings.filterwarnings("ignore",category=SerializedWarning)  

#from memory_profiler import profile
#@profile
[docs]def read_data(inp_dct, save_data=False, fill_mask=False,fill_error=False, # for FITS/image polaverage=True,dataflag=False,saveflag=True): # for MS/visibilities """ read FITS/image or MS/visibilities into the dictionary + we set data= note: DATA column shape in nrecord x nchan x ncorr WEIGHT column shape in nrecord x ncorr( supposely this is the "average" value of WEIGHT_SPECTRUM along the channle-axis WEIGHT_SPECTRUM SHAPE in nrecord x nchan x ncorr FLAGS shape in nrecord x nchan x ncorr (so WEIGHT_SPECTRUM is likely ~WEIGHT/NCHAN?) depending on the data model / calibration script data@ms in complex64 (not complex128) For space-saving, we + set DATA-values=np.nan when flag=True (so there is no a "flag" variable for flagging in dat_dct + when polaverage=True, we only derive stokes-I if both XX/YY (or RR/YY) are Good. If one of them are flagged, Data-Values set to np.nan this follows the principle in the tclean()/stokes parameter https://casa.nrao.edu/casadocs-devel/stable/global-task-list/task_tclean/parameters http://casacore.github.io/casacore/StokesConverter_8h_source.html + weight doesn't include the channel-axis (assuming the channel-wise weight variable is neligible XX=I+Q YY=I-Q ; RR=I+V LL=I-V => I=(XX+YY)/2 or (RR+LL)/2 """ logger.info('read data (may take some time..)') start_time = time.time() dat_dct={} for tag in inp_dct.keys(): if 'vis' in inp_dct[tag].keys(): obj=inp_dct[tag] vis_list=obj['vis'].split(",") if 'pb' in obj: pb_list=obj['pb'].split(",") for ind in range(len(vis_list)): if ('data@'+vis_list[ind] not in dat_dct) and 'vis' in obj: read_ms(vis_list[ind], polaverage=polaverage,dataflag=dataflag,saveflag=saveflag, dat_dct=dat_dct) if ('pbeam@'+vis_list[ind] not in dat_dct) and 'pb' in obj: data,header=fits.getdata(pb_list[ind],header=True) np.nan_to_num(data,copy=False,nan=0.0) dat_dct['pbeam@'+vis_list[ind]]=data dat_dct['header@'+vis_list[ind]]=header logger.debug('reading pb model') logger.debug('{:60} {:15}'.format(pb_list[ind],str(dat_dct['pbeam@'+vis_list[ind]].shape))) if 'image' in inp_dct[tag].keys(): obj=inp_dct[tag] im_list=obj['image'].split(",") if 'mask' in obj: mk_list=obj['mask'].split(",") if 'pb' in obj: pb_list=obj['pb'].split(",") if 'error' in obj: em_list=obj['error'].split(",") if 'sample' in obj: sp_list=obj['sample'].split(",") if 'psf' in obj: if isinstance(obj['psf'],str): pf_list=obj['psf'].split(",") else: if isinstance(obj['psf'],tuple): pf_list=[] pf_list.append(obj['psf']) else: pf_list=obj['psf'] if 'pmodel' in obj: data,hd=fits.getdata(obj['pmodel'],header=True,memmap=False) dat_dct['pmodel@'+tag]=data dat_dct['pheader@'+tag]=hd logger.debug('loading: '+obj['pmodel']+' to ') logger.debug('pmodel@'+tag) logger.debug(str(data.shape)+str(human_unit(getsizeof(data)*u.byte))) for ind in range(len(im_list)): if ('data@'+im_list[ind] not in dat_dct) and 'image' in obj: data,hd=fits.getdata(im_list[ind],header=True,memmap=False) dat_dct['data@'+im_list[ind]]=np.squeeze(data).copy() dat_dct['header@'+im_list[ind]]=hd dat_dct['type@'+im_list[ind]]='image' logger.debug('loading: '+im_list[ind]+' to ') logger.debug('data@'+im_list[ind]+' '+'header@'+im_list[ind]) logger.debug(str(data.shape)+str(human_unit(getsizeof(data)*u.byte))) if ('error@'+im_list[ind] not in dat_dct) and 'error' in obj: data=fits.getdata(em_list[ind],memmap=False) dat_dct['error@'+im_list[ind]]=data logger.debug('loading: '+em_list[ind]+' to ') logger.debug('error@'+im_list[ind]) logger.debug(str(data.shape)+str(human_unit(getsizeof(data)*u.byte))) if ('mask@'+im_list[ind] not in dat_dct) and 'mask' in obj: data=fits.getdata(mk_list[ind],memmap=False) dat_dct['mask@'+im_list[ind]]=data logger.debug('loading: '+mk_list[ind]+' to ') logger.debug('mask@'+im_list[ind]) logger.debug(str(data.shape)+str(human_unit(getsizeof(data)*u.byte))) if ('sample@'+im_list[ind] not in dat_dct) and 'sample' in obj: data=fits.getdata(sp_list[ind],memmap=False) # sp_index; 3xnp array (px index of sampling data points) dat_dct['sample@'+im_list[ind]]=np.squeeze(data['sp_index']).copy() logger.debug('loading: '+sp_list[ind]+' to ') logger.debug('sample@'+im_list[ind]) logger.debug(str(data.shape)+str(human_unit(getsizeof(data)*u.byte))) if ('psf@'+im_list[ind] not in dat_dct) and 'psf' in obj: if isinstance(pf_list[ind],str): data=fits.getdata(pf_list[ind],memmap=False) dat_dct['psf@'+im_list[ind]]=np.squeeze(data).copy() logger.debug('loading: '+pf_list[ind]+' to ') logger.debug('psf@'+im_list[ind]) logger.debug(str(data.shape)+str(human_unit(getsizeof(data)*u.byte))) else: dat_dct['psf@'+im_list[ind]]=pf_list[ind] if ('pbeam@'+im_list[ind] not in dat_dct) and 'pb' in obj: data=fits.getdata(pb_list[ind],header=False) np.nan_to_num(data,copy=False,nan=0.0) dat_dct['pbeam@'+im_list[ind]]=data logger.debug('loading: '+pb_list[ind]+' to ') logger.debug('pbeam@'+im_list[ind]) logger.debug(str(data.shape)+str(human_unit(getsizeof(data)*u.byte))) tag='data@'+im_list[ind] if fill_mask==True or fill_error==True: if (tag.replace('data@','mask@') not in dat_dct) and fill_mask==True: data=dat_dct[tag] dat_dct[tag.replace('data@','mask@')]=data*0.0+1. logger.debug('fill '+tag.replace('data@','mask@')+str(1.0)) if (tag.replace('data@','error@') not in dat_dct) and fill_error==True: data=dat_dct[tag] dat_dct[tag.replace('data@','error@')]=data*0.0+np.std(data) logger.debug('fill '+tag.replace('data@','error@')+str(np.std(data))) logger.info('-'*80) dat_size=human_unit(get_obj_size(dat_dct)*u.byte) logger.info("--- dat_dct size {:0.2f} ---".format(dat_size)) logger.info("--- took {0:<8.5f} seconds ---".format(time.time()-start_time)) if save_data==True: outdir='./' if 'general' in inp_dct: if 'outdir' in inp_dct['general']: outdir=inp_dct['general']['outdir']+'/' hkl.dump(dat_dct, outdir+'dat_dct.h5', mode='w') logger.info('--- save to: '+outdir+'dat_dct.h5') return dat_dct
[docs]def dct2npy(dct,outname='dct2npy'): np.save(outname+'.npy',dct) return
[docs]def npy2dct(npyname): return np.load(npyname,allow_pickle=True).item()
[docs]def to_hdf5(value,outname='test.h5',checkname=False, **kwargs): """ write a Python dictionary object into a HDF5 file. slashes are not allowed in HDF5 object names We repalce any "/" in keys with "|" to avoid a confusion with HDF5 internal structures note: https://support.hdfgroup.org/HDF5/Tutor/cmdtoolview.html h5ls -r test.h5 h5dump -n 1 test.h5 optional kwargs: compression='gzip' None, gzip, lzf (+ szip, if installed) gzip is preferred lzip with high-speed but lower comperssion rate. examples: uvdata.h5 (none2.5GB/lzip2.2GB/gzip2.0GB) """ if outname.endswith('.h5') or outname.endswith('.hdf5'): outpath=outname else: outpath=outname+'.h5' if checkname==True: if isinstance(value, dict): value_copy=value.copy() for key in list(value.keys()): if '/' in key: value_copy[key.replace('/','|')]=value_copy.pop(key) hkl.dump(value_copy, outpath, mode='w', **kwargs) else: hkl.dump(value, outpath, mode='w', **kwargs) logger.debug('--- save to: '+outpath) return
[docs]def from_hdf5(h5name): """ read a Python dictionary object from a HDF5 file slashes are not allowed in HDF5 object names We repalce any "|" in keys with "/" to recover the potential file directory paths in keysst """ value=hkl.load(h5name) #keys=list(value.keys()) #for key in keys: # if '|' in key: # value[key.replace('|','/')]=value.pop(key) return value
[docs]def dct2hdf(dct,outname='dct2hdf'): """ write a Python dictionary object into a HDF5 file. slashes are not allowed in HDF5 object names We repalce any "/" in keys with "|" to avoid a confusion with HDF5 internal structures """ dct0=dct.copy() keys=list(dct0.keys()) for key in keys: if '/' in key: dct0[key.replace('/','|')]=dct0.pop(key) outpath=outname if outname.endswith('.h5') or outname.endswith('.hdf5'): outpath=outname else: outpath=outname+'.h5' hkl.dump(dct0, outpath, mode='w')#,compression='gzip') logger.info('--- save to: '+outpath) return
[docs]def hdf2dct(hdf): """ read a Python dictionary object from a HDF5 file slashes are not allowed in HDF5 object names We repalce any "|" in keys with "/" to recover the potential file directory paths in keysst """ dct=hkl.load(hdf) keys=list(dct.keys()) for key in keys: if '|' in key: dct[key.replace('|','/')]=dct.pop(key) return dct
[docs]def fits2dct(fits): """ read a fits table into a none-nested dictionary the fits table must be the "1-row" version from gmake.dct2fits References + about FITS byteorder: https://github.com/astropy/astropy/issues/4069 note: we decide to save dct to HDF5 by default for a couple of reasons from now. FITS is always stored in big-endian byte order, will cause some troubles with casacore/galario I/O performance is worse The "table" approach doesn't support nesting / DatType may changed during the dct2fits->fits2dct process """ t=Table() tb=t.read(fits) sys_byteorder = ('>', '<')[sys.byteorder == 'little'] dct={} for col in tb.colnames: dct[col]=(tb[col][0]) try: dorder=dct[col].dtype.byteorder if dorder not in ('=', sys_byteorder): dct[col]=dct[col].byteswap().newbyteorder(sys_byteorder) except: pass #try: # # #col,type(dct[col].dtype.byteorder),len(dct[col].dtype.byteorder)) # #except: # pass return dct
[docs]def dct2fits(dct,outname='dct2fits'): """ save a non-nested dictionary into a FITS binary table note: not every Python object can be dumped into a FITS column, e.g. a dictionary type can be aded into a column of a astropy/Table, but the Table can'be saved into FITS. example: gmake.dct2fits(dat_dct,save_npy=True) for npy np.save(outname.replace('.fits','.npy'),dct) note: we decide to save dct to HDF5 by default for a couple of reasons from now. FITS is always stored in big-endian byte order, will cause some troubles with casacore/galario I/O performance is worse The "table" approach doesn't support nesting / DatType may changed during the dct2fits->fits2dct process """ t=Table() for key in dct: # the value is wrapped into a one-element list # so the saved FITS table will "technically" have one row. t.add_column(Column(name=key,data=[dct[key]])) if outname.endswith('.fits'): outname_full=outname else: outname_full=outname+'.fits' t.write(outname_full,overwrite=True) return
[docs]def export_model(models,outdir='./', includedata=False, outname_exclude=None,outname_replace=None): """ export a model set into FITS images or Measurement Sets shortname: a string list to get rid of from the original data image name """ logger.info('export the model set: {0:^50}'.format(outdir)+' (may take some time..)') start_time = time.time() for key in list(models.keys()): if 'type@' not in key: continue if models[key.replace('type@','type@')]=='vis': basename=key.replace('type@','') # name string replacement if outname_replace is not None: for ostring, rstring in outname_replace: basename=basename.replace(ostring,rstring) # name string exclusion if outname_exclude is not None: for ostring in outname_exclude: basename=basename.replace(ostring,'') basename=os.path.basename(basename) logger.debug(" ") logger.debug('-->'+'data_'+basename) logger.debug(" ") if not os.path.exists(outdir): os.makedirs(outdir) versions=['imodel','imod2d','imod3d','pbeam'] hd=models[key.replace('type@','header@')] for version in versions: if key.replace('type@',version+'@') in models.keys(): if models[key.replace('type@',version+'@')] is not None: logger.debug(key.replace('type@',version+'@')) tmp=(models[key.replace('type@',version+'@')]).copy() if tmp.ndim==2: tmp=tmp[np.newaxis,np.newaxis,:,:] fitsname=outdir+'/'+version+'_'+basename+'.fits' logger.debug("write reference model image: ") logger.debug(" "+key.replace('type@',version+'@')+' to '+fitsname) fits.writeto(fitsname, tmp, models[key.replace('type@','header@')], overwrite=True) for prof in list(models.keys()): if 'imod3d_prof@' in prof and key.replace('type@','') in prof: outname=prof.replace(key.replace('type@',''),'') outname=outname.replace('imod3d_prof@','imodrp_').replace('@','_') fitsname=outdir+'/'+outname+basename+'.fits' logger.debug("write reference model profile: ") logger.debug(" "+prof+' to '+fitsname) dct2fits(models[prof],outname=fitsname) ############### # data -> data@outms # model -> corrected@outms # mod2d -> [email protected] # mod3d -> [email protected] ############### oldms=key.replace('type@','') newms=outdir+'/model_'+basename.replace('.ms','.ms') logger.debug("copy ms container: ") logger.debug(" "+oldms+' to '+newms) os.system("rm -rf "+newms) os.system('cp -rf '+oldms+' '+newms) logger.debug("write ms column: ") if includedata==False: logger.debug(" "+key.replace('type@','model@')+' to '+'data@'+newms) write_ms(newms,models[key.replace('type@','model@')], datacolumn='data') else: logger.debug(" "+key.replace('type@','model@')+' to '+'corrected@'+newms) write_ms(newms,models[key.replace('type@','model@')], datacolumn='corrected') oldms_path=os.path.abspath(oldms) newms=outdir+'/data_'+basename logger.debug("create symlink:") logger.debug(" "+oldms_path+' to '+newms) os.system('rm -rf '+newms) os.system('ln -s '+oldms_path+' '+newms) """ newms=outdir+'/data_'+basename.replace('.ms','.ms.contsub') logger.debug("copy ms container: "+oldms+' to '+newms) os.system("rm -rf "+newms) os.system('cp -rf '+oldms+' '+newms) logger.debug("write ms column: "+key.replace('data@','uvmod3d@')+' to '+'corrected@'+newms) write_ms(newms,models[key.replace('data@','uvmod3d@')], datacolumn='corrected',delmod=False) logger.debug("write ms column: "+key.replace('data@','uvmod2d@')+' to '+'data@'+newms) write_ms(newms,models[key.replace('data@','uvmod2d@')], datacolumn='data',delmod=True) """ #write_ms(newms,models[key.replace('data@','uvmod3d@')]) #logger.debug("copy "+key.replace('data@','uvmod2d@')+' to '+'model@'+newms) #write_ms(newms,models[key.replace('data@','uvmod2d@')], # datacolumn='model') #logger.debug("copy "+key.replace('data@','uvmod3d@')+' to '+'corrected@'+newms) #write_ms(newms,models[key.replace('data@','uvmod3d@')], # datacolumn='corrected') #newms=outdir+'/data_'+basename.replace('.fits','.ms.cont') #logger.debug("copy "+oldms+' to '+newms) #os.system("rm -rf "+newms) #os.system('cp -rf '+oldms+' '+newms) #write_ms(newms,models[key.replace('data@','uvmod2d@')]) #newms=outdir+'/data_'+basename.replace('.fits','.ms.contsub') #logger.debug("copy "+oldms+' to '+newms) #os.system("rm -rf "+newms) #os.system('cp -rf '+oldms+' '+newms) #write_ms(newms,models[key.replace('data@','uvmod3d@')]) if models[key.replace('data@','type@')]=='image': basename=key.replace('data@','') # name string replacement if outname_replace is not None: for ostring, rstring in outname_replace: basename=basename.replace(ostring,rstring) # name string exclusion if outname_exclude is not None: for ostring in outname_exclude: basename=basename.replace(ostring,'') basename=os.path.basename(basename) logger.debug('-->data_'+basename) if not os.path.exists(outdir): os.makedirs(outdir) versions=['data','imodel','cmodel','error','mask','kernel','psf','residual', 'imod2d','imod3d','cmod2d','cmod3d','model'] hd=models[key.replace('data@','header@')] for version in versions: if version=='residual' and key.replace('data@','cmodel@') in models.keys(): if models[key.replace('data@','cmodel@')] is not None: fits.writeto(outdir+'/'+version+'_'+basename, models[key]-models[key.replace('data@','cmodel@')], models[key.replace('data@','header@')], overwrite=True) if key.replace('data@',version+'@') in models.keys(): if models[key.replace('data@',version+'@')] is not None: tmp=(models[key.replace('data@',version+'@')]).copy() if tmp.ndim==2: tmp=tmp[np.newaxis,np.newaxis,:,:] fits.writeto(outdir+'/'+version+'_'+basename, tmp, models[key.replace('data@','header@')], overwrite=True) for prof in list(models.keys()): if 'imod3d_prof@' in prof and key.replace('data@','') in prof: outname=prof.replace(key.replace('data@',''),'') outname=outname.replace('imod3d_prof@','imodel_').replace('@','_') dct2fits(models[prof],outname=outdir+'/'+outname+basename.replace('.fits','.rp')) logger.info('-'*80) logger.info("--- took {0:<8.5f} seconds ---".format(time.time()-start_time))