Source code for ecpi.simu.lib.sources

'''Sources simulation module

Created on 14 févr. 2019

author: catalano camille , APC/IN2P3/CNRS
'''

import logging
import numpy as np
from astropy.table import Row
from scipy.integrate import quad

import ecpi.common.sky.catalog as cat
from ecpi.common import add_path_data_ref_eclairs
from ecpi.common.io.fits_tools import read_fits
from ecpi.common.sky.cat_builder import swift_catalog_to_eclairs_catalog
from ecpi.common.instru.model_effect import ECLAIRsDetectorEffect
from ecpi.common.instru.solid_angle import SolidAngleComputationWithEarth
from ecpi.common.instru.solid_angle import SolidAngleWithEarthOrNot
from ecpi.simu.lib.context import GlobalContextSimulator


[docs]class ModelSrcInterface(object): """ Base object for the creation of the shadowgram model of different sources """ def __init__(self): """**Constructor** .. warning :: _snr attribute is unused so far """ self._snr = -1 self._name = "Unknown" self._name_model = "Not defined" self._memo_id_pos = -1 self.context = GlobalContextSimulator() self.logger = logging.getLogger(__name__)
[docs] def get_noisy_model(self, obs_time=None): """Return the shadowgram model with poisson noise. :param obs_time: observation time in seconds :type obs_time: float """ return np.random.poisson(self.get_model(obs_time))
[docs] def update(self): """return True if the model is already update with position or not yet defined """ # for design pattern observer # overload only src is depended of satellite position and call this before if self.context._id_pos < 0: self.logger.warning("no position satellite defined") return True if self._memo_id_pos == self.context._id_pos: return True else: self._memo_id_pos = self.context._id_pos return False
@property def name(self): """Get name. """ return self._name @name.setter def name(self, name): """Set name """ self._name = name
[docs]class ModelFlat(ModelSrcInterface): """Generic class for flat model """ def __init__(self, level): """**constructor** :param level: intensity detected :type level: float [ph/cm2/s] """ super().__init__() self.pix_area = self.context._sim_geom.get_det_pixel_surface() shape = self.context._sim_geom.inst.detect.get_shape() self.detect_ones = np.ones(shape, dtype=np.float64) self.noise_cm2_s = level self._name_model = "Flat"
[docs] def get_model(self, obs_time=None): """return the shadowgram model :param obs_time: observation time in s :type obs_time: float :return: noise detector image in ph/pix :rtype: array(float) """ if not obs_time: obs_time = self.context.duration noise_pixel = self.noise_cm2_s * self.pix_area * obs_time return self.detect_ones * noise_pixel
[docs]class ModelInternalNoise(ModelFlat): """ Model for a flat internal noise, impact on detector of particles in the terrestrial environment The internal noise is independent from wavelength """ def __init__(self, noise_level=0.003): """**constructor** :param noise_level: internal noise level in ph/s/cm2/keV (default is 0.003) :type noise_level: float """ super().__init__(noise_level) self._name = "Internal noise" self.interne_noise = noise_level
[docs] def get_model(self, obs_time=None): """return the shadowgram model :param obs_time: observation time in s :type obs_time: float :return: noise detector image in ph/pix :rtype: array(float) """ if not obs_time: obs_time = self.context.duration emin, emax = self.context.energy_range size_band = emax - emin self.noise_cm2_s = size_band * self.interne_noise return super().get_model(obs_time)
[docs]class ModelCxbFlat(ModelFlat): """model for a flat CXB without spectrum """ def __init__(self, cxb_level=1): """**constructor** :param cxb_level: intensity of CXB :type cxb_level: float [ph/cm2/s] """ super().__init__(cxb_level) self._name = "CXB" self._name_model = "Flat"
[docs]class ModelCxbFlatBasedEnergy(ModelSrcInterface): """Model for a geometrically flat CXB with energy dependence as specified with the Moretti spectrum. """ def __init__(self, solidangle_filename=\ add_path_data_ref_eclairs('instru/pixels_solid_angle_withcostheta.fits')): """**constructor** :param solidangle_filename: PATH/filename of the solid angle image :type solidangle_filename: string """ super().__init__() norm = 0.109 gamma_1 = 1.4 gamma_2 = 2.88 e_b = 29 self.solidangle_filename = solidangle_filename shape_based_solid_angle = read_fits(solidangle_filename, ugts_standard=1) mean_solid_angle = np.mean(shape_based_solid_angle) solid_angle = np.ones(shape=shape_based_solid_angle.shape) * mean_solid_angle self._name = "flat CXB Moretti spectrum" self.s_angle_pixel = solid_angle * self.context._sim_geom.get_det_pixel_surface() self.func = lambda e: norm / ((e / e_b) ** gamma_1 + (e / e_b) ** gamma_2) self.solid_angle = solid_angle
[docs] def get_model(self, obs_time=None): """return the shadowgram model :param obs_time: observation time in s :type obs_time: float :return: cxb detector image in ph/pix :rtype: array(float) .. warning :: a factor 0.8 is added to this model to make the count rate equal to the CXB shape case. TBI. """ # ## initial function get_model as implemented by CC/PB val = self.solid_angle[0, 0] assert (self.solid_angle == val).all() if not obs_time: obs_time = self.context.duration # flux = integral between e_min and e_max of eq. 4 of https://arxiv.org/pdf/0811.1444.pdf emin, emax = self.context.energy_range flux, _ = quad(self.func, emin, emax) ret = (obs_time * flux) * self.s_angle_pixel self.logger.info(f'ADDED 0.8 FACTOR in get_model cxb flat !!! \ [eband=[{emin, emax}] keV]') ret *= 0.8 return ret
[docs]class ModelCxbShapeBased(ModelSrcInterface): """Model for a CXB without spectrum based on the pixel solid angle shape ..warning :: dead code ? """ def __init__(self, cxb_level=3035, solidangle_filename=add_path_data_ref_eclairs('instru/pixels_solid_angle.fits')): """**constructor** :param cxb_level: intensity of CXB :type cxb_level: float [ph/cm2/s] :param solidangle_filename: PATH/filename of the solid angle image :type solidangle_filename: string """ super().__init__() self.cxb_level = cxb_level self.solidangle_filename = solidangle_filename solid_angle = read_fits(solidangle_filename, ugts_standard=1) self.rate = cxb_level * solid_angle / solid_angle.sum() self._name = "CXB" self._name_model = "Shape"
[docs] def get_model(self, obs_time=None): """return the shadowgram model :param obs_time: observation time in s :type obs_time: float :return: cxb detector image in ph/pix :rtype: array(float) """ if not obs_time: obs_time = self.context.duration ret = self.rate * obs_time return ret
[docs]class ModelCxbShapeBasedEnergy(ModelSrcInterface): """Model for an energy dependent CXB based on the pixel solid angle shape """ def __init__(self, solidangle_filename=\ add_path_data_ref_eclairs('instru/pixels_solid_angle_withcostheta.fits')): """**constructor** :param solidangle_filename: PATH/filename of the solid angle image :type solidangle_filename: string """ super().__init__() norm = 0.109 gamma_1 = 1.4 gamma_2 = 2.88 e_b = 29 # keV self.mask_aperture = self.context._sim_geom.inst.mask_aperture self.solidangle_filename = solidangle_filename self.solid_angle = read_fits(solidangle_filename, ugts_standard=1) self._name = "energy CXB shape" self.func = lambda e: norm / ((e / e_b) ** gamma_1 + (e / e_b) ** gamma_2) det_pix_surf = self.context._sim_geom.inst.det_pix_size ** 2 self.s_angle_pix = 0.4 * det_pix_surf * self.solid_angle * (1. / self.mask_aperture)
[docs] def get_model(self, obs_time=None): """return the shadowgram model ..note: RMF must be handled here at some point ! ..note: doc to Moretti's model: flux = integral between e_min and e_max of eq. 4 of https://arxiv.org/pdf/0811.1444.pdf :param obs_time: observation time in s :type obs_time: float :return: cxb detector image in ph/pix :rtype: array(float) """ if not obs_time: obs_time = self.context.duration emin, emax = self.context.energy_range flux, _ = quad(self.func, emin, emax) # middle point integration for comparisons with AlG. # flux = self.func((emin+emax)*0.5) * (emax-emin) ret = (flux * obs_time) * self.s_angle_pix return ret
class _ModelMeanCXBShapeBasedEnergyWithEarth(ModelSrcInterface): """Model for an energy dependent CXB in the presence of Earth in FOV. """ def __init__(self): """**constructor** :param use_numba: whether or not to use pre-compiled function when computing solid angle. Default is True. :type use_numba: bool ..note: sum of solid angle with real mask accounts for cos theta """ super().__init__() self._name = 'shape_moretti_earth' self.mode = None self.norm = 0.109 self.gamma_1 = 1.4 self.gamma_2 = 2.88 self.e_b = 29 # keV def get_model(self, obs_time=None): """Return the shadowgram model. :param obs_time: observation time in s :type obs_time: float :return: shadowgram :rtype: 2D array(float) with shape 80x80 """ logging.info(f'Running get_model CXB with Earth [mode={self.mode}]') if not obs_time: obs_time = self.context.duration emin, emax = self.context.energy_range flux, _ = quad(self.func, emin, emax) ret = (flux * obs_time) * self.s_angle_pix return ret
[docs]class ModelMeanCXBShapeBasedEnergyWithEarthCalculator(_ModelMeanCXBShapeBasedEnergyWithEarth): """Model for an energy dependent CXB in the presence of Earth in FOV. CXB model is computed with mean shape. """ def __init__(self, limb=None, vec_earth=None, use_numba=True): """**constructor** :param limb: limb angle in degree :type limb: float :param vec_earth: earth direction unit vector :type vec_earth: list/array of size 3 :param use_numba: whether or not to use pre-compiled function when computing solid angle. Default is True. :type use_numba: bool ..note: sum of solid angle with real mask accounts for cos theta """ super().__init__() self._name = 'shape_moretti_earth_calculator' self.mode = "calculator" if (limb is not None) and (vec_earth is not None): self.limb = limb self.vec_earth = vec_earth else: self.limb = self.context.earth_limb self.vec_earth = self.context.earth_pos_fov_unit inst_ecl = self.context._sim_geom.inst det_pix_surf = inst_ecl.det_pix_size ** 2 sacwe = SolidAngleComputationWithEarth(inst_ecl, use_numba=use_numba) self.solid_angle_with_earth = sacwe.run_calculator(self.limb, self.vec_earth) self.func = lambda e: self.norm / ((e / self.e_b) ** self.gamma_1 + \ (e / self.e_b) ** self.gamma_2) self.s_angle_pix = 0.4 * det_pix_surf * self.solid_angle_with_earth *\ (1. / inst_ecl.mask_aperture)
[docs]class ModelMeanCXBShapeBasedEnergyWithEarthSimulator(_ModelMeanCXBShapeBasedEnergyWithEarth): """Model for an energy dependent CXB in the presence of Earth in FOV. CXB model is computed with a ray-tracing method. """ def __init__(self): super().__init__() self._name = 'shape_moretti_earth_simulator' self.mode = "simulator" raise NotImplementedError
[docs]class ModelMeanCXBMorettiSmart(ModelSrcInterface): '''model CXB Moretti with and without earth ''' def __init__(self): super().__init__() self._name = 'shape_moretti_smart' self.mdl_earth = ModelEarthMoretti() self.mdl_no_earth = ModelCxbShapeBasedEnergy() self.mdl_used = self.mdl_earth self.context.add_observer(self)
[docs] def update(self): if self.context.is_in_fov(): self.mdl_used = self.mdl_earth print("WITH Earth") else: self.mdl_used = self.mdl_no_earth print("NO Earth")
[docs] def get_model(self, obs_time=None): return self.mdl_used.get_model(obs_time)
[docs]class ModelEarthMoretti(ModelSrcInterface): """Model for an energy dependent CXB in the presence of Earth in FOV. CXB model is computed with mean shape. """ def __init__(self, limb=None, vec_earth=None): """**constructor** :param limb: limb angle in degree :type limb: float :param vec_earth: earth direction unit vector :type vec_earth: list/array of size 3 :param use_numba: whether or not to use pre-compiled function when computing solid angle. Default is True. :type use_numba: bool ..note: sum of solid angle with real mask accounts for cos theta """ super().__init__() self._name = 'shape_moretti_earth_calculator' self.mode = "calculator" self.context.add_observer(self) self.norm = 0.109 self.gamma_1 = 1.4 self.gamma_2 = 2.88 self.e_b = 29 # keV if (limb is not None) and (vec_earth is not None): self.limb = limb self.vec_earth = vec_earth else: self.limb = self.context.earth_limb self.vec_earth = self.context.earth_pos_fov_unit instru_ecl = self.context._sim_geom.inst self.sacwe = SolidAngleWithEarthOrNot(instru_ecl) # prcomputing CXB flux with model Moretti for all channels self.moretti = np.zeros(self.context._mdl_effect.chan_nb(), dtype=np.float32) func = lambda e: self.norm / ((e / self.e_b) ** self.gamma_1 +\ (e / self.e_b) ** self.gamma_2) mdl_effect = self.context._mdl_effect for chan in range(self.context._mdl_effect.chan_nb()): e_min = mdl_effect.chan_boundary[chan] e_max = mdl_effect.chan_boundary[chan + 1] flux, _ = quad(func, e_min, e_max) self.moretti[chan] = flux self.s_angle_pix_no_earth = None self.update(limb, vec_earth) # to be removed ? # def _update_solid_angle(self): # instru_ecl = self.context._sim_geom.inst # det_pix_surf = instru_ecl.det_pix_size ** 2 # solid_angle_with_earth = self.sacwe.run_calculator(self.limb, self.vec_earth) # self.s_angle_pix = 0.4 * det_pix_surf * solid_angle_with_earth * (1. / instru_ecl.mask_aperture)
[docs] def update(self, limb=None, vec_earth=None, use_numba=True): """**constructor** :param limb: limb angle in degree :type limb: float :param vec_earth: earth direction unit vector :type vec_earth: list/array of size 3 :param use_numba: whether or not to use pre-compiled function when computing solid angle. Default is True. :type use_numba: bool ..note: sum of solid angle with real mask accounts for cos theta """ if super().update(): self.logger.debug("Already update") return self.logger.debug("update now") if (limb is not None) and (vec_earth is not None): self.limb = limb self.vec_earth = vec_earth else: self.limb = self.context.earth_limb self.vec_earth = self.context.earth_pos_fov_unit instru_ecl = self.context._sim_geom.inst det_pix_surf = instru_ecl.det_pix_size ** 2 solid_angle_with_earth = self.sacwe.run_calculator(self.limb, self.vec_earth) self.s_angle_pix = 0.4 * det_pix_surf * solid_angle_with_earth *\ (1. / instru_ecl.mask_aperture)
[docs] def get_model(self, obs_time=None): """Return the shadowgram model. :param obs_time: observation time in s :type obs_time: float :return: shadowgram :rtype: 2D array(float) with shape 80x80 """ logging.info(f'Running get_model CXB with Earth [mode={self.mode}]') if not obs_time: obs_time = self.context.duration ret = (self.moretti[self.context.idx_chan] * obs_time) * self.s_angle_pix return ret
[docs]class ModelPointSrcIRF(ModelSrcInterface): """model for point source """ S_cpt = 0 def __init__(self, info_src): """**Constructor** info_src must have at least the same number of intensities as idx_chan :param info_src: single source information {'elev','dir','intensity','name'}. Elev and dir in deg. :type info_src: astropy.table.Row or dict """ super().__init__() self.info_src = info_src if (ModelPointSrcIRF.S_cpt == 0) and \ isinstance(info_src, Row): self.logger.debug(f'source info: {self.info_src}') self.logger.debug(f'source info dtype: {self.info_src.dtype}') ModelPointSrcIRF.S_cpt += 1 self.simu = self.context._sim_geom self._name = self.info_src["name"] self._name_model = "with IRF" self._shadow_cm2 = None self._last_shadow = None self.coef_hors_axis = self.set_coef_hors_axis()
[docs] def set_coef_hors_axis(self): """Return coefficient associated to cos theta or sin elev off axis effect. :return: coefficient (units ?) :rtype: array(float) """ pix_y, pix_z = self.simu.inst.elevdir_to_skypix(np.array(self.info_src['elev']), np.array(self.info_src['dir'])) return self.context._mdl_effect.get_irf_val_ecllos(pix_y, pix_z)[0]
[docs] def get_model(self, obs_time=None): """return the shadowgram model intensity given in ph/cm2/s in the current channel. :param obs_time: observation time in s :type obs_time: float :return: cxb detector image in ph/pix :rtype: array(float) """ if not obs_time: obs_time = self.context.duration if self._shadow_cm2 is None: self._shadow_cm2 = self.simu.get_shadow_surface(self.info_src['elev'], self.info_src['dir']).copy() try: intensity = self.info_src['intensity'][self.context.idx_chan] # except ValueError: except (ValueError, TypeError, IndexError): intensity = self.info_src['intensity'] self._last_shadow = intensity * obs_time * self._shadow_cm2 * self.coef_hors_axis return self._last_shadow.copy()
[docs]class ModelPointSrcSinElev(ModelPointSrcIRF): """Model for point source. Elevation angle is in degree. """ def __init__(self, info_src): """**Constructor** :param info_src: single source information {'elev','dir','intensity','name'}. Elev and dir in deg. :type info_src: astropy.table.Row or dict """ self.sin_elev = np.sin(np.deg2rad(info_src['elev'])) super().__init__(info_src) self._name_model = "with sin(elev)"
[docs] def set_coef_hors_axis(self): """This function must be renamed as 'get_coef_hors_axis'. Verbose argument is unused. """ return self.sin_elev
[docs]class ListModelPointSrcFromCatalog(object): """Manage list of point source """ def __init__(self, o_dpix, use_irf=True): """**Constructor** o_dpix used to define dpix channel """ assert isinstance(o_dpix, ECLAIRsDetectorEffect) self._dpix = o_dpix self.use_irf = use_irf self.context = GlobalContextSimulator() # declare logger self.logger = logging.getLogger(__name__) self.logger.debug('creating an instance of ListModelPointSrcFromCatalog')
[docs] def set_use_irf(self, use_irf): assert use_irf in [True, False] self.use_irf = use_irf
def _get_model_point_src(self, info_src): if self.use_irf: mdl = ModelPointSrcIRF(info_src) else: mdl = ModelPointSrcSinElev(info_src) return mdl
[docs] def init_with_cat_swift_2012(self): """return a list of ShadowModelPointSrcSinElev from the SWIFT/BAT catalog attitude used to define fov """ path_cat = add_path_data_ref_eclairs("cat/BAT_70m_catalog_20nov2012.fits") cat_bat = swift_catalog_to_eclairs_catalog(path_cat, self._dpix) assert isinstance(cat_bat, cat.CatalogAstroWithEnergySpecSampling) return self.init_with_cat_astro(cat_bat)
[docs] def init_with_cat_astro(self, astro_cat): """ return a list of ShadowModelPointSrcSinElev from a catalog attitude used to define fov """ # select only sources in fov with CatalogFovEclairs self.cat_fov = cat.CatalogFovEclairs() self.cat_fov.set_ptg_instru(self.context.eclairs_attitude) self.cat_fov.from_astro_catalog_with_spectrum(astro_cat) d_src = {} print(f'Number of sources in FOV: {self.cat_fov.get_nb_element()}') self.logger.info(f'Number of sources in FOV: {self.cat_fov.get_nb_element()}') for isrc in range(self.cat_fov.get_nb_element()): info_src = self.cat_fov._catalog[isrc] mdl = self._get_model_point_src(info_src) d_src[mdl.name] = mdl self.logger.debug(f'adding {info_src["name"]} source') self.logger.info(f"Selected {len(d_src)} sources in FOV with SWIFT/BAT catalog") return d_src
[docs] def init_with_custom_cat(self, path_cat): """return a list of ShadowModelPointSrcSinElev from a custom catalog. :param path_cat: path to custom source catalog. :type path_cat: str :return: sources infos :rtype: dict """ cat_bat = swift_catalog_to_eclairs_catalog(path_cat, self._dpix) assert isinstance(cat_bat, cat.CatalogAstroWithEnergySpecSampling) # select only sources in fov with CatalogFovEclairs self.cat_fov = cat.CatalogFovEclairs() self.cat_fov.set_ptg_instru(self.context.eclairs_attitude) self.cat_fov.from_astro_catalog_with_spectrum(cat_bat) d_src = {} print(f'Number of sources in FOV: {self.cat_fov.get_nb_element()}') self.logger.info(f'Number of sources in FOV: {self.cat_fov.get_nb_element()}') for isrc in range(self.cat_fov.get_nb_element()): info_src = self.cat_fov._catalog[isrc] mdl = self._get_model_point_src(info_src) d_src[mdl.name] = mdl self.logger.debug(f'adding {info_src["name"]} source') self.logger.info(f"Selected {len(d_src)} sources in FOV with custom catalog") return d_src
[docs] def init_with_cat_fov(self, cat_fov): """Initialize catalog with sources in FOV. :param cat_fov: source catalog. :type cat_fov: CatalogFovEclairs object :return: sources infos :rtype: dict """ assert isinstance(cat_fov, cat.CatalogFovEclairs) d_src = {} for isrc in range(cat_fov.get_nb_element()): info_src = cat_fov._catalog[isrc] self.logger.debug(f'adding {info_src["name"]} source') mdl = self._get_model_point_src(info_src) d_src[mdl.name] = mdl self.logger.info(f"Selected {len(d_src)} sources in FOV with user catalog") return d_src