Source code for mixmat

import healpy
import spherelib
import numpy as np
import os.path as op
import smica
from pipetools import *
from smicatools import *
import spherelib


[docs]def compute_covariance(names, bins, dset="full"): """ Compute covariance matrices """ # compute statistics from alm nmap = len(names) load_param("map2alm", globals(), ["lmax", "fsky", "unit_to", "ref_beam"]) gmask = healpy.read_map(glob_seg("map2alm", "gmask_apodized_2048.fits.gz")[0]) lstalm = list() lstibl = list() for m in range(nmap): lstalm.append(glob_seg("map2alm", "alm_masked_%s_%s.fits"%(dset,names[m]))[0]) lstibl.append(glob_seg("map2alm", "ibl*%s*"%(names[m].split("_")[0]))[0]) ibl = load_ibl(lstibl, lmax) lstats, lnmodes = compute_cov(lstalm, lmax, mask=gmask, ibl=ibl) save_products(get_data_fn("covmat_unbinned_%s.pkl"%dset), locals(), ['lstats', 'lnmodes']) # compute noise from jackknives hr = "yr" if len(glob_seg("map2alm", "alm_masked_yr1_%s.fits"%(names[m])))>1 else "hr" logger.info("compute noise from %s maps"%hr) lsthr1 = list() lsthr2 = list() for m in range(nmap): lsthr1.append(glob_seg("map2alm", "alm_masked_%s1_%s.fits"%(hr,names[m]))[0]) lsthr2.append(glob_seg("map2alm", "alm_masked_%s2_%s.fits"%(hr,names[m]))[0]) Nl = compute_noise_from_jk(lsthr1, lsthr2, lmax, fsky=np.mean(gmask**2), ibl=ibl, yr=hr=="yr") if dset!='full': Nl *= 2.0 np.savetxt(get_data_fn("noise_unbinned.txt"), Nl) plot_cov(lstats, N=Nl, filename=get_data_fn("cov_unbinned.png")) plot_ev_decomp(lstats, Nl, filename=get_data_fn("ev_unbinned.png")) # binning stats, nmodes = spherelib.bin_stat(lstats, bins, nmode=np.squeeze(lnmodes)) nmodes = np.squeeze(nmodes) lmean = np.squeeze(spherelib.get_lmean(bins)) Q = len(nmodes) #number of bins save_products(get_data_fn("covmat_binned_%s.pkl"%dset), locals(), ['stats', 'nmodes', 'bins', 'lmean']) N = np.zeros((nmap, Q)) for m in range(nmap): N[m,:] = spherelib.bin_cl(Nl[m,:], bins) plot_cov(stats, N=N, filename=get_data_fn("cov_binned.png"), lmean=lmean) plot_ev_decomp(stats, N, lmean=lmean, filename=get_data_fn("ev_binned.png")) return stats,nmodes,N
[docs]def main(): """ Fit the mixing matrix from 2nd order statistics """ # set input channels hook("config", globals()) # polar & calib & galactic dim if polar: names = ["030_E", "044_E", "070_E", "100_E","143_E","217_E","353_E", "030_B", "044_B", "070_B", "100_B","143_B","217_B","353_B"] elif calib and det: names = ["030_T","044_T", "070_T", "100_ds1_T", "100_ds2_T", "143_ds1_T","143_ds2_T", "143_5_T","143_6_T", "143_7_T", "217_1_T", "217_2_T", "217_3_T", "217_4_T", "217_ds1_T", "217_ds2_T", "353_1_T", "353_2_T", "353_7_T", "353_8_T", "353_ds1_T", "353_ds2_T", "545_1_T", "545_2_T", "545_4_T", "857_T"] else: names = ["030_T", "044_T", "070_T", "100_T","143_T","217_T","353_T", "545_T", "857_T"] nmap = len(names) # binning if calib: lmin_fit,lmax_fit,dset = get_range_from_parent() bins = get_bins(lmin_fit, lmax_fit) maxiter = 150 else: bins = get_bins(5, 50) if polar else get_bins(9, 150) bins = np.vstack(( np.array([2,4]), bins)) maxiter = 50 dset = 'full' lmin_fit = bins[0,0] lmax_fit = bins[-1,1] lmean = np.squeeze(spherelib.get_lmean(bins)) stats,nmodes,N = compute_covariance(names, bins, dset=dset) Q = len(nmodes) # build the model if not calib: #(of,oe) = logged_subprocess (['lspipe', 'calibdx11dr2@planck:plotc/data/*/calib-mean.txt']) #cfile = file(of).read().strip() cfile = data.get_calibfile() logger.info("using calibration from %s"%str(cfile)) else: cfile = None cmb_unit = 'uK_CMB' cmb = get_cmb(names, unit_to, bins, polar, calib=cfile, ref_beam=ref_beam, cmb_unit=cmb_unit, fixed=not(calib)) acmb = cmb.mixmat().copy() gal = get_gal(nmap, dim, Q, polar, polar2D=polar2D) noise_fix = True if not calib else False noise = get_noise(N, fixed=noise_fix) model = smica.Model(complist=[cmb, gal, noise]) # fit model to covariance model = fit_model_to_cov(model, stats, nmodes, maxiter=maxiter, noise_fix=noise_fix, logger=logger) final_mismatch = int(np.real(model.mismatch(stats, nmodes, exact=True))) em = model.error_model_full_fim(nmodes) save_products(get_data_fn("model_%s.pkl"%dset), locals(), ['model', 'em']) plot_mismatch(model, stats, nmodes, lmean, filename=get_data_fn("mismatch.png")) model.plot_power(get_data_fn("power.png"), xaxis=lmean) plot_relative_diff(model, stats, nmodes, names, lmean, filename=get_data_fn("relativediff.pdf")) plot_cmb_powspec(cmb, em.get_comp_by_name("cmb"), bins, polar=polar, cmb_unit=cmb_unit, ref_beam=ref_beam, filename=get_data_fn("cmb_powspec.png")) rg_fit = "%d-%d"%(lmin_fit, lmax_fit) if calib: plot_calib(acmb, cmb.mixmat(), em.get_comp_by_name('cmb').mixmat(), names, filename=get_data_fn("calib.png")) save_calib(get_data_fn("calib_%s_%s.txt"%(rg_fit,dset)),acmb, model, em) plot_noise(model, em, N, lmean, names, filename=[get_data_fn("noise_ratio.png"), get_data_fn("noise.png")]) set_output (get_input()) save_param(["bins", "cmb_unit", "names", "polar", "dim"]) globals().update(locals()) expose(["polar", "final_mismatch", "dim", "rg_fit"])
if __name__=="__builtin__": main()