'''Instrument simulation module
Created on 29 nov. 2017
@author: colley JM
'''
import numpy as np
from astropy.io import fits
import astropy.time as astrotime
import copy
import matplotlib.pyplot as plt
import astropy.table as astt
from common.instru.model_geom import InstruXbase
import common.instru.model_geom as iX
from common.sky.catalog import CatalogFovBasic
import common.io.events as xde
import common.num.interpol_tools as imask
from common.num.array_square_cell import ArraySquareCell
from common import add_path_data_ref_eclairs
from common.mission.time import convert_mjd_in_svomref_seconds
import common.sky.background as bkg
from common.instru.array_convention import yzegp_to_ijdet_array
[docs]class SimuInstruXBase(object):
def __init__(self):
self.b_photon_noise = True
self.time_expose_s = 0 # in second
[docs] def get_det_pixel_surface(self):
return self.det_pixel_surface
[docs] def get_shadowgram(self):
return self.inst.detect.ar_val
[docs] def plot_shadowgram(self):
self.inst.detect.plot("shadowgram t")
[docs] def raz_nb_photon(self):
self.inst.detect.ar_val[:,:] = 0
[docs] def set_catalog(self, catalog_x : CatalogFovBasic):
self.cat_fov = catalog_x
[docs] def simu_catalog(self, time_expose, verbose=None):
pass
[docs] def get_cxb(self, cxb_eclairs_level=1, cxb_type='flat_nospectr', internal_noise=False, noise_level=0.003):
"""
return a numpy array of detector with CXB count by pixel.
method :
:param cxb_eclairs_level: intensity of CXB
:type cxb_eclairs_level: float [count/cm2/s]
:param cxb_type: type of CXB = 'flat_nospectr' or 'shapebased_nospectr' (default='flat_nospectr')
:type cxb_type: string
:param internal_noise: flag to add internal noise
:type internal_noise: bool
:param noise_level: internal noise level in cts/sec/cm2/keV (default is 0.003)
:type noise_level: float
:return: cxb detector image counts
:rtype: array(float)
"""
if cxb_type == 'flat_nospectr':
cxb_rate = bkg.flat_cxb_nospectr(cxb_eclairs_level, self.get_det_pixel_surface(), self.inst.detect.ar_val.shape)
elif cxb_type == 'shapebased_nospectr':
cxb_rate = bkg.shapebased_cxb_nospectr(cxb_eclairs_level)
else:
print('WARNING: cxb type not reconized, default to flat_nospectr')
cxb_rate = bkg.flat_cxb_nospectr(cxb_eclairs_level, self.get_det_pixel_surface(), self.inst.detect.ar_val.shape)
if internal_noise:
cxb_rate += bkg.flat_internal_noise(noise_level, self.get_det_pixel_surface(), self.inst.detect.ar_val.shape)
nb_photon_theo = self.time_expose_s * cxb_rate
cpt_cxb = np.random.poisson(nb_photon_theo)
return cpt_cxb
[docs] def add_cxb(self, cxb_eclairs_level=1):
"""
:param cxb_eclairs_level: intensity of CXB
:type cxb_eclairs_level: float [count/cm2/s]
"""
self.inst.detect.ar_val += self.get_cxb(cxb_eclairs_level)
def _add_time_evts(self):
""" Count nb events, random uniform time in [s] for each and sort """
nb_photon = int(self.inst.get_total_photon())
self.ar_evts = np.empty((nb_photon,), dtype=[('a', 'uint32'), ('b', 'B'),('c', 'B'), ('d', 'uint16')])
max_tps = int(self.time_expose_s)
# TODO text max int32 before
self.ar_evts['a'] = np.random.randint(0, max_tps, nb_photon, dtype='uint32')
self.ar_evts['d'] = 1
index = np.arange(0, self.inst.detect.ar_val.size)
evts = np.repeat(index, self.inst.detect.ar_val.round().astype(np.int).ravel())
(self.ar_evts['b'], self.ar_evts['c']) = divmod(evts, self.inst.detect.get_nb_cell_y())
[docs] def add_all_evt(self, apos):
"""
Add one on pixel associated to position
:param apos: array plan position of photon
:type apos: numpy array (n,2)
"""
self.inst.detect.add_hit(apos)
[docs] def get_snr_sources(self, shadowgram_tot=None):
"""compute the theoretical max snr of each source
the snr are listed in the order of the sources catalog
:param shadowgram_tot: final shadowgram (default=from the simu object)
:type shadowgram_tot: array(float)
:return: list of the snr of each source
:rtype: list(float)
"""
if not shadowgram_tot:
shadowgram_tot = self.inst.detect.ar_val
snr = []
for src_idx in range(self.cat_fov.get_nb_element()):
src_cts = self.shadows_sources[src_idx].sum()
idx_l = int(self.intersect_sources[src_idx][0][0])
idx_b = int(self.intersect_sources[src_idx][0][1])
idx_r = int(self.intersect_sources[src_idx][1][0])
idx_u = int(self.intersect_sources[src_idx][1][1])
noise_cts = np.sqrt(shadowgram_tot[idx_l:idx_r+1, idx_b:idx_u+1].sum())
snr_src = src_cts / noise_cts
snr.append(snr_src)
return snr
[docs] def save_evts(self, filename, t_start=astrotime.Time('2018-06-16T14:37:07', format='isot', scale='tt')):
self._add_time_evts()
oevt = xde.XEvtData()
oevt.set_evt(self.ar_evts)
oevt.sort_with_time()
oevt.tb_evts['time'] = oevt.tb_evts['time'] + convert_mjd_in_svomref_seconds(t_start.mjd)
oevt.write(filename, self.__class__.__name__)
del self.ar_evts
[docs] def save_cal_evts(self, filename, t_start=0, l1=False):
"""
t_start in s from mjdref
"""
self._add_time_evts()
oevt = xde.EclairsCalEvtData()
oevt.set_evt_from_std_array(self.ar_evts)
oevt.sort_with_time()
oevt.tb_evts['time'] = oevt.tb_evts['time'] + t_start
if l1:
oevt.write_L1(filename, self.__class__.__name__)
else:
oevt.write(filename, self.__class__.__name__)
del self.ar_evts
# output file
[docs] def save_stack_evt_image(self, name_fits):
hdu = fits.PrimaryHDU(yzegp_to_ijdet_array(self.inst.detect.ar_val))
hdu.writeto(name_fits, overwrite=True)
[docs] def save_mask(self, name_fits):
hdu = fits.PrimaryHDU(yzegp_to_ijdet_array(self.inst.mask.ar_val))
hdu.writeto(name_fits, overwrite=True)
[docs] def save_catal_log(self, filename):
"""save a fits log file of the simulation of each source
"""
catal_log = copy.copy(self.cat_fov)
catal_log._catalog = astt.join(catal_log.get_catalog_sky_pix(), catal_log.get_catalog())
catal_log._catalog['simulated_cts'] = np.sum(self.shadows_sources, axis=(1,2))
catal_log._catalog['snr'] = self.get_snr_sources()
catal_log._catalog.add_row()
catal_log._catalog[-1]['sky_y'] = None
catal_log._catalog[-1]['sky_z'] = None
catal_log._catalog[-1]['elev'] = None
catal_log._catalog[-1]['dir'] = None
catal_log._catalog[-1]['name'] = 'cxb'
catal_log._catalog[-1]['intensity'] = 1
catal_log._catalog[-1]['simulated_cts'] = self.get_cxb(1).sum()
catal_log._catalog[-1]['snr'] = None
catal_log.save_catalog(filename)
[docs]class SimuInstruXMaskInterpolation(SimuInstruXBase):
'''
prerequis : mask re-sampling in grid detector
'''
def __init__(self, instru_x : InstruXbase):
self.inst = instru_x
self.time_expose_s = 0 # in second
super().__init__()
[docs] def set_resampling_mask(self, a_mask_detec, verbose=False):
nx = self.inst.detect.get_nb_cell_x()
ny = self.inst.detect.get_nb_cell_y()
if verbose == True:
print (a_mask_detec)
self.ointer = imask.InterpolLin2DCenter(a_mask_detec, nx, ny)
[docs] def simu_catalog(self, time_expose, verbose=False):
self.time_expose_s = time_expose
# loop on source and call ray_tracing_for_one_src()
r_ar_val = self.inst.detect.ar_val
for src in self.cat_fov.get_catalog():
# compute rectangle mask projection in dectetor
el_rad = np.deg2rad(src['elev'])
dr_rad = np.deg2rad(src['dir'])
self.inst.set_pos_source_fov(el_rad, dr_rad)
# compute nb photon with
# - time
# - surface
# - angle between source and detector
# - intensity of source
# - noise photon of source
#
# convert vec_trans_source in pixel unit detector
vec_trans = self.inst.vec_trans_source/self.inst.detect.size_cm
if verbose == True:
print(vec_trans[0], vec_trans[1])
a_percent_surface = self.ointer.interpol(vec_trans[0], vec_trans[1])
plt.figure()
plt.title('percent_surface')
plt.imshow(yzegp_to_ijdet_array(a_percent_surface))
plt.colorbar()
a_surface_open = a_percent_surface*(np.sin(el_rad)*self.inst.detect.surf_cell)
a_nb_photon_theo = src['intensity']*a_surface_open*time_expose
# add photon noise
if self.b_photon_noise:
a_nb_photon = np.reshape(np.random.poisson(a_nb_photon_theo.ravel()) , r_ar_val.shape)
else:
a_nb_photon = a_nb_photon_theo
# cumulate shadowgram of current source to detecteor array
r_ar_val += a_nb_photon
if verbose == True:
print('process simulation %s, with %d photons' %(src['name'], a_nb_photon.sum()))
self.inst.detect.ar_val = np.around(r_ar_val)
[docs]class SimuInstruXRayTracing(SimuInstruXBase):
'''
simulation instru X with
* catalog FOV => stable pointing
* evt in pixel => no energy
* CXB model uniforme noise on detector
'''
def __init__(self, instru_x : InstruXbase, catalog_x : CatalogFovBasic = None):
'''
'''
self.inst = instru_x
if catalog_x : self.cat_fov = catalog_x
super().__init__()
self.det_pixel_surface = self.inst.detect.size_cm**2
[docs] def simu_catalog(self, time_expose, verbose=None):
self.time_expose_s = time_expose
# loop on source and call ray_tracing_for_one_src()
src_idx = 0
# init individual source shadowgram and the intersection coordinates pixels on the detector only for a raytrac object
if not isinstance(self, SimuECLAIRsRayTracing):
self.shadows_sources = np.zeros((len(self.cat_fov.get_catalog()), self.inst.p_nb_pix, self.inst.p_nb_pix))
self.intersect_sources = np.zeros((len(self.cat_fov.get_catalog()), 2, 2)) + [[self.inst.p_nb_pix-1, self.inst.p_nb_pix-1], [0,0]]
for src in self.cat_fov.get_catalog():
# compute rectangle mask projection in detector
el_rad = np.deg2rad(src['elev'])
dr_rad = np.deg2rad(src['dir'])
self.inst.set_pos_source_fov(el_rad, dr_rad)
surface = self.inst.get_surface_rect_proj()
if verbose == True:
print('surface %f'%surface)
if np.isclose(0, surface, 1e-7) :
# src out of detector
continue
# compute nb photon with
# - time
# - surface
# - angle between source and detector
# - intensity of source
# - noise photon of source
surface_open = surface*np.sin(el_rad)
nb_photon_theo = src['intensity']*surface_open*time_expose
# add photon noise
if self.b_photon_noise:
nb_photon = np.random.poisson(nb_photon_theo)
else:
nb_photon = nb_photon_theo
apos = self.inst.detect.random_pos_rect(int(round(nb_photon)), self.inst.rect_proj)
self.ray_tracing_for_one_src(apos, src_idx=src_idx)
self.intersect_sources[src_idx] = [np.minimum(self.intersect_sources[src_idx, 0], self.intersect_det_subsrc[0]), np.maximum(self.intersect_sources[src_idx, 1], self.intersect_det_subsrc[1])]
self.shadows_sources[src_idx] += self.shadow_src
src_idx += 1
if verbose == True:
print('process simulation %s, with %d photons' %(src['name'], nb_photon))
#self.inst.detect.plot()
[docs] def ray_tracing_for_one_src(self, apos, src_idx=0):
"""
result: select by ray tracing photon can be arrived on detector by hole mask
"""
apos_mask = apos + self.inst.vec_trans_source
idx_det = self.inst.detect.pos2idx_inf(apos)
idx_mask = self.inst.mask.pos2idx_inf(apos_mask)
det = self.inst.detect
mask = self.inst.mask
# apply mask on photon in field of view
if idx_mask.size :
aval_mask = mask.value_array(idx_mask)
#print (aval_mask.shape, aval_mask.sum(), aval_mask.sum()/aval_mask.shape[0], self.detect.ar_val.sum())
# ceci
# self.detect.ar_val[idx_det[:,0], idx_det[:,1]] += aval_mask
# ne donne pas le bo resultat, comme si on avait = a la plce de += ... ?
it = np.nditer([idx_det[:,0], idx_det[:,1], aval_mask], [], [['readonly'], ['readonly'],['readonly']])
self.shadow_src = np.zeros(det.ar_val.shape)
for il, ic, val in it:
det.ar_val[il,ic] += val
# save individual source shadowgram for this source and this mask
self.shadow_src[il, ic] += val
# intersection on the detector pix for this source and this mask
self.intersect_det_subsrc = [[idx_det[:,0].min(), idx_det[:,1].min()], [idx_det[:,0].max(), idx_det[:,1].max()]]
#print(self.detect.ar_val.sum(), self.detect.ar_val.max())
[docs]class SimuECLAIRsRayTracing(SimuInstruXRayTracing):
def __init__(self, catalog_x : CatalogFovBasic = None, show=None, p_nb_pixel=None):
eclairs = iX.InstruECLAIRs(show=show, p_nb_pix=p_nb_pixel)
self.inst = eclairs
if catalog_x : self.cat_fov = catalog_x
super().__init__(eclairs)
[docs] def simu_catalog(self, time_expose, verbose=None, show=None):
self.time_expose_s = time_expose
# init individual source shadowgram and the intersection coordinates pixels on the detector
self.shadows_sources = np.zeros((len(self.cat_fov.get_catalog()), self.inst.p_nb_pix, self.inst.p_nb_pix))
self.intersect_sources = np.zeros((len(self.cat_fov.get_catalog()), 2, 2)) + [[self.inst.p_nb_pix-1, self.inst.p_nb_pix-1], [0,0]]
for mask in self.inst.l_mask:
if verbose == True:
print("mask ",mask.name)
self.inst.set_mask(mask)
super().simu_catalog(time_expose, verbose=verbose)
if show == True:
self.inst.detect.plot()
[docs]class SimuECLAIRsInterpol(SimuInstruXMaskInterpolation):
def __init__(self, show=None, p_nb_pixel=None):
# approximative method, must have a correction
eclairs = iX.InstruECLAIRs(show=show, p_nb_pix=p_nb_pixel)
self.inst = eclairs
super().__init__(eclairs)
f_mask_detec = add_path_data_ref_eclairs('Rebinned_Final_mask_AGr.fits')
hdul = fits.open(f_mask_detec)
a_mask_detec = hdul[0].data.copy()
hdul.close()
self.set_resampling_mask(a_mask_detec)
# need correction before use it
raise
[docs]class SimuECLAIRsMaskProjection(SimuInstruXBase):
def __init__(self, catalog_x : CatalogFovBasic = None, p_nb_pixel=None, show=None):
"""
"""
self.inst = iX.InstruECLAIRs(p_nb_pix=p_nb_pixel, show=show)
self.set_catalog(catalog_x)
super().__init__()
self.det_tmp = ArraySquareCell() # for pydev auto-completion reference
self.det_tmp = copy.deepcopy(self.inst.detect)
self.det_pixel_surface = self.inst.detect.size_cm**2
self.intersect_sources = []
[docs] def get_det_pixel_surface(self):
return self.det_pixel_surface
[docs] def get_shadow_percent(self, elev, direc):
self.el_rad = np.deg2rad(elev)
dr_rad = np.deg2rad(direc)
self.inst.set_pos_source_fov(self.el_rad, dr_rad)
return self.get_proj_mask()
[docs] def get_shadow_surface(self, elev, direc):
a_surface_open = self.get_shadow_percent(elev, direc)*self.inst.detect.surf_cell
return a_surface_open
[docs] def get_shadow_apparent_surface(self, elev, direc):
"""
return apparent surface
"""
a_surface_apparent = self.get_shadow_surface(elev, direc)*np.sin(self.el_rad)
return a_surface_apparent
[docs] def simu_catalog(self, time_expose, verbose=False):
self.time_expose_s = time_expose
# save individual source shadowgram and the intersection coordinates pixels on the detector
self.shadows_sources = []
# loop on source and call ray_tracing_for_one_src()
r_ar_val = self.inst.detect.ar_val
for src in self.cat_fov.get_catalog():
# compute rectangle mask projection in detector
el_rad = np.deg2rad(src['elev'])
dr_rad = np.deg2rad(src['dir'])
self.inst.set_pos_source_fov(el_rad, dr_rad)
# compute nb photon with
# - time
# - surface
# - angle between source and detector
# - intensity of source
# - noise photon of source
#
#a_percent_surface = self.ointer.interpol(vec_trans[0], vec_trans[1])
a_percent_surface = self.get_proj_mask()
a_surface_open = a_percent_surface*(np.sin(el_rad)*self.inst.detect.surf_cell)
a_nb_photon_theo = src['intensity']*a_surface_open*time_expose
# add photon noise
if self.b_photon_noise:
a_nb_photon = np.reshape(np.random.poisson(a_nb_photon_theo.ravel()) , r_ar_val.shape)
else:
a_nb_photon = a_nb_photon_theo
# save individual source shadowgram
self.shadows_sources.append(a_nb_photon)
# cumulate shadowgram of current source to detector array and their pattern on the detector
r_ar_val += a_nb_photon
if verbose == True:
print('process simulation %s, with %d photons' %(src['name'], a_nb_photon.sum()))
self.inst.detect.ar_val = np.around(r_ar_val)
[docs] def get_proj_mask(self):
intersect_source = [[self.inst.p_nb_pix-1, self.inst.p_nb_pix-1], [0, 0]]
# loop on sub mask
self.det_tmp.set_val(0.0)
for mask in self.inst.l_mask:
#print("========================================")
self.inst.set_mask(mask)
# intersection sub-mask
if self.inst.get_surface_rect_proj() > 0:
self.mask_trans = ArraySquareCell()
self.mask_trans = copy.copy(self.inst.mask)
# add translation source
new_origin = self.mask_trans.to_origin_cm + self.inst.vec_trans_source
self.mask_trans.set_origin(new_origin)
self.get_proj_sub_mask()
# intersection on the detector pix of sub mask
intersect_det_submask = self.inst.detect.pos2idx_rect(self.inst.rect_proj, True)
intersect_source = [np.minimum(intersect_source[0], intersect_det_submask[0]), np.maximum(intersect_source[1], intersect_det_submask[1])]
self.intersect_sources.append(intersect_source)
# to percent
self.det_tmp.ar_val /= self.det_tmp.surf_cell
#self.plot_proj_mask("just compute")
return self.det_tmp.ar_val
[docs] def get_proj_sub_mask(self):
# define index pixel mask in intersection
#print(self.inst.rect_proj)
mask_idx = self.mask_trans.pos2idx_rect(self.inst.rect_proj, True)
# loop on pixel mask
for col_m in range(mask_idx[0,1], mask_idx[1,1]+1):
for row_m in range(mask_idx[0,0], mask_idx[1,0]+1):
if self.mask_trans.ar_val[row_m, col_m] == 0 : continue
# It's open mask pixel
# we can define position of this open pixel
rect_pos_pix_mask = self.mask_trans.pos_rect_cell([row_m, col_m])
# print("========================================")
# print('pixel mask')
# print(row_m, col_m)
# print(rect_pos_pix_mask)
# now define index pixel DPIX in intersection with this open pixel
dpix_idx = self.det_tmp.pos2idx_rect(rect_pos_pix_mask, True)
#print("DPIX: \n", dpix_idx)
for col_d in range(dpix_idx[0,1], dpix_idx[1,1]+1):
for row_d in range(dpix_idx[0,0], dpix_idx[1,0]+1):
rect_pos_dpix = self.det_tmp.pos_rect_cell([row_d, col_d])
# print("========================")
# print('pixel dpix')
# print(row_d, col_d)
#print(rect_pos_dpix)
surf_inter = self.det_tmp.surf_intersection_rect(rect_pos_pix_mask, rect_pos_dpix)
self.det_tmp.ar_val[row_d, col_d] += surf_inter
#break
#break
[docs] def plot_proj_mask(self,p_title=""):
plt.figure()
plt.title(p_title)
plt.imshow(yzegp_to_ijdet_array(self.det_tmp.ar_val))
plt.colorbar()