Source code for simu.lib.eclairs_channel

'''
Created on 7 mars 2019

@author: Colley JM
'''

from numba import jit
import numpy as np
from builtins import issubclass
import matplotlib.pyplot as plt

import simu.lib.sources as mdl_src
from common.instru.model_effect import ECLAIRsDetectorEffectDefault
from common.mission.attitude import AttitudeECLAIRs
from common.io.events import EclairsCalEvtData
from scipy.stats import binom
from common.instru.array_convention import yzegp_to_ijdet_array

S_verbose = 0

@jit(nopython=True)
def apply_result_rmf_with_numba(chan_shadows, dim1, dim2, l_new_channel):    
    new_a_shadows = np.zeros_like(chan_shadows)
    a_axis0 = range(dim1)
    a_axis1 = range(dim2)
    nb_chan = chan_shadows.shape[0]     
    for idx in range(nb_chan):
        if idx%2 == 0:
            print("RMF canal ", idx)        
        det_chan = l_new_channel[idx]
        idx_new_chan = 0
        for li in a_axis0:
            for lj in a_axis1:
                nb_photon = chan_shadows[idx, li, lj]                                   
                for lc in range(nb_photon):
                    #print(det_chan[idx_new_chan+lc], li, lj)
                    new_a_shadows[det_chan[idx_new_chan+lc], li, lj] += 1
                idx_new_chan += nb_photon
    return new_a_shadows



