"""Treatment of detector effects
"""
import logging
import os.path as osp
import os
from astropy.io import fits
import numpy as np
import matplotlib.pyplot as plt
import scipy.stats as scs
import scipy.interpolate as sci
import ecpi.common.instru.array_convention as ac
import ecpi.common.num.array_square_cell as asc
from ecpi.common.num.extrapolate_image import extrapolate_edge
from ecpi.common import add_path_data_ref_eclairs, add_path_caldb_eclairs
[docs]def get_idx_first_zero(p_array):
"""return idx of the first zero
"""
z_idx = np.where(p_array == 0.0)[0]
if len(z_idx) == 0:
return p_array.size
return z_idx[0]
[docs]class ECLAIRsDetectorEffect(object):
"""
Implementation of detector effects (ARF, IRF, RMF)
"""
def __init__(self):
"""**Constructor**
Read ARF, IRF, RMF and return values/products or processing expected by ECPI
.. warning:: 2015 version with old mask so **IRF isn't up-to-date**, ARF is in
approximative percent format
"""
# declare logger
self.logger = logging.getLogger(__name__)
# protections.
self.defined_arf = False
self.defined_irf = False
self.defined_rmf = False
self.defined_efficiency = False
self.defined_nonuniformity = False
self.defined_globalefficiency = False
self._arf_percent = None
self._boundary_chan = None
self._channel = None
self._channel_center = None
self._step_channel = 0
self.rmf_col = None
def __repr__(self):
return f"""
EclairsDetectorEffect object
============================
ARF defined: {self.defined_arf}
IRF defined: {self.defined_irf}
RMF defined: {self.defined_rmf}
Efficiency defined: {self.defined_efficiency}
Non uniformity defined: {self.defined_nonuniformity}
Global efficiency defined: {self.defined_globalefficiency}
"""
[docs] def set_arf(self, path_arf):
"""Set ARF related attributes from file path
:param path_arf: path to ARF file
:type path_arf: str
"""
self.defined_arf = True
# ARF
self.path_arf = path_arf
f_arf = fits.open(path_arf)
arf = f_arf[1].data
f_arf.close()
# TODO: JMC revoir la definition open_surface dans prochain ARF avec le bon masque
max_open_surface = arf["SPECRESP"].max()
self._arf_percent = arf["SPECRESP"] / (max_open_surface * 1.01)
# boundary channel
self._boundary_chan = np.concatenate((arf["ENERG_LO"], np.array([arf["ENERG_HI"][-1]])))
self._channel_center = (arf["ENERG_LO"] + arf["ENERG_HI"]) / 2
a_step_channel = arf["ENERG_HI"] - arf["ENERG_LO"]
self._step_channel = arf["ENERG_HI"][0] - arf["ENERG_LO"][0]
assert np.allclose(a_step_channel, self._step_channel, rtol=1e-3)
# self._update_array_index()
[docs] def set_rmf(self, path_rmf):
"""Set RMF related attributes from file path
:param path_rmf: path to RMF file
:type path_rmf: str
"""
self.defined_rmf = True
# RMF
f_rmf = fits.open(path_rmf)
self.rmf_col = f_rmf[1].data
ebounds = f_rmf[2].data
f_rmf.close()
# Tests consistency with ARF
rmf_boundary_chan = np.concatenate((ebounds["E_MIN"], np.array([ebounds["E_MAX"][-1]])))
assert np.array_equal(rmf_boundary_chan, self._boundary_chan)
self._channel = ebounds["CHANNEL"]
[docs] def set_irf(self, path_irf):
"""Set IRF related attributes from file path
:param path_irf: path to IRF file
:type path_irf: str
"""
self.defined_irf = True
# IRF
f_irf = fits.open(path_irf)
array_irf = ac.ijdet_to_yzegp_array(f_irf[0].data)
self._irf = asc.ArraySquareCell(199, 199, 1)
self._irf.set_array(array_irf)
self._irf.set_origin_center()
f_irf.close()
# extrapolated IRF
array_irf = extrapolate_edge(array_irf)
irf_extra = asc.ArraySquareCell(201, 201, 1)
irf_extra.set_array(array_irf)
irf_extra.set_origin_center()
y_g, z_g = irf_extra.pos_center_cells()
self._irf_interpol = sci.interp2d(y_g, z_g, irf_extra.ar_val.transpose(),
kind='cubic', fill_value=None)
[docs] def set_efficiency(self, path_efficiency):
"""Set detector efficiency from file path.
:param path_efficiency: path to detector efficiency file
:type path_efficiency: str
"""
self.defined_efficiency = True
self.path_efficiency = path_efficiency
f_eff = fits.open(path_efficiency)
self.efficiency = f_eff[1].data
f_eff.close()
[docs] def compute_globalefficiency(self):
"""Compute global efficiency defined as the elementwise
product between efficiency and non uniformity matrices.
"""
assert self.defined_efficiency
assert self.defined_nonuniformity
self.defined_globalefficiency = True
EG = np.multiply(self.efficiency, self.non_uniformity)
assert not 0 in EG
self.global_efficiency = EG
def _update_array_index(self):
assert self.defined_arf
self._a_idx = np.array(list(range(self.chan_nb())))
[docs] def reduce_channel(self, nb_chan):
assert self.defined_arf
if (nb_chan > 0) and (nb_chan < self.chan_nb()):
self._arf_percent = self._arf_percent[:nb_chan]
self._boundary_chan = self._boundary_chan[:nb_chan + 1]
self._channel = self._channel[:nb_chan]
self._channel_center = self._channel_center[:nb_chan]
self._update_array_index()
else:
self.logger.warning(f'Invalid value nb_chan ({nb_chan}), no change !')
[docs] def arf_percent(self):
assert self.defined_arf
return self._arf_percent
@property
def chan_step(self):
assert self.defined_arf
return self._step_channel
@property
def chan_boundary(self):
assert self.defined_arf
return self._boundary_chan
@property
def chan_center(self):
assert self.defined_arf
return self._channel_center
[docs] def chan_nb(self):
assert self.defined_arf
return self._channel.size
[docs] def get_energymin_val(self, chan):
"""Return energy value in a given channel index.
:param chan: lower channel index
:type chan: int
:return: energy value
:rtype: float
"""
assert self._channel[0] <= chan <= self._channel[-1]
return self._boundary_chan[chan - self._channel[0]]
# self._channel[chan] - (self._step_channel / 2.)
[docs] def get_energymax_val(self, chan):
"""Return energy value in a given channel index.
:param chan: upper channel index
:type chan: int
:return: energy value
:rtype: float
"""
assert self._channel[0] <= chan <= self._channel[-1]
return self._boundary_chan[chan - self._channel[0] + 1]
# return self._channel[chan] + (self._step_channel / 2.)
[docs] def get_irf_val_ecllos(self, y_pix, z_pix, channel=None, ngp=False):
"""Return IRF value at position with ECLLos referential in pixel unit
.. warning:: problem with border with array 199x199 ? Need to extrapolate ?
:param y_pix: y position in ECLLos referential in pixel unit
:type y_pix: float
:param z_pix:z position in ECLLos referential in pixel unit
:type z_pix: float
:param channel: Energy channel index in EIC convention. Default is None.
:type channel: int
:param ngp: flag to opt for the Nearest Grid Position algorithm. Default is False.
:type ngp: bool
:param verbose: flag to enable/disable printing of the IRF coeff value. Default is False.
:type verbose: bool
:return: value of IRF at (y_pix, z_pix)
:rtype: float
"""
assert self.defined_irf
# NGP method
if ngp:
coef_irf = self._irf.value_at_pos_check([y_pix, z_pix])
else: # interpolation/extrapolation
coef_irf = self._irf_interpol(y_pix, z_pix)
self.logger.info(f"coef irf {coef_irf} at y_pix={y_pix} z_pix={z_pix}.")
return coef_irf
[docs] def get_irf_array_ijecpi(self):
"""Return raw IRF array from EIC
:return: raw IRF array, ie IRF value for pixel center in array 199x199
:rtype: 2D numpy array 199x199
"""
assert self.defined_irf
return self._irf.ar_val
[docs] def get_rmf_raw(self, idx_chan):
"""
:return: rmf of channel as readed in EIC file
:rtype: numpy array
"""
assert self.defined_rmf
rmf = self.rmf_col[idx_chan]["MATRIX"][:self.chan_nb()].astype(np.float64)
return rmf
[docs] def get_rmf_max_channel_def(self, channel):
"""return channel where rmf becomes and stay to zero
:return: index of first zero on distribution
:rtype: int
"""
assert self.defined_rmf
return get_idx_first_zero(self.get_rmf_raw(channel))
[docs] def get_rmf_norm(self, idx_chan):
"""Normalize rmf distribution
:return: rmf normalize
:rtype: numpy array
"""
assert self.defined_arf
assert self.defined_rmf
rmf = self.rmf_col[idx_chan]["MATRIX"][:self.chan_nb()].astype(np.float64)
return rmf / rmf.sum()
[docs] def get_efficiency(self):
assert self.defined_efficiency
return self.efficiency
[docs] def get_globalefficiency(self):
if not self.defined_globalefficiency:
self.compute_globalefficiency()
return self.global_efficiency
[docs] def get_list_rv_rmf(self):
"""return a list of generator object for each channel
:return: list of generator object for each channel
:rtype: generator object is an instance of scipy.stats.rv_discrete
"""
assert self.defined_arf
assert self.defined_rmf
l_rmf = []
for idx in range(self.chan_nb()):
l_rmf.append(scs.rv_discrete(values=(self._a_idx.copy(), \
self.get_rmf_norm(idx).copy())))
return l_rmf
[docs] def simu_rmf_channel(self, channel, nb_photon):
"""Compute RMF for only one channel
:param channel: [0..nb channel[
:type channel: int
:param nb_photon: number of incident photon in channel
:type nb_photon: int
:return: channel number where incident photon was detected
:rtype: numpy array with size nb_photon
"""
assert self.defined_arf
assert self.defined_rmf
rmf = scs.rv_discrete(values=(self._a_idx, self.get_rmf_norm(channel)))
y_rdn = np.random.uniform(size=nb_photon)
return (rmf.ppf(y_rdn) + 0.5).astype(np.uint32)
[docs] def plot_irf(self, p_mes=""): # pragma: no cover
assert self.defined_irf
plt.figure()
plt.title(p_mes)
# plt.imshow(np.flip(self.ar_val,0))
plt.imshow(self._irf)
plt.colorbar()
plt.show()
[docs]class ECLAIRsDetectorEffectDefault(ECLAIRsDetectorEffect):
"""
Default class of ECLAIRsDetectorEffect
"""
def __init__(self):
"""
Init with default values
"""
super().__init__()
self.path_irf = add_path_caldb_eclairs("ECL-RSP-IRF_0008_10keV.fits")
self.path_arf = add_path_caldb_eclairs("ECL-RSP-ARF_2021-10-12_13:13:00.fits")
self.path_rmf = add_path_caldb_eclairs("ECL-RSP-RMF_2021-10-13_10:36:00.fits")
self.path_efficiency = add_path_caldb_eclairs("efficiency_matrix.fits")
self.path_nonuniformity = add_path_caldb_eclairs("non_uniformity_matrix.fits")
# We check caldb dir points to the default one
assert self.path_arf.find("extra") > 5
self.set_arf(self.path_arf)
self.set_irf(self.path_irf)
self.set_rmf(self.path_rmf)
self.set_efficiency(self.path_efficiency)
self.set_nonuniformity(self.path_nonuniformity)