"""Sources simulation module
"""
import abc
import logging
import os.path as osp
import os
import numpy as np
from astropy.table import Row
from scipy.integrate import quad
import ecpi.common.sky.catalog as cat
from ecpi.common import add_path_data_ref_eclairs, add_path_caldb_eclairs
from ecpi.common.io.fits_tools import read_fits
from ecpi.common.sky.cat_builder import swift_catalog_to_eclairs_catalog
from ecpi.common.instru.model_effect import ECLAIRsDetectorEffect
from ecpi.common.instru.solid_angle import SolidAngleComputationWithEarth
from ecpi.common.instru.solid_angle import SolidAngleSingleton
from ecpi.simu.lib.context import GlobalContextSimulator
s_logger = logging.getLogger(__name__)
[docs]class AbstractModelSrc(abc.ABC):
"""
Base object for the creation of the shadowgram model of different sources
"""
def __init__(self):
"""**Constructor**
.. warning :: _snr attribute is unused so far
"""
self._snr = -1
self._name = "Unknown"
self._name_model = "Not defined"
self._memo_id_pos = -1
self.context = GlobalContextSimulator()
[docs] def get_noisy_model(self, obs_time=None):
"""Return the shadowgram model with poisson noise.
:param obs_time: observation time in seconds
:type obs_time: float
"""
return np.random.poisson(self.get_model(obs_time))
[docs] def update(self):
"""return True if the model is already update with position or not yet defined
"""
# for design pattern observer
# overload only src is depended of satellite position and call this before
if self.context._id_pos < 0:
s_logger.info("no position satellite defined")
return True
if self._memo_id_pos == self.context._id_pos:
s_logger.info("memo_id_pos == context._id_pos")
return True
else:
self._memo_id_pos = self.context._id_pos
return False
@property
def name(self):
"""Get name.
"""
return self._name
@name.setter
def name(self, name):
"""Set name
"""
self._name = name
[docs] @abc.abstractmethod
def get_model(self, obs_time=None):
"""return detector image of model of the source
:param obs_time: observation time in s
:type obs_time: float
:return: detector image of model of the source
:rtype: array(80x80) of float
"""
return NotImplemented
[docs]class ModelFlat(AbstractModelSrc):
"""Generic class for flat model
"""
def __init__(self, level):
"""**constructor**
:param level: intensity detected
:type level: float [ph/cm2/s]
"""
super().__init__()
self.pix_area = self.context._sim_geom.get_det_pixel_surface()
shape = self.context._sim_geom.inst.detect.get_shape()
self.detect_ones = np.ones(shape, dtype=np.float64)
self.noise_cm2_s = level
self._name_model = "Flat"
[docs] def get_model(self, obs_time=None):
"""return the shadowgram model
:param obs_time: observation time in s
:type obs_time: float
:return: noise detector image in ph/pix
:rtype: array(float)
"""
if not obs_time:
obs_time = self.context.duration
noise_pixel = self.noise_cm2_s * self.pix_area * obs_time
return self.detect_ones * noise_pixel
[docs]class ModelInternalNoise(ModelFlat):
"""
Model for a flat internal noise, impact on detector of particles in the terrestrial environment
The internal noise is independent from wavelength
"""
def __init__(self, noise_level=0.003):
"""**constructor**
:param noise_level: internal noise level in ph/s/cm2/keV (default is 0.003)
:type noise_level: float
"""
super().__init__(noise_level)
self._name = "Internal noise"
self.interne_noise = noise_level
[docs] def get_model(self, obs_time=None):
"""return the shadowgram model
:param obs_time: observation time in s
:type obs_time: float
:return: noise detector image in ph/pix
:rtype: array(float)
"""
if not obs_time:
obs_time = self.context.duration
emin, emax = self.context.energy_range
size_band = emax - emin
self.noise_cm2_s = size_band * self.interne_noise
return super().get_model(obs_time)
[docs]class ModelCxbFlat(ModelFlat):
"""model for a flat CXB without spectrum
"""
def __init__(self, cxb_level=1):
"""**constructor**
:param cxb_level: intensity of CXB
:type cxb_level: float [ph/cm2/s]
"""
super().__init__(cxb_level)
self._name = "CXB"
self._name_model = "Flat"
[docs]class ModelCxbFlatBasedEnergy(AbstractModelSrc):
"""Model for a geometrically flat CXB with energy dependence
as specified with the Moretti spectrum.
"""
# TODO: refactoring remove file from constructor
def __init__(self, solidangle_filename="__to_remove__"):
"""**constructor**
:param solidangle_filename: PATH/filename of the solid angle image
:type solidangle_filename: string
"""
if solidangle_filename == "__to_remove__":
solidangle_filename = add_path_data_ref_eclairs(osp.join("instru", "pixels_solid_angle_withcostheta.fits"))
super().__init__()
norm = 0.109
gamma_1 = 1.4
gamma_2 = 2.88
e_b = 29
self.solidangle_filename = solidangle_filename
shape_based_solid_angle = read_fits(solidangle_filename, ugts_standard=True)
mean_solid_angle = np.mean(shape_based_solid_angle)
solid_angle = np.ones(shape=shape_based_solid_angle.shape) * mean_solid_angle
self._name = "flat CXB Moretti spectrum"
self.s_angle_pixel = solid_angle * self.context._sim_geom.get_det_pixel_surface()
self.func = lambda e: norm / ((e / e_b) ** gamma_1 + (e / e_b) ** gamma_2)
self.solid_angle = solid_angle
[docs] def get_model(self, obs_time=None):
"""return the shadowgram model
:param obs_time: observation time in s
:type obs_time: float
:return: cxb detector image in ph/pix
:rtype: array(float)
.. warning :: a factor 0.8 is added to this model to
make the count rate equal to the CXB shape case. TBI.
"""
# ## initial function get_model as implemented by CC/PB
val = self.solid_angle[0, 0]
assert np.array_equal(self.solid_angle, val)
if not obs_time:
obs_time = self.context.duration
# flux = integral between e_min and e_max of eq. 4 of https://arxiv.org/pdf/0811.1444.pdf
emin, emax = self.context.energy_range
flux, _ = quad(self.func, emin, emax)
ret = (obs_time * flux) * self.s_angle_pixel
s_logger.info(f'ADDED 0.8 FACTOR in get_model cxb flat !!! \
[eband=[{emin, emax}] keV]')
ret *= 0.8
return ret
[docs]class ModelCxbShapeBased(AbstractModelSrc):
"""Model for a CXB without spectrum based on the pixel solid angle shape
..warning :: dead code ?
"""
def __init__(self, cxb_level=3035, solidangle_filename="__to_remove__"):
"""**constructor**
:param cxb_level: intensity of CXB
:type cxb_level: float [ph/cm2/s]
:param solidangle_filename: PATH/filename of the solid angle image
:type solidangle_filename: string
"""
if solidangle_filename == "__to_remove__":
solidangle_filename = add_path_data_ref_eclairs(osp.join("instru", "pixels_solid_angle.fits"))
super().__init__()
self.cxb_level = cxb_level
self.solidangle_filename = solidangle_filename
solid_angle = read_fits(solidangle_filename, ugts_standard=True)
self.rate = cxb_level * solid_angle / solid_angle.sum()
self._name = "CXB"
self._name_model = "Shape"
[docs] def get_model(self, obs_time=None):
"""return the shadowgram model
:param obs_time: observation time in s
:type obs_time: float
:return: cxb detector image in ph/pix
:rtype: array(float)
"""
if not obs_time:
obs_time = self.context.duration
ret = self.rate * obs_time
return ret
[docs]class ModelCxbShapeBasedEnergy(AbstractModelSrc):
"""Model for an energy dependent CXB based on the pixel solid angle shape
"""
def __init__(self, solidangle_filename="__to_remove__"):
"""**constructor**
:param solidangle_filename: PATH/filename of the solid angle image
:type solidangle_filename: string
"""
if solidangle_filename == "__to_remove__":
solidangle_filename = add_path_data_ref_eclairs(osp.join("instru", "pixels_solid_angle.fits"))
super().__init__()
norm = 0.109
gamma_1 = 1.4
gamma_2 = 2.88
e_b = 29 # keV
self.mask_aperture = self.context._sim_geom.inst.mask_aperture
self.solidangle_filename = solidangle_filename
self.solid_angle = read_fits(solidangle_filename, ugts_standard=True)
self._name = "ModelCxbShapeBasedEnergy_NoEarth"
self.func = lambda e: norm / ((e / e_b) ** gamma_1 + (e / e_b) ** gamma_2)
det_pix_surf = self.context._sim_geom.inst.det_pix_size ** 2
self.s_angle_pix = 0.4 * det_pix_surf * self.solid_angle * (1. / self.mask_aperture)
[docs] def get_model(self, obs_time=None):
"""return the shadowgram model
..note: RMF must be handled here at some point !
..note: doc to Moretti's model:
flux = integral between e_min and e_max of eq. 4
of https://arxiv.org/pdf/0811.1444.pdf
:param obs_time: observation time in s
:type obs_time: float
:return: cxb detector image in ph/pix
:rtype: array(float)
"""
if not obs_time:
obs_time = self.context.duration
emin, emax = self.context.energy_range
flux, _ = quad(self.func, emin, emax)
# middle point integration for comparisons with AlG.
# flux = self.func((emin+emax)*0.5) * (emax-emin)
ret = (flux * obs_time) * self.s_angle_pix
return ret
class _ModelMeanCXBShapeBasedEnergyWithEarth(AbstractModelSrc):
"""Model for an energy dependent CXB in the presence of Earth in FOV.
"""
def __init__(self):
"""**constructor**
:param use_numba: whether or not to use pre-compiled function
when computing solid angle. Default is True.
:type use_numba: bool
..note: sum of solid angle with real mask accounts for cos theta
"""
super().__init__()
self._name = 'shape_moretti_earth'
self.mode = None
self.norm = 0.109
self.gamma_1 = 1.4
self.gamma_2 = 2.88
self.e_b = 29 # keV
def get_model(self, obs_time=None):
"""Return the shadowgram model.
:param obs_time: observation time in s
:type obs_time: float
:return: shadowgram
:rtype: 2D array(float) with shape 80x80
"""
s_logger.info(f'Running get_model CXB with Earth [mode={self.mode}]')
if not obs_time:
obs_time = self.context.duration
emin, emax = self.context.energy_range
flux, _ = quad(self.func, emin, emax)
ret = (flux * obs_time) * self.s_angle_pix
return ret
[docs]class ModelMeanCXBShapeBasedEnergyWithEarthCalculator(_ModelMeanCXBShapeBasedEnergyWithEarth):
"""Model for an energy dependent CXB in the presence of Earth in FOV.
CXB model is computed with mean shape.
"""
def __init__(self, limb=None, vec_earth=None, use_numba=True):
"""**constructor**
:param limb: limb angle in degree
:type limb: float
:param vec_earth: earth direction unit vector
:type vec_earth: list/array of size 3
:param use_numba: whether or not to use pre-compiled function
when computing solid angle. Default is True.
:type use_numba: bool
..note: sum of solid angle with real mask accounts for cos theta
"""
super().__init__()
self._name = 'shape_moretti_earth_calculator'
self.mode = "calculator"
if (limb is not None) and (vec_earth is not None):
self.limb = limb
self.vec_earth = vec_earth
else:
self.limb = self.context.earth_limb
self.vec_earth = self.context.earth_pos_fov_unit
inst_ecl = self.context._sim_geom.inst
det_pix_surf = inst_ecl.det_pix_size ** 2
sacwe = SolidAngleComputationWithEarth(inst_ecl, use_numba=use_numba)
self.solid_angle_with_earth = sacwe.run_calculator(self.limb, self.vec_earth)
self.func = lambda e: self.norm / ((e / self.e_b) ** self.gamma_1 + \
(e / self.e_b) ** self.gamma_2)
self.s_angle_pix = 0.4 * det_pix_surf * self.solid_angle_with_earth * \
(1. / inst_ecl.mask_aperture)
[docs]class ModelMeanCXBShapeBasedEnergyWithEarthSimulator(_ModelMeanCXBShapeBasedEnergyWithEarth):
"""Model for an energy dependent CXB in the presence of Earth in FOV.
CXB model is computed with a ray-tracing method.
"""
def __init__(self):
super().__init__()
self._name = 'shape_moretti_earth_simulator'
self.mode = "simulator"
raise NotImplementedError
[docs]class ModelEarthMoretti(AbstractModelSrc):
"""Model for an energy dependent CXB in the presence of Earth in FOV.
CXB model is computed with mean shape.
"""
def __init__(self, limb=None, vec_earth=None):
"""**constructor**
:param limb: limb angle in degree
:type limb: float
:param vec_earth: earth direction unit vector
:type vec_earth: list/array of size 3
:param use_numba: whether or not to use pre-compiled function
when computing solid angle. Default is True.
:type use_numba: bool
..note: sum of solid angle with real mask accounts for cos theta
"""
super().__init__()
self._name = 'ModelEarthMoretti'
self.mode = "calculator"
self.context.add_observer(self)
self.norm = 0.109
self.gamma_1 = 1.4
self.gamma_2 = 2.88
self.e_b = 29 # keV
if (limb is not None) and (vec_earth is not None):
self.limb = limb
self.vec_earth = vec_earth
else:
self.limb = self.context.earth_limb
self.vec_earth = self.context.earth_pos_fov_unit
instru_ecl = self.context._sim_geom.inst
self.sacwe = SolidAngleSingleton(instru_ecl)
# prcomputing CXB flux with model Moretti for all channels
self.moretti = np.zeros(self.context._mdl_effect.chan_nb(), dtype=np.float32)
func = lambda e: self.norm / ((e / self.e_b) ** self.gamma_1 + \
(e / self.e_b) ** self.gamma_2)
mdl_effect = self.context._mdl_effect
# pre-computing CXB Moretti flux in each band
for chan in range(self.context._mdl_effect.chan_nb()):
e_min = mdl_effect.chan_boundary[chan]
e_max = mdl_effect.chan_boundary[chan + 1]
flux, _ = quad(func, e_min, e_max)
self.moretti[chan] = flux
self.s_angle_pix_no_earth = None
self.update()
[docs] def update(self):
"""**constructor**
:param limb: limb angle in degree
:type limb: float
:param vec_earth: earth direction unit vector
:type vec_earth: list/array of size 3
:param use_numba: whether or not to use pre-compiled function
when computing solid angle. Default is True.
:type use_numba: bool
..note: sum of solid angle with real mask accounts for cos theta
"""
if super().update():
s_logger.info("ModelEarthMoretti already update")
return
instru_ecl = self.context._sim_geom.inst
det_pix_surf = instru_ecl.det_pix_size ** 2
s_logger.debug("update ModelEarthMoretti")
solid_angle_with_earth = self.sacwe.run(self.context._earth_fov)
self.s_angle_pix = 0.4 * det_pix_surf * solid_angle_with_earth * \
(1. / instru_ecl.mask_aperture)
[docs] def get_model(self, obs_time=None):
"""Return the shadowgram model.
:param obs_time: observation time in s
:type obs_time: float
:return: shadowgram
:rtype: 2D array(float) with shape 80x80
"""
if not obs_time:
obs_time = self.context.duration
return (self.moretti[self.context.idx_chan] * obs_time) * self.s_angle_pix
[docs]class ModelPointSrcIRF(AbstractModelSrc):
"""model for point source
"""
S_cpt = 0
def __init__(self, info_src):
"""**Constructor**
info_src must have at least the same number of intensities as idx_chan
:param info_src: single source information {'elev','dir','intensity','name'}.
Elev and dir in deg.
:type info_src: astropy.table.Row or dict
"""
super().__init__()
self.info_src = info_src
if (ModelPointSrcIRF.S_cpt == 0) and \
isinstance(info_src, Row):
s_logger.debug(f'source info: {self.info_src}')
s_logger.debug(f'source info dtype: {self.info_src.dtype}')
ModelPointSrcIRF.S_cpt += 1
self.simu = self.context._sim_geom
self._name = self.info_src["name"]
self._name_model = "with IRF"
self._shadow_cm2 = None
self._last_shadow = None
self.coef_hors_axis = self.set_coef_hors_axis()
[docs] def set_coef_hors_axis(self):
"""Return coefficient associated to cos theta or sin elev off axis effect.
:return: coefficient (units ?)
:rtype: array(float)
"""
pix_y, pix_z = self.simu.inst.elevdir_to_skypix(np.array(self.info_src['elev']),
np.array(self.info_src['dir']))
return self.context._mdl_effect.get_irf_val_ecllos(pix_y, pix_z)[0]
[docs] def get_model(self, obs_time=None):
"""return the shadowgram model
intensity given in ph/cm2/s in the current channel.
:param obs_time: observation time in s
:type obs_time: float
:return: cxb detector image in ph/pix
:rtype: array(float)
"""
if not obs_time:
obs_time = self.context.duration
if self._shadow_cm2 is None:
self._shadow_cm2 = self.simu.get_shadow_surface(self.info_src['elev'],
self.info_src['dir']).copy()
try:
intensity = self.info_src['intensity'][self.context.idx_chan]
# except ValueError:
except (ValueError, TypeError, IndexError):
s_logger.exception('Recovering intensity')
intensity = self.info_src['intensity']
self._last_shadow = intensity * obs_time * self._shadow_cm2 * self.coef_hors_axis
return self._last_shadow.copy()
[docs]class ModelPointSrcSinElev(ModelPointSrcIRF):
"""Model for point source.
Elevation angle is in degree.
"""
def __init__(self, info_src):
"""**Constructor**
:param info_src: single source information {'elev','dir','intensity','name'}.
Elev and dir in deg.
:type info_src: astropy.table.Row or dict
"""
self.sin_elev = np.sin(np.deg2rad(info_src['elev']))
super().__init__(info_src)
self._name_model = "with sin(elev)"
[docs] def set_coef_hors_axis(self):
"""This function must be renamed as 'get_coef_hors_axis'.
Verbose argument is unused.
"""
return self.sin_elev
[docs]class ListModelPointSrcFromCatalog(object):
"""Manage list of point source
"""
def __init__(self, use_irf=True):
"""**Constructor**
o_dpix used to define dpix channel
"""
self.use_irf = use_irf
self.context = GlobalContextSimulator()
self._dpix = self.context.mdl_effect
# declare logger
s_logger.debug('creating an instance of ListModelPointSrcFromCatalog')
[docs] def set_use_irf(self, use_irf):
assert use_irf in [True, False]
self.use_irf = use_irf
def _get_model_point_src(self, info_src):
if self.use_irf:
mdl = ModelPointSrcIRF(info_src)
else:
mdl = ModelPointSrcSinElev(info_src)
return mdl
[docs] def init_with_cat_swift_2012(self):
"""return a list of ShadowModelPointSrcSinElev from the SWIFT/BAT catalog
attitude used to define fov
"""
path_cat = add_path_data_ref_eclairs(osp.join("cat", "BAT_70m_catalog_20nov2012.fits"))
cat_bat = swift_catalog_to_eclairs_catalog(path_cat, self._dpix)
assert isinstance(cat_bat, cat.CatalogAstroWithEnergySpecSampling)
return self.init_with_cat_astro(cat_bat)
[docs] def init_with_cat_astro(self, astro_cat):
"""
return a list of ShadowModelPointSrcSinElev from a catalog
attitude used to define fov
:param astro_cat: table with intensoty column
"""
# select only sources in fov with CatalogFovEclairs
self.cat_fov = cat.CatalogFovEclairs()
self.cat_fov.set_ptg_instru(self.context.eclairs_attitude)
self.cat_fov.from_astro_catalog_with_spectrum(astro_cat)
d_src = {}
s_logger.info(f'Number of sources in FOV: {self.cat_fov.get_nb_element()}')
for isrc in range(self.cat_fov.get_nb_element()):
info_src = self.cat_fov._catalog[isrc]
mdl = self._get_model_point_src(info_src)
d_src[mdl.name] = mdl
s_logger.debug(f'adding {info_src["name"]} source')
s_logger.info(f"Selected {len(d_src)} sources in FOV")
return d_src
[docs] def init_with_custom_cat(self, path_cat):
"""return a list of ShadowModelPointSrcSinElev from a custom catalog.
:param path_cat: path to custom source catalog.
:type path_cat: str
:return: sources infos
:rtype: dict
"""
cat_bat = swift_catalog_to_eclairs_catalog(path_cat, self._dpix)
assert isinstance(cat_bat, cat.CatalogAstroWithEnergySpecSampling)
# select only sources in fov with CatalogFovEclairs
self.cat_fov = cat.CatalogFovEclairs()
self.cat_fov.set_ptg_instru(self.context.eclairs_attitude)
self.cat_fov.from_astro_catalog_with_spectrum(cat_bat)
d_src = {}
s_logger.info(f'Number of sources in FOV: {self.cat_fov.get_nb_element()}')
for isrc in range(self.cat_fov.get_nb_element()):
info_src = self.cat_fov._catalog[isrc]
mdl = self._get_model_point_src(info_src)
d_src[mdl.name] = mdl
s_logger.debug(f'adding {info_src["name"]} source')
s_logger.info(f"Selected {len(d_src)} sources in FOV with custom catalog")
return d_src
[docs] def init_with_cat_fov(self, cat_fov):
"""Initialize catalog with sources in FOV.
:param cat_fov: source catalog.
:type cat_fov: CatalogFovEclairs object
:return: sources infos
:rtype: dict
"""
assert isinstance(cat_fov, cat.CatalogFovEclairs)
d_src = {}
for isrc in range(cat_fov.get_nb_element()):
info_src = cat_fov._catalog[isrc]
s_logger.debug(f'adding {info_src["name"]} source')
mdl = self._get_model_point_src(info_src)
d_src[mdl.name] = mdl
s_logger.info(f"Selected {len(d_src)} sources in FOV with user catalog")
return d_src