Source code for ecpi.simu.lib.instru_x

'''Instrument simulation module of point source
.. note:: TODO need to be update
'''

import copy
import logging
import numpy as np
import matplotlib.pyplot as plt
import astropy.time as astrotime
import astropy.table as astt
from astropy.io import fits
from numba import njit

import ecpi.common.instru.model_geom as iX
import ecpi.common.io.events as xde
from ecpi.common.sky.catalog import CatalogFovBasic
from ecpi.common.num.array_square_cell import ArraySquareCell, \
                                              intersection_rect, surf_intersection_rect
from ecpi.common.mission.time import convert_mjd_in_svomref_seconds
from ecpi.common.instru.array_convention import yzegp_to_ijdet_array


[docs]class SimuInstruXBase(object): """Generic simulation of X instrument coded mask """ def __init__(self): self.b_photon_noise = True self.time_expose_s = 0 # in second # declare logger self.logger = logging.getLogger(__name__) self.logger.info('creating an instance of SimuInstruXBase')
[docs] def get_det_pixel_surface(self): return self.det_pixel_surface
[docs] def get_shadowgram(self): return self.inst.detect.ar_val
[docs] def plot_shadowgram(self): # pragma: no cover self.inst.detect.plot("shadowgram t")
[docs] def raz_nb_photon(self): self.inst.detect.ar_val[:, :] = 0
[docs] def set_catalog(self, catalog_x: CatalogFovBasic): self.cat_fov = catalog_x
[docs] def simu_catalog(self, time_expose, verbose=None): pass
[docs] def get_cxb(self, cxb_eclairs_level=1, cxb_type='flat_nospectr', internal_noise=False, noise_level=0.003): """ OBSOLETE METHOD """ raise
[docs] def add_cxb(self, cxb_eclairs_level=1): """ :param cxb_eclairs_level: intensity of CXB :type cxb_eclairs_level: float [count/cm2/s] """ self.inst.detect.ar_val += self.get_cxb(cxb_eclairs_level)
def _add_time_evts(self): """ Count nb events, random uniform time in [s] for each and sort """ nb_photon = int(self.inst.print_total_count()) self.ar_evts = np.empty((nb_photon,), dtype=[('a', 'uint32'), ('b', 'B'), ('c', 'B'), ('d', 'uint16')]) max_tps = int(self.time_expose_s) self.ar_evts['a'] = np.random.randint(0, max_tps, nb_photon, dtype='uint32') self.ar_evts['d'] = 1 index = np.arange(0, self.inst.detect.ar_val.size) evts = np.repeat(index, self.inst.detect.ar_val.round().astype(np.int).ravel()) (self.ar_evts['b'], self.ar_evts['c']) = divmod(evts, self.inst.detect.get_nb_cell_y())
[docs] def add_all_evt(self, apos): """ Add one on pixel associated to position :param apos: array plan position of photon :type apos: numpy array (n,2) """ self.inst.detect.add_hit(apos)
[docs] def get_snr_sources(self, shadowgram_tot=None): """compute the theoretical max snr of each source. Takes into account the mask is partially coded. SNRs are listed in the order of the sources catalog :param shadowgram_tot: final shadowgram (default=from the simu object) :type shadowgram_tot: array(float) :return: list of the snr of each source :rtype: list(float) """ if not shadowgram_tot: shadowgram_tot = self.inst.detect.ar_val snr = [] for src_idx in range(self.cat_fov.get_nb_element()): src_cts = self.shadows_sources[src_idx].sum() idx_l = int(self.intersect_sources[src_idx][0][0]) idx_b = int(self.intersect_sources[src_idx][0][1]) idx_r = int(self.intersect_sources[src_idx][1][0]) idx_u = int(self.intersect_sources[src_idx][1][1]) noise_cts = np.sqrt(shadowgram_tot[idx_l:idx_r + 1, idx_b:idx_u + 1].sum()) snr_src = src_cts / noise_cts snr.append(snr_src) return snr
[docs] def save_evts(self, filename, t_start=astrotime.Time('2018-06-16T14:37:07', format='isot', scale='tt')): self._add_time_evts() oevt = xde.XEvtData() oevt.set_evt(self.ar_evts) oevt.sort_with_time() oevt.tb_evts['TIME'] = oevt.tb_evts['TIME'] + convert_mjd_in_svomref_seconds(t_start.mjd) oevt.write(filename, self.__class__.__name__) del self.ar_evts
[docs] def save_cal_evts(self, filename, t_start=0, l1=False): """ t_start in s from mjdref """ self._add_time_evts() oevt = xde.EclairsCalEvtData() oevt.set_evt_from_std_array(self.ar_evts) oevt.sort_with_time() oevt.tb_evts['TIME'] = oevt.tb_evts['TIME'] + t_start oevt.write_L1(filename, self.__class__.__name__) del self.ar_evts
[docs] def save_stack_evt_image(self, name_fits): hdu = fits.PrimaryHDU(yzegp_to_ijdet_array(self.inst.detect.ar_val)) hdu.writeto(name_fits, overwrite=True)
[docs] def save_mask(self, name_fits): hdu = fits.PrimaryHDU(yzegp_to_ijdet_array(self.inst.mask.ar_val)) hdu.writeto(name_fits, overwrite=True)
[docs] def save_catal_log(self, filename): """save a fits log file of the simulation of each source """ catal_log = copy.copy(self.cat_fov) catal_log._catalog = astt.join(catal_log.get_catalog_sky_pix(), catal_log.get_catalog()) catal_log._catalog['simulated_cts'] = np.sum(self.shadows_sources, axis=(1, 2)) catal_log._catalog['snr'] = self.get_snr_sources() catal_log._catalog.add_row() catal_log._catalog[-1]['sky_y'] = None catal_log._catalog[-1]['sky_z'] = None catal_log._catalog[-1]['elev'] = None catal_log._catalog[-1]['dir'] = None catal_log._catalog[-1]['name'] = 'cxb' catal_log._catalog[-1]['intensity'] = 1 catal_log._catalog[-1]['simulated_cts'] = self.get_cxb(1).sum() catal_log._catalog[-1]['snr'] = None catal_log.save_catalog(filename)
[docs]class SimuECLAIRsMaskProjection(SimuInstruXBase): """Simulation with geometric method square intersection """ def __init__(self, catalog_x: CatalogFovBasic = None, p_nb_pixel=None): """**Constructor** """ self.inst = iX.InstruECLAIRs(p_nb_pix=p_nb_pixel) self.set_catalog(catalog_x) super().__init__() self.det_tmp = ArraySquareCell() # for pydev auto-completion reference self.det_tmp.zeros_like(self.inst.detect) self.det_tmp.set_array(self.inst.detect.ar_val) self.det_pixel_surface = self.inst.detect.size_cm ** 2 self.intersect_sources = [] self._projection_mask_numba = True
[docs] def get_det_pixel_surface(self): return self.det_pixel_surface
[docs] def get_shadow_percent(self, elev, direc): self.el_rad = np.deg2rad(elev) dr_rad = np.deg2rad(direc) self.inst.set_pos_source_fov(self.el_rad, dr_rad) return self.get_proj_mask()
[docs] def get_shadow_surface(self, elev, direc): a_surface_open = self.get_shadow_percent(elev, direc) * self.inst.detect.surf_cell return a_surface_open
[docs] def get_shadow_apparent_surface(self, elev, direc): """Return apparent surface """ return self.get_shadow_surface(elev, direc) * np.sin(self.el_rad)
[docs] def simu_catalog(self, time_expose, verbose=False): self.time_expose_s = time_expose # save individual source shadowgram and the intersection coordinates pixels on the detector self.shadows_sources = [] # loop on source and call ray_tracing_for_one_src() r_ar_val = self.inst.detect.ar_val for src in self.cat_fov.get_catalog(): # compute rectangle mask projection in detector el_rad = np.deg2rad(src['elev']) dr_rad = np.deg2rad(src['dir']) self.inst.set_pos_source_fov(el_rad, dr_rad) # compute nb photon with # - time # - surface # - angle between source and detector # - intensity of source # - noise photon of source # # a_percent_surface = self.ointer.interpol(vec_trans[0], vec_trans[1]) a_percent_surface = self.get_proj_mask() a_surface_open = a_percent_surface * (np.sin(el_rad) * self.inst.detect.surf_cell) a_nb_photon_theo = src['intensity'] * a_surface_open * time_expose # add photon noise if self.b_photon_noise: a_nb_photon = np.reshape(np.random.poisson(a_nb_photon_theo.ravel()), r_ar_val.shape) else: a_nb_photon = a_nb_photon_theo # save individual source shadowgram self.shadows_sources.append(a_nb_photon) # cumulate shadowgram of current source to # detector array and their pattern on the detector r_ar_val += a_nb_photon if verbose is True: self.logger.info(f'process simulation {src["name"]} with {a_nb_photon} photons') self.inst.detect.ar_val = np.around(r_ar_val)
[docs] def get_proj_mask(self): intersect_source = [[self.inst.p_nb_pix - 1, self.inst.p_nb_pix - 1], [0, 0]] # loop on sub mask self.det_tmp.set_val(0.0) for mask in self.inst._l_mask: self.inst.set_mask(mask) # intersection sub-mask if self.inst.get_surface_rect_proj() > 0: self.mask_trans = ArraySquareCell() self.mask_trans.zeros_like(self.inst.mask) self.mask_trans.set_array(self.inst.mask.ar_val) # add translation source new_origin = self.mask_trans.to_origin_cm + self.inst.vec_trans_source self.mask_trans.set_to_origin(new_origin) if self._projection_mask_numba: mask_trans = self.mask_trans.ar_val det_tmp = self.det_tmp.ar_val rect_proj = self.inst.rect_proj max_p_mask = self.mask_trans.max_ix max_p_det = self.inst.detect.max_ix size_p_mask = self.mask_trans.size_cm size_p_det = self.inst.detect.size_cm trans_mask = self.mask_trans.to_origin_cm trans_det = self.inst.detect.to_origin_cm projection_mask_numba(mask_trans, det_tmp, rect_proj, max_p_mask, max_p_det, size_p_mask, size_p_det, trans_mask, trans_det) else: self.logger.info('intersection mask detector optimization') self.get_proj_sub_mask() # intersection on the detector pix of sub mask intersect_det_submask = self.inst.detect.pos2idx_rect(self.inst.rect_proj, True) intersect_source = [np.minimum(intersect_source[0], intersect_det_submask[0]), np.maximum(intersect_source[1], intersect_det_submask[1])] self.intersect_sources.append(intersect_source) # to percent self.det_tmp.ar_val /= self.det_tmp.surf_cell # self.plot_proj_mask("just compute") return self.det_tmp.ar_val
[docs] def get_proj_sub_mask(self): # define index pixel mask in intersection mask_idx = self.mask_trans.pos2idx_rect(self.inst.rect_proj, True) # loop on pixel mask for col_m in range(mask_idx[0, 1], mask_idx[1, 1] + 1): for row_m in range(mask_idx[0, 0], mask_idx[1, 0] + 1): if self.mask_trans.ar_val[row_m, col_m] == 0: continue # It's open mask pixel # we can define position of this open pixel rect_pos_pix_mask = self.mask_trans.pos_rect_cell([row_m, col_m]) # now define index pixel DPIX in intersection with this open pixel dpix_idx = self.det_tmp.pos2idx_rect(rect_pos_pix_mask, True) self.logger.debug(f'dpix: {dpix_idx}') for col_d in range(dpix_idx[0, 1], dpix_idx[1, 1] + 1): for row_d in range(dpix_idx[0, 0], dpix_idx[1, 0] + 1): rect_pos_dpix = self.det_tmp.pos_rect_cell([row_d, col_d]) surf_inter = surf_intersection_rect(rect_pos_pix_mask, rect_pos_dpix) self.det_tmp.ar_val[row_d, col_d] += surf_inter
[docs] def plot_proj_mask(self, p_title=""): # pragma: no cover plt.figure() plt.title(p_title) plt.imshow(yzegp_to_ijdet_array(self.det_tmp.ar_val)) plt.colorbar()
[docs]@njit def projection_mask_numba(sub_mask, det_surf_out, rect_proj, max_p_mask, max_p_det, size_p_mask, size_p_det, trans_mask, trans_det): # pragma: no cover """ Projection of sub-mask on detector with numba library in ECL_Los. Return array detector with surface (in cm^2) illuminated by the source .. note:: numba version of method get_proj_sub_mask() as function Algo: * rect_proj is rectangle of intersection between sub-mask and detector in ECL_Los: RI * Find index array of mask in RI: mask_idx * loop on pixel mask PM in mask_idx * if PM isn't open pass to next pixel mask * Find index array of detector in PM * loop on pixel detector PD in PM * compute surface intersection between PM and PD * add it to det_surf_out[PD] .. note:: Index to physical position and inverse usually is performed by class :py:class:`.ArraySquareCell` but in numba function we can't used python class, so we rewrite it .. :param sub_mask: just sub-mask value. Open pixel is 1 :type sub_mask: array 2D :param det_surf_out: output of the function, array detector surface illuminated in cm^2 :type det_surf_out: array 2D :param rect_proj: rectangle of intersection between sub-mask and detector in ECL_Los :type rect_proj: [[x_lb, y_lb], [x_ru, y_ru]] lb: left bottom, ru: right upper in cm :param max_p_mask: max index pixel mask :type max_p_mask: int :param max_p_det: max index pixel detector :type max_p_det: int :param size_p_mask: size mask pixel in cm :type size_p_mask: float :param size_p_det: size detector pixel in cm :type size_p_det: float :param trans_mask: vector between the origin of the index of mask towards the origin ECL_Los :type trans_mask: vector 2D :param trans_det: vector between the origin of the index of detector towards the origin ECL_Los :type trans_det: vector 2D """ # declare variable dpix_idx = np.empty((2, 2), dtype=np.int16) rect_pos_pix_mask = np.empty((2, 2), dtype=np.float32) rect_pos_dpix = np.empty((2, 2), dtype=np.float32) # define index pixel mask in intersection # # reduce index range of mask to the intersection between detector # mask_idx : [[ipx_lb, ipy_lb],[ipx_ru, ipy_ru]] # mask_idx = np.floor((rect_proj + trans_mask) / size_p_mask).astype(np.int16) trange = [0, 1] # edge management, must be in interval [O, max_p_mask] for i1 in trange: for i2 in trange: if mask_idx[i1, i2] < 0: mask_idx[i1, i2] = 0 continue if mask_idx[i1, i2] > max_p_mask: mask_idx[i1, i2] = max_p_mask # loop on pixel mask for col_m in range(mask_idx[0, 1], mask_idx[1, 1] + 1): for row_m in range(mask_idx[0, 0], mask_idx[1, 0] + 1): if sub_mask[row_m, col_m] == 0: continue # It's open mask pixel # we can define position of this open pixel # compute position of pixel in ECL_los : rect_pos_pix_mask a_idx = np.array([[row_m, col_m]], dtype=np.float32) rect_pos_pix_mask[0, :] = a_idx * size_p_mask - trans_mask rect_pos_pix_mask[1, :] = rect_pos_pix_mask[0, :] + size_p_mask # now define index pixel DPIX in intersection with this mask open pixel dpix_idx = np.floor((rect_pos_pix_mask + trans_det) / size_p_det).astype(np.int16) # edge management, must be in interval [O, max_p_det] # without loop more efficient val = dpix_idx[0, 0] if val < 0: dpix_idx[0, 0] = 0 elif val > max_p_det: dpix_idx[0, 0] = max_p_det val = dpix_idx[1, 0] if val < 0: dpix_idx[1, 0] = 0 elif val > max_p_det: dpix_idx[1, 0] = max_p_det val = dpix_idx[0, 1] if val < 0: dpix_idx[0, 1] = 0 elif val > max_p_det: dpix_idx[0, 1] = max_p_det val = dpix_idx[1, 1] if val < 0: dpix_idx[1, 1] = 0 elif val > max_p_det: dpix_idx[1, 1] = max_p_det # now loop on pixel detector for col_d in range(dpix_idx[0, 1], dpix_idx[1, 1] + 1): for row_d in range(dpix_idx[0, 0], dpix_idx[1, 0] + 1): # compute position of pixel in ECL_los : rect_pos_dpix a_idx = np.array([[row_d, col_d]], dtype=np.float32) rect_pos_dpix[0, :] = a_idx * size_p_det - trans_det rect_pos_dpix[1, :] = rect_pos_dpix[0, :] + size_p_det # compute rectangle intersection inter_rec = intersection_rect(rect_pos_pix_mask, rect_pos_dpix) term = (inter_rec[1, 0] - inter_rec[0, 0]) * (inter_rec[1, 1] - inter_rec[0, 1]) # add intersection surface in cm^2 det_surf_out[row_d, col_d] += term