Source code for ecpi.common.instru.model_effect
'''
Created on 25 mars 2019
@author: JM Colley APC/IN2P3/CNRS
'''
import logging
from astropy.io import fits
import numpy as np
import matplotlib.pyplot as plt
import scipy.stats as scs
import scipy.interpolate as sci
from ecpi.common import add_path_data_ref_eclairs
import ecpi.common.instru.array_convention as ac
import ecpi.common.num.array_square_cell as asc
from ecpi.common.num.extrapolate_image import extrapolate_edge
[docs]def get_idx_first_zero(p_array):
"""return idx of the first zero
"""
z_idx = np.where(p_array == 0.0)[0]
if len(z_idx) == 0:
return p_array.size
return z_idx[0]
[docs]class ECLAIRsDetectorEffectDefault(object):
def __init__(self):
"""**Constructor**
Read ARF, IRF, RMF and return values/products or processing expected by ECPI
.. warning:: 2015 version with old mask so **IRF isn't up-to-date**, ARF is in approximative percent format
"""
# declare logger
self.logger = logging.getLogger(__name__)
self.logger.info('creating an instance of ECLAIRsDetectorEffectDefault')
# default value
path_arf = add_path_data_ref_eclairs("instru/ecl_rsp_0008_arf.fits")
path_irf = add_path_data_ref_eclairs("instru/ecl_irf_0008_10keV.fits")
path_rmf = add_path_data_ref_eclairs("instru/ecl_rsp_0008_rmf.fits")
# ARF
f_arf = fits.open(path_arf)
arf = f_arf[1].data
f_arf.close()
# XXX TODO : revoir la ded open_surface dans prochain ARF avec le bon masque
open_surface = arf["SPECRESP"].max()*1.01
self._arf_percent = arf["SPECRESP"]/open_surface
# boundary channel
self._boundary_chan = np.concatenate((arf["ENERG_LO"], np.array([arf["ENERG_HI"][-1]])))
self._channel = (arf["ENERG_LO"] + arf["ENERG_HI"])/2
a_step_channel = arf["ENERG_HI"] - arf["ENERG_LO"]
self._step_channel = arf["ENERG_HI"][0] - arf["ENERG_LO"][0]
assert np.allclose(a_step_channel, self._step_channel, rtol=1e-3)
self._update_array_index()
# IRF
f_irf = fits.open(path_irf)
array_irf = ac.ijdet_to_yzegp_array(f_irf[0].data)
self._irf = asc.ArraySquareCell(199, 199, 1)
self._irf.set_array(array_irf)
self._irf.set_origin_center()
f_irf.close()
# extrapolated IRF
array_irf = extrapolate_edge(array_irf)
irf_extra = asc.ArraySquareCell(201, 201, 1)
irf_extra.set_array(array_irf)
irf_extra.set_origin_center()
y_g, z_g = irf_extra.pos_center_cells()
self._irf_interpol = sci.interp2d(y_g, z_g, irf_extra.ar_val.transpose(),
kind='cubic', fill_value=None)
# RMF
f_rmf = fits.open(path_rmf)
# XXX TODO : test if same channel than ARF
self.rmf_col = f_rmf[1].data
f_rmf.close()
def _update_array_index(self):
self._a_idx = np.array(list(range(self.chan_nb())))
[docs] def reduce_channel(self, nb_chan):
if (nb_chan > 0) and (nb_chan < self.chan_nb()):
self._arf_percent = self._arf_percent[:nb_chan]
self._boundary_chan = self._boundary_chan[:nb_chan+1]
self._channel = self._channel[:nb_chan]
self._update_array_index()
else:
self.logger.warning('Invalid value nb_chan, no change !')
[docs] def arf_percent(self):
return self._arf_percent
@property
def chan_step(self):
return self._step_channel
@property
def chan_boundary(self):
return self._boundary_chan
@property
def chan_center(self):
return self._channel
[docs] def chan_nb(self):
return self._channel.size
[docs] def get_irf_val_ecllos(self, y_pix, z_pix, channel=None, ngp=False, verbose=False):
"""Return IRF value at position with ECLLos referential in pixel unit
.. warning:: problem with border with array 199x199 ? Need to extrapolate ?
:param y_pix: y position in ECLLos referential in pixel unit
:type y_pix: float
:param z_pix:z position in ECLLos referential in pixel unit
:type z_pix: float
:param channel: Energy channel index in EIC convention. Default is None.
:type channel: int
:param ngp: flag to opt for the Nearest Grid Position algorithm. Default is False.
:type ngp: bool
:param verbose: flag to enable/disable printing of the IRF coeff value. Default is False.
:type verbose: bool
:return: value of IRF at (y_pix, z_pix)
:rtype: float
"""
# NGP method
if ngp:
coef_irf = self._irf.value_at_pos_check([y_pix, z_pix])
else: # interpolation/extrapolation
coef_irf = self._irf_interpol(y_pix, z_pix)
if verbose:
self.logger.info(f"coef irf {coef_irf} at y_pix={y_pix} z_pix={z_pix}.")
return coef_irf
[docs] def get_irf_array_ijecpi(self):
"""Return raw IRF array from EIC
:return: raw IRF array, ie IRF value for pixel center in array 199x199
:rtype: 2D numpy array 199x199
"""
return self._irf.ar_val
[docs] def get_rmf_raw(self, channel):
"""
:return: rmf of channel as readed in EIC file
:rtype: numpy array
"""
rmf = self.rmf_col[channel]["MATRIX"][:self.chan_nb()].astype(np.float64)
return rmf
[docs] def get_rmf_max_channel_def(self, channel):
"""return channel where rmf becomes and stay to zero
:return: index of first zero on distribution
:rtype: int
"""
return get_idx_first_zero(self.get_rmf_raw(channel))
[docs] def get_rmf_norm(self, channel):
"""Normalize rmf distribution
:return: rmf normalize
:rtype: numpy array
"""
rmf = self.rmf_col[channel]["MATRIX"][:self.chan_nb()].astype(np.float64)
return rmf/rmf.sum()
[docs] def get_list_rv_rmf(self):
"""return a list of generator object for each channel
:return: list of generator object for each channel
:rtype: generator object is an instance of scipy.stats.rv_discrete
"""
l_rmf =[]
for idx in range(self.chan_nb()):
l_rmf.append(scs.rv_discrete(values=(self._a_idx.copy(), self.get_rmf_norm(idx).copy())))
return l_rmf
[docs] def simu_rmf_channel(self, channel, nb_photon):
"""Compute RMF for only one channel
:param channel: [0..nb channel[
:type channel: int
:param nb_photon: number of incident photon in channel
:type nb_photon: int
:return: channel number where incident photon was detected
:rtype: numpy array with size nb_photon
"""
rmf = scs.rv_discrete(values=(self._a_idx, self.get_rmf_norm(channel)))
y_rdn = np.random.uniform(size=nb_photon)
return (rmf.ppf(y_rdn)+0.5).astype(np.uint32)
[docs] def plot_irf(self, p_mes=""): # pragma: no cover
plt.figure()
plt.title(p_mes)
#plt.imshow(np.flip(self.ar_val,0))
plt.imshow(self._irf)
plt.colorbar()
plt.show()