Source code for simu.lib.InstruX

'''Instrument simulation module

Created on 29 nov. 2017

@author: colley JM
'''

import numpy as np
from astropy.io import fits
import astropy.time as astrotime
import copy
import matplotlib.pyplot as plt
import astropy.table as astt

from common.instru.model_geom import InstruXbase
import common.instru.model_geom as iX
from common.sky.catalog import CatalogFovBasic
import common.io.events as xde
import common.num.interpol_tools as imask
from common.num.array_square_cell import ArraySquareCell 
from common import add_path_data_ref_eclairs
from common.mission.time import convert_mjd_in_svomref_seconds
import common.sky.background as bkg
from common.instru.array_convention import yzegp_to_ijdet_array



[docs]class SimuInstruXBase(object): def __init__(self): self.b_photon_noise = True self.time_expose_s = 0 # in second
[docs] def get_det_pixel_surface(self): return self.det_pixel_surface
[docs] def get_shadowgram(self): return self.inst.detect.ar_val
[docs] def plot_shadowgram(self): self.inst.detect.plot("shadowgram t")
[docs] def raz_nb_photon(self): self.inst.detect.ar_val[:,:] = 0
[docs] def set_catalog(self, catalog_x : CatalogFovBasic): self.cat_fov = catalog_x
[docs] def simu_catalog(self, time_expose, verbose=None): pass
[docs] def get_cxb(self, cxb_eclairs_level=1, cxb_type='flat_nospectr', internal_noise=False, noise_level=0.003): """ return a numpy array of detector with CXB count by pixel. method : :param cxb_eclairs_level: intensity of CXB :type cxb_eclairs_level: float [count/cm2/s] :param cxb_type: type of CXB = 'flat_nospectr' or 'shapebased_nospectr' (default='flat_nospectr') :type cxb_type: string :param internal_noise: flag to add internal noise :type internal_noise: bool :param noise_level: internal noise level in cts/sec/cm2/keV (default is 0.003) :type noise_level: float :return: cxb detector image counts :rtype: array(float) """ if cxb_type == 'flat_nospectr': cxb_rate = bkg.flat_cxb_nospectr(cxb_eclairs_level, self.get_det_pixel_surface(), self.inst.detect.ar_val.shape) elif cxb_type == 'shapebased_nospectr': cxb_rate = bkg.shapebased_cxb_nospectr(cxb_eclairs_level) else: print('WARNING: cxb type not reconized, default to flat_nospectr') cxb_rate = bkg.flat_cxb_nospectr(cxb_eclairs_level, self.get_det_pixel_surface(), self.inst.detect.ar_val.shape) if internal_noise: cxb_rate += bkg.flat_internal_noise(noise_level, self.get_det_pixel_surface(), self.inst.detect.ar_val.shape) nb_photon_theo = self.time_expose_s * cxb_rate cpt_cxb = np.random.poisson(nb_photon_theo) return cpt_cxb
[docs] def add_cxb(self, cxb_eclairs_level=1): """ :param cxb_eclairs_level: intensity of CXB :type cxb_eclairs_level: float [count/cm2/s] """ self.inst.detect.ar_val += self.get_cxb(cxb_eclairs_level)
def _add_time_evts(self): """ Count nb events, random uniform time in [s] for each and sort """ nb_photon = int(self.inst.get_total_photon()) self.ar_evts = np.empty((nb_photon,), dtype=[('a', 'uint32'), ('b', 'B'),('c', 'B'), ('d', 'uint16')]) max_tps = int(self.time_expose_s) # TODO text max int32 before self.ar_evts['a'] = np.random.randint(0, max_tps, nb_photon, dtype='uint32') self.ar_evts['d'] = 1 index = np.arange(0, self.inst.detect.ar_val.size) evts = np.repeat(index, self.inst.detect.ar_val.round().astype(np.int).ravel()) (self.ar_evts['b'], self.ar_evts['c']) = divmod(evts, self.inst.detect.get_nb_cell_y())
[docs] def add_all_evt(self, apos): """ Add one on pixel associated to position :param apos: array plan position of photon :type apos: numpy array (n,2) """ self.inst.detect.add_hit(apos)
[docs] def get_snr_sources(self, shadowgram_tot=None): """compute the theoretical max snr of each source the snr are listed in the order of the sources catalog :param shadowgram_tot: final shadowgram (default=from the simu object) :type shadowgram_tot: array(float) :return: list of the snr of each source :rtype: list(float) """ if not shadowgram_tot: shadowgram_tot = self.inst.detect.ar_val snr = [] for src_idx in range(self.cat_fov.get_nb_element()): src_cts = self.shadows_sources[src_idx].sum() idx_l = int(self.intersect_sources[src_idx][0][0]) idx_b = int(self.intersect_sources[src_idx][0][1]) idx_r = int(self.intersect_sources[src_idx][1][0]) idx_u = int(self.intersect_sources[src_idx][1][1]) noise_cts = np.sqrt(shadowgram_tot[idx_l:idx_r+1, idx_b:idx_u+1].sum()) snr_src = src_cts / noise_cts snr.append(snr_src) return snr
[docs] def save_evts(self, filename, t_start=astrotime.Time('2018-06-16T14:37:07', format='isot', scale='tt')): self._add_time_evts() oevt = xde.XEvtData() oevt.set_evt(self.ar_evts) oevt.sort_with_time() oevt.tb_evts['time'] = oevt.tb_evts['time'] + convert_mjd_in_svomref_seconds(t_start.mjd) oevt.write(filename, self.__class__.__name__) del self.ar_evts
[docs] def save_cal_evts(self, filename, t_start=0, l1=False): """ t_start in s from mjdref """ self._add_time_evts() oevt = xde.EclairsCalEvtData() oevt.set_evt_from_std_array(self.ar_evts) oevt.sort_with_time() oevt.tb_evts['time'] = oevt.tb_evts['time'] + t_start if l1: oevt.write_L1(filename, self.__class__.__name__) else: oevt.write(filename, self.__class__.__name__) del self.ar_evts
# output file
[docs] def save_stack_evt_image(self, name_fits): hdu = fits.PrimaryHDU(yzegp_to_ijdet_array(self.inst.detect.ar_val)) hdu.writeto(name_fits, overwrite=True)
[docs] def save_mask(self, name_fits): hdu = fits.PrimaryHDU(yzegp_to_ijdet_array(self.inst.mask.ar_val)) hdu.writeto(name_fits, overwrite=True)
[docs] def save_catal_log(self, filename): """save a fits log file of the simulation of each source """ catal_log = copy.copy(self.cat_fov) catal_log._catalog = astt.join(catal_log.get_catalog_sky_pix(), catal_log.get_catalog()) catal_log._catalog['simulated_cts'] = np.sum(self.shadows_sources, axis=(1,2)) catal_log._catalog['snr'] = self.get_snr_sources() catal_log._catalog.add_row() catal_log._catalog[-1]['sky_y'] = None catal_log._catalog[-1]['sky_z'] = None catal_log._catalog[-1]['elev'] = None catal_log._catalog[-1]['dir'] = None catal_log._catalog[-1]['name'] = 'cxb' catal_log._catalog[-1]['intensity'] = 1 catal_log._catalog[-1]['simulated_cts'] = self.get_cxb(1).sum() catal_log._catalog[-1]['snr'] = None catal_log.save_catalog(filename)
[docs]class SimuInstruXMaskInterpolation(SimuInstruXBase): ''' prerequis : mask re-sampling in grid detector ''' def __init__(self, instru_x : InstruXbase): self.inst = instru_x self.time_expose_s = 0 # in second super().__init__()
[docs] def set_resampling_mask(self, a_mask_detec, verbose=False): nx = self.inst.detect.get_nb_cell_x() ny = self.inst.detect.get_nb_cell_y() if verbose == True: print (a_mask_detec) self.ointer = imask.InterpolLin2DCenter(a_mask_detec, nx, ny)
[docs] def simu_catalog(self, time_expose, verbose=False): self.time_expose_s = time_expose # loop on source and call ray_tracing_for_one_src() r_ar_val = self.inst.detect.ar_val for src in self.cat_fov.get_catalog(): # compute rectangle mask projection in dectetor el_rad = np.deg2rad(src['elev']) dr_rad = np.deg2rad(src['dir']) self.inst.set_pos_source_fov(el_rad, dr_rad) # compute nb photon with # - time # - surface # - angle between source and detector # - intensity of source # - noise photon of source # # convert vec_trans_source in pixel unit detector vec_trans = self.inst.vec_trans_source/self.inst.detect.size_cm if verbose == True: print(vec_trans[0], vec_trans[1]) a_percent_surface = self.ointer.interpol(vec_trans[0], vec_trans[1]) plt.figure() plt.title('percent_surface') plt.imshow(yzegp_to_ijdet_array(a_percent_surface)) plt.colorbar() a_surface_open = a_percent_surface*(np.sin(el_rad)*self.inst.detect.surf_cell) a_nb_photon_theo = src['intensity']*a_surface_open*time_expose # add photon noise if self.b_photon_noise: a_nb_photon = np.reshape(np.random.poisson(a_nb_photon_theo.ravel()) , r_ar_val.shape) else: a_nb_photon = a_nb_photon_theo # cumulate shadowgram of current source to detecteor array r_ar_val += a_nb_photon if verbose == True: print('process simulation %s, with %d photons' %(src['name'], a_nb_photon.sum())) self.inst.detect.ar_val = np.around(r_ar_val)
[docs]class SimuInstruXRayTracing(SimuInstruXBase): ''' simulation instru X with * catalog FOV => stable pointing * evt in pixel => no energy * CXB model uniforme noise on detector ''' def __init__(self, instru_x : InstruXbase, catalog_x : CatalogFovBasic = None): ''' ''' self.inst = instru_x if catalog_x : self.cat_fov = catalog_x super().__init__() self.det_pixel_surface = self.inst.detect.size_cm**2
[docs] def simu_catalog(self, time_expose, verbose=None): self.time_expose_s = time_expose # loop on source and call ray_tracing_for_one_src() src_idx = 0 # init individual source shadowgram and the intersection coordinates pixels on the detector only for a raytrac object if not isinstance(self, SimuECLAIRsRayTracing): self.shadows_sources = np.zeros((len(self.cat_fov.get_catalog()), self.inst.p_nb_pix, self.inst.p_nb_pix)) self.intersect_sources = np.zeros((len(self.cat_fov.get_catalog()), 2, 2)) + [[self.inst.p_nb_pix-1, self.inst.p_nb_pix-1], [0,0]] for src in self.cat_fov.get_catalog(): # compute rectangle mask projection in detector el_rad = np.deg2rad(src['elev']) dr_rad = np.deg2rad(src['dir']) self.inst.set_pos_source_fov(el_rad, dr_rad) surface = self.inst.get_surface_rect_proj() if verbose == True: print('surface %f'%surface) if np.isclose(0, surface, 1e-7) : # src out of detector continue # compute nb photon with # - time # - surface # - angle between source and detector # - intensity of source # - noise photon of source surface_open = surface*np.sin(el_rad) nb_photon_theo = src['intensity']*surface_open*time_expose # add photon noise if self.b_photon_noise: nb_photon = np.random.poisson(nb_photon_theo) else: nb_photon = nb_photon_theo apos = self.inst.detect.random_pos_rect(int(round(nb_photon)), self.inst.rect_proj) self.ray_tracing_for_one_src(apos, src_idx=src_idx) self.intersect_sources[src_idx] = [np.minimum(self.intersect_sources[src_idx, 0], self.intersect_det_subsrc[0]), np.maximum(self.intersect_sources[src_idx, 1], self.intersect_det_subsrc[1])] self.shadows_sources[src_idx] += self.shadow_src src_idx += 1 if verbose == True: print('process simulation %s, with %d photons' %(src['name'], nb_photon))
#self.inst.detect.plot()
[docs] def ray_tracing_for_one_src(self, apos, src_idx=0): """ result: select by ray tracing photon can be arrived on detector by hole mask """ apos_mask = apos + self.inst.vec_trans_source idx_det = self.inst.detect.pos2idx_inf(apos) idx_mask = self.inst.mask.pos2idx_inf(apos_mask) det = self.inst.detect mask = self.inst.mask # apply mask on photon in field of view if idx_mask.size : aval_mask = mask.value_array(idx_mask) #print (aval_mask.shape, aval_mask.sum(), aval_mask.sum()/aval_mask.shape[0], self.detect.ar_val.sum()) # ceci # self.detect.ar_val[idx_det[:,0], idx_det[:,1]] += aval_mask # ne donne pas le bo resultat, comme si on avait = a la plce de += ... ? it = np.nditer([idx_det[:,0], idx_det[:,1], aval_mask], [], [['readonly'], ['readonly'],['readonly']]) self.shadow_src = np.zeros(det.ar_val.shape) for il, ic, val in it: det.ar_val[il,ic] += val # save individual source shadowgram for this source and this mask self.shadow_src[il, ic] += val # intersection on the detector pix for this source and this mask self.intersect_det_subsrc = [[idx_det[:,0].min(), idx_det[:,1].min()], [idx_det[:,0].max(), idx_det[:,1].max()]]
#print(self.detect.ar_val.sum(), self.detect.ar_val.max())
[docs]class SimuECLAIRsRayTracing(SimuInstruXRayTracing): def __init__(self, catalog_x : CatalogFovBasic = None, show=None, p_nb_pixel=None): eclairs = iX.InstruECLAIRs(show=show, p_nb_pix=p_nb_pixel) self.inst = eclairs if catalog_x : self.cat_fov = catalog_x super().__init__(eclairs)
[docs] def simu_catalog(self, time_expose, verbose=None, show=None): self.time_expose_s = time_expose # init individual source shadowgram and the intersection coordinates pixels on the detector self.shadows_sources = np.zeros((len(self.cat_fov.get_catalog()), self.inst.p_nb_pix, self.inst.p_nb_pix)) self.intersect_sources = np.zeros((len(self.cat_fov.get_catalog()), 2, 2)) + [[self.inst.p_nb_pix-1, self.inst.p_nb_pix-1], [0,0]] for mask in self.inst.l_mask: if verbose == True: print("mask ",mask.name) self.inst.set_mask(mask) super().simu_catalog(time_expose, verbose=verbose) if show == True: self.inst.detect.plot()
[docs]class SimuECLAIRsInterpol(SimuInstruXMaskInterpolation): def __init__(self, show=None, p_nb_pixel=None): # approximative method, must have a correction eclairs = iX.InstruECLAIRs(show=show, p_nb_pix=p_nb_pixel) self.inst = eclairs super().__init__(eclairs) f_mask_detec = add_path_data_ref_eclairs('Rebinned_Final_mask_AGr.fits') hdul = fits.open(f_mask_detec) a_mask_detec = hdul[0].data.copy() hdul.close() self.set_resampling_mask(a_mask_detec) # need correction before use it raise
[docs]class SimuECLAIRsMaskProjection(SimuInstruXBase): def __init__(self, catalog_x : CatalogFovBasic = None, p_nb_pixel=None, show=None): """ """ self.inst = iX.InstruECLAIRs(p_nb_pix=p_nb_pixel, show=show) self.set_catalog(catalog_x) super().__init__() self.det_tmp = ArraySquareCell() # for pydev auto-completion reference self.det_tmp = copy.deepcopy(self.inst.detect) self.det_pixel_surface = self.inst.detect.size_cm**2 self.intersect_sources = []
[docs] def get_det_pixel_surface(self): return self.det_pixel_surface
[docs] def get_shadow_percent(self, elev, direc): self.el_rad = np.deg2rad(elev) dr_rad = np.deg2rad(direc) self.inst.set_pos_source_fov(self.el_rad, dr_rad) return self.get_proj_mask()
[docs] def get_shadow_surface(self, elev, direc): a_surface_open = self.get_shadow_percent(elev, direc)*self.inst.detect.surf_cell return a_surface_open
[docs] def get_shadow_apparent_surface(self, elev, direc): """ return apparent surface """ a_surface_apparent = self.get_shadow_surface(elev, direc)*np.sin(self.el_rad) return a_surface_apparent
[docs] def simu_catalog(self, time_expose, verbose=False): self.time_expose_s = time_expose # save individual source shadowgram and the intersection coordinates pixels on the detector self.shadows_sources = [] # loop on source and call ray_tracing_for_one_src() r_ar_val = self.inst.detect.ar_val for src in self.cat_fov.get_catalog(): # compute rectangle mask projection in detector el_rad = np.deg2rad(src['elev']) dr_rad = np.deg2rad(src['dir']) self.inst.set_pos_source_fov(el_rad, dr_rad) # compute nb photon with # - time # - surface # - angle between source and detector # - intensity of source # - noise photon of source # #a_percent_surface = self.ointer.interpol(vec_trans[0], vec_trans[1]) a_percent_surface = self.get_proj_mask() a_surface_open = a_percent_surface*(np.sin(el_rad)*self.inst.detect.surf_cell) a_nb_photon_theo = src['intensity']*a_surface_open*time_expose # add photon noise if self.b_photon_noise: a_nb_photon = np.reshape(np.random.poisson(a_nb_photon_theo.ravel()) , r_ar_val.shape) else: a_nb_photon = a_nb_photon_theo # save individual source shadowgram self.shadows_sources.append(a_nb_photon) # cumulate shadowgram of current source to detector array and their pattern on the detector r_ar_val += a_nb_photon if verbose == True: print('process simulation %s, with %d photons' %(src['name'], a_nb_photon.sum())) self.inst.detect.ar_val = np.around(r_ar_val)
[docs] def get_proj_mask(self): intersect_source = [[self.inst.p_nb_pix-1, self.inst.p_nb_pix-1], [0, 0]] # loop on sub mask self.det_tmp.set_val(0.0) for mask in self.inst.l_mask: #print("========================================") self.inst.set_mask(mask) # intersection sub-mask if self.inst.get_surface_rect_proj() > 0: self.mask_trans = ArraySquareCell() self.mask_trans = copy.copy(self.inst.mask) # add translation source new_origin = self.mask_trans.to_origin_cm + self.inst.vec_trans_source self.mask_trans.set_origin(new_origin) self.get_proj_sub_mask() # intersection on the detector pix of sub mask intersect_det_submask = self.inst.detect.pos2idx_rect(self.inst.rect_proj, True) intersect_source = [np.minimum(intersect_source[0], intersect_det_submask[0]), np.maximum(intersect_source[1], intersect_det_submask[1])] self.intersect_sources.append(intersect_source) # to percent self.det_tmp.ar_val /= self.det_tmp.surf_cell #self.plot_proj_mask("just compute") return self.det_tmp.ar_val
[docs] def get_proj_sub_mask(self): # define index pixel mask in intersection #print(self.inst.rect_proj) mask_idx = self.mask_trans.pos2idx_rect(self.inst.rect_proj, True) # loop on pixel mask for col_m in range(mask_idx[0,1], mask_idx[1,1]+1): for row_m in range(mask_idx[0,0], mask_idx[1,0]+1): if self.mask_trans.ar_val[row_m, col_m] == 0 : continue # It's open mask pixel # we can define position of this open pixel rect_pos_pix_mask = self.mask_trans.pos_rect_cell([row_m, col_m]) # print("========================================") # print('pixel mask') # print(row_m, col_m) # print(rect_pos_pix_mask) # now define index pixel DPIX in intersection with this open pixel dpix_idx = self.det_tmp.pos2idx_rect(rect_pos_pix_mask, True) #print("DPIX: \n", dpix_idx) for col_d in range(dpix_idx[0,1], dpix_idx[1,1]+1): for row_d in range(dpix_idx[0,0], dpix_idx[1,0]+1): rect_pos_dpix = self.det_tmp.pos_rect_cell([row_d, col_d]) # print("========================") # print('pixel dpix') # print(row_d, col_d) #print(rect_pos_dpix) surf_inter = self.det_tmp.surf_intersection_rect(rect_pos_pix_mask, rect_pos_dpix) self.det_tmp.ar_val[row_d, col_d] += surf_inter
#break #break
[docs] def plot_proj_mask(self,p_title=""): plt.figure() plt.title(p_title) plt.imshow(yzegp_to_ijdet_array(self.det_tmp.ar_val)) plt.colorbar()