'''
Created on 7 mars 2019
@author: Colley JM
'''
from builtins import issubclass
import logging
import numpy as np
import matplotlib.pyplot as plt
import astropy.time as astrotime
from astropy.io import fits
from scipy.stats import binom
import ecpi.simu.lib.sources as mdl_src
from ecpi.common.instru.model_effect import ECLAIRsDetectorEffect, \
ECLAIRsDetectorEffectDefault
from ecpi.common.mission.attitude import AttitudeECLAIRs
from ecpi.common.io.events import EclairsCalEvtData
from ecpi.common.instru.array_convention import yzegp_to_ijdet_array
from ecpi.common.mission.time import convert_svomref_seconds_in_mjd
logger = logging.getLogger(__name__)
[docs]class SimuEclairsEnergyChannel(object):
"""Main class to simalute ECLAIRs instrument with energy chanel
"""
def __init__(self, sim_effect=None):
"""**Constructor**
:param sim_effect: ECLAIRs detector default.
:type sim_effect: ECLAIRsDetectorEffect
"""
# list of model of src to simulate
self.d_src = {}
self.l_src_selected = []
if sim_effect is None:
self.sim_effect = ECLAIRsDetectorEffectDefault()
else:
self.sim_effect = sim_effect
assert isinstance(self.sim_effect, ECLAIRsDetectorEffect)
# use same simu effect for source model class
dummy_model_src = mdl_src.ModelSrcInterface() # to init geom and effect
mdl_src.ModelSrcInterface.set_sim_dpix(self.sim_effect)
self.det_shape = mdl_src.ModelSrcInterface.static_sim_geom.inst.detect.ar_val.shape
self._range_chan = range(0, self.sim_effect.chan_nb())
self.chan_shadows = np.zeros((self.sim_effect.chan_nb(),
self.det_shape[0], self.det_shape[1]), dtype=np.float32)
[docs] def raz_chan_shadows(self):
"""raz all channel with float64 type
"""
self.chan_shadows = np.zeros_like(self.chan_shadows, dtype=np.float64)
[docs] def set_range_channel(self, start_chan, end_chan):
"""Set energy range [start_chan, end_chan] of simulation
:param start_chan:
:type start_chan: integer
:param end_chan:
:type end_chan: integer
:return: True if check start_chan, end_chan are ok
:rtype: Boolean
"""
self._range_chan = range(1, 1)
if start_chan > end_chan:
logger.error(f"start_chan > end_chan !, start_chan={start_chan}, end_chan={end_chan}")
return False
if start_chan < 0:
logger.error(f"start_chan < 0 !, start_chan={start_chan}")
return False
max_chan = self.sim_effect.chan_nb() - 1
if end_chan > max_chan:
logger.error(f"end_chan > max channel !, end_chan={end_chan}, max channel={max_chan}")
return False
self._range_chan = range(start_chan, end_chan + 1)
return True
[docs] def set_chan_shadows(self, chan_shadows_):
"""Setter for chan_shadows class attribute.
:param chan_shadows_: shadowgrams per energy channel
:type chan_shadows_: 3D array (channel, shadowgram)
"""
self.chan_shadows = chan_shadows_
[docs] def set_src_point_with_catalog_swift(self, att_eclairs, use_irf):
"""
reset all source l_ponc_src and init with swift catalog
:param att_eclairs: ECLAIRs attitude
:type att_eclairs: common.mission.attitude.AttitudeECLAIRs
:param use_irf: whether or not to use IRF.
:type use_irf: bool
"""
assert isinstance(att_eclairs, AttitudeECLAIRs)
self.attitute = att_eclairs
swift = mdl_src.ListModelPointSrcFromCatalog(self.sim_effect, use_irf)
self.d_src.update(swift.init_with_cat_swift_2012(self.attitute))
self.cat = swift.cat_fov
[docs] def set_src_point_with_catalog_fov(self, cat_fov, use_irf=False):
"""
reset all source l_ponc_src and init with cat fov
cat_fov is an user catalog can by used for debug, analysis and test ..
cat_fov must have intensity array shape
:param cat_fov: sidx_chanources catalog with energy
:type cat_fov: CatalogFovBasic
:param use_irf: whether or not to use IRF.
:type use_irf: bool
"""
user_cat = mdl_src.ListModelPointSrcFromCatalog(self.sim_effect, use_irf)
self.d_src.update(user_cat.init_with_cat_fov(cat_fov))
[docs] def add_src(self, model_src_obj):
"""
add a source to the list
:param model_src_obj: initialized source object
:type model_src_obj: mdl_src.ModelSrcInterface
:return: status of add
:rtype: boolean
"""
if not issubclass(type(model_src_obj), mdl_src.ModelSrcInterface):
logger.error(f'{type(model_src_obj)} is not a subclass of ModelSrcInterface')
return False
self.d_src[model_src_obj.name] = model_src_obj
return True
[docs] def add_src_cxb(self):
"""Add energy dependent CXB source based on the pixel solid angle shape.
:return: status of add
:rtype: boolean"""
return self.add_src(mdl_src.ModelCxbShapeBasedEnergy())
[docs] def add_src_cxb_lvl(self, level=1):
"""Add CXB source based on the pixel solid angle shape.
:param level: offset level for the CXB.
:type level: float [ph/cm2/s]
:return: status of add
:rtype: boolean
"""
return self.add_src(mdl_src.ModelCxbShapeBased(level))
[docs] def add_src_cxb_flat(self, level=1):
"""Add flat CXB source.
:param level: offset level for the CXB.
:type level: float [ph/cm2/s]
:return: status of add
:rtype: boolean
"""
return self.add_src(mdl_src.ModelCxbFlat(level))
[docs] def add_src_cxb_flat_moretti_spectrum(self):
"""Add flat CXB source with Moretti Spectrum
:param level: offset level for the CXB.
:type level: float [ph/cm2/s]
:return: status of add
:rtype: boolean
"""
return self.add_src(mdl_src.ModelCxbFlatBasedEnergy())
[docs] def add_src_astroparticule(self):
"""Add flat internal noise from particles on the detector.
:return: status of add
:rtype: boolean
"""
return self.add_src(mdl_src.ModelInternalNoise())
[docs] def set_context_simu(self, duration, t_start=0):
"""set the context parameters for the simulation
:param duration: simulation time in s
:type duration: float
:param t_start: start time of the simulated observation in s from mjdref
:type t_start: float
"""
self.time_expose_s = duration
mdl_src.ModelSrcInterface.static_duration = duration
self.t_start = t_start
#
# SIMULATION source and instrument effects
#
[docs] def simu_src(self, idx_chan=None, l_src=None):
"""simulation of sources
:param idx_chan: channel index to be simulated. Default=None for all channel
:type idx_chan: int >0
:param l_src: list of sources to be simulated. Default=None for all initiated sources
:type l_src: list(string)
"""
if l_src:
self.l_src_selected = l_src
else:
self.l_src_selected = list(self.d_src.keys())
logger.info(f"simulation of {len(self.l_src_selected)}\
sources during {self.time_expose_s}s")
if idx_chan:
self._simu_src_one_channel(idx_chan)
else:
self._simu_src_all_channel()
def _simu_src_one_channel(self, idx_chan, verbose=False):
"""Simulation of all sources added by method add_src_xxx() or init_xxx
for a given energy channel.
Sonarqube underlines an unused variable: to be ignored.
:param idx_chan: channel index to be simulated.
:type idx_chan: int >0
"""
mdl_src.ModelSrcInterface.set_idx_channel(idx_chan)
self.chan_shadows[idx_chan] = 0
simu_shadow = self.chan_shadows[idx_chan]
for _, m_src in enumerate(self.l_src_selected):
mdl = self.d_src[m_src]
shadow_src = mdl.get_model()
simu_shadow += shadow_src
if verbose:
plot_shadow(shadow_src, mdl.name)
def _simu_src_all_channel(self):
"""Simulation of all sources added by method add_src_xxx() or init_xxx
over all the energy channels.
"""
logger.info(f"simulation of all {self._range_chan} channel(s)")
for idx in self._range_chan:
logger.debug(f'simulation channel index: {idx}')
self._simu_src_one_channel(idx)
[docs] def simu_arf_mean(self):
"""
This method introduces real (ie not integer) number of photon
can be replaced by binomial random processing.
Add poisson noise at end
"""
# ## original implementation
arf_percent = self.sim_effect.arf_percent()
# use broadcast numpy feature
# logger.debug(arf_percent)
self.chan_shadows *= arf_percent[:, np.newaxis, np.newaxis]
logger.debug('Use ARF in percent in simu_arf_mean')
[docs] def simu_arf_binomial(self):
"""
apply arf with binomial process to conserve integer value
"""
arf_percent = self.sim_effect.arf_percent()
new_shwadows = np.empty_like(self.chan_shadows, dtype=np.uint16)
for idx in self._range_chan:
new_shwadows[idx] = binom.rvs(self.chan_shadows[idx], arf_percent[idx])
self.chan_shadows = new_shwadows
[docs] def simu_rmf(self):
"""RMF random simulation
input: attribut array shadowgram is incident photon by channel
ouput: attribut array shadowgram becomes detected photon by channel
"""
self._create_shadows_int()
new_a_shadows = np.zeros_like(self.chan_shadows)
for idx in self._range_chan:
if idx % (self.sim_effect.chan_nb() / 10) == 0:
logger.info(f'rmf canal: {idx}')
self.shadow_int_temp = self.chan_shadows[idx]
nb_count = self.shadow_int_temp.sum()
det_chan = self.sim_effect.simu_rmf_channel(idx, nb_count)
a_axis0 = np.arange(self.det_shape[0])
a_axis1 = np.arange(self.det_shape[1])
idx_new_chan = 0
# incident channel to detected channel
for li in np.nditer(a_axis0):
for lj in np.nditer(a_axis1):
nb_photon = self.chan_shadows[idx, li, lj]
for lc in np.arange(nb_photon):
logger.debug(f'{det_chan[idx_new_chan+lc]}, {li}, {lj}')
new_a_shadows[det_chan[idx_new_chan + lc], li, lj] += 1
idx_new_chan += nb_photon
self.chan_shadows = new_a_shadows.astype(np.float64)
[docs] def simu_global_noise_poisson(self):
"""Simulation of random Poisson noise for each shadowgram (ie. each energy channel)
"""
self.chan_shadows = np.random.poisson(self.chan_shadows)
logger.debug(f'type of chan_shadows: {self.chan_shadows.dtype}')
[docs] def print_nb_photon(self):
for idx in self._range_chan:
print(f"number of photons in channel {idx}:", self.chan_shadows[idx].sum())
#
# GETTER
#
[docs] def get_shadow_channel(self, idx):
return self.chan_shadows[idx]
[docs] def get_list_src(self):
return list(self.d_src.keys())
#
# EVENEMENT CREATION
#
def _create_shadows_int(self):
if np.issubdtype(self.chan_shadows.dtype, np.integer):
# no integer casting is needed
return
else:
logger.warning(f"must convert {self.chan_shadows.dtype} to int ")
s1 = self.chan_shadows.sum()
logger.debug(f"before round {self.chan_shadows.sum()}")
self.chan_shadows = (self.chan_shadows + 0.5).astype(np.uint16)
logger.debug(f"after round {self.chan_shadows.sum()}")
s2 = self.chan_shadows.sum()
logger.debug(f"relative difference: {(s2-s1)*100/s1}")
def _create_evts_chan(self, idx_chan):
"""Create event for a single energy channel.
:param idx_chan: channel index to be simulated.
:type idx_chan: int >0
"""
self.shadow_int_temp = self.chan_shadows[idx_chan]
nb_count = self.shadow_int_temp.sum()
logger.info(f"number of counts in channel {idx_chan}: {nb_count}")
evts = EclairsCalEvtData(nb_count)
evts.tb_evts['TIME'] = np.random.random_sample(nb_count) * self.time_expose_s + self.t_start
evts.tb_evts['PHA'] = idx_chan
#TODO: manage PI like evts.tb_evts['PI'] = idx_chan
index = np.arange(0, self.shadow_int_temp.size)
a_evt = np.repeat(index, self.shadow_int_temp.ravel())
(evts.tb_evts['Y'], evts.tb_evts['Z']) = np.divmod(a_evt, self.shadow_int_temp.shape[0])
evts.tb_evts['Y'] = np.uint8(evts.tb_evts['Y'])
evts.tb_evts['Z'] = np.uint8(evts.tb_evts['Z'])
return evts
[docs] def create_evts(self):
"""Create events for every energy channel.
"""
self._evts = EclairsCalEvtData()
self._create_shadows_int()
for idx_chan in self._range_chan:
evts_chan = self._create_evts_chan(idx_chan)
# TODO: optimization : add_evt() can be very slow with huge simulation => create array with good size and fill it !
self._evts.add_evt(evts_chan)
del evts_chan
self._evts.sort_with_time()
[docs] def write_events(self, working_directory, observation):
"""Write ECL-EVT-SEC .fits files.
:param working_dir: PATH/directory where to save files
:type working_dir: str
:param observation: observation parameters
:type observation: EclairsObservation
"""
t_start = astrotime.Time(convert_svomref_seconds_in_mjd(observation.start_time),
format='mjd', scale='tt')
t_start = t_start.isot[:-4].replace('-', '').replace(':', '')
name = working_directory + f'/in/ECL-EVT-SEC-{t_start}.fits'
self._evts.write_L1(name, 'ECPI simulator')
[docs] def write_shadowgrams(self, working_directory, simulation_id, limit_shadowgrams=50):
"""Write shadowgrams in .fits files.
:param working_dir: PATH/directory where to save files
:type working_dir: str
:param simulation_id: source ID
:type simulation_id: int
"""
chan_num = self._range_chan[0]
if len(self._range_chan) > limit_shadowgrams:
logger.error(f'cannot create more than {limit_shadowgrams} !')
logger.info('simulator creates as many shadowgrams as energy channels.')
for shadowgram in self.chan_shadows[self._range_chan]:
hdu = fits.PrimaryHDU(yzegp_to_ijdet_array(shadowgram))
name_file = f'/in_shadow/shadowgram_{simulation_id}_{chan_num}.fits'
hdu.writeto(working_directory + name_file)
chan_num += 1
[docs] def plot_channel(self, idx, p_mes=""): # pragma: no cover
plt.figure()
plt.title(p_mes)
plt.imshow(yzegp_to_ijdet_array(self.chan_shadows[idx]))
plt.colorbar()
[docs]def plot_shadow(shadow, p_mes=""): # pragma: no cover
"""Simple plot shadowgram
"""
plt.figure()
plt.title(p_mes)
plt.imshow(yzegp_to_ijdet_array(shadow))
plt.colorbar()