"""Sources simulation module
"""
import logging
import numpy as np
import os.path as osp
from astropy.table import Row
from scipy.integrate import quad
import ecpi.common.sky.catalog as cat
from ecpi.common import add_path_data_ref_eclairs
from ecpi.common.io.fits_tools import read_fits
from ecpi.common.sky.cat_builder import swift_catalog_to_eclairs_catalog
from ecpi.common.instru.model_effect import ECLAIRsDetectorEffect
from ecpi.common.instru.solid_angle import SolidAngleComputationWithEarth
from ecpi.common.instru.solid_angle import SolidAngleSingleton
from ecpi.simu.lib.context import GlobalContextSimulator
import abc
s_logger = logging.getLogger(__name__)
[docs]class AbstractModelSrc(abc.ABC):
"""
Base object for the creation of the shadowgram model of different sources
"""
def __init__(self):
"""**Constructor**
.. warning :: _snr attribute is unused so far
"""
self._snr = -1
self._name = "Unknown"
self._name_model = "Not defined"
self._memo_id_pos = -1
self.context = GlobalContextSimulator()
[docs] def get_noisy_model(self, obs_time=None):
"""Return the shadowgram model with poisson noise.
:param obs_time: observation time in seconds
:type obs_time: float
"""
return np.random.poisson(self.get_model(obs_time))
[docs] def update(self):
"""return True if the model is already update with position or not yet defined
"""
# for design pattern observer
# overload only src is depended of satellite position and call this before
if self.context._id_pos < 0:
s_logger.warning("no position satellite defined")
return True
if self._memo_id_pos == self.context._id_pos:
return True
else:
self._memo_id_pos = self.context._id_pos
return False
@property
def name(self):
"""Get name.
"""
return self._name
@name.setter
def name(self, name):
"""Set name
"""
self._name = name
[docs] @abc.abstractmethod
def get_model(self, obs_time=None):
"""return detector image of model of the source
:param obs_time: observation time in s
:type obs_time: float
:return: detector image of model of the source
:rtype: array(80x80) of float
"""
return NotImplemented
[docs]class ModelFlat(AbstractModelSrc):
"""Generic class for flat model
"""
def __init__(self, level):
"""**constructor**
:param level: intensity detected
:type level: float [ph/cm2/s]
"""
super().__init__()
self.pix_area = self.context._sim_geom.get_det_pixel_surface()
shape = self.context._sim_geom.inst.detect.get_shape()
self.detect_ones = np.ones(shape, dtype=np.float64)
self.noise_cm2_s = level
self._name_model = "Flat"
[docs] def get_model(self, obs_time=None):
"""return the shadowgram model
:param obs_time: observation time in s
:type obs_time: float
:return: noise detector image in ph/pix
:rtype: array(float)
"""
if not obs_time:
obs_time = self.context.duration
noise_pixel = self.noise_cm2_s * self.pix_area * obs_time
return self.detect_ones * noise_pixel
[docs]class ModelInternalNoise(ModelFlat):
"""
Model for a flat internal noise, impact on detector of particles in the terrestrial environment
The internal noise is independent from wavelength
"""
def __init__(self, noise_level=0.003):
"""**constructor**
:param noise_level: internal noise level in ph/s/cm2/keV (default is 0.003)
:type noise_level: float
"""
super().__init__(noise_level)
self._name = "Internal noise"
self.interne_noise = noise_level
[docs] def get_model(self, obs_time=None):
"""return the shadowgram model
:param obs_time: observation time in s
:type obs_time: float
:return: noise detector image in ph/pix
:rtype: array(float)
"""
if not obs_time:
obs_time = self.context.duration
emin, emax = self.context.energy_range
size_band = emax - emin
self.noise_cm2_s = size_band * self.interne_noise
return super().get_model(obs_time)
[docs]class ModelCxbFlat(ModelFlat):
"""model for a flat CXB without spectrum
"""
def __init__(self, cxb_level=1):
"""**constructor**
:param cxb_level: intensity of CXB
:type cxb_level: float [ph/cm2/s]
"""
super().__init__(cxb_level)
self._name = "CXB"
self._name_model = "Flat"
[docs]class ModelCxbFlatBasedEnergy(AbstractModelSrc):
"""Model for a geometrically flat CXB with energy dependence
as specified with the Moretti spectrum.
"""
def __init__(self,
solidangle_filename=\
add_path_data_ref_eclairs(osp.join('instru', 'pixels_solid_angle_withcostheta.fits'))):
"""**constructor**
:param solidangle_filename: PATH/filename of the solid angle image
:type solidangle_filename: string
"""
super().__init__()
norm = 0.109
gamma_1 = 1.4
gamma_2 = 2.88
e_b = 29
self.solidangle_filename = solidangle_filename
shape_based_solid_angle = read_fits(solidangle_filename, ugts_standard=1)
mean_solid_angle = np.mean(shape_based_solid_angle)
solid_angle = np.ones(shape=shape_based_solid_angle.shape) * mean_solid_angle
self._name = "flat CXB Moretti spectrum"
self.s_angle_pixel = solid_angle * self.context._sim_geom.get_det_pixel_surface()
self.func = lambda e: norm / ((e / e_b) ** gamma_1 + (e / e_b) ** gamma_2)
self.solid_angle = solid_angle
[docs] def get_model(self, obs_time=None):
"""return the shadowgram model
:param obs_time: observation time in s
:type obs_time: float
:return: cxb detector image in ph/pix
:rtype: array(float)
.. warning :: a factor 0.8 is added to this model to
make the count rate equal to the CXB shape case. TBI.
"""
# ## initial function get_model as implemented by CC/PB
val = self.solid_angle[0, 0]
assert (self.solid_angle == val).all()
if not obs_time:
obs_time = self.context.duration
# flux = integral between e_min and e_max of eq. 4 of https://arxiv.org/pdf/0811.1444.pdf
emin, emax = self.context.energy_range
flux, _ = quad(self.func, emin, emax)
ret = (obs_time * flux) * self.s_angle_pixel
s_logger.info(f'ADDED 0.8 FACTOR in get_model cxb flat !!! \
[eband=[{emin, emax}] keV]')
ret *= 0.8
return ret
[docs]class ModelCxbShapeBased(AbstractModelSrc):
"""Model for a CXB without spectrum based on the pixel solid angle shape
..warning :: dead code ?
"""
def __init__(self, cxb_level=3035,
solidangle_filename=add_path_data_ref_eclairs(osp.join('instru', 'pixels_solid_angle.fits'))):
"""**constructor**
:param cxb_level: intensity of CXB
:type cxb_level: float [ph/cm2/s]
:param solidangle_filename: PATH/filename of the solid angle image
:type solidangle_filename: string
"""
super().__init__()
self.cxb_level = cxb_level
self.solidangle_filename = solidangle_filename
solid_angle = read_fits(solidangle_filename, ugts_standard=1)
self.rate = cxb_level * solid_angle / solid_angle.sum()
self._name = "CXB"
self._name_model = "Shape"
[docs] def get_model(self, obs_time=None):
"""return the shadowgram model
:param obs_time: observation time in s
:type obs_time: float
:return: cxb detector image in ph/pix
:rtype: array(float)
"""
if not obs_time:
obs_time = self.context.duration
ret = self.rate * obs_time
return ret
[docs]class ModelCxbShapeBasedEnergy(AbstractModelSrc):
"""Model for an energy dependent CXB based on the pixel solid angle shape
"""
def __init__(self,
solidangle_filename=\
add_path_data_ref_eclairs(osp.join('instru', 'pixels_solid_angle_withcostheta.fits'))):
"""**constructor**
:param solidangle_filename: PATH/filename of the solid angle image
:type solidangle_filename: string
"""
super().__init__()
norm = 0.109
gamma_1 = 1.4
gamma_2 = 2.88
e_b = 29 # keV
self.mask_aperture = self.context._sim_geom.inst.mask_aperture
self.solidangle_filename = solidangle_filename
self.solid_angle = read_fits(solidangle_filename, ugts_standard=1)
self._name = "ModelCxbShapeBasedEnergy_NoEarth"
self.func = lambda e: norm / ((e / e_b) ** gamma_1 + (e / e_b) ** gamma_2)
det_pix_surf = self.context._sim_geom.inst.det_pix_size ** 2
self.s_angle_pix = 0.4 * det_pix_surf * self.solid_angle * (1. / self.mask_aperture)
[docs] def get_model(self, obs_time=None):
"""return the shadowgram model
..note: RMF must be handled here at some point !
..note: doc to Moretti's model:
flux = integral between e_min and e_max of eq. 4
of https://arxiv.org/pdf/0811.1444.pdf
:param obs_time: observation time in s
:type obs_time: float
:return: cxb detector image in ph/pix
:rtype: array(float)
"""
if not obs_time:
obs_time = self.context.duration
emin, emax = self.context.energy_range
flux, _ = quad(self.func, emin, emax)
# middle point integration for comparisons with AlG.
# flux = self.func((emin+emax)*0.5) * (emax-emin)
ret = (flux * obs_time) * self.s_angle_pix
return ret
class _ModelMeanCXBShapeBasedEnergyWithEarth(AbstractModelSrc):
"""Model for an energy dependent CXB in the presence of Earth in FOV.
"""
def __init__(self):
"""**constructor**
:param use_numba: whether or not to use pre-compiled function
when computing solid angle. Default is True.
:type use_numba: bool
..note: sum of solid angle with real mask accounts for cos theta
"""
super().__init__()
self._name = 'shape_moretti_earth'
self.mode = None
self.norm = 0.109
self.gamma_1 = 1.4
self.gamma_2 = 2.88
self.e_b = 29 # keV
def get_model(self, obs_time=None):
"""Return the shadowgram model.
:param obs_time: observation time in s
:type obs_time: float
:return: shadowgram
:rtype: 2D array(float) with shape 80x80
"""
s_logger.info(f'Running get_model CXB with Earth [mode={self.mode}]')
if not obs_time:
obs_time = self.context.duration
emin, emax = self.context.energy_range
flux, _ = quad(self.func, emin, emax)
ret = (flux * obs_time) * self.s_angle_pix
return ret
[docs]class ModelMeanCXBShapeBasedEnergyWithEarthCalculator(_ModelMeanCXBShapeBasedEnergyWithEarth):
"""Model for an energy dependent CXB in the presence of Earth in FOV.
CXB model is computed with mean shape.
"""
def __init__(self, limb=None, vec_earth=None, use_numba=True):
"""**constructor**
:param limb: limb angle in degree
:type limb: float
:param vec_earth: earth direction unit vector
:type vec_earth: list/array of size 3
:param use_numba: whether or not to use pre-compiled function
when computing solid angle. Default is True.
:type use_numba: bool
..note: sum of solid angle with real mask accounts for cos theta
"""
super().__init__()
self._name = 'shape_moretti_earth_calculator'
self.mode = "calculator"
if (limb is not None) and (vec_earth is not None):
self.limb = limb
self.vec_earth = vec_earth
else:
self.limb = self.context.earth_limb
self.vec_earth = self.context.earth_pos_fov_unit
inst_ecl = self.context._sim_geom.inst
det_pix_surf = inst_ecl.det_pix_size ** 2
sacwe = SolidAngleComputationWithEarth(inst_ecl, use_numba=use_numba)
self.solid_angle_with_earth = sacwe.run_calculator(self.limb, self.vec_earth)
self.func = lambda e: self.norm / ((e / self.e_b) ** self.gamma_1 + \
(e / self.e_b) ** self.gamma_2)
self.s_angle_pix = 0.4 * det_pix_surf * self.solid_angle_with_earth * \
(1. / inst_ecl.mask_aperture)
[docs]class ModelMeanCXBShapeBasedEnergyWithEarthSimulator(_ModelMeanCXBShapeBasedEnergyWithEarth):
"""Model for an energy dependent CXB in the presence of Earth in FOV.
CXB model is computed with a ray-tracing method.
"""
def __init__(self):
super().__init__()
self._name = 'shape_moretti_earth_simulator'
self.mode = "simulator"
raise NotImplementedError
[docs]class ModelEarthMoretti(AbstractModelSrc):
"""Model for an energy dependent CXB in the presence of Earth in FOV.
CXB model is computed with mean shape.
"""
def __init__(self, limb=None, vec_earth=None):
"""**constructor**
:param limb: limb angle in degree
:type limb: float
:param vec_earth: earth direction unit vector
:type vec_earth: list/array of size 3
:param use_numba: whether or not to use pre-compiled function
when computing solid angle. Default is True.
:type use_numba: bool
..note: sum of solid angle with real mask accounts for cos theta
"""
super().__init__()
self._name = 'ModelEarthMoretti'
self.mode = "calculator"
self.context.add_observer(self)
self.norm = 0.109
self.gamma_1 = 1.4
self.gamma_2 = 2.88
self.e_b = 29 # keV
if (limb is not None) and (vec_earth is not None):
self.limb = limb
self.vec_earth = vec_earth
else:
self.limb = self.context.earth_limb
self.vec_earth = self.context.earth_pos_fov_unit
instru_ecl = self.context._sim_geom.inst
self.sacwe = SolidAngleSingleton(instru_ecl)
# prcomputing CXB flux with model Moretti for all channels
self.moretti = np.zeros(self.context._mdl_effect.chan_nb(), dtype=np.float32)
func = lambda e: self.norm / ((e / self.e_b) ** self.gamma_1 + \
(e / self.e_b) ** self.gamma_2)
mdl_effect = self.context._mdl_effect
# pre-computing CXB Moretti flux in each band
for chan in range(self.context._mdl_effect.chan_nb()):
e_min = mdl_effect.chan_boundary[chan]
e_max = mdl_effect.chan_boundary[chan + 1]
flux, _ = quad(func, e_min, e_max)
self.moretti[chan] = flux
self.s_angle_pix_no_earth = None
self.update()
[docs] def update(self):
"""**constructor**
:param limb: limb angle in degree
:type limb: float
:param vec_earth: earth direction unit vector
:type vec_earth: list/array of size 3
:param use_numba: whether or not to use pre-compiled function
when computing solid angle. Default is True.
:type use_numba: bool
..note: sum of solid angle with real mask accounts for cos theta
"""
if super().update():
s_logger.info("ModelEarthMoretti already update")
return
instru_ecl = self.context._sim_geom.inst
det_pix_surf = instru_ecl.det_pix_size ** 2
s_logger.info("update ModelEarthMoretti")
solid_angle_with_earth = self.sacwe.run(self.context._earth_fov)
self.s_angle_pix = 0.4 * det_pix_surf * solid_angle_with_earth * \
(1. / instru_ecl.mask_aperture)
[docs] def get_model(self, obs_time=None):
"""Return the shadowgram model.
:param obs_time: observation time in s
:type obs_time: float
:return: shadowgram
:rtype: 2D array(float) with shape 80x80
"""
if not obs_time:
obs_time = self.context.duration
return (self.moretti[self.context.idx_chan] * obs_time) * self.s_angle_pix
[docs]class ModelPointSrcIRF(AbstractModelSrc):
"""model for point source
"""
S_cpt = 0
def __init__(self, info_src):
"""**Constructor**
info_src must have at least the same number of intensities as idx_chan
:param info_src: single source information {'elev','dir','intensity','name'}.
Elev and dir in deg.
:type info_src: astropy.table.Row or dict
"""
super().__init__()
self.info_src = info_src
if (ModelPointSrcIRF.S_cpt == 0) and \
isinstance(info_src, Row):
s_logger.debug(f'source info: {self.info_src}')
s_logger.debug(f'source info dtype: {self.info_src.dtype}')
ModelPointSrcIRF.S_cpt += 1
self.simu = self.context._sim_geom
self._name = self.info_src["name"]
self._name_model = "with IRF"
self._shadow_cm2 = None
self._last_shadow = None
self.coef_hors_axis = self.set_coef_hors_axis()
[docs] def set_coef_hors_axis(self):
"""Return coefficient associated to cos theta or sin elev off axis effect.
:return: coefficient (units ?)
:rtype: array(float)
"""
pix_y, pix_z = self.simu.inst.elevdir_to_skypix(np.array(self.info_src['elev']),
np.array(self.info_src['dir']))
return self.context._mdl_effect.get_irf_val_ecllos(pix_y, pix_z)[0]
[docs] def get_model(self, obs_time=None):
"""return the shadowgram model
intensity given in ph/cm2/s in the current channel.
:param obs_time: observation time in s
:type obs_time: float
:return: cxb detector image in ph/pix
:rtype: array(float)
"""
if not obs_time:
obs_time = self.context.duration
if self._shadow_cm2 is None:
self._shadow_cm2 = self.simu.get_shadow_surface(self.info_src['elev'],
self.info_src['dir']).copy()
try:
intensity = self.info_src['intensity'][self.context.idx_chan]
# except ValueError:
except (ValueError, TypeError, IndexError):
intensity = self.info_src['intensity']
self._last_shadow = intensity * obs_time * self._shadow_cm2 * self.coef_hors_axis
return self._last_shadow.copy()
[docs]class ModelPointSrcSinElev(ModelPointSrcIRF):
"""Model for point source.
Elevation angle is in degree.
"""
def __init__(self, info_src):
"""**Constructor**
:param info_src: single source information {'elev','dir','intensity','name'}.
Elev and dir in deg.
:type info_src: astropy.table.Row or dict
"""
self.sin_elev = np.sin(np.deg2rad(info_src['elev']))
super().__init__(info_src)
self._name_model = "with sin(elev)"
[docs] def set_coef_hors_axis(self):
"""This function must be renamed as 'get_coef_hors_axis'.
Verbose argument is unused.
"""
return self.sin_elev
[docs]class ListModelPointSrcFromCatalog(object):
"""Manage list of point source
"""
def __init__(self, o_dpix, use_irf=True):
"""**Constructor**
o_dpix used to define dpix channel
"""
assert isinstance(o_dpix, ECLAIRsDetectorEffect)
self._dpix = o_dpix
self.use_irf = use_irf
self.context = GlobalContextSimulator()
# declare logger
s_logger.debug('creating an instance of ListModelPointSrcFromCatalog')
[docs] def set_use_irf(self, use_irf):
assert use_irf in [True, False]
self.use_irf = use_irf
def _get_model_point_src(self, info_src):
if self.use_irf:
mdl = ModelPointSrcIRF(info_src)
else:
mdl = ModelPointSrcSinElev(info_src)
return mdl
[docs] def init_with_cat_swift_2012(self):
"""return a list of ShadowModelPointSrcSinElev from the SWIFT/BAT catalog
attitude used to define fov
"""
path_cat = add_path_data_ref_eclairs(osp.join("cat", "BAT_70m_catalog_20nov2012.fits"))
cat_bat = swift_catalog_to_eclairs_catalog(path_cat, self._dpix)
assert isinstance(cat_bat, cat.CatalogAstroWithEnergySpecSampling)
return self.init_with_cat_astro(cat_bat)
[docs] def init_with_cat_astro(self, astro_cat):
"""
return a list of ShadowModelPointSrcSinElev from a catalog
attitude used to define fov
:param astro_cat: table with intensoty column
"""
# select only sources in fov with CatalogFovEclairs
self.cat_fov = cat.CatalogFovEclairs()
self.cat_fov.set_ptg_instru(self.context.eclairs_attitude)
self.cat_fov.from_astro_catalog_with_spectrum(astro_cat)
d_src = {}
s_logger.info(f'Number of sources in FOV: {self.cat_fov.get_nb_element()}')
for isrc in range(self.cat_fov.get_nb_element()):
info_src = self.cat_fov._catalog[isrc]
mdl = self._get_model_point_src(info_src)
d_src[mdl.name] = mdl
s_logger.debug(f'adding {info_src["name"]} source')
s_logger.info(f"Selected {len(d_src)} sources in FOV")
return d_src
[docs] def init_with_custom_cat(self, path_cat):
"""return a list of ShadowModelPointSrcSinElev from a custom catalog.
:param path_cat: path to custom source catalog.
:type path_cat: str
:return: sources infos
:rtype: dict
"""
cat_bat = swift_catalog_to_eclairs_catalog(path_cat, self._dpix)
assert isinstance(cat_bat, cat.CatalogAstroWithEnergySpecSampling)
# select only sources in fov with CatalogFovEclairs
self.cat_fov = cat.CatalogFovEclairs()
self.cat_fov.set_ptg_instru(self.context.eclairs_attitude)
self.cat_fov.from_astro_catalog_with_spectrum(cat_bat)
d_src = {}
s_logger.info(f'Number of sources in FOV: {self.cat_fov.get_nb_element()}')
for isrc in range(self.cat_fov.get_nb_element()):
info_src = self.cat_fov._catalog[isrc]
mdl = self._get_model_point_src(info_src)
d_src[mdl.name] = mdl
s_logger.debug(f'adding {info_src["name"]} source')
s_logger.info(f"Selected {len(d_src)} sources in FOV with custom catalog")
return d_src
[docs] def init_with_cat_fov(self, cat_fov):
"""Initialize catalog with sources in FOV.
:param cat_fov: source catalog.
:type cat_fov: CatalogFovEclairs object
:return: sources infos
:rtype: dict
"""
assert isinstance(cat_fov, cat.CatalogFovEclairs)
d_src = {}
for isrc in range(cat_fov.get_nb_element()):
info_src = cat_fov._catalog[isrc]
s_logger.debug(f'adding {info_src["name"]} source')
mdl = self._get_model_point_src(info_src)
d_src[mdl.name] = mdl
s_logger.info(f"Selected {len(d_src)} sources in FOV with user catalog")
return d_src