Source code for ecpi.simu.lib.sources

"""Sources simulation module
"""
import abc
import logging
import os.path as osp
import os

import numpy as np
from astropy.table import Row
from scipy.integrate import quad

import ecpi.common.sky.catalog as cat
from ecpi.common import add_path_data_ref_eclairs, add_path_caldb_eclairs
from ecpi.common.io.fits_tools import read_fits
from ecpi.common.sky.cat_builder import swift_catalog_to_eclairs_catalog
from ecpi.common.instru.model_effect import ECLAIRsDetectorEffect
from ecpi.common.instru.solid_angle import SolidAngleComputationWithEarth
from ecpi.common.instru.solid_angle import SolidAngleSingleton
from ecpi.simu.lib.context import GlobalContextSimulator

s_logger = logging.getLogger(__name__)


[docs]class AbstractModelSrc(abc.ABC): """ Base object for the creation of the shadowgram model of different sources """ def __init__(self): """**Constructor** .. warning :: _snr attribute is unused so far """ self._snr = -1 self._name = "Unknown" self._name_model = "Not defined" self._memo_id_pos = -1 self.context = GlobalContextSimulator()
[docs] def get_noisy_model(self, obs_time=None): """Return the shadowgram model with poisson noise. :param obs_time: observation time in seconds :type obs_time: float """ return np.random.poisson(self.get_model(obs_time))
[docs] def update(self): """return True if the model is already update with position or not yet defined """ # for design pattern observer # overload only src is depended of satellite position and call this before if self.context._id_pos < 0: s_logger.info("no position satellite defined") return True if self._memo_id_pos == self.context._id_pos: s_logger.info("memo_id_pos == context._id_pos") return True else: self._memo_id_pos = self.context._id_pos return False
@property def name(self): """Get name. """ return self._name @name.setter def name(self, name): """Set name """ self._name = name
[docs] @abc.abstractmethod def get_model(self, obs_time=None): """return detector image of model of the source :param obs_time: observation time in s :type obs_time: float :return: detector image of model of the source :rtype: array(80x80) of float """ return NotImplemented
[docs]class ModelFlat(AbstractModelSrc): """Generic class for flat model """ def __init__(self, level): """**constructor** :param level: intensity detected :type level: float [ph/cm2/s] """ super().__init__() self.pix_area = self.context._sim_geom.get_det_pixel_surface() shape = self.context._sim_geom.inst.detect.get_shape() self.detect_ones = np.ones(shape, dtype=np.float64) self.noise_cm2_s = level self._name_model = "Flat"
[docs] def get_model(self, obs_time=None): """return the shadowgram model :param obs_time: observation time in s :type obs_time: float :return: noise detector image in ph/pix :rtype: array(float) """ if not obs_time: obs_time = self.context.duration noise_pixel = self.noise_cm2_s * self.pix_area * obs_time return self.detect_ones * noise_pixel
[docs]class ModelInternalNoise(ModelFlat): """ Model for a flat internal noise, impact on detector of particles in the terrestrial environment The internal noise is independent from wavelength """ def __init__(self, noise_level=0.003): """**constructor** :param noise_level: internal noise level in ph/s/cm2/keV (default is 0.003) :type noise_level: float """ super().__init__(noise_level) self._name = "Internal noise" self.interne_noise = noise_level
[docs] def get_model(self, obs_time=None): """return the shadowgram model :param obs_time: observation time in s :type obs_time: float :return: noise detector image in ph/pix :rtype: array(float) """ if not obs_time: obs_time = self.context.duration emin, emax = self.context.energy_range size_band = emax - emin self.noise_cm2_s = size_band * self.interne_noise return super().get_model(obs_time)
[docs]class ModelCxbFlat(ModelFlat): """model for a flat CXB without spectrum """ def __init__(self, cxb_level=1): """**constructor** :param cxb_level: intensity of CXB :type cxb_level: float [ph/cm2/s] """ super().__init__(cxb_level) self._name = "CXB" self._name_model = "Flat"
[docs]class ModelCxbFlatBasedEnergy(AbstractModelSrc): """Model for a geometrically flat CXB with energy dependence as specified with the Moretti spectrum. """ # TODO: refactoring remove file from constructor def __init__(self, solidangle_filename="__to_remove__"): """**constructor** :param solidangle_filename: PATH/filename of the solid angle image :type solidangle_filename: string """ if solidangle_filename == "__to_remove__": solidangle_filename = add_path_data_ref_eclairs(osp.join("instru", "pixels_solid_angle_withcostheta.fits")) super().__init__() norm = 0.109 gamma_1 = 1.4 gamma_2 = 2.88 e_b = 29 self.solidangle_filename = solidangle_filename shape_based_solid_angle = read_fits(solidangle_filename, ugts_standard=True) mean_solid_angle = np.mean(shape_based_solid_angle) solid_angle = np.ones(shape=shape_based_solid_angle.shape) * mean_solid_angle self._name = "flat CXB Moretti spectrum" self.s_angle_pixel = solid_angle * self.context._sim_geom.get_det_pixel_surface() self.func = lambda e: norm / ((e / e_b) ** gamma_1 + (e / e_b) ** gamma_2) self.solid_angle = solid_angle
[docs] def get_model(self, obs_time=None): """return the shadowgram model :param obs_time: observation time in s :type obs_time: float :return: cxb detector image in ph/pix :rtype: array(float) .. warning :: a factor 0.8 is added to this model to make the count rate equal to the CXB shape case. TBI. """ # ## initial function get_model as implemented by CC/PB val = self.solid_angle[0, 0] assert np.array_equal(self.solid_angle, val) if not obs_time: obs_time = self.context.duration # flux = integral between e_min and e_max of eq. 4 of https://arxiv.org/pdf/0811.1444.pdf emin, emax = self.context.energy_range flux, _ = quad(self.func, emin, emax) ret = (obs_time * flux) * self.s_angle_pixel s_logger.info(f'ADDED 0.8 FACTOR in get_model cxb flat !!! \ [eband=[{emin, emax}] keV]') ret *= 0.8 return ret
[docs]class ModelCxbShapeBased(AbstractModelSrc): """Model for a CXB without spectrum based on the pixel solid angle shape ..warning :: dead code ? """ def __init__(self, cxb_level=3035, solidangle_filename="__to_remove__"): """**constructor** :param cxb_level: intensity of CXB :type cxb_level: float [ph/cm2/s] :param solidangle_filename: PATH/filename of the solid angle image :type solidangle_filename: string """ if solidangle_filename == "__to_remove__": solidangle_filename = add_path_data_ref_eclairs(osp.join("instru", "pixels_solid_angle.fits")) super().__init__() self.cxb_level = cxb_level self.solidangle_filename = solidangle_filename solid_angle = read_fits(solidangle_filename, ugts_standard=True) self.rate = cxb_level * solid_angle / solid_angle.sum() self._name = "CXB" self._name_model = "Shape"
[docs] def get_model(self, obs_time=None): """return the shadowgram model :param obs_time: observation time in s :type obs_time: float :return: cxb detector image in ph/pix :rtype: array(float) """ if not obs_time: obs_time = self.context.duration ret = self.rate * obs_time return ret
[docs]class ModelCxbShapeBasedEnergy(AbstractModelSrc): """Model for an energy dependent CXB based on the pixel solid angle shape """ def __init__(self, solidangle_filename="__to_remove__"): """**constructor** :param solidangle_filename: PATH/filename of the solid angle image :type solidangle_filename: string """ if solidangle_filename == "__to_remove__": solidangle_filename = add_path_data_ref_eclairs(osp.join("instru", "pixels_solid_angle.fits")) super().__init__() norm = 0.109 gamma_1 = 1.4 gamma_2 = 2.88 e_b = 29 # keV self.mask_aperture = self.context._sim_geom.inst.mask_aperture self.solidangle_filename = solidangle_filename self.solid_angle = read_fits(solidangle_filename, ugts_standard=True) self._name = "ModelCxbShapeBasedEnergy_NoEarth" self.func = lambda e: norm / ((e / e_b) ** gamma_1 + (e / e_b) ** gamma_2) det_pix_surf = self.context._sim_geom.inst.det_pix_size ** 2 self.s_angle_pix = 0.4 * det_pix_surf * self.solid_angle * (1. / self.mask_aperture)
[docs] def get_model(self, obs_time=None): """return the shadowgram model ..note: RMF must be handled here at some point ! ..note: doc to Moretti's model: flux = integral between e_min and e_max of eq. 4 of https://arxiv.org/pdf/0811.1444.pdf :param obs_time: observation time in s :type obs_time: float :return: cxb detector image in ph/pix :rtype: array(float) """ if not obs_time: obs_time = self.context.duration emin, emax = self.context.energy_range flux, _ = quad(self.func, emin, emax) # middle point integration for comparisons with AlG. # flux = self.func((emin+emax)*0.5) * (emax-emin) ret = (flux * obs_time) * self.s_angle_pix return ret
class _ModelMeanCXBShapeBasedEnergyWithEarth(AbstractModelSrc): """Model for an energy dependent CXB in the presence of Earth in FOV. """ def __init__(self): """**constructor** :param use_numba: whether or not to use pre-compiled function when computing solid angle. Default is True. :type use_numba: bool ..note: sum of solid angle with real mask accounts for cos theta """ super().__init__() self._name = 'shape_moretti_earth' self.mode = None self.norm = 0.109 self.gamma_1 = 1.4 self.gamma_2 = 2.88 self.e_b = 29 # keV def get_model(self, obs_time=None): """Return the shadowgram model. :param obs_time: observation time in s :type obs_time: float :return: shadowgram :rtype: 2D array(float) with shape 80x80 """ s_logger.info(f'Running get_model CXB with Earth [mode={self.mode}]') if not obs_time: obs_time = self.context.duration emin, emax = self.context.energy_range flux, _ = quad(self.func, emin, emax) ret = (flux * obs_time) * self.s_angle_pix return ret
[docs]class ModelMeanCXBShapeBasedEnergyWithEarthCalculator(_ModelMeanCXBShapeBasedEnergyWithEarth): """Model for an energy dependent CXB in the presence of Earth in FOV. CXB model is computed with mean shape. """ def __init__(self, limb=None, vec_earth=None, use_numba=True): """**constructor** :param limb: limb angle in degree :type limb: float :param vec_earth: earth direction unit vector :type vec_earth: list/array of size 3 :param use_numba: whether or not to use pre-compiled function when computing solid angle. Default is True. :type use_numba: bool ..note: sum of solid angle with real mask accounts for cos theta """ super().__init__() self._name = 'shape_moretti_earth_calculator' self.mode = "calculator" if (limb is not None) and (vec_earth is not None): self.limb = limb self.vec_earth = vec_earth else: self.limb = self.context.earth_limb self.vec_earth = self.context.earth_pos_fov_unit inst_ecl = self.context._sim_geom.inst det_pix_surf = inst_ecl.det_pix_size ** 2 sacwe = SolidAngleComputationWithEarth(inst_ecl, use_numba=use_numba) self.solid_angle_with_earth = sacwe.run_calculator(self.limb, self.vec_earth) self.func = lambda e: self.norm / ((e / self.e_b) ** self.gamma_1 + \ (e / self.e_b) ** self.gamma_2) self.s_angle_pix = 0.4 * det_pix_surf * self.solid_angle_with_earth * \ (1. / inst_ecl.mask_aperture)
[docs]class ModelMeanCXBShapeBasedEnergyWithEarthSimulator(_ModelMeanCXBShapeBasedEnergyWithEarth): """Model for an energy dependent CXB in the presence of Earth in FOV. CXB model is computed with a ray-tracing method. """ def __init__(self): super().__init__() self._name = 'shape_moretti_earth_simulator' self.mode = "simulator" raise NotImplementedError
[docs]class ModelEarthMoretti(AbstractModelSrc): """Model for an energy dependent CXB in the presence of Earth in FOV. CXB model is computed with mean shape. """ def __init__(self, limb=None, vec_earth=None): """**constructor** :param limb: limb angle in degree :type limb: float :param vec_earth: earth direction unit vector :type vec_earth: list/array of size 3 :param use_numba: whether or not to use pre-compiled function when computing solid angle. Default is True. :type use_numba: bool ..note: sum of solid angle with real mask accounts for cos theta """ super().__init__() self._name = 'ModelEarthMoretti' self.mode = "calculator" self.context.add_observer(self) self.norm = 0.109 self.gamma_1 = 1.4 self.gamma_2 = 2.88 self.e_b = 29 # keV if (limb is not None) and (vec_earth is not None): self.limb = limb self.vec_earth = vec_earth else: self.limb = self.context.earth_limb self.vec_earth = self.context.earth_pos_fov_unit instru_ecl = self.context._sim_geom.inst self.sacwe = SolidAngleSingleton(instru_ecl) # prcomputing CXB flux with model Moretti for all channels self.moretti = np.zeros(self.context._mdl_effect.chan_nb(), dtype=np.float32) func = lambda e: self.norm / ((e / self.e_b) ** self.gamma_1 + \ (e / self.e_b) ** self.gamma_2) mdl_effect = self.context._mdl_effect # pre-computing CXB Moretti flux in each band for chan in range(self.context._mdl_effect.chan_nb()): e_min = mdl_effect.chan_boundary[chan] e_max = mdl_effect.chan_boundary[chan + 1] flux, _ = quad(func, e_min, e_max) self.moretti[chan] = flux self.s_angle_pix_no_earth = None self.update()
[docs] def update(self): """**constructor** :param limb: limb angle in degree :type limb: float :param vec_earth: earth direction unit vector :type vec_earth: list/array of size 3 :param use_numba: whether or not to use pre-compiled function when computing solid angle. Default is True. :type use_numba: bool ..note: sum of solid angle with real mask accounts for cos theta """ if super().update(): s_logger.info("ModelEarthMoretti already update") return instru_ecl = self.context._sim_geom.inst det_pix_surf = instru_ecl.det_pix_size ** 2 s_logger.debug("update ModelEarthMoretti") solid_angle_with_earth = self.sacwe.run(self.context._earth_fov) self.s_angle_pix = 0.4 * det_pix_surf * solid_angle_with_earth * \ (1. / instru_ecl.mask_aperture)
[docs] def get_model(self, obs_time=None): """Return the shadowgram model. :param obs_time: observation time in s :type obs_time: float :return: shadowgram :rtype: 2D array(float) with shape 80x80 """ if not obs_time: obs_time = self.context.duration return (self.moretti[self.context.idx_chan] * obs_time) * self.s_angle_pix
[docs]class ModelPointSrcIRF(AbstractModelSrc): """model for point source """ S_cpt = 0 def __init__(self, info_src): """**Constructor** info_src must have at least the same number of intensities as idx_chan :param info_src: single source information {'elev','dir','intensity','name'}. Elev and dir in deg. :type info_src: astropy.table.Row or dict """ super().__init__() self.info_src = info_src if (ModelPointSrcIRF.S_cpt == 0) and \ isinstance(info_src, Row): s_logger.debug(f'source info: {self.info_src}') s_logger.debug(f'source info dtype: {self.info_src.dtype}') ModelPointSrcIRF.S_cpt += 1 self.simu = self.context._sim_geom self._name = self.info_src["name"] self._name_model = "with IRF" self._shadow_cm2 = None self._last_shadow = None self.coef_hors_axis = self.set_coef_hors_axis()
[docs] def set_coef_hors_axis(self): """Return coefficient associated to cos theta or sin elev off axis effect. :return: coefficient (units ?) :rtype: array(float) """ pix_y, pix_z = self.simu.inst.elevdir_to_skypix(np.array(self.info_src['elev']), np.array(self.info_src['dir'])) return self.context._mdl_effect.get_irf_val_ecllos(pix_y, pix_z)[0]
[docs] def get_model(self, obs_time=None): """return the shadowgram model intensity given in ph/cm2/s in the current channel. :param obs_time: observation time in s :type obs_time: float :return: cxb detector image in ph/pix :rtype: array(float) """ if not obs_time: obs_time = self.context.duration if self._shadow_cm2 is None: self._shadow_cm2 = self.simu.get_shadow_surface(self.info_src['elev'], self.info_src['dir']).copy() try: intensity = self.info_src['intensity'][self.context.idx_chan] # except ValueError: except (ValueError, TypeError, IndexError): s_logger.exception('Recovering intensity') intensity = self.info_src['intensity'] self._last_shadow = intensity * obs_time * self._shadow_cm2 * self.coef_hors_axis return self._last_shadow.copy()
[docs]class ModelPointSrcSinElev(ModelPointSrcIRF): """Model for point source. Elevation angle is in degree. """ def __init__(self, info_src): """**Constructor** :param info_src: single source information {'elev','dir','intensity','name'}. Elev and dir in deg. :type info_src: astropy.table.Row or dict """ self.sin_elev = np.sin(np.deg2rad(info_src['elev'])) super().__init__(info_src) self._name_model = "with sin(elev)"
[docs] def set_coef_hors_axis(self): """This function must be renamed as 'get_coef_hors_axis'. Verbose argument is unused. """ return self.sin_elev
[docs]class ListModelPointSrcFromCatalog(object): """Manage list of point source """ def __init__(self, use_irf=True): """**Constructor** o_dpix used to define dpix channel """ self.use_irf = use_irf self.context = GlobalContextSimulator() self._dpix = self.context.mdl_effect # declare logger s_logger.debug('creating an instance of ListModelPointSrcFromCatalog')
[docs] def set_use_irf(self, use_irf): assert use_irf in [True, False] self.use_irf = use_irf
def _get_model_point_src(self, info_src): if self.use_irf: mdl = ModelPointSrcIRF(info_src) else: mdl = ModelPointSrcSinElev(info_src) return mdl
[docs] def init_with_cat_swift_2012(self): """return a list of ShadowModelPointSrcSinElev from the SWIFT/BAT catalog attitude used to define fov """ path_cat = add_path_data_ref_eclairs(osp.join("cat", "BAT_70m_catalog_20nov2012.fits")) cat_bat = swift_catalog_to_eclairs_catalog(path_cat, self._dpix) assert isinstance(cat_bat, cat.CatalogAstroWithEnergySpecSampling) return self.init_with_cat_astro(cat_bat)
[docs] def init_with_cat_astro(self, astro_cat): """ return a list of ShadowModelPointSrcSinElev from a catalog attitude used to define fov :param astro_cat: table with intensoty column """ # select only sources in fov with CatalogFovEclairs self.cat_fov = cat.CatalogFovEclairs() self.cat_fov.set_ptg_instru(self.context.eclairs_attitude) self.cat_fov.from_astro_catalog_with_spectrum(astro_cat) d_src = {} s_logger.info(f'Number of sources in FOV: {self.cat_fov.get_nb_element()}') for isrc in range(self.cat_fov.get_nb_element()): info_src = self.cat_fov._catalog[isrc] mdl = self._get_model_point_src(info_src) d_src[mdl.name] = mdl s_logger.debug(f'adding {info_src["name"]} source') s_logger.info(f"Selected {len(d_src)} sources in FOV") return d_src
[docs] def init_with_custom_cat(self, path_cat): """return a list of ShadowModelPointSrcSinElev from a custom catalog. :param path_cat: path to custom source catalog. :type path_cat: str :return: sources infos :rtype: dict """ cat_bat = swift_catalog_to_eclairs_catalog(path_cat, self._dpix) assert isinstance(cat_bat, cat.CatalogAstroWithEnergySpecSampling) # select only sources in fov with CatalogFovEclairs self.cat_fov = cat.CatalogFovEclairs() self.cat_fov.set_ptg_instru(self.context.eclairs_attitude) self.cat_fov.from_astro_catalog_with_spectrum(cat_bat) d_src = {} s_logger.info(f'Number of sources in FOV: {self.cat_fov.get_nb_element()}') for isrc in range(self.cat_fov.get_nb_element()): info_src = self.cat_fov._catalog[isrc] mdl = self._get_model_point_src(info_src) d_src[mdl.name] = mdl s_logger.debug(f'adding {info_src["name"]} source') s_logger.info(f"Selected {len(d_src)} sources in FOV with custom catalog") return d_src
[docs] def init_with_cat_fov(self, cat_fov): """Initialize catalog with sources in FOV. :param cat_fov: source catalog. :type cat_fov: CatalogFovEclairs object :return: sources infos :rtype: dict """ assert isinstance(cat_fov, cat.CatalogFovEclairs) d_src = {} for isrc in range(cat_fov.get_nb_element()): info_src = cat_fov._catalog[isrc] s_logger.debug(f'adding {info_src["name"]} source') mdl = self._get_model_point_src(info_src) d_src[mdl.name] = mdl s_logger.info(f"Selected {len(d_src)} sources in FOV with user catalog") return d_src