[docs]class SimuEclairsEnergyChannel(object): # # INIT # def __init__(self, sim_effect=None): # list of model of src to simulate self.d_src = {} self.l_src_selected = [] if sim_effect is None: self.sim_effect = ECLAIRsDetectorEffectDefault() else: self.sim_effect = sim_effect assert(isinstance(self.sim_effect, ECLAIRsDetectorEffectDefault)) # use same simu effect for source model class mdl_src.ModelSrcInterface.set_sim_dpix(sim_effect) self.det_shape = mdl_src.ModelSrcInterface.sim_geom.inst.detect.ar_val.shape self.chan_shadows = np.zeros((self.sim_effect.chan_nb(), self.det_shape[0], self.det_shape[1]), dtype=np.float64)
[docs] def add_src_point_with_catalog_swift(self, att_eclairs, use_irf=True, verbose=False): """ reset all source l_ponc_src and init with swift catalog :param att_eclairs: ECLAIRs attitude :type att_eclairs: common.mission.attitude.AttitudeECLAIRs :param verbose: print outputs from sources build (default=None) :type verbose: bool """ assert isinstance(att_eclairs, AttitudeECLAIRs) self.attitute = att_eclairs swift = mdl_src.ListModelPointSrcFromCatalog(self.sim_effect, use_irf) self.d_src.update(swift.init_with_cat_swift_2012(self.attitute, verbose)) self.cat = swift.cat_fov
[docs] def add_src_point_with_catalog_fov(self, cat_fov, use_irf=True, verbose=False): """ reset all source l_ponc_src and init with cat fov cat_fov is an user catalaog can by used for debug, analysis and test .. cat_fov must have intensity array shape :param cat_fov: sidx_chanources catalog with energy :type cat_fov: CatalogFovBasic :param verbose: print outputs sources build (default=None) :type verbose: bool """ user_cat = mdl_src.ListModelPointSrcFromCatalog(self.sim_effect, use_irf) self.d_src.update(user_cat.init_with_cat_fov(cat_fov))
[docs] def add_src(self, model_src_obj): if not issubclass( type(model_src_obj), mdl_src.ModelSrcInterface): print(f"ERROR: {type(model_src_obj)} is not a sub class of ModelSrcInterface") return False self.d_src[model_src_obj.name] = model_src_obj return True
[docs] def add_src_cxb(self, level=1): """ """ return self.add_src(mdl_src.ModelCxbShapeBasedEnergy())
[docs] def add_src_cxb_flat(self, level=1): """ """ return self.add_src(mdl_src.ModelCxbFlat(level))
[docs] def add_src_astroparticule(self): return self.add_src(mdl_src.ModelInternalNoise())
[docs] def set_context_simu(self, duration, t_start=0): """set the context parameters for the simulation :param duration: simulation time in s :type duration: float :param t_start: start time of the simulated observation in s from mjdref :type t_start: float """ self.time_expose_s = duration mdl_src.ModelSrcInterface.duration = duration self.t_start = t_start
# # SIMULATION source and instrument effects #
[docs] def simu_src(self, idx_chan=None, l_src=None, verbose=False): """simulation of sources :param idx_chan: channel index to be simulated. Default=None for all channel :type idx_chan: int >0 :param l_src: list of sources to be simulated. Default=None for all sources already initiated :type l_src: list(string) :param verbose: for print outputs. Default=False :type verbose: bool """ if l_src: self.l_src_selected = l_src else: self.l_src_selected = list(self.d_src.keys()) if idx_chan: self._simu_src_one_channel(idx_chan, verbose) else: self._simu_src_all_channel(verbose)
def _simu_src_one_channel(self, idx_chan, verbose=False): """ simulation of all sources added by method add_src_xxx() or init_xxx """ mdl_src.ModelSrcInterface.set_idx_channel(idx_chan) self.chan_shadows[idx_chan] = 0 simu_shadow = self.chan_shadows[idx_chan] if S_verbose > 0: print("selected:", self.l_src_selected) for idx, m_src in enumerate(self.l_src_selected): mdl = self.d_src[m_src] if verbose: print("simulation src: ", mdl.name) shadow_src = mdl.get_model() simu_shadow += shadow_src if S_verbose > 0: self.plot_shadow(shadow_src, m_src.name) def _simu_src_all_channel(self, verbose=False): nb_chan = self.sim_effect.chan_nb() if verbose: print(f"simulation of all {nb_chan} channel") for idx in range(nb_chan): if verbose: print('simu channel ', idx) self._simu_src_one_channel(idx, verbose) def _simu_arf_mean(self): """ this method introduces real (ie not integer) number of photon can be replace by binomial random processing """ arf_percent = self.sim_effect.arf_percent() # use broadcast numpy feature if S_verbose > 0: print(arf_percent) self.chan_shadows *= arf_percent[:, np.newaxis, np.newaxis]
[docs] def simu_arf(self): """ apply arf with binomial process to conserve integer value """ #print(self.chan_shadows.dtype) arf_percent = self.sim_effect.arf_percent() new_shwadows = np.empty_like(self.chan_shadows, dtype=np.uint16) # TO DO : use numpy broadcast possible here ? for idx in range(self.sim_effect.chan_nb()): new_shwadows[idx] = binom.rvs(self.chan_shadows[idx], arf_percent[idx]) self.chan_shadows = new_shwadows
[docs] def simu_rmf(self): self._create_shadows_int() new_a_shadows = np.zeros_like(self.chan_shadows_int) for idx in range(self.sim_effect.chan_nb()): if idx%(self.sim_effect.chan_nb()/10) == 0: print("RMF canal ", idx) self.shadow_int_temp = self.chan_shadows_int[idx] nb_count = self.shadow_int_temp.sum() det_chan = self.sim_effect.simu_rmf_channel(idx, nb_count) a_axis0 = np.arange(self.det_shape[0]) a_axis1 = np.arange(self.det_shape[1]) idx_new_chan = 0 for li in np.nditer(a_axis0): for lj in np.nditer(a_axis1): nb_photon = self.chan_shadows_int[idx, li, lj] for lc in np.arange(nb_photon): #print(det_chan[idx_new_chan+lc], li, lj) new_a_shadows[det_chan[idx_new_chan+lc], li, lj] += 1 idx_new_chan += nb_photon self.chan_shadows = new_a_shadows.astype(np.float64)
[docs] def simu_rmf_2(self): new_a_shadows = np.zeros_like(self.chan_shadows) a_axis0 = range(self.det_shape[0]) a_axis1 = range(self.det_shape[1]) for idx in range(self.sim_effect.chan_nb()): if idx%2 == 0: print("RMF canal ", idx) det_chan = self.sim_effect.simu_rmf_channel(idx, self.chan_shadows[idx].sum()) idx_new_chan = 0 for li in a_axis0: for lj in a_axis1: nb_photon = self.chan_shadows[idx, li, lj] for lc in range(nb_photon): #print(det_chan[idx_new_chan+lc], li, lj) new_a_shadows[det_chan[idx_new_chan+lc], li, lj] += 1 idx_new_chan += nb_photon self.chan_shadows = new_a_shadows
[docs] def simu_rmf_jit(self): copy_array = self.chan_shadows.copy() a_rmf = self.sim_effect.get_list_rv_rmf() nb_chan = self.chan_shadows.shape[0] l_nchan = [] for idx in range(nb_chan): nb_photon = self.chan_shadows[idx].sum() print(f"{nb_photon} photons in channel {idx}, random new channel follow rmf ...") det_chan = a_rmf[idx].rvs(size=nb_photon) det_chan = (det_chan + 0.5).astype(np.int16) l_nchan.append(det_chan) print("start rmf processing") ret_array = apply_result_rmf_with_numba(copy_array, self.det_shape[0], self.det_shape[1], l_nchan) self.chan_shadows = ret_array
[docs] def simu_global_noise_poisson(self, verbose=False): if verbose: for idx in range(self.sim_effect.chan_nb()): if verbose: print(f"nb photons in channel {idx}:", self.chan_shadows[idx].sum()) self.chan_shadows[idx] = np.random.poisson(self.chan_shadows[idx]) if verbose: print(f"nb photons with noise in channel {idx}:", self.chan_shadows[idx].sum()) else: self.chan_shadows = np.random.poisson(self.chan_shadows).astype(np.uint16)
#print(self.chan_shadows.dtype)
[docs] def print_nb_photon(self): for idx in range(self.sim_effect.chan_nb()): print(f"nb photons in channel {idx}:", self.chan_shadows[idx].sum())
# # GETTER #
[docs] def get_shadow_channel(self, idx): return self.chan_shadows[idx]
[docs] def get_list_src(self): return list(self.d_src.keys())
# # EVENEMENT CREATION # def _create_shadows_int(self): """ """ if np.issubdtype(self.chan_shadows.dtype, np.integer): # No int conversion needed return else: if S_verbose > 0: print(f"must convert {self.chan_shadows.dtype} to int ") self.chan_shadows_int = np.around(self.chan_shadows).astype(np.int64) s1 = self.chan_shadows.sum() print("before round ", self.chan_shadows.sum()) self.chan_shadows = (self.chan_shadows+0.5).astype(np.uint16) print("after round ", self.chan_shadows.sum()) s2 = self.chan_shadows.sum() print('relative diff', (s2-s1)*100/s1) #print(f"new type {self.chan_shadows.dtype}") def _create_evts_chan(self, idx_chan, verbose=False): """ """ self.shadow_int_temp = self.chan_shadows[idx_chan] #print(self.chan_shadows.dtype) nb_count = self.shadow_int_temp.sum() if verbose: print(f"nb_count in channel {idx_chan}:",nb_count) evts = EclairsCalEvtData(nb_count) evts.tb_evts['time'] = np.random.random_sample(nb_count)*self.time_expose_s + self.t_start evts.tb_evts['PHA'] = idx_chan evts.tb_evts['PI'] = idx_chan index = np.arange(0, self.shadow_int_temp.size) a_evt = np.repeat(index, self.shadow_int_temp.ravel()) (evts.tb_evts['Y'], evts.tb_evts['Z']) = np.divmod(a_evt, self.shadow_int_temp.shape[0]) return evts
[docs] def create_evts(self, verbose=False): """ loop on energy channel create events """ self._evts = EclairsCalEvtData() self._create_shadows_int() for idx_chan in range(self.sim_effect.chan_nb()): evts_chan = self._create_evts_chan(idx_chan, verbose) self._evts.add_evt(evts_chan) del evts_chan self._evts.sort_with_time()
# # PLOT #
[docs] def plot_channel(self, idx, p_mes=""): plt.figure() plt.title(p_mes) plt.imshow(yzegp_to_ijdet_array(self.chan_shadows[idx])) plt.colorbar()
[docs] def plot_shadow(self, shadow, p_mes=""): plt.figure() plt.title(p_mes) plt.imshow(yzegp_to_ijdet_array(shadow)) plt.colorbar()