'''Instrument simulation module
Created on 29 nov. 2017
@author: colley JM
'''
import copy
import logging
import numpy as np
import matplotlib.pyplot as plt
import astropy.time as astrotime
import astropy.table as astt
from astropy.io import fits
from numba import njit
import ecpi.common.instru.model_geom as iX
import ecpi.common.io.events as xde
import ecpi.common.sky.background as bkg
from ecpi.common.sky.catalog import CatalogFovBasic
from ecpi.common.num.array_square_cell import ArraySquareCell, \
intersection_rect, surf_intersection_rect
from ecpi.common.mission.time import convert_mjd_in_svomref_seconds
from ecpi.common.instru.array_convention import yzegp_to_ijdet_array
[docs]class SimuInstruXBase(object):
"""Generic simulation of X instrument coded mask
"""
def __init__(self):
self.b_photon_noise = True
self.time_expose_s = 0 # in second
# declare logger
self.logger = logging.getLogger(__name__)
self.logger.info('creating an instance of SimuInstruXBase')
[docs] def get_det_pixel_surface(self):
return self.det_pixel_surface
[docs] def get_shadowgram(self):
return self.inst.detect.ar_val
[docs] def plot_shadowgram(self): # pragma: no cover
self.inst.detect.plot("shadowgram t")
[docs] def raz_nb_photon(self):
self.inst.detect.ar_val[:, :] = 0
[docs] def set_catalog(self, catalog_x: CatalogFovBasic):
self.cat_fov = catalog_x
[docs] def simu_catalog(self, time_expose, verbose=None):
pass
[docs] def get_cxb(self, cxb_eclairs_level=1, cxb_type='flat_nospectr',
internal_noise=False, noise_level=0.003):
"""
return a numpy array of detector with CXB count by pixel.
:param cxb_eclairs_level: intensity of CXB
:type cxb_eclairs_level: float [count/cm2/s]
:param cxb_type: type of CXB = 'flat_nospectr' or 'shapebased_nospectr'.
Default is 'flat_nospectr'.
:type cxb_type: string
:param internal_noise: flag to add internal noise
:type internal_noise: bool
:param noise_level: internal noise level in cts/sec/cm2/keV (default is 0.003)
:type noise_level: float
:return: cxb detector image counts
:rtype: array(float)
"""
if cxb_type == 'flat_nospectr':
cxb_rate = bkg.flat_cxb_nospectr(cxb_eclairs_level,
self.get_det_pixel_surface(),
self.inst.detect.ar_val.shape)
elif cxb_type == 'shapebased_nospectr':
cxb_rate = bkg.shapebased_cxb_nospectr(cxb_eclairs_level)
else:
self.logger.warning(f'cxb type {cxb_type} not understood.')
cxb_rate = bkg.flat_cxb_nospectr(cxb_eclairs_level,
self.get_det_pixel_surface(),
self.inst.detect.ar_val.shape)
if internal_noise:
cxb_rate += bkg.flat_internal_noise(noise_level,
self.get_det_pixel_surface(),
self.inst.detect.ar_val.shape)
nb_photon_theo = self.time_expose_s * cxb_rate
cpt_cxb = np.random.poisson(nb_photon_theo)
return cpt_cxb
[docs] def add_cxb(self, cxb_eclairs_level=1):
"""
:param cxb_eclairs_level: intensity of CXB
:type cxb_eclairs_level: float [count/cm2/s]
"""
self.inst.detect.ar_val += self.get_cxb(cxb_eclairs_level)
def _add_time_evts(self):
""" Count nb events, random uniform time in [s] for each and sort """
nb_photon = int(self.inst.get_total_photon())
self.ar_evts = np.empty((nb_photon,),
dtype=[('a', 'uint32'), ('b', 'B'),
('c', 'B'), ('d', 'uint16')])
max_tps = int(self.time_expose_s)
self.ar_evts['a'] = np.random.randint(0, max_tps, nb_photon, dtype='uint32')
self.ar_evts['d'] = 1
index = np.arange(0, self.inst.detect.ar_val.size)
evts = np.repeat(index, self.inst.detect.ar_val.round().astype(np.int).ravel())
(self.ar_evts['b'], self.ar_evts['c']) = divmod(evts, self.inst.detect.get_nb_cell_y())
[docs] def add_all_evt(self, apos):
"""
Add one on pixel associated to position
:param apos: array plan position of photon
:type apos: numpy array (n,2)
"""
self.inst.detect.add_hit(apos)
[docs] def get_snr_sources(self, shadowgram_tot=None):
"""compute the theoretical max snr of each source.
Takes into account the mask is partially coded.
SNRs are listed in the order of the sources catalog
:param shadowgram_tot: final shadowgram (default=from the simu object)
:type shadowgram_tot: array(float)
:return: list of the snr of each source
:rtype: list(float)
"""
if not shadowgram_tot:
shadowgram_tot = self.inst.detect.ar_val
snr = []
for src_idx in range(self.cat_fov.get_nb_element()):
src_cts = self.shadows_sources[src_idx].sum()
idx_l = int(self.intersect_sources[src_idx][0][0])
idx_b = int(self.intersect_sources[src_idx][0][1])
idx_r = int(self.intersect_sources[src_idx][1][0])
idx_u = int(self.intersect_sources[src_idx][1][1])
noise_cts = np.sqrt(shadowgram_tot[idx_l:idx_r + 1, idx_b:idx_u + 1].sum())
snr_src = src_cts / noise_cts
snr.append(snr_src)
return snr
[docs] def save_evts(self, filename,
t_start=astrotime.Time('2018-06-16T14:37:07', format='isot', scale='tt')):
self._add_time_evts()
oevt = xde.XEvtData()
oevt.set_evt(self.ar_evts)
oevt.sort_with_time()
oevt.tb_evts['TIME'] = oevt.tb_evts['TIME'] + convert_mjd_in_svomref_seconds(t_start.mjd)
oevt.write(filename, self.__class__.__name__)
del self.ar_evts
[docs] def save_cal_evts(self, filename, t_start=0, l1=False):
"""
t_start in s from mjdref
"""
self._add_time_evts()
oevt = xde.EclairsCalEvtData()
oevt.set_evt_from_std_array(self.ar_evts)
oevt.sort_with_time()
oevt.tb_evts['TIME'] = oevt.tb_evts['TIME'] + t_start
oevt.write_L1(filename, self.__class__.__name__)
del self.ar_evts
[docs] def save_stack_evt_image(self, name_fits):
hdu = fits.PrimaryHDU(yzegp_to_ijdet_array(self.inst.detect.ar_val))
hdu.writeto(name_fits, overwrite=True)
[docs] def save_mask(self, name_fits):
hdu = fits.PrimaryHDU(yzegp_to_ijdet_array(self.inst.mask.ar_val))
hdu.writeto(name_fits, overwrite=True)
[docs] def save_catal_log(self, filename):
"""save a fits log file of the simulation of each source
"""
catal_log = copy.copy(self.cat_fov)
catal_log._catalog = astt.join(catal_log.get_catalog_sky_pix(), catal_log.get_catalog())
catal_log._catalog['simulated_cts'] = np.sum(self.shadows_sources, axis=(1, 2))
catal_log._catalog['snr'] = self.get_snr_sources()
catal_log._catalog.add_row()
catal_log._catalog[-1]['sky_y'] = None
catal_log._catalog[-1]['sky_z'] = None
catal_log._catalog[-1]['elev'] = None
catal_log._catalog[-1]['dir'] = None
catal_log._catalog[-1]['name'] = 'cxb'
catal_log._catalog[-1]['intensity'] = 1
catal_log._catalog[-1]['simulated_cts'] = self.get_cxb(1).sum()
catal_log._catalog[-1]['snr'] = None
catal_log.save_catalog(filename)
[docs]class SimuECLAIRsMaskProjection(SimuInstruXBase):
"""Simulation with geometric method square intersection
"""
def __init__(self, catalog_x: CatalogFovBasic = None, p_nb_pixel=None):
"""**Constructor**
"""
self.inst = iX.InstruECLAIRs(p_nb_pix=p_nb_pixel)
self.set_catalog(catalog_x)
super().__init__()
self.det_tmp = ArraySquareCell() # for pydev auto-completion reference
self.det_tmp.zeros_like(self.inst.detect)
self.det_tmp.set_array(self.inst.detect.ar_val)
self.det_pixel_surface = self.inst.detect.size_cm ** 2
self.intersect_sources = []
self._projection_mask_numba = True
[docs] def get_det_pixel_surface(self):
return self.det_pixel_surface
[docs] def get_shadow_percent(self, elev, direc):
self.el_rad = np.deg2rad(elev)
dr_rad = np.deg2rad(direc)
self.inst.set_pos_source_fov(self.el_rad, dr_rad)
return self.get_proj_mask()
[docs] def get_shadow_surface(self, elev, direc):
a_surface_open = self.get_shadow_percent(elev, direc) * self.inst.detect.surf_cell
return a_surface_open
[docs] def get_shadow_apparent_surface(self, elev, direc):
"""Return apparent surface
"""
return self.get_shadow_surface(elev, direc) * np.sin(self.el_rad)
[docs] def simu_catalog(self, time_expose, verbose=False):
self.time_expose_s = time_expose
# save individual source shadowgram and the intersection coordinates pixels on the detector
self.shadows_sources = []
# loop on source and call ray_tracing_for_one_src()
r_ar_val = self.inst.detect.ar_val
for src in self.cat_fov.get_catalog():
# compute rectangle mask projection in detector
el_rad = np.deg2rad(src['elev'])
dr_rad = np.deg2rad(src['dir'])
self.inst.set_pos_source_fov(el_rad, dr_rad)
# compute nb photon with
# - time
# - surface
# - angle between source and detector
# - intensity of source
# - noise photon of source
#
# a_percent_surface = self.ointer.interpol(vec_trans[0], vec_trans[1])
a_percent_surface = self.get_proj_mask()
a_surface_open = a_percent_surface * (np.sin(el_rad) * self.inst.detect.surf_cell)
a_nb_photon_theo = src['intensity'] * a_surface_open * time_expose
# add photon noise
if self.b_photon_noise:
a_nb_photon = np.reshape(np.random.poisson(a_nb_photon_theo.ravel()),
r_ar_val.shape)
else:
a_nb_photon = a_nb_photon_theo
# save individual source shadowgram
self.shadows_sources.append(a_nb_photon)
# cumulate shadowgram of current source to
# detector array and their pattern on the detector
r_ar_val += a_nb_photon
if verbose is True:
self.logger.info(f'process simulation {src["name"]} with {a_nb_photon} photons')
self.inst.detect.ar_val = np.around(r_ar_val)
[docs] def get_proj_mask(self):
intersect_source = [[self.inst.p_nb_pix - 1, self.inst.p_nb_pix - 1], [0, 0]]
# loop on sub mask
self.det_tmp.set_val(0.0)
for mask in self.inst._l_mask:
self.inst.set_mask(mask)
# intersection sub-mask
if self.inst.get_surface_rect_proj() > 0:
self.mask_trans = ArraySquareCell()
self.mask_trans.zeros_like(self.inst.mask)
self.mask_trans.set_array(self.inst.mask.ar_val)
# add translation source
new_origin = self.mask_trans.to_origin_cm + self.inst.vec_trans_source
self.mask_trans.set_origin(new_origin)
if self._projection_mask_numba:
mask_trans = self.mask_trans.ar_val
det_tmp = self.det_tmp.ar_val
rect_proj = self.inst.rect_proj
max_p_mask = self.mask_trans.max_ix
max_p_det = self.inst.detect.max_ix
size_p_mask = self.mask_trans.size_cm
size_p_det = self.inst.detect.size_cm
trans_mask = self.mask_trans.to_origin_cm
trans_det = self.inst.detect.to_origin_cm
projection_mask_numba(mask_trans, det_tmp, rect_proj, max_p_mask,
max_p_det, size_p_mask, size_p_det,
trans_mask, trans_det)
else:
self.logger.info('intersection mask detector optimization')
self.get_proj_sub_mask()
# intersection on the detector pix of sub mask
intersect_det_submask = self.inst.detect.pos2idx_rect(self.inst.rect_proj, True)
intersect_source = [np.minimum(intersect_source[0], intersect_det_submask[0]),
np.maximum(intersect_source[1], intersect_det_submask[1])]
self.intersect_sources.append(intersect_source)
# to percent
self.det_tmp.ar_val /= self.det_tmp.surf_cell
# self.plot_proj_mask("just compute")
return self.det_tmp.ar_val
[docs] def get_proj_sub_mask(self):
# define index pixel mask in intersection
mask_idx = self.mask_trans.pos2idx_rect(self.inst.rect_proj, True)
# loop on pixel mask
for col_m in range(mask_idx[0, 1], mask_idx[1, 1] + 1):
for row_m in range(mask_idx[0, 0], mask_idx[1, 0] + 1):
if self.mask_trans.ar_val[row_m, col_m] == 0:
continue
# It's open mask pixel
# we can define position of this open pixel
rect_pos_pix_mask = self.mask_trans.pos_rect_cell([row_m, col_m])
# now define index pixel DPIX in intersection with this open pixel
dpix_idx = self.det_tmp.pos2idx_rect(rect_pos_pix_mask, True)
self.logger.debug(f'dpix: {dpix_idx}')
for col_d in range(dpix_idx[0, 1], dpix_idx[1, 1] + 1):
for row_d in range(dpix_idx[0, 0], dpix_idx[1, 0] + 1):
rect_pos_dpix = self.det_tmp.pos_rect_cell([row_d, col_d])
surf_inter = surf_intersection_rect(rect_pos_pix_mask, rect_pos_dpix)
self.det_tmp.ar_val[row_d, col_d] += surf_inter
[docs] def plot_proj_mask(self, p_title=""): # pragma: no cover
plt.figure()
plt.title(p_title)
plt.imshow(yzegp_to_ijdet_array(self.det_tmp.ar_val))
plt.colorbar()
@njit
def projection_mask_numba(mask_trans, det_tmp, rect_proj, max_p_mask,
max_p_det, size_p_mask, size_p_det,
trans_mask, trans_det): # pragma: no cover
"""
projection and intersection with numba library
"""
# declare variable
dpix_idx = np.empty((2, 2), dtype=np.int16)
rect_pos_pix_mask = np.empty((2, 2), dtype=np.float32)
rect_pos_dpix = np.empty((2, 2), dtype=np.float32)
# define index pixel mask in intersection
mask_idx = np.floor((rect_proj + trans_mask) / size_p_mask).astype(np.int16)
trange = [0, 1]
for i1 in trange:
for i2 in trange:
if mask_idx[i1, i2] < 0:
mask_idx[i1, i2] = 0
continue
if mask_idx[i1, i2] > max_p_mask:
mask_idx[i1, i2] = max_p_mask
# loop on pixel mask
for col_m in range(mask_idx[0, 1], mask_idx[1, 1] + 1):
for row_m in range(mask_idx[0, 0], mask_idx[1, 0] + 1):
if mask_trans[row_m, col_m] == 0:
continue
# It's open mask pixel
# we can define position of this open pixel
a_idx = np.array([[row_m, col_m]], dtype=np.float32)
rect_pos_pix_mask[0, :] = a_idx * size_p_mask - trans_mask
rect_pos_pix_mask[1, :] = rect_pos_pix_mask[0, :] + size_p_mask
# now define index pixel DPIX in intersection with this open pixel
dpix_idx = np.floor((rect_pos_pix_mask + trans_det) / size_p_det).astype(np.int16)
val = dpix_idx[0, 0]
if val < 0:
dpix_idx[0, 0] = 0
elif val > max_p_det:
dpix_idx[0, 0] = max_p_det
val = dpix_idx[1, 0]
if val < 0:
dpix_idx[1, 0] = 0
elif val > max_p_det:
dpix_idx[1, 0] = max_p_det
val = dpix_idx[0, 1]
if val < 0:
dpix_idx[0, 1] = 0
elif val > max_p_det:
dpix_idx[0, 1] = max_p_det
val = dpix_idx[1, 1]
if val < 0:
dpix_idx[1, 1] = 0
elif val > max_p_det:
dpix_idx[1, 1] = max_p_det
for col_d in range(dpix_idx[0, 1], dpix_idx[1, 1] + 1):
for row_d in range(dpix_idx[0, 0], dpix_idx[1, 0] + 1):
# # replace rect_pos_dpix = det_tmp.pos_rect_cell([row_d, col_d])
a_idx = np.array([[row_d, col_d]], dtype=np.float32)
rect_pos_dpix[0, :] = a_idx * size_p_det - trans_det
rect_pos_dpix[1, :] = rect_pos_dpix[0, :] + size_p_det
# add intersection surface
inter_rec = intersection_rect(rect_pos_pix_mask, rect_pos_dpix)
term = (inter_rec[1, 0] - inter_rec[0, 0]) * (inter_rec[1, 1] - inter_rec[0, 1])
det_tmp[row_d, col_d] += term