Source code for ecpi.common.instru.model_effect

'''
Created on 25 mars 2019

@author: JM Colley APC/IN2P3/CNRS
'''

import logging
from astropy.io import fits
import numpy as np
import matplotlib.pyplot as plt
import scipy.stats as scs
import scipy.interpolate as sci

from ecpi.common import add_path_data_ref_eclairs
import ecpi.common.instru.array_convention as ac
import ecpi.common.num.array_square_cell as asc
from ecpi.common.num.extrapolate_image import extrapolate_edge


[docs]def get_idx_first_zero(p_array): """return idx of the first zero """ z_idx = np.where(p_array == 0.0)[0] if len(z_idx) == 0: return p_array.size return z_idx[0]
[docs]class ECLAIRsDetectorEffectDefault(object): def __init__(self): """**Constructor** Read ARF, IRF, RMF and return values/products or processing expected by ECPI .. warning:: 2015 version with old mask so **IRF isn't up-to-date**, ARF is in approximative percent format """ # declare logger self.logger = logging.getLogger(__name__) self.logger.info('creating an instance of ECLAIRsDetectorEffectDefault') # default value path_arf = add_path_data_ref_eclairs("instru/ecl_rsp_0008_arf.fits") path_irf = add_path_data_ref_eclairs("instru/ecl_irf_0008_10keV.fits") path_rmf = add_path_data_ref_eclairs("instru/ecl_rsp_0008_rmf.fits") # ARF f_arf = fits.open(path_arf) arf = f_arf[1].data f_arf.close() # XXX TODO : revoir la ded open_surface dans prochain ARF avec le bon masque open_surface = arf["SPECRESP"].max()*1.01 self._arf_percent = arf["SPECRESP"]/open_surface # boundary channel self._boundary_chan = np.concatenate((arf["ENERG_LO"], np.array([arf["ENERG_HI"][-1]]))) self._channel = (arf["ENERG_LO"] + arf["ENERG_HI"])/2 a_step_channel = arf["ENERG_HI"] - arf["ENERG_LO"] self._step_channel = arf["ENERG_HI"][0] - arf["ENERG_LO"][0] assert np.allclose(a_step_channel, self._step_channel, rtol=1e-3) self._update_array_index() # IRF f_irf = fits.open(path_irf) array_irf = ac.ijdet_to_yzegp_array(f_irf[0].data) self._irf = asc.ArraySquareCell(199, 199, 1) self._irf.set_array(array_irf) self._irf.set_origin_center() f_irf.close() # extrapolated IRF array_irf = extrapolate_edge(array_irf) irf_extra = asc.ArraySquareCell(201, 201, 1) irf_extra.set_array(array_irf) irf_extra.set_origin_center() y_g, z_g = irf_extra.pos_center_cells() self._irf_interpol = sci.interp2d(y_g, z_g, irf_extra.ar_val.transpose(), kind='cubic', fill_value=None) # RMF f_rmf = fits.open(path_rmf) # XXX TODO : test if same channel than ARF self.rmf_col = f_rmf[1].data f_rmf.close() def _update_array_index(self): self._a_idx = np.array(list(range(self.chan_nb())))
[docs] def reduce_channel(self, nb_chan): if (nb_chan > 0) and (nb_chan < self.chan_nb()): self._arf_percent = self._arf_percent[:nb_chan] self._boundary_chan = self._boundary_chan[:nb_chan+1] self._channel = self._channel[:nb_chan] self._update_array_index() else: self.logger.warning('Invalid value nb_chan, no change !')
[docs] def arf_percent(self): return self._arf_percent
@property def chan_step(self): return self._step_channel @property def chan_boundary(self): return self._boundary_chan @property def chan_center(self): return self._channel
[docs] def chan_nb(self): return self._channel.size
[docs] def get_irf_val_ecllos(self, y_pix, z_pix, channel=None, ngp=False, verbose=False): """Return IRF value at position with ECLLos referential in pixel unit .. warning:: problem with border with array 199x199 ? Need to extrapolate ? :param y_pix: y position in ECLLos referential in pixel unit :type y_pix: float :param z_pix:z position in ECLLos referential in pixel unit :type z_pix: float :param channel: Energy channel index in EIC convention. Default is None. :type channel: int :param ngp: flag to opt for the Nearest Grid Position algorithm. Default is False. :type ngp: bool :param verbose: flag to enable/disable printing of the IRF coeff value. Default is False. :type verbose: bool :return: value of IRF at (y_pix, z_pix) :rtype: float """ # NGP method if ngp: coef_irf = self._irf.value_at_pos_check([y_pix, z_pix]) else: # interpolation/extrapolation coef_irf = self._irf_interpol(y_pix, z_pix) if verbose: self.logger.info(f"coef irf {coef_irf} at y_pix={y_pix} z_pix={z_pix}.") return coef_irf
[docs] def get_irf_array_ijecpi(self): """Return raw IRF array from EIC :return: raw IRF array, ie IRF value for pixel center in array 199x199 :rtype: 2D numpy array 199x199 """ return self._irf.ar_val
[docs] def get_rmf_raw(self, channel): """ :return: rmf of channel as readed in EIC file :rtype: numpy array """ rmf = self.rmf_col[channel]["MATRIX"][:self.chan_nb()].astype(np.float64) return rmf
[docs] def get_rmf_max_channel_def(self, channel): """return channel where rmf becomes and stay to zero :return: index of first zero on distribution :rtype: int """ return get_idx_first_zero(self.get_rmf_raw(channel))
[docs] def get_rmf_norm(self, channel): """Normalize rmf distribution :return: rmf normalize :rtype: numpy array """ rmf = self.rmf_col[channel]["MATRIX"][:self.chan_nb()].astype(np.float64) return rmf/rmf.sum()
[docs] def get_list_rv_rmf(self): """return a list of generator object for each channel :return: list of generator object for each channel :rtype: generator object is an instance of scipy.stats.rv_discrete """ l_rmf =[] for idx in range(self.chan_nb()): l_rmf.append(scs.rv_discrete(values=(self._a_idx.copy(), self.get_rmf_norm(idx).copy()))) return l_rmf
[docs] def simu_rmf_channel(self, channel, nb_photon): """Compute RMF for only one channel :param channel: [0..nb channel[ :type channel: int :param nb_photon: number of incident photon in channel :type nb_photon: int :return: channel number where incident photon was detected :rtype: numpy array with size nb_photon """ rmf = scs.rv_discrete(values=(self._a_idx, self.get_rmf_norm(channel))) y_rdn = np.random.uniform(size=nb_photon) return (rmf.ppf(y_rdn)+0.5).astype(np.uint32)
[docs] def plot_irf(self, p_mes=""): # pragma: no cover plt.figure() plt.title(p_mes) #plt.imshow(np.flip(self.ar_val,0)) plt.imshow(self._irf) plt.colorbar() plt.show()