'''
Created on 7 mars 2019
@author: Colley JM
'''
from numba import jit
import numpy as np
from builtins import issubclass
import matplotlib.pyplot as plt
import simu.lib.sources as mdl_src
from common.instru.model_effect import ECLAIRsDetectorEffectDefault
from common.mission.attitude import AttitudeECLAIRs
from common.io.events import EclairsCalEvtData
from scipy.stats import binom
from common.instru.array_convention import yzegp_to_ijdet_array
S_verbose = 0
@jit(nopython=True)
def apply_result_rmf_with_numba(chan_shadows, dim1, dim2, l_new_channel):
new_a_shadows = np.zeros_like(chan_shadows)
a_axis0 = range(dim1)
a_axis1 = range(dim2)
nb_chan = chan_shadows.shape[0]
for idx in range(nb_chan):
if idx%2 == 0:
print("RMF canal ", idx)
det_chan = l_new_channel[idx]
idx_new_chan = 0
for li in a_axis0:
for lj in a_axis1:
nb_photon = chan_shadows[idx, li, lj]
for lc in range(nb_photon):
#print(det_chan[idx_new_chan+lc], li, lj)
new_a_shadows[det_chan[idx_new_chan+lc], li, lj] += 1
idx_new_chan += nb_photon
return new_a_shadows
[docs]class SimuEclairsEnergyChannel(object):
#
# INIT
#
def __init__(self, sim_effect=None):
# list of model of src to simulate
self.d_src = {}
self.l_src_selected = []
if sim_effect is None:
self.sim_effect = ECLAIRsDetectorEffectDefault()
else:
self.sim_effect = sim_effect
assert(isinstance(self.sim_effect, ECLAIRsDetectorEffectDefault))
# use same simu effect for source model class
mdl_src.ModelSrcInterface.set_sim_dpix(sim_effect)
self.det_shape = mdl_src.ModelSrcInterface.sim_geom.inst.detect.ar_val.shape
self.chan_shadows = np.zeros((self.sim_effect.chan_nb(), self.det_shape[0], self.det_shape[1]), dtype=np.float64)
[docs] def add_src_point_with_catalog_swift(self, att_eclairs, use_irf=True, verbose=False):
"""
reset all source l_ponc_src and init with swift catalog
:param att_eclairs: ECLAIRs attitude
:type att_eclairs: common.mission.attitude.AttitudeECLAIRs
:param verbose: print outputs from sources build (default=None)
:type verbose: bool
"""
assert isinstance(att_eclairs, AttitudeECLAIRs)
self.attitute = att_eclairs
swift = mdl_src.ListModelPointSrcFromCatalog(self.sim_effect, use_irf)
self.d_src.update(swift.init_with_cat_swift_2012(self.attitute, verbose))
self.cat = swift.cat_fov
[docs] def add_src_point_with_catalog_fov(self, cat_fov, use_irf=True, verbose=False):
"""
reset all source l_ponc_src and init with cat fov
cat_fov is an user catalaog can by used for debug, analysis and test ..
cat_fov must have intensity array shape
:param cat_fov: sidx_chanources catalog with energy
:type cat_fov: CatalogFovBasic
:param verbose: print outputs sources build (default=None)
:type verbose: bool
"""
user_cat = mdl_src.ListModelPointSrcFromCatalog(self.sim_effect, use_irf)
self.d_src.update(user_cat.init_with_cat_fov(cat_fov))
[docs] def add_src(self, model_src_obj):
if not issubclass( type(model_src_obj), mdl_src.ModelSrcInterface):
print(f"ERROR: {type(model_src_obj)} is not a sub class of ModelSrcInterface")
return False
self.d_src[model_src_obj.name] = model_src_obj
return True
[docs] def add_src_cxb(self, level=1):
"""
"""
return self.add_src(mdl_src.ModelCxbShapeBasedEnergy())
[docs] def add_src_cxb_flat(self, level=1):
"""
"""
return self.add_src(mdl_src.ModelCxbFlat(level))
[docs] def add_src_astroparticule(self):
return self.add_src(mdl_src.ModelInternalNoise())
[docs] def set_context_simu(self, duration, t_start=0):
"""set the context parameters for the simulation
:param duration: simulation time in s
:type duration: float
:param t_start: start time of the simulated observation in s from mjdref
:type t_start: float
"""
self.time_expose_s = duration
mdl_src.ModelSrcInterface.duration = duration
self.t_start = t_start
#
# SIMULATION source and instrument effects
#
[docs] def simu_src(self, idx_chan=None, l_src=None, verbose=False):
"""simulation of sources
:param idx_chan: channel index to be simulated. Default=None for all channel
:type idx_chan: int >0
:param l_src: list of sources to be simulated. Default=None for all sources already initiated
:type l_src: list(string)
:param verbose: for print outputs. Default=False
:type verbose: bool
"""
if l_src:
self.l_src_selected = l_src
else:
self.l_src_selected = list(self.d_src.keys())
if idx_chan:
self._simu_src_one_channel(idx_chan, verbose)
else:
self._simu_src_all_channel(verbose)
def _simu_src_one_channel(self, idx_chan, verbose=False):
""" simulation of all sources added by method add_src_xxx() or init_xxx
"""
mdl_src.ModelSrcInterface.set_idx_channel(idx_chan)
self.chan_shadows[idx_chan] = 0
simu_shadow = self.chan_shadows[idx_chan]
if S_verbose > 0:
print("selected:", self.l_src_selected)
for idx, m_src in enumerate(self.l_src_selected):
mdl = self.d_src[m_src]
if verbose:
print("simulation src: ", mdl.name)
shadow_src = mdl.get_model()
simu_shadow += shadow_src
if S_verbose > 0:
self.plot_shadow(shadow_src, m_src.name)
def _simu_src_all_channel(self, verbose=False):
nb_chan = self.sim_effect.chan_nb()
if verbose:
print(f"simulation of all {nb_chan} channel")
for idx in range(nb_chan):
if verbose:
print('simu channel ', idx)
self._simu_src_one_channel(idx, verbose)
def _simu_arf_mean(self):
"""
this method introduces real (ie not integer) number of photon
can be replace by binomial random processing
"""
arf_percent = self.sim_effect.arf_percent()
# use broadcast numpy feature
if S_verbose > 0:
print(arf_percent)
self.chan_shadows *= arf_percent[:, np.newaxis, np.newaxis]
[docs] def simu_arf(self):
"""
apply arf with binomial process to conserve integer value
"""
#print(self.chan_shadows.dtype)
arf_percent = self.sim_effect.arf_percent()
new_shwadows = np.empty_like(self.chan_shadows, dtype=np.uint16)
# TO DO : use numpy broadcast possible here ?
for idx in range(self.sim_effect.chan_nb()):
new_shwadows[idx] = binom.rvs(self.chan_shadows[idx], arf_percent[idx])
self.chan_shadows = new_shwadows
[docs] def simu_rmf(self):
self._create_shadows_int()
new_a_shadows = np.zeros_like(self.chan_shadows_int)
for idx in range(self.sim_effect.chan_nb()):
if idx%(self.sim_effect.chan_nb()/10) == 0:
print("RMF canal ", idx)
self.shadow_int_temp = self.chan_shadows_int[idx]
nb_count = self.shadow_int_temp.sum()
det_chan = self.sim_effect.simu_rmf_channel(idx, nb_count)
a_axis0 = np.arange(self.det_shape[0])
a_axis1 = np.arange(self.det_shape[1])
idx_new_chan = 0
for li in np.nditer(a_axis0):
for lj in np.nditer(a_axis1):
nb_photon = self.chan_shadows_int[idx, li, lj]
for lc in np.arange(nb_photon):
#print(det_chan[idx_new_chan+lc], li, lj)
new_a_shadows[det_chan[idx_new_chan+lc], li, lj] += 1
idx_new_chan += nb_photon
self.chan_shadows = new_a_shadows.astype(np.float64)
[docs] def simu_rmf_2(self):
new_a_shadows = np.zeros_like(self.chan_shadows)
a_axis0 = range(self.det_shape[0])
a_axis1 = range(self.det_shape[1])
for idx in range(self.sim_effect.chan_nb()):
if idx%2 == 0:
print("RMF canal ", idx)
det_chan = self.sim_effect.simu_rmf_channel(idx, self.chan_shadows[idx].sum())
idx_new_chan = 0
for li in a_axis0:
for lj in a_axis1:
nb_photon = self.chan_shadows[idx, li, lj]
for lc in range(nb_photon):
#print(det_chan[idx_new_chan+lc], li, lj)
new_a_shadows[det_chan[idx_new_chan+lc], li, lj] += 1
idx_new_chan += nb_photon
self.chan_shadows = new_a_shadows
[docs] def simu_rmf_jit(self):
copy_array = self.chan_shadows.copy()
a_rmf = self.sim_effect.get_list_rv_rmf()
nb_chan = self.chan_shadows.shape[0]
l_nchan = []
for idx in range(nb_chan):
nb_photon = self.chan_shadows[idx].sum()
print(f"{nb_photon} photons in channel {idx}, random new channel follow rmf ...")
det_chan = a_rmf[idx].rvs(size=nb_photon)
det_chan = (det_chan + 0.5).astype(np.int16)
l_nchan.append(det_chan)
print("start rmf processing")
ret_array = apply_result_rmf_with_numba(copy_array, self.det_shape[0], self.det_shape[1], l_nchan)
self.chan_shadows = ret_array
[docs] def simu_global_noise_poisson(self, verbose=False):
if verbose:
for idx in range(self.sim_effect.chan_nb()):
if verbose:
print(f"nb photons in channel {idx}:", self.chan_shadows[idx].sum())
self.chan_shadows[idx] = np.random.poisson(self.chan_shadows[idx])
if verbose:
print(f"nb photons with noise in channel {idx}:", self.chan_shadows[idx].sum())
else:
self.chan_shadows = np.random.poisson(self.chan_shadows).astype(np.uint16)
#print(self.chan_shadows.dtype)
[docs] def print_nb_photon(self):
for idx in range(self.sim_effect.chan_nb()):
print(f"nb photons in channel {idx}:", self.chan_shadows[idx].sum())
#
# GETTER
#
[docs] def get_shadow_channel(self, idx):
return self.chan_shadows[idx]
[docs] def get_list_src(self):
return list(self.d_src.keys())
#
# EVENEMENT CREATION
#
def _create_shadows_int(self):
"""
"""
if np.issubdtype(self.chan_shadows.dtype, np.integer):
# No int conversion needed
return
else:
if S_verbose > 0:
print(f"must convert {self.chan_shadows.dtype} to int ")
self.chan_shadows_int = np.around(self.chan_shadows).astype(np.int64)
s1 = self.chan_shadows.sum()
print("before round ", self.chan_shadows.sum())
self.chan_shadows = (self.chan_shadows+0.5).astype(np.uint16)
print("after round ", self.chan_shadows.sum())
s2 = self.chan_shadows.sum()
print('relative diff', (s2-s1)*100/s1)
#print(f"new type {self.chan_shadows.dtype}")
def _create_evts_chan(self, idx_chan, verbose=False):
"""
"""
self.shadow_int_temp = self.chan_shadows[idx_chan]
#print(self.chan_shadows.dtype)
nb_count = self.shadow_int_temp.sum()
if verbose:
print(f"nb_count in channel {idx_chan}:",nb_count)
evts = EclairsCalEvtData(nb_count)
evts.tb_evts['time'] = np.random.random_sample(nb_count)*self.time_expose_s + self.t_start
evts.tb_evts['PHA'] = idx_chan
evts.tb_evts['PI'] = idx_chan
index = np.arange(0, self.shadow_int_temp.size)
a_evt = np.repeat(index, self.shadow_int_temp.ravel())
(evts.tb_evts['Y'], evts.tb_evts['Z']) = np.divmod(a_evt, self.shadow_int_temp.shape[0])
return evts
[docs] def create_evts(self, verbose=False):
"""
loop on energy channel create events
"""
self._evts = EclairsCalEvtData()
self._create_shadows_int()
for idx_chan in range(self.sim_effect.chan_nb()):
evts_chan = self._create_evts_chan(idx_chan, verbose)
self._evts.add_evt(evts_chan)
del evts_chan
self._evts.sort_with_time()
#
# PLOT
#
[docs] def plot_channel(self, idx, p_mes=""):
plt.figure()
plt.title(p_mes)
plt.imshow(yzegp_to_ijdet_array(self.chan_shadows[idx]))
plt.colorbar()
[docs] def plot_shadow(self, shadow, p_mes=""):
plt.figure()
plt.title(p_mes)
plt.imshow(yzegp_to_ijdet_array(shadow))
plt.colorbar()