Source code for simu.lib.instru_x

'''Instrument simulation module of point source
.. note:: TODO need to be update
'''

import copy
import logging
import numpy as np
import matplotlib.pyplot as plt
import astropy.time as astrotime
import astropy.table as astt
from astropy.io import fits
from numba import njit

import ecpi.common.instru.model_geom as iX
import ecpi.common.io.events as xde
import ecpi.common.sky.background as bkg
from ecpi.common.sky.catalog import CatalogFovBasic
from ecpi.common.num.array_square_cell import ArraySquareCell, \
                                              intersection_rect, surf_intersection_rect
from ecpi.common.mission.time import convert_mjd_in_svomref_seconds
from ecpi.common.instru.array_convention import yzegp_to_ijdet_array


[docs]class SimuInstruXBase(object): """Generic simulation of X instrument coded mask """ def __init__(self): self.b_photon_noise = True self.time_expose_s = 0 # in second # declare logger self.logger = logging.getLogger(__name__) self.logger.info('creating an instance of SimuInstruXBase')
[docs] def get_det_pixel_surface(self): return self.det_pixel_surface
[docs] def get_shadowgram(self): return self.inst.detect.ar_val
[docs] def plot_shadowgram(self): # pragma: no cover self.inst.detect.plot("shadowgram t")
[docs] def raz_nb_photon(self): self.inst.detect.ar_val[:, :] = 0
[docs] def set_catalog(self, catalog_x: CatalogFovBasic): self.cat_fov = catalog_x
[docs] def simu_catalog(self, time_expose, verbose=None): pass
[docs] def get_cxb(self, cxb_eclairs_level=1, cxb_type='flat_nospectr', internal_noise=False, noise_level=0.003): """ return a numpy array of detector with CXB count by pixel. :param cxb_eclairs_level: intensity of CXB :type cxb_eclairs_level: float [count/cm2/s] :param cxb_type: type of CXB = 'flat_nospectr' or 'shapebased_nospectr'. Default is 'flat_nospectr'. :type cxb_type: string :param internal_noise: flag to add internal noise :type internal_noise: bool :param noise_level: internal noise level in cts/sec/cm2/keV (default is 0.003) :type noise_level: float :return: cxb detector image counts :rtype: array(float) """ if cxb_type == 'flat_nospectr': cxb_rate = bkg.flat_cxb_nospectr(cxb_eclairs_level, self.get_det_pixel_surface(), self.inst.detect.ar_val.shape) elif cxb_type == 'shapebased_nospectr': cxb_rate = bkg.shapebased_cxb_nospectr(cxb_eclairs_level) else: self.logger.warning(f'cxb type {cxb_type} not understood.') cxb_rate = bkg.flat_cxb_nospectr(cxb_eclairs_level, self.get_det_pixel_surface(), self.inst.detect.ar_val.shape) if internal_noise: cxb_rate += bkg.flat_internal_noise(noise_level, self.get_det_pixel_surface(), self.inst.detect.ar_val.shape) nb_photon_theo = self.time_expose_s * cxb_rate cpt_cxb = np.random.poisson(nb_photon_theo) return cpt_cxb
[docs] def add_cxb(self, cxb_eclairs_level=1): """ :param cxb_eclairs_level: intensity of CXB :type cxb_eclairs_level: float [count/cm2/s] """ self.inst.detect.ar_val += self.get_cxb(cxb_eclairs_level)
def _add_time_evts(self): """ Count nb events, random uniform time in [s] for each and sort """ nb_photon = int(self.inst.print_total_count()) self.ar_evts = np.empty((nb_photon,), dtype=[('a', 'uint32'), ('b', 'B'), ('c', 'B'), ('d', 'uint16')]) max_tps = int(self.time_expose_s) self.ar_evts['a'] = np.random.randint(0, max_tps, nb_photon, dtype='uint32') self.ar_evts['d'] = 1 index = np.arange(0, self.inst.detect.ar_val.size) evts = np.repeat(index, self.inst.detect.ar_val.round().astype(np.int).ravel()) (self.ar_evts['b'], self.ar_evts['c']) = divmod(evts, self.inst.detect.get_nb_cell_y())
[docs] def add_all_evt(self, apos): """ Add one on pixel associated to position :param apos: array plan position of photon :type apos: numpy array (n,2) """ self.inst.detect.add_hit(apos)
[docs] def get_snr_sources(self, shadowgram_tot=None): """compute the theoretical max snr of each source. Takes into account the mask is partially coded. SNRs are listed in the order of the sources catalog :param shadowgram_tot: final shadowgram (default=from the simu object) :type shadowgram_tot: array(float) :return: list of the snr of each source :rtype: list(float) """ if not shadowgram_tot: shadowgram_tot = self.inst.detect.ar_val snr = [] for src_idx in range(self.cat_fov.get_nb_element()): src_cts = self.shadows_sources[src_idx].sum() idx_l = int(self.intersect_sources[src_idx][0][0]) idx_b = int(self.intersect_sources[src_idx][0][1]) idx_r = int(self.intersect_sources[src_idx][1][0]) idx_u = int(self.intersect_sources[src_idx][1][1]) noise_cts = np.sqrt(shadowgram_tot[idx_l:idx_r + 1, idx_b:idx_u + 1].sum()) snr_src = src_cts / noise_cts snr.append(snr_src) return snr
[docs] def save_evts(self, filename, t_start=astrotime.Time('2018-06-16T14:37:07', format='isot', scale='tt')): self._add_time_evts() oevt = xde.XEvtData() oevt.set_evt(self.ar_evts) oevt.sort_with_time() oevt.tb_evts['TIME'] = oevt.tb_evts['TIME'] + convert_mjd_in_svomref_seconds(t_start.mjd) oevt.write(filename, self.__class__.__name__) del self.ar_evts
[docs] def save_cal_evts(self, filename, t_start=0, l1=False): """ t_start in s from mjdref """ self._add_time_evts() oevt = xde.EclairsCalEvtData() oevt.set_evt_from_std_array(self.ar_evts) oevt.sort_with_time() oevt.tb_evts['TIME'] = oevt.tb_evts['TIME'] + t_start oevt.write_L1(filename, self.__class__.__name__) del self.ar_evts
[docs] def save_stack_evt_image(self, name_fits): hdu = fits.PrimaryHDU(yzegp_to_ijdet_array(self.inst.detect.ar_val)) hdu.writeto(name_fits, overwrite=True)
[docs] def save_mask(self, name_fits): hdu = fits.PrimaryHDU(yzegp_to_ijdet_array(self.inst.mask.ar_val)) hdu.writeto(name_fits, overwrite=True)
[docs] def save_catal_log(self, filename): """save a fits log file of the simulation of each source """ catal_log = copy.copy(self.cat_fov) catal_log._catalog = astt.join(catal_log.get_catalog_sky_pix(), catal_log.get_catalog()) catal_log._catalog['simulated_cts'] = np.sum(self.shadows_sources, axis=(1, 2)) catal_log._catalog['snr'] = self.get_snr_sources() catal_log._catalog.add_row() catal_log._catalog[-1]['sky_y'] = None catal_log._catalog[-1]['sky_z'] = None catal_log._catalog[-1]['elev'] = None catal_log._catalog[-1]['dir'] = None catal_log._catalog[-1]['name'] = 'cxb' catal_log._catalog[-1]['intensity'] = 1 catal_log._catalog[-1]['simulated_cts'] = self.get_cxb(1).sum() catal_log._catalog[-1]['snr'] = None catal_log.save_catalog(filename)
[docs]class SimuECLAIRsMaskProjection(SimuInstruXBase): """Simulation with geometric method square intersection """ def __init__(self, catalog_x: CatalogFovBasic = None, p_nb_pixel=None): """**Constructor** """ self.inst = iX.InstruECLAIRs(p_nb_pix=p_nb_pixel) self.set_catalog(catalog_x) super().__init__() self.det_tmp = ArraySquareCell() # for pydev auto-completion reference self.det_tmp.zeros_like(self.inst.detect) self.det_tmp.set_array(self.inst.detect.ar_val) self.det_pixel_surface = self.inst.detect.size_cm ** 2 self.intersect_sources = [] self._projection_mask_numba = True
[docs] def get_det_pixel_surface(self): return self.det_pixel_surface
[docs] def get_shadow_percent(self, elev, direc): self.el_rad = np.deg2rad(elev) dr_rad = np.deg2rad(direc) self.inst.set_pos_source_fov(self.el_rad, dr_rad) return self.get_proj_mask()
[docs] def get_shadow_surface(self, elev, direc): a_surface_open = self.get_shadow_percent(elev, direc) * self.inst.detect.surf_cell return a_surface_open
[docs] def get_shadow_apparent_surface(self, elev, direc): """Return apparent surface """ return self.get_shadow_surface(elev, direc) * np.sin(self.el_rad)
[docs] def simu_catalog(self, time_expose, verbose=False): self.time_expose_s = time_expose # save individual source shadowgram and the intersection coordinates pixels on the detector self.shadows_sources = [] # loop on source and call ray_tracing_for_one_src() r_ar_val = self.inst.detect.ar_val for src in self.cat_fov.get_catalog(): # compute rectangle mask projection in detector el_rad = np.deg2rad(src['elev']) dr_rad = np.deg2rad(src['dir']) self.inst.set_pos_source_fov(el_rad, dr_rad) # compute nb photon with # - time # - surface # - angle between source and detector # - intensity of source # - noise photon of source # # a_percent_surface = self.ointer.interpol(vec_trans[0], vec_trans[1]) a_percent_surface = self.get_proj_mask() a_surface_open = a_percent_surface * (np.sin(el_rad) * self.inst.detect.surf_cell) a_nb_photon_theo = src['intensity'] * a_surface_open * time_expose # add photon noise if self.b_photon_noise: a_nb_photon = np.reshape(np.random.poisson(a_nb_photon_theo.ravel()), r_ar_val.shape) else: a_nb_photon = a_nb_photon_theo # save individual source shadowgram self.shadows_sources.append(a_nb_photon) # cumulate shadowgram of current source to # detector array and their pattern on the detector r_ar_val += a_nb_photon if verbose is True: self.logger.info(f'process simulation {src["name"]} with {a_nb_photon} photons') self.inst.detect.ar_val = np.around(r_ar_val)
[docs] def get_proj_mask(self): intersect_source = [[self.inst.p_nb_pix - 1, self.inst.p_nb_pix - 1], [0, 0]] # loop on sub mask self.det_tmp.set_val(0.0) for mask in self.inst._l_mask: self.inst.set_mask(mask) # intersection sub-mask if self.inst.get_surface_rect_proj() > 0: self.mask_trans = ArraySquareCell() self.mask_trans.zeros_like(self.inst.mask) self.mask_trans.set_array(self.inst.mask.ar_val) # add translation source new_origin = self.mask_trans.to_origin_cm + self.inst.vec_trans_source self.mask_trans.set_origin(new_origin) if self._projection_mask_numba: mask_trans = self.mask_trans.ar_val det_tmp = self.det_tmp.ar_val rect_proj = self.inst.rect_proj max_p_mask = self.mask_trans.max_ix max_p_det = self.inst.detect.max_ix size_p_mask = self.mask_trans.size_cm size_p_det = self.inst.detect.size_cm trans_mask = self.mask_trans.to_origin_cm trans_det = self.inst.detect.to_origin_cm projection_mask_numba(mask_trans, det_tmp, rect_proj, max_p_mask, max_p_det, size_p_mask, size_p_det, trans_mask, trans_det) else: self.logger.info('intersection mask detector optimization') self.get_proj_sub_mask() # intersection on the detector pix of sub mask intersect_det_submask = self.inst.detect.pos2idx_rect(self.inst.rect_proj, True) intersect_source = [np.minimum(intersect_source[0], intersect_det_submask[0]), np.maximum(intersect_source[1], intersect_det_submask[1])] self.intersect_sources.append(intersect_source) # to percent self.det_tmp.ar_val /= self.det_tmp.surf_cell # self.plot_proj_mask("just compute") return self.det_tmp.ar_val
[docs] def get_proj_sub_mask(self): # define index pixel mask in intersection mask_idx = self.mask_trans.pos2idx_rect(self.inst.rect_proj, True) # loop on pixel mask for col_m in range(mask_idx[0, 1], mask_idx[1, 1] + 1): for row_m in range(mask_idx[0, 0], mask_idx[1, 0] + 1): if self.mask_trans.ar_val[row_m, col_m] == 0: continue # It's open mask pixel # we can define position of this open pixel rect_pos_pix_mask = self.mask_trans.pos_rect_cell([row_m, col_m]) # now define index pixel DPIX in intersection with this open pixel dpix_idx = self.det_tmp.pos2idx_rect(rect_pos_pix_mask, True) self.logger.debug(f'dpix: {dpix_idx}') for col_d in range(dpix_idx[0, 1], dpix_idx[1, 1] + 1): for row_d in range(dpix_idx[0, 0], dpix_idx[1, 0] + 1): rect_pos_dpix = self.det_tmp.pos_rect_cell([row_d, col_d]) surf_inter = surf_intersection_rect(rect_pos_pix_mask, rect_pos_dpix) self.det_tmp.ar_val[row_d, col_d] += surf_inter
[docs] def plot_proj_mask(self, p_title=""): # pragma: no cover plt.figure() plt.title(p_title) plt.imshow(yzegp_to_ijdet_array(self.det_tmp.ar_val)) plt.colorbar()
[docs]@njit def projection_mask_numba(mask_trans, det_tmp, rect_proj, max_p_mask, max_p_det, size_p_mask, size_p_det, trans_mask, trans_det): # pragma: no cover """ projection and intersection with numba library """ # declare variable dpix_idx = np.empty((2, 2), dtype=np.int16) rect_pos_pix_mask = np.empty((2, 2), dtype=np.float32) rect_pos_dpix = np.empty((2, 2), dtype=np.float32) # define index pixel mask in intersection mask_idx = np.floor((rect_proj + trans_mask) / size_p_mask).astype(np.int16) trange = [0, 1] for i1 in trange: for i2 in trange: if mask_idx[i1, i2] < 0: mask_idx[i1, i2] = 0 continue if mask_idx[i1, i2] > max_p_mask: mask_idx[i1, i2] = max_p_mask # loop on pixel mask for col_m in range(mask_idx[0, 1], mask_idx[1, 1] + 1): for row_m in range(mask_idx[0, 0], mask_idx[1, 0] + 1): if mask_trans[row_m, col_m] == 0: continue # It's open mask pixel # we can define position of this open pixel a_idx = np.array([[row_m, col_m]], dtype=np.float32) rect_pos_pix_mask[0, :] = a_idx * size_p_mask - trans_mask rect_pos_pix_mask[1, :] = rect_pos_pix_mask[0, :] + size_p_mask # now define index pixel DPIX in intersection with this open pixel dpix_idx = np.floor((rect_pos_pix_mask + trans_det) / size_p_det).astype(np.int16) val = dpix_idx[0, 0] if val < 0: dpix_idx[0, 0] = 0 elif val > max_p_det: dpix_idx[0, 0] = max_p_det val = dpix_idx[1, 0] if val < 0: dpix_idx[1, 0] = 0 elif val > max_p_det: dpix_idx[1, 0] = max_p_det val = dpix_idx[0, 1] if val < 0: dpix_idx[0, 1] = 0 elif val > max_p_det: dpix_idx[0, 1] = max_p_det val = dpix_idx[1, 1] if val < 0: dpix_idx[1, 1] = 0 elif val > max_p_det: dpix_idx[1, 1] = max_p_det for col_d in range(dpix_idx[0, 1], dpix_idx[1, 1] + 1): for row_d in range(dpix_idx[0, 0], dpix_idx[1, 0] + 1): # # replace rect_pos_dpix = det_tmp.pos_rect_cell([row_d, col_d]) a_idx = np.array([[row_d, col_d]], dtype=np.float32) rect_pos_dpix[0, :] = a_idx * size_p_det - trans_det rect_pos_dpix[1, :] = rect_pos_dpix[0, :] + size_p_det # add intersection surface inter_rec = intersection_rect(rect_pos_pix_mask, rect_pos_dpix) term = (inter_rec[1, 0] - inter_rec[0, 0]) * (inter_rec[1, 1] - inter_rec[0, 1]) det_tmp[row_d, col_d] += term