Source code for ecpi.common.instru.model_effect
'''
Created on 25 mars 2019
@author: JM Colley APC/IN2P3/CNRS
'''
import logging
from astropy.io import fits
import numpy as np
import matplotlib.pyplot as plt
import scipy.stats as scs
import scipy.interpolate as sci
import ecpi.common.instru.array_convention as ac
import ecpi.common.num.array_square_cell as asc
from ecpi.common.num.extrapolate_image import extrapolate_edge
from ecpi.common import add_path_data_ref_eclairs
[docs]def get_idx_first_zero(p_array):
"""return idx of the first zero
"""
z_idx = np.where(p_array == 0.0)[0]
if len(z_idx) == 0:
return p_array.size
return z_idx[0]
[docs]class ECLAIRsDetectorEffect(object):
'''
Implementation of detector effects (ARF, IRF, RMF)
'''
def __init__(self):
"""**Constructor**
Read ARF, IRF, RMF and return values/products or processing expected by ECPI
.. warning:: 2015 version with old mask so **IRF isn't up-to-date**, ARF is in
approximative percent format
"""
# declare logger
self.logger = logging.getLogger(__name__)
self.logger.info('creating an instance of ECLAIRsDetectorEffect')
# protections.
self.defined_arf = False
self.defined_irf = False
self.defined_rmf = False
def __repr__(self):
return f"""
EclairsDetectorEffect object
============================
ARF defined: {self.defined_arf}
IRF defined: {self.defined_irf}
RMF defined: {self.defined_rmf}
"""
[docs] def set_arf(self, path_arf):
"""Set ARF related attributes from file path
:param path_arf: path to ARF file
:type path_arf: str
"""
self.defined_arf = True
# ARF
self.path_arf = path_arf
f_arf = fits.open(path_arf)
arf = f_arf[1].data
f_arf.close()
#TODO: JMC revoir la definition open_surface dans prochain ARF avec le bon masque
self.open_surface = arf["SPECRESP"].max() * 1.01
self._arf_percent = arf["SPECRESP"] / self.open_surface
# boundary channel
self._boundary_chan = np.concatenate((arf["ENERG_LO"], np.array([arf["ENERG_HI"][-1]])))
self._channel = (arf["ENERG_LO"] + arf["ENERG_HI"]) / 2
a_step_channel = arf["ENERG_HI"] - arf["ENERG_LO"]
self._step_channel = arf["ENERG_HI"][0] - arf["ENERG_LO"][0]
assert np.allclose(a_step_channel, self._step_channel, rtol=1e-3)
self._update_array_index()
[docs] def set_irf(self, path_irf):
"""Set IRF related attributes from file path
:param path_irf: path to IRF file
:type path_irf: str
"""
self.defined_irf = True
# IRF
f_irf = fits.open(path_irf)
array_irf = ac.ijdet_to_yzegp_array(f_irf[0].data)
self._irf = asc.ArraySquareCell(199, 199, 1)
self._irf.set_array(array_irf)
self._irf.set_origin_center()
f_irf.close()
# extrapolated IRF
array_irf = extrapolate_edge(array_irf)
irf_extra = asc.ArraySquareCell(201, 201, 1)
irf_extra.set_array(array_irf)
irf_extra.set_origin_center()
y_g, z_g = irf_extra.pos_center_cells()
self._irf_interpol = sci.interp2d(y_g, z_g, irf_extra.ar_val.transpose(),
kind='cubic', fill_value=None)
[docs] def set_rmf(self, path_rmf):
"""Set RMF related attributes from file path
:param path_rmf: path to RMF file
:type path_rmf: str
"""
self.defined_rmf = True
# RMF
f_rmf = fits.open(path_rmf)
#TODO: test if same channel than ARF
self.rmf_col = f_rmf[1].data
f_rmf.close()
def _update_array_index(self):
assert self.defined_arf
self._a_idx = np.array(list(range(self.chan_nb())))
[docs] def reduce_channel(self, nb_chan):
assert self.defined_arf
if (nb_chan > 0) and (nb_chan < self.chan_nb()):
self._arf_percent = self._arf_percent[:nb_chan]
self._boundary_chan = self._boundary_chan[:nb_chan + 1]
self._channel = self._channel[:nb_chan]
self._update_array_index()
else:
self.logger.warning(f'Invalid value nb_chan ({nb_chan}), no change !')
[docs] def arf_percent(self):
assert self.defined_arf
return self._arf_percent
@property
def chan_step(self):
assert self.defined_arf
return self._step_channel
@property
def chan_boundary(self):
assert self.defined_arf
return self._boundary_chan
@property
def chan_center(self):
assert self.defined_arf
return self._channel
[docs] def chan_nb(self):
assert self.defined_arf
return self._channel.size
[docs] def get_energymin_val(self, chan):
"""Return energy value in a given channel index.
:param chan: lower channel index
:type chan: int
:return: energy value
:rtype: float
"""
return self._channel[chan-1] - (self._step_channel/2.)
[docs] def get_energymax_val(self, chan):
"""Return energy value in a given channel index.
:param chan: upper channel index
:type chan: int
:return: energy value
:rtype: float
"""
return self._channel[chan] - (self._step_channel/2.)
[docs] def get_irf_val_ecllos(self, y_pix, z_pix, channel=None, ngp=False):
"""Return IRF value at position with ECLLos referential in pixel unit
.. warning:: problem with border with array 199x199 ? Need to extrapolate ?
:param y_pix: y position in ECLLos referential in pixel unit
:type y_pix: float
:param z_pix:z position in ECLLos referential in pixel unit
:type z_pix: float
:param channel: Energy channel index in EIC convention. Default is None.
:type channel: int
:param ngp: flag to opt for the Nearest Grid Position algorithm. Default is False.
:type ngp: bool
:param verbose: flag to enable/disable printing of the IRF coeff value. Default is False.
:type verbose: bool
:return: value of IRF at (y_pix, z_pix)
:rtype: float
"""
assert self.defined_irf
# NGP method
if ngp:
coef_irf = self._irf.value_at_pos_check([y_pix, z_pix])
else: # interpolation/extrapolation
coef_irf = self._irf_interpol(y_pix, z_pix)
self.logger.info(f"coef irf {coef_irf} at y_pix={y_pix} z_pix={z_pix}.")
return coef_irf
[docs] def get_irf_array_ijecpi(self):
"""Return raw IRF array from EIC
:return: raw IRF array, ie IRF value for pixel center in array 199x199
:rtype: 2D numpy array 199x199
"""
assert self.defined_irf
return self._irf.ar_val
[docs] def get_rmf_raw(self, channel):
"""
:return: rmf of channel as readed in EIC file
:rtype: numpy array
"""
assert self.defined_rmf
rmf = self.rmf_col[channel]["MATRIX"][:self.chan_nb()].astype(np.float64)
return rmf
[docs] def get_rmf_max_channel_def(self, channel):
"""return channel where rmf becomes and stay to zero
:return: index of first zero on distribution
:rtype: int
"""
assert self.defined_rmf
return get_idx_first_zero(self.get_rmf_raw(channel))
[docs] def get_rmf_norm(self, channel):
"""Normalize rmf distribution
:return: rmf normalize
:rtype: numpy array
"""
assert self.defined_arf
assert self.defined_rmf
rmf = self.rmf_col[channel]["MATRIX"][:self.chan_nb()].astype(np.float64)
return rmf / rmf.sum()
[docs] def get_list_rv_rmf(self):
"""return a list of generator object for each channel
:return: list of generator object for each channel
:rtype: generator object is an instance of scipy.stats.rv_discrete
"""
assert self.defined_arf
assert self.defined_rmf
l_rmf = []
for idx in range(self.chan_nb()):
l_rmf.append(scs.rv_discrete(values=(self._a_idx.copy(), \
self.get_rmf_norm(idx).copy())))
return l_rmf
[docs] def simu_rmf_channel(self, channel, nb_photon):
"""Compute RMF for only one channel
:param channel: [0..nb channel[
:type channel: int
:param nb_photon: number of incident photon in channel
:type nb_photon: int
:return: channel number where incident photon was detected
:rtype: numpy array with size nb_photon
"""
assert self.defined_arf
assert self.defined_rmf
rmf = scs.rv_discrete(values=(self._a_idx, self.get_rmf_norm(channel)))
y_rdn = np.random.uniform(size=nb_photon)
return (rmf.ppf(y_rdn) + 0.5).astype(np.uint32)
[docs] def plot_irf(self, p_mes=""): # pragma: no cover
assert self.defined_irf
plt.figure()
plt.title(p_mes)
# plt.imshow(np.flip(self.ar_val,0))
plt.imshow(self._irf)
plt.colorbar()
plt.show()
[docs]class ECLAIRsDetectorEffectDefault(ECLAIRsDetectorEffect):
'''
Default class of ECLAIRsDetectorEffect
'''
def __init__(self):
'''
Init with default values
'''
super().__init__()
path_arf = add_path_data_ref_eclairs("instru/ECL-RSP-ENE-0T01.fits")
path_irf = add_path_data_ref_eclairs("instru/ecl_irf_0008_10keV.fits")
path_rmf = add_path_data_ref_eclairs("instru/ecl_rsp_0008_rmf.fits")
self.set_arf(path_arf)
self.set_irf(path_irf)
self.set_rmf(path_rmf)