'''
Created on 7 mars 2019
@author: Colley JM
'''
from builtins import issubclass
import logging
import numpy as np
import matplotlib.pyplot as plt
import astropy.time as astrotime
from astropy.io import fits
from scipy.stats import binom
import ecpi.simu.lib.sources as mdl_src
from ecpi.common.instru.model_effect import ECLAIRsDetectorEffect
from ecpi.common.mission.attitude import AttitudeECLAIRs
from ecpi.common.io.events import EclairsCalEvtData
from ecpi.common.instru.array_convention import yzegp_to_ijdet_array
from ecpi.common.mission.time import convert_svomref_seconds_in_mjd
from ecpi.simu.lib.context import GlobalContextSimulator
logger = logging.getLogger(__name__)
AVAILABLE_CXB_MODES = ["flat", "flat_moretti",
"shape", "shape_moretti",
"shape_moretti_earth_ref",
"shape_moretti_earth",
'shape_moretti_smart']
[docs]class SimuEclairsEnergyChannel(object):
"""ECLAIRs simulation with energy chanel during stable pointing
User Guide:
* defined first the context of simulation with context attribut :
** t_start, duration
** if necessary attitude for catalog swift and earth
** if necessary sat pos for earth
* add some sky sources
** with methods like add_srcxxxx()
** from catalog with method set_src_point_with_catalog_xxx()
* set energy band of simulation with
* simulate sky source in photon with simu_src()
* add some detector effect
** simu_arf_xxx
** simu_rmf (to check !!)
* add poisson noise
* result : image detector by channel in count
"""
def __init__(self, mdl_effect=None):
"""**Constructor**
:param mdl_effect: ECLAIRs detector default.
:type mdl_effect: ECLAIRsDetectorEffect
"""
# list of model of src to simulate
self.d_src = {}
self.l_src_selected = []
self.context = GlobalContextSimulator()
self.context.reset()
if mdl_effect is not None:
assert isinstance(mdl_effect, ECLAIRsDetectorEffect)
self.context._mdl_effect = mdl_effect
self.mdl_effect = self.context._mdl_effect
self.det_shape = self.context._sim_geom.inst.detect.ar_val.shape
self._range_chan = range(0, self.mdl_effect.chan_nb())
self.chan_shadows = np.zeros((self.mdl_effect.chan_nb(),
self.det_shape[0], self.det_shape[1]), dtype=np.float32)
[docs] def raz_chan_shadows(self):
"""raz all channel with float64 type
"""
self.chan_shadows = np.zeros_like(self.chan_shadows, dtype=np.float64)
[docs] def set_range_channel(self, start_chan, end_chan):
"""Set energy range [start_chan, end_chan] of simulation
:param start_chan:
:type start_chan: integer
:param end_chan:
:type end_chan: integer
:return: True if check start_chan, end_chan are ok
:rtype: Boolean
"""
self._range_chan = range(1, 1)
if start_chan > end_chan:
logger.error(f"start_chan > end_chan !, start_chan={start_chan}, end_chan={end_chan}")
return False
if start_chan < 0:
logger.error(f"start_chan < 0 !, start_chan={start_chan}")
return False
max_chan = self.mdl_effect.chan_nb() - 1
if end_chan > max_chan:
logger.error(f"end_chan > max channel !, end_chan={end_chan}, max channel={max_chan}")
return False
self._range_chan = range(start_chan, end_chan + 1)
return True
[docs] def set_chan_shadows(self, chan_shadows_):
"""Setter for chan_shadows class attribute.
:param chan_shadows_: shadowgrams per energy channel
:type chan_shadows_: 3D array (channel, shadowgram)
"""
self.chan_shadows = chan_shadows_
[docs] def add_src_point_with_catalog_swift(self, use_irf):
"""
reset all source l_ponc_src and init with swift catalog
:param use_irf: whether or not to use IRF.
:type use_irf: bool
"""
swift = mdl_src.ListModelPointSrcFromCatalog(self.mdl_effect, use_irf)
self.d_src.update(swift.init_with_cat_swift_2012(self.attitute))
self.cat = swift.cat_fov
[docs] def set_src_point_with_custom_catalog(self, path_cat, use_irf):
"""
reset all source l_ponc_src and init with swift catalog
:param path_cat: path to custom source catalog.
:type path_cat: str
:param use_irf: whether or not to use IRF.
:type use_irf: bool
"""
swift = mdl_src.ListModelPointSrcFromCatalog(self.mdl_effect, use_irf)
self.d_src.update(swift.init_with_custom_cat(path_cat))
[docs] def add_src_point_with_catalog_fov(self, cat_fov, use_irf=False):
"""
add all sources from cat_fov
cat_fov is an user catalog can by used for debug, analysis and test ..
cat_fov must have intensity array shape
:param cat_fov: sources catalog with energy
:type cat_fov: CatalogFovBasic
:param use_irf: whether or not to use IRF.
:type use_irf: bool
"""
user_cat = mdl_src.ListModelPointSrcFromCatalog(self.mdl_effect, use_irf)
self.d_src.update(user_cat.init_with_cat_fov(cat_fov))
[docs] def add_src_point_with_catalog_astro(self, cat_astro, use_irf=False):
"""
add all sources in cat_astro that are in FOV
cat_astro is an user catalog can by used for debug, analysis and test ..
cat_fov must have intensity array shape
:param cat_astro: sidx_chanources catalog with energy
:type cat_astro: CatalogAstroXXXX
:param use_irf: whether or not to use IRF.
:type use_irf: bool
"""
user_cat = mdl_src.ListModelPointSrcFromCatalog(self.mdl_effect, use_irf)
self.d_src.update(user_cat.init_with_cat_astro(cat_astro))
[docs] def add_src(self, model_src_obj):
"""
add a source to the list
:param model_src_obj: initialized source object
:type model_src_obj: mdl_src.ModelSrcInterface
:return: status of add
:rtype: boolean
"""
if not issubclass(type(model_src_obj), mdl_src.ModelSrcInterface):
logger.error(f'{type(model_src_obj)} is not a subclass of ModelSrcInterface')
return False
self.d_src[model_src_obj.name] = model_src_obj
return True
[docs] def del_src(self, name_src):
if name_src in self.d_src:
self.d_src.pop(name_src)
else:
logger.error(f"{name_src} isn't in source dictionary")
[docs] def del_src_all(self):
self.d_src.clear()
[docs] def add_src_cxb(self, cxb_mode="shape_moretti"):
"""
Add CXB source depending on model name cxb_mode.
.. note:: to set properly the levels in 'flat' and 'shape' modes
with specific level, do
flat = ModelFlat(my_level)
add_src(flat)
:param cxb_mode: CXB model to be added. Default is "no_cxb".
Possible choices are: flat", "flat_moretti",
"shape", "shape_moretti", "shape_moretti_earth".
:type cxb_mode: str
:return: status of add
:rtype: boolean
"""
assert cxb_mode in AVAILABLE_CXB_MODES
cxb_src = None
if cxb_mode is "flat":
cxb_src = self.add_src(mdl_src.ModelCxbFlat())
if cxb_mode is "flat_moretti":
cxb_src = self.add_src(mdl_src.ModelCxbFlatBasedEnergy())
if cxb_mode is "shape":
cxb_src = self.add_src(mdl_src.ModelCxbShapeBased())
if cxb_mode is "shape_moretti_smart":
return self.add_src(mdl_src.ModelMeanCXBMorettiSmart())
if cxb_mode is "shape_moretti":
return self.add_src(mdl_src.ModelCxbShapeBasedEnergy())
if cxb_mode is "shape_moretti_earth_ref":
return self.add_src(
mdl_src.ModelMeanCXBShapeBasedEnergyWithEarthCalculator())
if cxb_mode is "shape_moretti_earth":
return self.add_src(mdl_src.ModelEarthMoretti())
else:
logger.error(f'No model for "{cxb_mode}".')
return None
[docs] def add_src_astroparticule(self):
"""Add flat internal noise from particles on the detector.
:return: status of add
:rtype: boolean
"""
return self.add_src(mdl_src.ModelInternalNoise())
#
# SIMULATION source and instrument effects
#
[docs] def simu_src(self, idx_chan=None, l_src=None):
"""simulation of sources
:param idx_chan: channel index to be simulated. Default=None for all channel
:type idx_chan: int >0
:param l_src: list of sources to be simulated. Default=None for all initiated sources
:type l_src: list(string)
"""
if l_src:
self.l_src_selected = l_src
else:
self.l_src_selected = list(self.d_src.keys())
logger.info(f"simulation of {len(self.l_src_selected)}\
sources during {self.context.duration}s")
if idx_chan:
self._simu_src_one_channel(idx_chan)
else:
self._simu_src_all_channel()
def _simu_src_one_channel(self, idx_chan, verbose=False):
"""Simulation of all sources added by method add_src_xxx() or init_xxx
for a given energy channel.
.. note:: SQ underlines an unused variable: to be ignored.
:param idx_chan: channel index to be simulated.
:type idx_chan: int >0
"""
self.context.set_idx_chan(idx_chan)
self.chan_shadows[idx_chan] = 0
simu_shadow = self.chan_shadows[idx_chan]
for _, m_src in enumerate(self.l_src_selected):
mdl = self.d_src[m_src]
shadow_src = mdl.get_model()
simu_shadow += shadow_src
if verbose:
plot_shadow(shadow_src, mdl.name)
def _simu_src_all_channel(self):
"""Simulation of all sources added by method add_src_xxx() or init_xxx
over all the energy channels.
"""
logger.info(f"simulation of all {self._range_chan} channel(s)")
for idx in self._range_chan:
self._simu_src_one_channel(idx)
[docs] def simu_arf_mean(self):
"""
This method introduces real (ie not integer) number of photon
can be replaced by binomial random processing.
Add poisson noise at end
"""
# ## original implementation
arf_percent = self.mdl_effect.arf_percent()
# use broadcast numpy feature
# logger.debug(arf_percent)
self.chan_shadows *= arf_percent[:, np.newaxis, np.newaxis]
logger.debug('Use ARF in percent in simu_arf_mean')
[docs] def simu_arf_binomial(self):
"""
apply arf with binomial process to conserve integer value
"""
arf_percent = self.mdl_effect.arf_percent()
new_shwadows = np.empty_like(self.chan_shadows, dtype=np.uint16)
for idx in self._range_chan:
new_shwadows[idx] = binom.rvs(self.chan_shadows[idx], arf_percent[idx])
self.chan_shadows = new_shwadows
[docs] def simu_rmf(self):
"""RMF random simulation
input: attribut array shadowgram is incident photon by channel
ouput: attribut array shadowgram becomes detected photon by channel
"""
self._create_shadows_int()
new_a_shadows = np.zeros_like(self.chan_shadows)
for idx in self._range_chan:
if idx % (self.mdl_effect.chan_nb() / 10) == 0:
logger.info(f'rmf canal: {idx}')
self.shadow_int_temp = self.chan_shadows[idx]
nb_count = self.shadow_int_temp.sum()
det_chan = self.mdl_effect.simu_rmf_channel(idx, nb_count)
a_axis0 = np.arange(self.det_shape[0])
a_axis1 = np.arange(self.det_shape[1])
idx_new_chan = 0
# incident channel to detected channel
for li in np.nditer(a_axis0):
for lj in np.nditer(a_axis1):
nb_photon = self.chan_shadows[idx, li, lj]
for lc in np.arange(nb_photon):
logger.debug(f'{det_chan[idx_new_chan+lc]}, {li}, {lj}')
new_a_shadows[det_chan[idx_new_chan + lc], li, lj] += 1
idx_new_chan += nb_photon
self.chan_shadows = new_a_shadows.astype(np.float64)
[docs] def simu_global_noise_poisson(self):
"""Simulation of random Poisson noise for each shadowgram (ie. each energy channel)
"""
self.chan_shadows = np.random.poisson(self.chan_shadows)
logger.debug(f'type of chan_shadows: {self.chan_shadows.dtype}')
[docs] def print_nb_photon(self):
for idx in self._range_chan:
print(f"number of photons in channel {idx}:", self.chan_shadows[idx].sum())
#
# GETTER
#
[docs] def get_shadow_channel(self, idx):
return self.chan_shadows[idx]
[docs] def get_list_src(self):
return list(self.d_src.keys())
#
# EVENEMENT CREATION
#
def _create_shadows_int(self):
if np.issubdtype(self.chan_shadows.dtype, np.integer):
# no integer casting is needed
return
else:
logger.warning(f"must convert {self.chan_shadows.dtype} to int ")
s1 = self.chan_shadows.sum()
logger.debug(f"before round {self.chan_shadows.sum()}")
self.chan_shadows = (self.chan_shadows + 0.5).astype(np.uint16)
logger.debug(f"after round {self.chan_shadows.sum()}")
s2 = self.chan_shadows.sum()
logger.debug(f"relative difference: {(s2-s1)*100/s1}")
def _create_evts_chan(self, idx_chan):
"""Create event for a single energy channel.
:param idx_chan: channel index to be simulated.
:type idx_chan: int >0
"""
self.shadow_int_temp = self.chan_shadows[idx_chan]
nb_count = self.shadow_int_temp.sum()
logger.info(f"number of counts in channel {idx_chan}: {nb_count}")
evts = EclairsCalEvtData(nb_count)
rd_nb_count = np.random.random_sample(nb_count)
evts.tb_evts['TIME'] = rd_nb_count * self.context.duration + self.context.t_start
evts.tb_evts['PHA'] = idx_chan
# TODO: manage PI like evts.tb_evts['PI'] = idx_chan
index = np.arange(0, self.shadow_int_temp.size)
a_evt = np.repeat(index, self.shadow_int_temp.ravel())
(evts.tb_evts['Y'], evts.tb_evts['Z']) = np.divmod(a_evt, self.shadow_int_temp.shape[0])
evts.tb_evts['Y'] = np.uint8(evts.tb_evts['Y'])
evts.tb_evts['Z'] = np.uint8(evts.tb_evts['Z'])
return evts
[docs] def create_evts(self):
"""Create events for every energy channel.
"""
self._evts = EclairsCalEvtData()
self._create_shadows_int()
for idx_chan in self._range_chan:
evts_chan = self._create_evts_chan(idx_chan)
# TODO: optimization : add_evt() can be very slow with huge simulation
# => create array with good size and fill it !
self._evts.add_evt(evts_chan)
del evts_chan
self._evts.sort_with_time()
[docs] def write_events(self, working_directory, observation):
"""Write ECL-EVT-SEC .fits files.
:param working_dir: PATH/directory where to save files
:type working_dir: str
:param observation: observation parameters
:type observation: EclairsObservation
"""
t_start = astrotime.Time(convert_svomref_seconds_in_mjd(observation.start_time),
format='mjd', scale='tt')
t_start = t_start.isot[:-4].replace('-', '').replace(':', '')
name = working_directory + f'/in/ECL-EVT-SEC-{t_start}.fits'
self._evts.write_L1(name, 'ECPI simulator')
[docs] def write_shadowgrams(self, working_directory, simulation_id, limit_shadowgrams=50):
"""Write shadowgrams in .fits files.
:param working_dir: PATH/directory where to save files
:type working_dir: str
:param simulation_id: source ID
:type simulation_id: int
"""
chan_num = self._range_chan[0]
if len(self._range_chan) > limit_shadowgrams:
logger.error(f'cannot create more than {limit_shadowgrams} !')
logger.info('simulator creates as many shadowgrams as energy channels.')
for shadowgram in self.chan_shadows[self._range_chan]:
hdu = fits.PrimaryHDU(yzegp_to_ijdet_array(shadowgram))
name_file = f'/in_shadow/shadowgram_{simulation_id}_{chan_num}.fits'
hdu.writeto(working_directory + name_file)
chan_num += 1
[docs] def plot_channel(self, idx, p_mes=""): # pragma: no cover
plt.figure()
plt.title(p_mes)
plt.imshow(yzegp_to_ijdet_array(self.chan_shadows[idx]))
plt.colorbar()
[docs]def plot_shadow(shadow, p_mes=""): # pragma: no cover
"""Simple plot shadowgram
"""
plt.figure()
plt.title(p_mes)
plt.imshow(yzegp_to_ijdet_array(shadow))
plt.colorbar()