Source code for planckenv

import matplotlib
matplotlib.use('Agg')
import numpy as np
from pipelet.environment import *
import os
import healpy
import spherelib
from matplotlib import pyplot as plt
import planckdata as data
import spherelib.astro as astro

[docs]class PlanckEnvironment(Environment): """ Planck environment. The Planck environment class provides additionnal facilities to the segment, related to pipeline output management (either data or plots). """
[docs] def plot_map_in_pipe(self, Imap, what, name, mask=None, min=None, max=None, norm='hist'): """ Make a mollview of the map in pipeline """ if mask is not None: Imap[mask!=1] = healpy.UNSEEN fn = self.get_data_fn("%s_%s.png"%(what, name)) healpy.mollview(Imap, title="%s @ %s"%(what, name), norm=norm, min=min, max=max) plt.savefig(fn)
[docs] def write_alm(self, alms, fields, name): """ Write a list of TEB alms in 3 files """ for alm, field in zip(alms, fields): healpy.write_alm(self.get_data_fn("%s_%s.fits"%(name,field)), alm)
[docs] def write_cat(self, cat, what, name): """ Write catalog in txt and skop format """ if what[0:3]=="det": phi = astro.glon2phi(cat[:,1]) theta = astro.glat2theta(cat[:,2]) skopcat= np.zeros((np.shape(phi)[0], 4)) skopcat[:,0] = theta skopcat[:,1] = phi skopcat[:,2] = 1e-3 skopcat[:,3] = 1 np.savetxt(self.get_data_fn("cat_%s_%s.skop"%(what,name)), skopcat) np.savetxt(self.get_data_fn("cat_%s_%s.txt"%(what,name)), cat) else: phi = cat[:,1] theta = cat[:,0] skopcat= np.zeros((np.shape(phi)[0], 4)) skopcat[:,0] = theta skopcat[:,1] = phi skopcat[:,2] = 1e-3 skopcat[:,3] = cat[:,-1] np.savetxt(self.get_data_fn("cat_%s_%s.skop"%(what,name)), skopcat) cat_fn = self.get_data_fn("cat_%s_%s.txt"%(what,name)) np.savetxt(cat_fn, cat) strcat = file(cat_fn, 'r').read() strcat = "#theta \t phi \t peak_flux \t theta_err \t phi_err \t peak_flux_err \t chi2 \n"+strcat file(cat_fn, 'w').write(strcat)
[docs] def plot_ps_subtraction(self, Imap, name, fit_cat, idx_bad, idx_good, beam, dset='full'): """ Make some nice plots of the subtraction """ omap = healpy.read_map(data.get_mapfile(name, dset)) #original map omask = healpy.read_map(data.get_psmaskfile(name)) #ps mask we are supposed to use omask = healpy.ud_grade(omask, healpy.get_nside(omap)) omask[omask!=1]=0 offmap = omap*omask #apply original mask offmap[offmap==0] = healpy.UNSEEN Imap[Imap==0] = healpy.UNSEEN nTplot = 50 nplot = nTplot/5 ig = np.arange(0,nTplot+1) # plot 5*5 subtracted source (good chi2) ib = np.arange(0,nTplot+1) # plot 5*5 masked source (bad chi2) reso = (beam*1.5/30.0) # adapt plot resolution to channel for npi in range(5): plt.figure() for p in range(nplot): try: i = idx_good[int(ig[(npi*nplot)+p])] lat = astro.theta2glat(fit_cat[i,0]) lon = astro.phi2glon(fit_cat[i,1]) healpy.gnomview(Imap, rot=[lon,lat],sub=(6,nplot,nplot+p+1),cbar=False, notext=True,title="", reso=reso) healpy.gnomview(omap, rot=[lon,lat],sub=(6,nplot,(p+1)),cbar=False, notext=True,title="", reso=reso) healpy.gnomview(offmap, rot=[lon,lat],sub=(6,nplot,2*nplot+(p+1)),cbar=False, notext=True,title="", reso=reso) except: pass for p in range(nplot): try: i = idx_bad[int(ib[(npi*nplot)+p])] lat = astro.theta2glat(fit_cat[i,0]) lon = astro.phi2glon(fit_cat[i,1]) healpy.gnomview(Imap, rot=[lon,lat], sub=(6,nplot,4*nplot+p+1),cbar=False, notext=True,title="", reso=reso) healpy.gnomview(omap, rot=[lon,lat], sub=(6,nplot,3*nplot+p+1),cbar=False, notext=True, title="", reso=reso) healpy.gnomview(offmap, rot=[lon,lat], sub=(6,nplot,5*nplot+p+1),cbar=False, notext=True,title="", reso=reso) except: pass plt.savefig(self.get_data_fn("subtraction_%s_%d.png"%(name, npi))) plt.clf() Imap[Imap==healpy.UNSEEN] = 0
[docs] def write_model_in_txt(self, model, bins, em=None): """ Save SMICA model in APN txt files, and make a tarball """ dim = model.dim Q = model.nbin np.savetxt(self.get_data_fn("A.txt"), model.mixmat()) np.savetxt(self.get_data_fn("N.txt"), model.noise()) np.savetxt(self.get_data_fn("P.txt"), model.powspec().reshape((dim)**2, Q)) if em is not None: np.savetxt(self.get_data_fn("A_error.txt"), em.mixmat()) np.savetxt(self.get_data_fn("N_error.txt"), em.noise()) np.savetxt(self.get_data_fn("P_error.txt"), em.powspec().reshape((dim)**2, Q)) np.savetxt(self.get_data_fn("bins.txt"), bins) np.savetxt(self.get_data_fn("lmean.txt"), np.squeeze(spherelib.get_lmean(bins))) self.logged_subprocess(["tar", "-czf", self.get_data_fn("model.tar.gz"), "%s*.txt"%(self.get_data_fn(""))], except_on_failure=False)
[docs] def write_stats_in_txt(self, stats, nmodes, suffix=""): """ Reshape 3D numpy array to 2D and save it in txt files. """ nmap,nmap,Q = stats.shape np.savetxt(self.get_data_fn("hR%s.txt"%suffix), stats.reshape(nmap*nmap, Q)) np.savetxt(self.get_data_fn("w%s.txt"%suffix), nmodes)
[docs] def get_range_from_parent(self): """ Parse binning from task input. """ strbin = self.get_input("bins") dset = self.get_input("map2alm_calib")[0] lststr = strbin.split("_") lmin = int(lststr[0]) lmax = int(lststr[-1]) return lmin,lmax,dset
[docs] def get_lstibl(self, names): """ Return list of beam correction function files """ lst = [self.glob_seg("map2alm", "ibl*%s*"%(name.split("_")[0]))[0] for name in names] return lst
[docs] def load_products(self, filename, glo, param_name='*'): """ Update a namespace by unpickling requested object from the file. >>> load_products(get_data_fn("test.pkl", seg="second"), globals(),['i']) >>> print i 0 """ try: f = file(filename) new_dict = pickle.load(f) f.close() except IOError: self.logger.warning('No such file: %s'%filename) except pickle.UnpicklingError: self.logger.warning('Failed to unpickle from file: %s'%filename) f.close() if param_name == '*': param_name = new_dict.keys() nd = dict() for k in param_name: try: nd[k] = new_dict[k] except KeyError: self.logger.warning( 'Fail to load object %s from file %s'%(k,filename) ) glo.update(nd)