"""
High-level class to simulate the sources of the sky seen by
the ECLAIRs instrument during **a stable pointing** period.
.. warning::
**SimuEclairsEnergyChannel is not a simulation script**. It is a library allowing
to create different types of simulation according to your needs.
SimuEclairsEnergyChannel features:
----------------------------------
The library simulate the number of photon/count in ECLAIRs detector in
each energy channel, so the data structure of simulation is a 3D array
nb_channelx80x80. This array is stored in :py:attr:`.chan_shadows` attribut.
Library can take in account:
* Energy channel
* all by default
* only one with :py:meth:`.simu_src` with parameter **idx_chan**
* a set [min, max] with :py:meth:`.set_range_channel`
* Sky sources model
* punctual source from
* catalog (SWIFT, user in equatorial coordinate, user in FOV coordinate)
* define by user in FOV
* include projection effect (with IRF or only "cos theta" term)
.. note::
All method for punctual sources begin by **add_src_point_xxx()**
* Cosmic X Background (CXB) with or without earth in field of view
* simulation for a selection of sources is possible with parameter **l_src** of the
method :py:meth:`.simu_src`
.. note::
If you want create a new type of sky source implement it as a derivation
of :py:class:`.AbstractModelSrc` and add to simulation with
:py:meth:`.add_src` method.
* Noise
* only Poisson noise with :py:meth:`.simu_noise_poisson`
* Detector effects
* ARF, with quantum efficiency formulation
* after Poisson noise with :py:meth:`.simu_arf_binomial`
* before Poisson noise with :py:meth:`.simu_arf_mean`
* Attitude and position of satellite
* to select source in field of view from astronomical catalog
* to take into account the occultation of the CXB by the earth
Outputs available:
* detector image on each energy channel in count/photon with
:py:attr:`.chan_shadows` attribut
* file events with :py:meth:`.write_events`
* file detector image with :py:meth:`.write_shadowgrams`
* plot detector image by energy channel :py:func:`.plot_shadow`
Context of simulation:
----------------------
Position, attitude, time, data calibration detector, geometry instrument, ...
all the information common and necessary for simulation models is called
the **context**, stored in :py:attr:`.context` attribut. It's in fact a
global instance of :py:class:`.ContextSimulator` class, share with design
pattern singleton.
.. warning::
If a model change the context, it is modified for all other models.
"""
from builtins import issubclass
import os.path as osp
import logging
import numpy as np
import matplotlib.pyplot as plt
import astropy.time as astrotime
from astropy.io import fits
from scipy.stats import binom
import ecpi.simu.lib.sources as mdl_src
from ecpi.common.instru.model_effect import ECLAIRsDetectorEffect
from ecpi.common.io.events import EclairsCalEvtData
from ecpi.common.instru.array_convention import yzegp_to_ijdet_array
from ecpi.common.mission.time import convert_svomref_seconds_in_mjd
from ecpi.simu.lib.context import GlobalContextSimulator
s_logger = logging.getLogger(__name__)
AVAILABLE_CXB_MODES = ["flat", "flat_moretti",
"shape", "shape_moretti",
"shape_moretti_earth_ref",
"shape_moretti_earth",
'shape_moretti_smart']
[docs]class SimuEclairsEnergyChannel(object):
"""ECLAIRs simulation by energy channel
"""
def __init__(self, mdl_effect=None):
"""
:param mdl_effect: ECLAIRs detector default.
:type mdl_effect: ECLAIRsDetectorEffect
"""
# list of model of src to simulate
self.d_src = {}
self.l_src_selected = []
self.context = GlobalContextSimulator()
self.context.reset_observer()
if mdl_effect is not None:
assert isinstance(mdl_effect, ECLAIRsDetectorEffect)
self.context._mdl_effect = mdl_effect
else:
self.context.reset_detec_effect()
self.mdl_effect = self.context._mdl_effect
self.det_shape = self.context._sim_geom.inst.detect.ar_val.shape
self._range_chan = range(self.mdl_effect._channel[0], self.mdl_effect._channel[-1])
self._range_idx_chan = range(0, self.mdl_effect.chan_nb())
self._evts = None
self.chan_shadows = np.zeros((self.mdl_effect.chan_nb(),
self.det_shape[0], self.det_shape[1]), dtype=np.float32)
s_logger.info("Create SimuEclairsEnergyChannel object")
[docs] def reset_chan_shadows(self):
"""Reset all image detector at zero
"""
self.chan_shadows = np.zeros_like(self.chan_shadows, dtype=np.float64)
[docs] def set_range_channel(self, start_chan, end_chan):
"""Set energy range [start_chan, end_chan] of simulation
:param integer start_chan: first channel in range
:param integer end_chan: last channel in range
:return: True if start_chan, end_chan are acceptable
:rtype: bool
"""
self._range_chan = range(1, 1)
if start_chan > end_chan:
s_logger.error(f"start_chan > end_chan !, start_chan={start_chan}, end_chan={end_chan}")
return False
offset = self.mdl_effect._channel[0]
if start_chan < offset:
s_logger.error(f"start_chan < 0 !, start_chan={start_chan}")
return False
max_chan = self.mdl_effect.chan_nb() + offset - 1
if end_chan > max_chan:
s_logger.error(f"end_chan > max channel !, end_chan={end_chan}, max channel={max_chan}")
return False
self._range_chan = range(start_chan, end_chan + 1)
self._range_idx_chan = range(start_chan - offset, end_chan - offset)
return True
[docs] def set_chan_shadows(self, chan_shadows):
"""Set all image detector by chan_shadows
:param chan_shadows: image detector by channel
:type chan_shadows: array 3D (nb channel, image detector)
"""
# TODO: add test on dimension chan_shadows_, compatible with self.chan_shadows ?
self.chan_shadows = chan_shadows
[docs] def add_src_point_with_catalog_swift(self, use_irf):
"""
Add all sources in catalog SWIFT that are in current
field of view of ECLAIRs.
If you not used IRF a simple model of geometric effect (cos theta)
is apply.
:param use_irf: use file calibration IRF for geometric effect
:type use_irf: bool
"""
swift = mdl_src.ListModelPointSrcFromCatalog(use_irf)
self.d_src.update(swift.init_with_cat_swift_2012())
[docs] def add_src_point_with_custom_catalog(self, path_cat, use_irf):
"""
Add all sources from FITS catalog that are in current
field of view of ECLAIRs.
If you not used IRF a simple model of geometric effect (cos theta)
is apply.
:param path_cat: path to custom source catalog
:param use_irf: use file calibration IRF for geometric effect
:type use_irf: bool
"""
catalog = mdl_src.ListModelPointSrcFromCatalog(use_irf)
self.d_src.update(catalog.init_with_custom_cat(path_cat))
[docs] def add_src_point_with_catalog_fov(self, cat_fov, use_irf=False):
"""
Add all sources in catalog cat_astro in field of view (direction,
elevation) angle that are in current field of view to the simulation.
If you not used IRFa simple model of geometric effect (cos theta)
is apply.
:param cat_fov: sources catalog with energy
:type cat_fov: subclass of :py:class:`.CatalogFovBasic`
:param use_irf: use file calibration IRF for geometric effect
:type use_irf: bool
"""
user_cat = mdl_src.ListModelPointSrcFromCatalog(use_irf)
self.d_src.update(user_cat.init_with_cat_fov(cat_fov))
[docs] def add_src_point_with_catalog_astro(self, cat_astro, use_irf=False):
"""
Add all sources in catalog cat_astro in J2000 that are in current
field of view of ECLAIRs.
If you not used IRF a simple model of geometric effect (cos theta)
is apply.
:param cat_astro: catalog in equatorial coordinate J2000
:type cat_astro: subclass of :py:class:`.CatalogAstroWithEnergySpecSampling`
:param use_irf: use file calibration IRF for geometric effect
:type use_irf: bool
"""
user_cat = mdl_src.ListModelPointSrcFromCatalog(use_irf)
self.d_src.update(user_cat.init_with_cat_astro(cat_astro))
[docs] def add_src(self, src_obj):
"""
Add a sky source to the simulation
:param src_obj: object
:type src_obj: subclass of :py:class:`.AbstractModelSrc`
:return: status of add
:rtype: boolean
"""
if not issubclass(type(src_obj), mdl_src.AbstractModelSrc):
s_logger.error(f'{type(src_obj)} is not a subclass of ModelSrcInterface')
return False
self.d_src[src_obj.name] = src_obj
return True
[docs] def del_src(self, name_src):
"""
Delete the source with name name_src from list of sky source.
:param name_src: name in current sky source list
:type name_src: string
"""
if name_src in self.d_src:
self.d_src.pop(name_src)
else:
s_logger.error(f"{name_src} isn't in source dictionary")
[docs] def del_src_all(self):
"""
Delete all sky source
"""
self.d_src.clear()
[docs] def add_src_cxb(self, cxb_mode="shape_moretti"):
"""
Add CXB source with keyword available in
:py:data:`AVAILABLE_CXB_MODES` list
:param cxb_mode: keyword defining the model of CXB
:type cxb_mode: string
:return: status of add
:rtype: boolean
"""
if cxb_mode not in AVAILABLE_CXB_MODES:
s_logger.error(f'No model for "{cxb_mode}".')
return False
ret_add = False
if cxb_mode == "flat":
ret_add = self.add_src(mdl_src.ModelCxbFlat())
if cxb_mode == "flat_moretti":
ret_add = self.add_src(mdl_src.ModelCxbFlatBasedEnergy())
if cxb_mode == "shape":
ret_add = self.add_src(mdl_src.ModelCxbShapeBased())
if cxb_mode == "shape_moretti":
ret_add = self.add_src(mdl_src.ModelCxbShapeBasedEnergy())
if cxb_mode == "shape_moretti_earth_ref":
ret_add = self.add_src(
mdl_src.ModelMeanCXBShapeBasedEnergyWithEarthCalculator())
if cxb_mode == "shape_moretti_earth" or cxb_mode == "shape_moretti_smart":
ret_add = self.add_src(mdl_src.ModelEarthMoretti())
if not ret_add:
s_logger.error(f'No model for "{cxb_mode}".')
return ret_add
[docs] def add_src_astroparticule(self):
"""
Add to the simulation a model of astroparticules flux
on the detector.
.. note::
Current model is defined by :py:class:`.ModelInternalNoise`
:return: status of add
:rtype: boolean
"""
return self.add_src(mdl_src.ModelInternalNoise())
#
# SIMULATION source and instrument effects
#
[docs] def simu_src(self, idx_chan=None, l_src=None):
"""Simulation of sky sources on image detector
:param int idx_chan: index channel where apply simulation
:param string l_src: restriction of the simulation to the source list
"""
if l_src:
self.l_src_selected = l_src
else:
self.l_src_selected = list(self.d_src.keys())
s_logger.debug(f"simulation of {len(self.l_src_selected)}\
sources during {self.context.duration}s")
if idx_chan:
self._simu_src_one_channel(idx_chan)
else:
self._simu_src_all_channel()
def _simu_src_one_channel(self, chan_num):
"""Simulation of all sources added by method add_src_xxx() or init_xxx
for a given energy channel.
.. note:: SQ underlines an unused variable: to be ignored.
:param idx_chan: channel index to be simulated.
:type idx_chan: int >0
"""
self.context.set_idx_chan(chan_num)
self.chan_shadows[self.context.idx_chan] = 0
simu_shadow = self.chan_shadows[self.context.idx_chan]
for _, m_src in enumerate(self.l_src_selected):
mdl = self.d_src[m_src]
shadow_src = mdl.get_model()
simu_shadow += shadow_src
def _simu_src_all_channel(self):
"""Simulation of all sources added by method add_src_xxx() or init_xxx
over all the energy channels.
"""
s_logger.debug(f"simulation of all {self._range_chan} channel(s)")
for idx in self._range_chan:
self._simu_src_one_channel(idx)
[docs] def simu_arf_mean(self):
"""
Apply ARF on images detector by multiplication with quantum efficiency
"""
# ## original implementation
arf_percent = self.mdl_effect.arf_percent()
# use broadcast numpy feature
# s_logger.debug(arf_percent)
self.chan_shadows *= arf_percent[:, np.newaxis, np.newaxis]
s_logger.debug('Use ARF in percent in simu_arf_mean')
[docs] def simu_arf_binomial(self):
"""
Apply ARF on images detector with binomial processus where mean is
quantum efficiency
.. note::
Use it only when images detector have integer value,
like after :py:meth:`.simu_noise_poisson`
"""
arf_percent = self.mdl_effect.arf_percent()
new_shadows = np.empty_like(self.chan_shadows, dtype=np.uint16)
for idx in self._range_idx_chan:
new_shadows[idx] = binom.rvs(self.chan_shadows[idx], arf_percent[idx])
self.chan_shadows = new_shadows
[docs] def simu_noise_poisson(self):
"""Simulation of random Poisson noise for each shadowgram (ie. each energy channel)
"""
self.chan_shadows = np.random.poisson(self.chan_shadows)
s_logger.debug(f'type of chan_shadows: {self.chan_shadows.dtype}')
[docs] def print_total_count(self):
"""
Print total number of photon/count for each channel
"""
for idx in self._range_chan:
s_logger.info(f"number of photon/count in channel {idx}:", self.chan_shadows[idx].sum())
#
# GETTER
#
[docs] def get_shadow_by_channel(self, num_chan): # @DontTrace
"""Return image detector of channel num_chan
:param int num_chan: channel number
:return: image detector of channel with number num_chan
:rtype: array 2D
"""
idx_chan = num_chan - self.mdl_effect._channel[0]
return self.chan_shadows[idx_chan]
[docs] def get_shadow_by_idx(self, idx): # @DontTrace
"""Return image detector of channel idx
:param int idx: number channel
:return: image detector of channel idx
:rtype: array 2D
"""
return self.chan_shadows[idx]
[docs] def get_list_src(self):
"""List sky source
:return: list with name of sky source in current simulation
"""
return list(self.d_src.keys())
#
# EVENT CREATION
#
def _create_shadows_int(self):
if np.issubdtype(self.chan_shadows.dtype, np.integer):
# no integer casting is needed
return
else:
s_logger.warning(f"must convert {self.chan_shadows.dtype} to int ")
s1 = self.chan_shadows.sum()
s_logger.debug(f"before round {self.chan_shadows.sum()}")
self.chan_shadows = (self.chan_shadows + 0.5).astype(np.uint16)
s_logger.debug(f"after round {self.chan_shadows.sum()}")
s2 = self.chan_shadows.sum()
s_logger.debug(f"relative difference: {(s2-s1)*100/s1}")
def _create_evts_chan(self, chan_num):
"""Create event for a single energy channel.
:param idx_chan: channel index to be simulated.
:type idx_chan: int >0
"""
idx_chan = chan_num - self.mdl_effect._channel[0]
self.shadow_int_temp = self.chan_shadows[idx_chan]
nb_count = self.shadow_int_temp.sum()
s_logger.info(f"number of counts in channel {idx_chan}: {nb_count}")
evts = EclairsCalEvtData(nb_count)
rd_nb_count = np.random.random_sample(nb_count)
evts.tb_evts['TIME'] = rd_nb_count * self.context.duration + self.context.t_start
evts.tb_evts['PHA'] = idx_chan
# TODO: manage PI like evts.tb_evts['PI'] = idx_chan
index = np.arange(0, self.shadow_int_temp.size)
a_evt = np.repeat(index, self.shadow_int_temp.ravel())
(evts.tb_evts['Y'], evts.tb_evts['Z']) = np.divmod(a_evt, self.shadow_int_temp.shape[0])
evts.tb_evts['Y'] = np.uint8(evts.tb_evts['Y'])
evts.tb_evts['Z'] = np.uint8(evts.tb_evts['Z'])
return evts
[docs] def create_evts(self):
"""
From each image detector create an instance of
:py:class:`.EclairsCalEvtData` to store events.
For each events a time of detection is defined.
"""
self._evts = EclairsCalEvtData()
self._create_shadows_int()
for idx_chan in self._range_chan:
evts_chan = self._create_evts_chan(idx_chan)
# TODO: optimization : add_evt() can be very slow with huge simulation
# => create array with good size and fill it !
self._evts.add_evt(evts_chan)
del evts_chan
self._evts.sort_with_time()
[docs] def write_events(self, working_directory, observation):
"""Write event FITS file
.. note::
#TODO: remove observation parameters and use context
:param working_directory: directory where file is written
:type working_directory: string
:param observation: observation parameters
:type observation: EclairsObservation
"""
t_start = astrotime.Time(convert_svomref_seconds_in_mjd(observation.start_time),
format='mjd', scale='tt')
t_start = t_start.isot[:-4].replace('-', '').replace(':', '')
name = osp.join(working_directory, 'in', f'ECL-EVT-SEC-{t_start}.fits')
self._evts.write_L1(name, 'ECPI simulator')
[docs] def write_shadowgrams(self, working_directory, simulation_id, limit_shadowgrams=50):
"""Write image detector FITS file
:param working_directory: directory where file is written
:type working_directory: string
:param simulation_id: simulation identification
:type simulation_id: int
"""
chan_num = self._range_chan[0]
if len(self._range_chan) > limit_shadowgrams:
s_logger.error(f'cannot create more than {limit_shadowgrams} !')
s_logger.info('simulator creates as many shadowgrams as energy channels.')
for shadowgram in self.chan_shadows[self._range_chan]:
hdu = fits.PrimaryHDU(yzegp_to_ijdet_array(shadowgram))
name_file = f'shadowgram_{simulation_id}_{chan_num}.fits'
hdu.writeto(osp.join(working_directory, 'in_shadow', name_file))
chan_num += 1
[docs] def plot_channel(self, idx, p_mes=""): # pragma: no cover
"""Create a figure of image detector for channel idx
:param int idx: number channel
:param string p_mes: Title of the figure
"""
plt.figure()
plt.title(p_mes)
plt.imshow(yzegp_to_ijdet_array(self.chan_shadows[idx]))
plt.colorbar()