Source code for common.io.events

'''
Created on 7 févr. 2018

objects to manipulate events

@author: Colley Jean-Marc, APC/IN2P3/CNRS
'''


from copy import deepcopy
import astropy.table as at
import astropy.units as u
import numpy as np
import matplotlib.pyplot as plt
from astropy.io import fits

from common.io import fits_tools
import common.instru.array_convention as arr_conv

    
[docs]class XEvtData(object): """standard container for ECLAIRs' event or data file """ def __init__(self): """**constructor** : init with an empty, photons table """ self.nb_pix=80 #ATTENTION !! Ceci est temporaire et DOIT etre changer pour une forme plus robuste self.name_col = ('time', 'Y', 'Z', 'energy') self.meta_col = {'time': 'arrival time in s after mjdref keyword', 'Y' : "detector pixel 0-79", 'Z' : "detector pixel 0-79", 'energy' : 'photon energy in adu'} self.dtype_col = ('f8', 'u1', 'u1', 'i2') self.tb_evts = at.Table( names=self.name_col, meta=self.meta_col, dtype =self.dtype_col) self.tb_evts['time'].unit = u.s; self.tb_evts['energy'].unit = u.adu self.tb_evts['Y'].unit = u.pix self.tb_evts['Z'].unit = u.pix self.start_time = 0 @property def nb_row(self): return len(self.tb_evts)
[docs] def sort_with_time(self): """sort the event table according to the arrival time of each photon """ self.tb_evts.sort('time')
[docs] def write(self, filename, creator='unkowwn', t_start=0, t_stop=0): """save the event in a fits file :param filename: PATH/name of the file to save to :param start_time: events start time in s from MJDREF :param t_stop: events end time in s from MJDREF :param creator: program that generated the file :type filename: string :type start_time: float :type t_stop: float :type creator: string (default='unknown') """ if t_start==0: t_start = self.tb_evts['time'].min() if t_stop==0: t_stop = self.tb_evts['time'].max() evts = deepcopy(self.tb_evts) evts['Z'], evts['Y'] = arr_conv.yzegp_to_ijdet(evts['Y'], evts['Z']) evts['Y'].name = 'Jdet' evts['Z'].name = 'Idet' tbhdu = fits.table_to_hdu(evts) prihdr = fits.Header() prihdu = fits.PrimaryHDU(header=prihdr) fits_tools.common_keyword(tbhdu, t_start, t_stop, creator, 1.0, '') thdulist = fits.HDUList([prihdu, tbhdu]) thdulist[1].header['comment'] = "ECLAIRs events list" thdulist[1].name = 'events table' thdulist.writeto(filename, overwrite=True, checksum=True)
[docs] def read(self, filename, add_data=False): """read an event from a fits file and save it into the tb_evts attribute the events are sorted according to arrival time :param filename: PATH/name of the event file to read :param add_data: replace or add to precedent data. Default is False :type filename: string :type add_data: bool """ if add_data: evts_to_add = at.Table.read(filename, format='fits') evts_to_add['Jdet'], evts_to_add['Idet'] = arr_conv.ijdet_to_yzegp(evts_to_add['Idet'], evts_to_add['Jdet']) evts_to_add['Idet'].name = 'Z' evts_to_add['Jdet'].name = 'Y' self.tb_evts = at.vstack([self.tb_evts, evts_to_add], metadata_conflicts='silent') self.sort_with_time() else: self.tb_evts = at.Table.read(filename, format='fits') self.tb_evts['Jdet'], self.tb_evts['Idet'] = arr_conv.ijdet_to_yzegp(self.tb_evts['Idet'], self.tb_evts['Jdet']) self.tb_evts['Idet'].name = 'Z' self.tb_evts['Jdet'].name = 'Y' self.sort_with_time()
[docs] def set_evt(self, data_table): """set the data event table with the input array :param data_table: input data array to read from :type data_table: array """ self.tb_evts = at.Table(np.array(data_table), names=self.name_col, meta=self.meta_col, dtype=self.dtype_col)
[docs] def extract_image(self, t_1, t_2, photon_sum='energy', energy_band=[0,9999]): """extract one image from the data event table The image is build with photons between t_1 and t_2, and within 'energy_band'. :param t_1: start time to integrate the image :param t_2: end time for the integration :param photon_sum: column name to use to sum photons in the pixels image. Default='energy'. :param energy_band: energy band to sum the photon within. Default=[0,9999]. :type t_1: float in 'time' format :type t_2: float in "time" format :type photon_sum: string :type energy_band: [int, int] :return: the image integrated between t_1 and t_2, within 'energy_band' :rtype: float array """ one_image = np.zeros((self.nb_pix,self.nb_pix)) window_index = np.where(((t_1 <= self.tb_evts['time']) & (self.tb_evts['time'] < t_2)) & ((self.tb_evts[photon_sum]>=energy_band[0]) & (self.tb_evts[photon_sum]<=energy_band[1])))[0] try: for photon in np.nditer(self.tb_evts[window_index]): one_image[photon['Y'],photon['Z']] += 1 except: pass return one_image
[docs] def construct_images(self,t_window,t_step,method='sliding_window'): """construct a suite of images from the data event table images integrated during time = t_window with time step = t_step - if t_window == t_step: images are successives - if t_window == t_max - t_min: return one stacked array 2 construct methods : sliding_window or cumulative - sliding_window : images are independently computed - cumulative : images are added with the precedent one .. note:: may be some data not included from the end of the event file :param t_window: integration window of the images, in 'time' format. Must be < t_max - t_min. :type t_window: float :param t_step: step between two beginning time of windows in 'time' format :type t_step: float :param method: construction method: 'sliding_window' or 'cumulative'. Default='sliding_window' :type method: string :return: constructed images :rtype: float np.array """ t_min = self.tb_evts['time'].min() t_max = self.tb_evts['time'].max() if (t_window == t_max - t_min) : return self.extract_image(t_min,t_max+1) elif ((0 < t_window < t_max - t_min) and (method == 'sliding_window')) : arr_images=np.empty((0,self.nb_pix,self.nb_pix)) t_1 = t_min t_2 = t_window + t_min while (t_2 <= t_max) : one_image = self.extract_image(t_1,t_2) arr_images = np.append(arr_images,[one_image],axis=0) t_1 += t_step t_2 += t_step return arr_images elif ((0 < t_window < t_max - t_min) and (method == 'cumulative')): arr_images=np.empty((0,self.nb_pix,self.nb_pix)) t_1 = t_min t_2 = t_window + t_min one_image = self.extract_image(t_1,t_2) arr_images = np.append(arr_images,[one_image],axis=0) t_2 += t_step while (t_2 <= t_max) : one_image = self.extract_image(t_1,t_2) arr_images = np.append(arr_images,[one_image],axis=0) t_2 += t_step return arr_images else: raise ValueError("The time window {} must have a shorter duration than the duration of the event in question {}".format(t_window,t_max))
[docs] def save_images(self,t_window,t_step,images_path,method='sliding_window'): """construct and save a suite of images from the data event table 2 construct methods : sliding_window or cumulative - sliding_window : images are independently computed - cumulative : integration time of images increase of t_step each step (t_window == size of the initial time window) images integrated during time = t_window with time step = t_step - if t_window == t_step: images are successives - if t_window == t_max: return one stacked array :param t_window: integration window of the images, in 'time' format. Must be < t_max - t_min. :type t_window: float :param t_step: step between two beginning time of windows in 'time' format :type t_step: float :param images_path: path to the directory to save images into :type images_path: string (PATH) :param method: construction method: 'sliding_window' or 'cumulative'. Default='sliding_window' :type method: string """ arr_images = self.construct_images(t_window,t_step,method) time_unit = 's' if arr_images.ndim == 2 : plt.figure() #plt.title('shadowgram_%02d' % (i) ) plt.title('shadowgram_00') plt.imshow(arr_conv.yzegp_to_ijdet_array(arr_images)) cbar = plt.colorbar() cbar.set_label('photons number') #ATTENTION au format d'affichage des temps plt.text(-10, 68, ' t_window={:.0f}{}\n'.format(t_window, time_unit), bbox={'facecolor': 'white', 'pad': 5}) plt.savefig(images_path+'/img00.png') plt.close() else: z_min = arr_images.min() z_max = arr_images.max() if method == 'cumulative': for i in range(len(arr_images)): plt.figure() #plt.title('shadowgram_%02d' % (i) ) plt.title('shadowgram_{:02d}'.format(i) ) plt.imshow(arr_conv.yzegp_to_ijdet_array(arr_images[i]), vmin=z_min, vmax=z_max) cbar = plt.colorbar() cbar.set_label('photons number') #ATTENTION au format d'affichage des temps plt.text(-10, 68, ' t_window={:.0f}{}'.format(t_window+t_step*i, time_unit), bbox={'facecolor': 'white', 'pad': 5}) plt.savefig(images_path+'/img{:02d}.png'.format(i)) plt.close() else: for i in range(len(arr_images)): plt.figure() #plt.title('shadowgram_%02d' % (i) ) plt.title('shadowgram_{:02d}'.format(i) ) plt.imshow(arr_conv.yzegp_to_ijdet_array(arr_images[i]), vmin=z_min, vmax=z_max) cbar = plt.colorbar() cbar.set_label('photons number') #ATTENTION au format d'affichage des temps plt.text(-10, 68, ' t_window={0:.0f}{2}\n time={1:.0f}{2}'.format(t_window,t_step*i, time_unit), bbox={'facecolor': 'white', 'pad': 5}) plt.savefig(images_path+'/img{:02d}.png'.format(i)) plt.close()
[docs]class EclairsCalEvtData(XEvtData): """standard container for ECLAIRs' calibrated event data """ def __init__(self, nb_line=None): """**constructor** : init with an empty counts table """ self.nb_pix=80 #ATTENTION !! Ceci est temporaire et DOIT etre changer pour une forme plus robuste self.name_col = ('time', 'Y', 'Z', 'PHA', 'PI', 'FLAG') self.meta_col = {'time': 'Universal Time in s from MJDREF', 'Y' : "detector pixel 0-79", 'Z' : "detector pixel 0-79", 'PHA' : 'Pules Height Amplitude energy channel (0-1023)', 'PI' : 'Pulse height Invariant energy channel (0-1023)', 'FLAG' : 'Flag pixel status'} # warning! i1 is interpreted as a bool by cfits self.dtype_col = ('f8', 'u1', 'u1', 'u2', 'u2', 'b') dt_numpy = ('f8, u1, u1, u2, u2, b') if nb_line: empty_array = np.zeros(nb_line, dtype= dt_numpy) self.tb_evts = at.Table(data=empty_array, names=self.name_col, meta=self.meta_col, dtype =self.dtype_col) else: self.tb_evts = at.Table( names=self.name_col, meta=self.meta_col, dtype =self.dtype_col) self.tb_evts['time'].unit = u.s; self.tb_evts['PHA'].unit = u.adu self.tb_evts['PI'].unit = u.adu self.tb_evts['Y'].unit = u.pix self.tb_evts['Z'].unit = u.pix
[docs] def add_evt(self, eced): assert isinstance(eced, EclairsCalEvtData) self.tb_evts = at.vstack([self.tb_evts, eced.tb_evts])
[docs] def set_evt_from_std_array(self, data_table): """set the data event table with the input array (time, y, z, pha) :param data_table: input data table [time, y, z, pha] :type data_table: array """ self.tb_evts = at.Table(np.array(data_table), names=self.name_col[0:4], meta=self.meta_col, dtype=self.dtype_col[0:4]) self.tb_evts['time'].unit = u.s; self.tb_evts['PHA'].unit = u.adu self.tb_evts['Y'].unit = u.pix self.tb_evts['Z'].unit = u.pix flag_column = at.Column([0], name='FLAG', meta={'FLAG' : 'Flag pixel status'}, dtype='i4') pulse_invariant_column = self.tb_evts.columns['PHA'].copy() pulse_invariant_column.name = 'PI' pulse_invariant_column.meta = {'PI' : 'Pulse height Invariant energy channel (0-1023)'} self.tb_evts['FLAG'] = flag_column self.tb_evts['PI'] = pulse_invariant_column
[docs] def extract_image(self,t_1,t_2, energy_range=[0, 1023]): """extract one image from the data event table extract image by summing the PHA of photons in the pixels of the image :param t_1: start time to integrate the image :param t_2: end time for the integration :param energy_range: energy band to sum the photon within. Default=[0,1023]. :type t_1: float in 'time' format :type t_2: float in "time" format :type energy_range: [int, int] :return: the image integrated between t_1 and t_2, and within the energy range :rtype: float array """ return super().extract_image(t_1, t_2, 'PI', energy_range)
[docs] def extract_images(self, t_1, t_2, energy_ranges): """extract images according to the time and energy ranges in input :param t_1: start time to integrate the image :type t_1: float in 'time' format :param t_2: end time for the integration :type t_2: float in "time" format :param energy_ranges: different energy ranges (in PI) to extract the images [[low_energy, high_energy],[]] :type energy_ranges: [[int, int]] :return: extracted images between t_1 and t_2, and within the energy ranges :rtype: list(array) """ images = [] for energy_range in energy_ranges: images.append(self.extract_image(t_1, t_2, energy_range)) return images
def _save_from_IJdet(self, evts, add_data=False): """save it into the tb_evts attribute :param evts: events in (Y,Z) :param add_data: replace or add to precedent data. Default is False :type filename: astropy.table.Table :type add_data: bool """ if (not add_data): self.tb_evts = evts self.tb_evts['Jdet'], self.tb_evts['Idet'] = arr_conv.ijdet_to_yzegp(self.tb_evts['Idet'], self.tb_evts['Jdet']) self.tb_evts['Idet'].name = 'Z' self.tb_evts['Jdet'].name = 'Y' else: evts['Jdet'], evts['Idet'] = arr_conv.ijdet_to_yzegp(evts['Idet'], evts['Jdet']) evts['Idet'].name = 'Z' evts['Jdet'].name = 'Y' self.tb_evts = at.vstack([self.tb_evts, evts], metadata_conflicts='silent') self.sort_with_time()
[docs] def read(self, filename, add_data=False): """read an event from a fits file (in IJdet convention) and save it into the tb_evts attribute :param filename: PATH/name of the event file to read :param add_data: replace or add to precedent data. Default is False :type filename: string :type add_data: bool """ tb_evts = at.Table.read(filename, format='fits') self._save_from_IJdet(tb_evts, add_data)
[docs] def write(self, filename, creator, t_start=0, t_stop=0, proc_id="01"): """save the event in a ECLAIRs calibrated event fits file Common keywords are added to the header. :param filename: PATH/name of the file to save to :param start_time: events start time in s from MJDREF :param t_stop: events end time in s from MJDREF :param creator: program that generated the file :type filename: string :type start_time: float :type t_stop: float :type creator: string (default='unknown') """ #if start_time.mjd != 0: # time set to MJD # time en ms -> en jour #self.tb_evts['time'] = self.tb_evts['time']/1000/3600/24 + start_time.mjd # construction of the fits file if self.nb_row == 0: print("WARNING: no event !!") else: if t_start==0: t_start = self.tb_evts['time'].min() if t_stop==0: t_stop = self.tb_evts['time'].max() evts = deepcopy(self.tb_evts) evts['Z'], evts['Y'] = arr_conv.yzegp_to_ijdet(evts['Y'], evts['Z']) evts['Y'].name = 'Jdet' evts['Z'].name = 'Idet' del evts.meta['Y'] del evts.meta['Z'] evts.meta['Jdet'] = 'column number on J detector axis' evts.meta['Idet'] = 'column number on I detector axis' tbhdu = fits.table_to_hdu(evts) prihdr = fits.Header() prihdu = fits.PrimaryHDU(header=prihdr) fits_tools.common_keyword(tbhdu, t_start, t_stop,creator, 1.0, proc_id) thdulist = fits.HDUList([prihdu, tbhdu]) thdulist[1].header['comment'] = "ECLAIRs calibrated events list" thdulist[1].name = 'EVTCAL' thdulist.writeto(filename, overwrite=True, checksum=True)
[docs] def write_L1(self, filename, creator, t_start=0, t_stop=0, proc_id="01"): """save the event in a ECLAIRs single event energy coded fits file Common keywords are added to the header. :param filename: PATH/name of the file to save to :param start_time: events start time in s from MJDREF :param t_stop: events end time in s from MJDREF :param creator: program that generated the file :type filename: string :type start_time: float :type t_stop: float :type creator: string (default='unknown') """ if self.nb_row == 0: print("WARNING: no event !!") else: if t_start==0: t_start = self.tb_evts['time'].min() if t_stop==0: t_stop = self.tb_evts['time'].max() evts = deepcopy(self.tb_evts) evts['Z'], evts['Y'] = arr_conv.yzegp_to_ijdet(evts['Y'], evts['Z']) evts['Y'].name = 'Jdet' evts['Z'].name = 'Idet' del evts.meta['Y'] del evts.meta['Z'] del evts.meta['PI'] del evts.meta['FLAG'] evts.meta['Jdet'] = 'column number on J detector axis' evts.meta['Idet'] = 'column number on I detector axis' evts.meta['time'] = 'time of the event (from PPS and clock)' evts.remove_columns(['PI', 'FLAG']) ebd = at.Column(np.zeros(self.nb_row), name='EBD', meta={'EBD' : 'energy band (0 to 3, -1 for SEU, 4 for SEO'}, dtype='u1') evts.add_column(ebd) tbhdu = fits.table_to_hdu(evts) prihdr = fits.Header() prihdu = fits.PrimaryHDU(header=prihdr) fits_tools.common_keyword(tbhdu, t_start, t_stop,creator, 2.0, proc_id) thdulist = fits.HDUList([prihdu, tbhdu]) thdulist[1].header['comment'] = "ECLAIRs single events list, with energy coded" thdulist[1].name = 'ECL-EVT-SEC' thdulist.writeto(filename, overwrite=True, checksum=True)