Source code for ecpi.common.instru.model_effect

"""Treatment of detector effects
"""

import logging
import os.path as osp
import os
from astropy.io import fits
import numpy as np
import matplotlib.pyplot as plt
import scipy.stats as scs
import scipy.interpolate as sci
import ecpi.common.instru.array_convention as ac
import ecpi.common.num.array_square_cell as asc
from ecpi.common.num.extrapolate_image import extrapolate_edge
from ecpi.common import add_path_data_ref_eclairs, add_path_caldb_eclairs


[docs]def get_idx_first_zero(p_array): """return idx of the first zero """ z_idx = np.where(p_array == 0.0)[0] if len(z_idx) == 0: return p_array.size return z_idx[0]
[docs]class ECLAIRsDetectorEffect(object): """ Implementation of detector effects (ARF, IRF, RMF) """ def __init__(self): """**Constructor** Read ARF, IRF, RMF and return values/products or processing expected by ECPI .. warning:: 2015 version with old mask so **IRF isn't up-to-date**, ARF is in approximative percent format """ # declare logger self.logger = logging.getLogger(__name__) # protections. self.defined_arf = False self.defined_irf = False self.defined_rmf = False self.defined_efficiency = False self.defined_nonuniformity = False self.defined_globalefficiency = False self._arf_percent = None self._boundary_chan = None self._channel = None self._channel_center = None self._step_channel = 0 self.rmf_col = None def __repr__(self): return f""" EclairsDetectorEffect object ============================ ARF defined: {self.defined_arf} IRF defined: {self.defined_irf} RMF defined: {self.defined_rmf} Efficiency defined: {self.defined_efficiency} Non uniformity defined: {self.defined_nonuniformity} Global efficiency defined: {self.defined_globalefficiency} """
[docs] def set_arf(self, path_arf): """Set ARF related attributes from file path :param path_arf: path to ARF file :type path_arf: str """ self.defined_arf = True # ARF self.path_arf = path_arf f_arf = fits.open(path_arf) arf = f_arf[1].data f_arf.close() # TODO: JMC revoir la definition open_surface dans prochain ARF avec le bon masque max_open_surface = arf["SPECRESP"].max() self._arf_percent = arf["SPECRESP"] / (max_open_surface * 1.01) # boundary channel self._boundary_chan = np.concatenate((arf["ENERG_LO"], np.array([arf["ENERG_HI"][-1]]))) self._channel_center = (arf["ENERG_LO"] + arf["ENERG_HI"]) / 2 a_step_channel = arf["ENERG_HI"] - arf["ENERG_LO"] self._step_channel = arf["ENERG_HI"][0] - arf["ENERG_LO"][0] assert np.allclose(a_step_channel, self._step_channel, rtol=1e-3)
# self._update_array_index()
[docs] def set_rmf(self, path_rmf): """Set RMF related attributes from file path :param path_rmf: path to RMF file :type path_rmf: str """ self.defined_rmf = True # RMF f_rmf = fits.open(path_rmf) self.rmf_col = f_rmf[1].data ebounds = f_rmf[2].data f_rmf.close() # Tests consistency with ARF rmf_boundary_chan = np.concatenate((ebounds["E_MIN"], np.array([ebounds["E_MAX"][-1]]))) assert np.array_equal(rmf_boundary_chan, self._boundary_chan) self._channel = ebounds["CHANNEL"]
[docs] def set_irf(self, path_irf): """Set IRF related attributes from file path :param path_irf: path to IRF file :type path_irf: str """ self.defined_irf = True # IRF f_irf = fits.open(path_irf) array_irf = ac.ijdet_to_yzegp_array(f_irf[0].data) self._irf = asc.ArraySquareCell(199, 199, 1) self._irf.set_array(array_irf) self._irf.set_origin_center() f_irf.close() # extrapolated IRF array_irf = extrapolate_edge(array_irf) irf_extra = asc.ArraySquareCell(201, 201, 1) irf_extra.set_array(array_irf) irf_extra.set_origin_center() y_g, z_g = irf_extra.pos_center_cells() self._irf_interpol = sci.interp2d(y_g, z_g, irf_extra.ar_val.transpose(), kind='cubic', fill_value=None)
[docs] def set_efficiency(self, path_efficiency): """Set detector efficiency from file path. :param path_efficiency: path to detector efficiency file :type path_efficiency: str """ self.defined_efficiency = True self.path_efficiency = path_efficiency f_eff = fits.open(path_efficiency) self.efficiency = f_eff[1].data f_eff.close()
[docs] def set_nonuniformity(self, path_nonuniformity): """Set detector non uniformity from file path. :param path_nonuniformity: path to detector efficiency file :type path_nonuniformity: str """ self.defined_nonuniformity = True self.path_nonuniformity = path_nonuniformity f_nunif = fits.open(path_nonuniformity) self.non_uniformity = f_nunif[1].data f_nunif.close()
[docs] def compute_globalefficiency(self): """Compute global efficiency defined as the elementwise product between efficiency and non uniformity matrices. """ assert self.defined_efficiency assert self.defined_nonuniformity self.defined_globalefficiency = True EG = np.multiply(self.efficiency, self.non_uniformity) assert not 0 in EG self.global_efficiency = EG
def _update_array_index(self): assert self.defined_arf self._a_idx = np.array(list(range(self.chan_nb())))
[docs] def reduce_channel(self, nb_chan): assert self.defined_arf if (nb_chan > 0) and (nb_chan < self.chan_nb()): self._arf_percent = self._arf_percent[:nb_chan] self._boundary_chan = self._boundary_chan[:nb_chan + 1] self._channel = self._channel[:nb_chan] self._channel_center = self._channel_center[:nb_chan] self._update_array_index() else: self.logger.warning(f'Invalid value nb_chan ({nb_chan}), no change !')
[docs] def arf_percent(self): assert self.defined_arf return self._arf_percent
@property def chan_step(self): assert self.defined_arf return self._step_channel @property def chan_boundary(self): assert self.defined_arf return self._boundary_chan @property def chan_center(self): assert self.defined_arf return self._channel_center
[docs] def chan_nb(self): assert self.defined_arf return self._channel.size
[docs] def get_energymin_val(self, chan): """Return energy value in a given channel index. :param chan: lower channel index :type chan: int :return: energy value :rtype: float """ assert self._channel[0] <= chan <= self._channel[-1] return self._boundary_chan[chan - self._channel[0]]
# self._channel[chan] - (self._step_channel / 2.)
[docs] def get_energymax_val(self, chan): """Return energy value in a given channel index. :param chan: upper channel index :type chan: int :return: energy value :rtype: float """ assert self._channel[0] <= chan <= self._channel[-1] return self._boundary_chan[chan - self._channel[0] + 1]
# return self._channel[chan] + (self._step_channel / 2.)
[docs] def get_irf_val_ecllos(self, y_pix, z_pix, channel=None, ngp=False): """Return IRF value at position with ECLLos referential in pixel unit .. warning:: problem with border with array 199x199 ? Need to extrapolate ? :param y_pix: y position in ECLLos referential in pixel unit :type y_pix: float :param z_pix:z position in ECLLos referential in pixel unit :type z_pix: float :param channel: Energy channel index in EIC convention. Default is None. :type channel: int :param ngp: flag to opt for the Nearest Grid Position algorithm. Default is False. :type ngp: bool :param verbose: flag to enable/disable printing of the IRF coeff value. Default is False. :type verbose: bool :return: value of IRF at (y_pix, z_pix) :rtype: float """ assert self.defined_irf # NGP method if ngp: coef_irf = self._irf.value_at_pos_check([y_pix, z_pix]) else: # interpolation/extrapolation coef_irf = self._irf_interpol(y_pix, z_pix) self.logger.info(f"coef irf {coef_irf} at y_pix={y_pix} z_pix={z_pix}.") return coef_irf
[docs] def get_irf_array_ijecpi(self): """Return raw IRF array from EIC :return: raw IRF array, ie IRF value for pixel center in array 199x199 :rtype: 2D numpy array 199x199 """ assert self.defined_irf return self._irf.ar_val
[docs] def get_rmf_raw(self, idx_chan): """ :return: rmf of channel as readed in EIC file :rtype: numpy array """ assert self.defined_rmf rmf = self.rmf_col[idx_chan]["MATRIX"][:self.chan_nb()].astype(np.float64) return rmf
[docs] def get_rmf_max_channel_def(self, channel): """return channel where rmf becomes and stay to zero :return: index of first zero on distribution :rtype: int """ assert self.defined_rmf return get_idx_first_zero(self.get_rmf_raw(channel))
[docs] def get_rmf_norm(self, idx_chan): """Normalize rmf distribution :return: rmf normalize :rtype: numpy array """ assert self.defined_arf assert self.defined_rmf rmf = self.rmf_col[idx_chan]["MATRIX"][:self.chan_nb()].astype(np.float64) return rmf / rmf.sum()
[docs] def get_efficiency(self): assert self.defined_efficiency return self.efficiency
[docs] def get_nonuniformity(self): assert self.defined_nonuniformity return self.non_uniformity
[docs] def get_globalefficiency(self): if not self.defined_globalefficiency: self.compute_globalefficiency() return self.global_efficiency
[docs] def get_list_rv_rmf(self): """return a list of generator object for each channel :return: list of generator object for each channel :rtype: generator object is an instance of scipy.stats.rv_discrete """ assert self.defined_arf assert self.defined_rmf l_rmf = [] for idx in range(self.chan_nb()): l_rmf.append(scs.rv_discrete(values=(self._a_idx.copy(), \ self.get_rmf_norm(idx).copy()))) return l_rmf
[docs] def simu_rmf_channel(self, channel, nb_photon): """Compute RMF for only one channel :param channel: [0..nb channel[ :type channel: int :param nb_photon: number of incident photon in channel :type nb_photon: int :return: channel number where incident photon was detected :rtype: numpy array with size nb_photon """ assert self.defined_arf assert self.defined_rmf rmf = scs.rv_discrete(values=(self._a_idx, self.get_rmf_norm(channel))) y_rdn = np.random.uniform(size=nb_photon) return (rmf.ppf(y_rdn) + 0.5).astype(np.uint32)
[docs] def plot_irf(self, p_mes=""): # pragma: no cover assert self.defined_irf plt.figure() plt.title(p_mes) # plt.imshow(np.flip(self.ar_val,0)) plt.imshow(self._irf) plt.colorbar() plt.show()
[docs]class ECLAIRsDetectorEffectDefault(ECLAIRsDetectorEffect): """ Default class of ECLAIRsDetectorEffect """ def __init__(self): """ Init with default values """ super().__init__() self.path_irf = add_path_caldb_eclairs("ECL-RSP-IRF_0008_10keV.fits") self.path_arf = add_path_caldb_eclairs("ECL-RSP-ARF_2021-10-12_13:13:00.fits") self.path_rmf = add_path_caldb_eclairs("ECL-RSP-RMF_2021-10-13_10:36:00.fits") self.path_efficiency = add_path_caldb_eclairs("efficiency_matrix.fits") self.path_nonuniformity = add_path_caldb_eclairs("non_uniformity_matrix.fits") # We check caldb dir points to the default one assert self.path_arf.find("extra") > 5 self.set_arf(self.path_arf) self.set_irf(self.path_irf) self.set_rmf(self.path_rmf) self.set_efficiency(self.path_efficiency) self.set_nonuniformity(self.path_nonuniformity)