'''
Created on 7 févr. 2018
objects to manipulate events
@author: Colley Jean-Marc, APC/IN2P3/CNRS
'''
from copy import deepcopy
import astropy.table as at
import astropy.units as u
import numpy as np
import matplotlib.pyplot as plt
from astropy.io import fits
from common.io import fits_tools
import common.instru.array_convention as arr_conv
[docs]class XEvtData(object):
"""standard container for ECLAIRs' event or data file
"""
def __init__(self):
"""**constructor** : init with an empty, photons table
"""
self.nb_pix=80 #ATTENTION !! Ceci est temporaire et DOIT etre changer pour une forme plus robuste
self.name_col = ('time', 'Y', 'Z', 'energy')
self.meta_col = {'time': 'arrival time in s after mjdref keyword',
'Y' : "detector pixel 0-79",
'Z' : "detector pixel 0-79",
'energy' : 'photon energy in adu'}
self.dtype_col = ('f8', 'u1', 'u1', 'i2')
self.tb_evts = at.Table( names=self.name_col,
meta=self.meta_col,
dtype =self.dtype_col)
self.tb_evts['time'].unit = u.s;
self.tb_evts['energy'].unit = u.adu
self.tb_evts['Y'].unit = u.pix
self.tb_evts['Z'].unit = u.pix
self.start_time = 0
@property
def nb_row(self):
return len(self.tb_evts)
[docs] def sort_with_time(self):
"""sort the event table according to the arrival time of each photon
"""
self.tb_evts.sort('time')
[docs] def write(self, filename, creator='unkowwn', t_start=0, t_stop=0):
"""save the event in a fits file
:param filename: PATH/name of the file to save to
:param start_time: events start time in s from MJDREF
:param t_stop: events end time in s from MJDREF
:param creator: program that generated the file
:type filename: string
:type start_time: float
:type t_stop: float
:type creator: string (default='unknown')
"""
if t_start==0:
t_start = self.tb_evts['time'].min()
if t_stop==0:
t_stop = self.tb_evts['time'].max()
evts = deepcopy(self.tb_evts)
evts['Z'], evts['Y'] = arr_conv.yzegp_to_ijdet(evts['Y'], evts['Z'])
evts['Y'].name = 'Jdet'
evts['Z'].name = 'Idet'
tbhdu = fits.table_to_hdu(evts)
prihdr = fits.Header()
prihdu = fits.PrimaryHDU(header=prihdr)
fits_tools.common_keyword(tbhdu, t_start, t_stop, creator, 1.0, '')
thdulist = fits.HDUList([prihdu, tbhdu])
thdulist[1].header['comment'] = "ECLAIRs events list"
thdulist[1].name = 'events table'
thdulist.writeto(filename, overwrite=True, checksum=True)
[docs] def read(self, filename, add_data=False):
"""read an event from a fits file and save it into the tb_evts attribute
the events are sorted according to arrival time
:param filename: PATH/name of the event file to read
:param add_data: replace or add to precedent data. Default is False
:type filename: string
:type add_data: bool
"""
if add_data:
evts_to_add = at.Table.read(filename, format='fits')
evts_to_add['Jdet'], evts_to_add['Idet'] = arr_conv.ijdet_to_yzegp(evts_to_add['Idet'], evts_to_add['Jdet'])
evts_to_add['Idet'].name = 'Z'
evts_to_add['Jdet'].name = 'Y'
self.tb_evts = at.vstack([self.tb_evts, evts_to_add], metadata_conflicts='silent')
self.sort_with_time()
else:
self.tb_evts = at.Table.read(filename, format='fits')
self.tb_evts['Jdet'], self.tb_evts['Idet'] = arr_conv.ijdet_to_yzegp(self.tb_evts['Idet'], self.tb_evts['Jdet'])
self.tb_evts['Idet'].name = 'Z'
self.tb_evts['Jdet'].name = 'Y'
self.sort_with_time()
[docs] def set_evt(self, data_table):
"""set the data event table with the input array
:param data_table: input data array to read from
:type data_table: array
"""
self.tb_evts = at.Table(np.array(data_table), names=self.name_col,
meta=self.meta_col, dtype=self.dtype_col)
[docs] def construct_images(self,t_window,t_step,method='sliding_window'):
"""construct a suite of images from the data event table
images integrated during time = t_window with time step = t_step
- if t_window == t_step: images are successives
- if t_window == t_max - t_min: return one stacked array
2 construct methods : sliding_window or cumulative
- sliding_window : images are independently computed
- cumulative : images are added with the precedent one
.. note:: may be some data not included from the end of the event file
:param t_window: integration window of the images, in 'time' format. Must be < t_max - t_min.
:type t_window: float
:param t_step: step between two beginning time of windows in 'time' format
:type t_step: float
:param method: construction method: 'sliding_window' or 'cumulative'. Default='sliding_window'
:type method: string
:return: constructed images
:rtype: float np.array
"""
t_min = self.tb_evts['time'].min()
t_max = self.tb_evts['time'].max()
if (t_window == t_max - t_min) :
return self.extract_image(t_min,t_max+1)
elif ((0 < t_window < t_max - t_min) and (method == 'sliding_window')) :
arr_images=np.empty((0,self.nb_pix,self.nb_pix))
t_1 = t_min
t_2 = t_window + t_min
while (t_2 <= t_max) :
one_image = self.extract_image(t_1,t_2)
arr_images = np.append(arr_images,[one_image],axis=0)
t_1 += t_step
t_2 += t_step
return arr_images
elif ((0 < t_window < t_max - t_min) and (method == 'cumulative')):
arr_images=np.empty((0,self.nb_pix,self.nb_pix))
t_1 = t_min
t_2 = t_window + t_min
one_image = self.extract_image(t_1,t_2)
arr_images = np.append(arr_images,[one_image],axis=0)
t_2 += t_step
while (t_2 <= t_max) :
one_image = self.extract_image(t_1,t_2)
arr_images = np.append(arr_images,[one_image],axis=0)
t_2 += t_step
return arr_images
else:
raise ValueError("The time window {} must have a shorter duration than the duration of the event in question {}".format(t_window,t_max))
[docs] def save_images(self,t_window,t_step,images_path,method='sliding_window'):
"""construct and save a suite of images from the data event table
2 construct methods : sliding_window or cumulative
- sliding_window : images are independently computed
- cumulative : integration time of images increase of t_step each step (t_window == size of the initial time window)
images integrated during time = t_window with time step = t_step
- if t_window == t_step: images are successives
- if t_window == t_max: return one stacked array
:param t_window: integration window of the images, in 'time' format. Must be < t_max - t_min.
:type t_window: float
:param t_step: step between two beginning time of windows in 'time' format
:type t_step: float
:param images_path: path to the directory to save images into
:type images_path: string (PATH)
:param method: construction method: 'sliding_window' or 'cumulative'. Default='sliding_window'
:type method: string
"""
arr_images = self.construct_images(t_window,t_step,method)
time_unit = 's'
if arr_images.ndim == 2 :
plt.figure()
#plt.title('shadowgram_%02d' % (i) )
plt.title('shadowgram_00')
plt.imshow(arr_conv.yzegp_to_ijdet_array(arr_images))
cbar = plt.colorbar()
cbar.set_label('photons number')
#ATTENTION au format d'affichage des temps
plt.text(-10, 68, ' t_window={:.0f}{}\n'.format(t_window, time_unit), bbox={'facecolor': 'white', 'pad': 5})
plt.savefig(images_path+'/img00.png')
plt.close()
else:
z_min = arr_images.min()
z_max = arr_images.max()
if method == 'cumulative':
for i in range(len(arr_images)):
plt.figure()
#plt.title('shadowgram_%02d' % (i) )
plt.title('shadowgram_{:02d}'.format(i) )
plt.imshow(arr_conv.yzegp_to_ijdet_array(arr_images[i]), vmin=z_min, vmax=z_max)
cbar = plt.colorbar()
cbar.set_label('photons number')
#ATTENTION au format d'affichage des temps
plt.text(-10, 68, ' t_window={:.0f}{}'.format(t_window+t_step*i, time_unit), bbox={'facecolor': 'white', 'pad': 5})
plt.savefig(images_path+'/img{:02d}.png'.format(i))
plt.close()
else:
for i in range(len(arr_images)):
plt.figure()
#plt.title('shadowgram_%02d' % (i) )
plt.title('shadowgram_{:02d}'.format(i) )
plt.imshow(arr_conv.yzegp_to_ijdet_array(arr_images[i]), vmin=z_min, vmax=z_max)
cbar = plt.colorbar()
cbar.set_label('photons number')
#ATTENTION au format d'affichage des temps
plt.text(-10, 68, ' t_window={0:.0f}{2}\n time={1:.0f}{2}'.format(t_window,t_step*i, time_unit), bbox={'facecolor': 'white', 'pad': 5})
plt.savefig(images_path+'/img{:02d}.png'.format(i))
plt.close()
[docs]class EclairsCalEvtData(XEvtData):
"""standard container for ECLAIRs' calibrated event data
"""
def __init__(self, nb_line=None):
"""**constructor** : init with an empty counts table
"""
self.nb_pix=80 #ATTENTION !! Ceci est temporaire et DOIT etre changer pour une forme plus robuste
self.name_col = ('time', 'Y', 'Z', 'PHA', 'PI', 'FLAG')
self.meta_col = {'time': 'Universal Time in s from MJDREF',
'Y' : "detector pixel 0-79",
'Z' : "detector pixel 0-79",
'PHA' : 'Pules Height Amplitude energy channel (0-1023)',
'PI' : 'Pulse height Invariant energy channel (0-1023)',
'FLAG' : 'Flag pixel status'}
# warning! i1 is interpreted as a bool by cfits
self.dtype_col = ('f8', 'u1', 'u1', 'u2', 'u2', 'b')
dt_numpy = ('f8, u1, u1, u2, u2, b')
if nb_line:
empty_array = np.zeros(nb_line, dtype= dt_numpy)
self.tb_evts = at.Table(data=empty_array, names=self.name_col,
meta=self.meta_col,
dtype =self.dtype_col)
else:
self.tb_evts = at.Table( names=self.name_col,
meta=self.meta_col,
dtype =self.dtype_col)
self.tb_evts['time'].unit = u.s;
self.tb_evts['PHA'].unit = u.adu
self.tb_evts['PI'].unit = u.adu
self.tb_evts['Y'].unit = u.pix
self.tb_evts['Z'].unit = u.pix
[docs] def add_evt(self, eced):
assert isinstance(eced, EclairsCalEvtData)
self.tb_evts = at.vstack([self.tb_evts, eced.tb_evts])
[docs] def set_evt_from_std_array(self, data_table):
"""set the data event table with the input array (time, y, z, pha)
:param data_table: input data table [time, y, z, pha]
:type data_table: array
"""
self.tb_evts = at.Table(np.array(data_table), names=self.name_col[0:4],
meta=self.meta_col, dtype=self.dtype_col[0:4])
self.tb_evts['time'].unit = u.s;
self.tb_evts['PHA'].unit = u.adu
self.tb_evts['Y'].unit = u.pix
self.tb_evts['Z'].unit = u.pix
flag_column = at.Column([0], name='FLAG', meta={'FLAG' : 'Flag pixel status'}, dtype='i4')
pulse_invariant_column = self.tb_evts.columns['PHA'].copy()
pulse_invariant_column.name = 'PI'
pulse_invariant_column.meta = {'PI' : 'Pulse height Invariant energy channel (0-1023)'}
self.tb_evts['FLAG'] = flag_column
self.tb_evts['PI'] = pulse_invariant_column
def _save_from_IJdet(self, evts, add_data=False):
"""save it into the tb_evts attribute
:param evts: events in (Y,Z)
:param add_data: replace or add to precedent data. Default is False
:type filename: astropy.table.Table
:type add_data: bool
"""
if (not add_data):
self.tb_evts = evts
self.tb_evts['Jdet'], self.tb_evts['Idet'] = arr_conv.ijdet_to_yzegp(self.tb_evts['Idet'], self.tb_evts['Jdet'])
self.tb_evts['Idet'].name = 'Z'
self.tb_evts['Jdet'].name = 'Y'
else:
evts['Jdet'], evts['Idet'] = arr_conv.ijdet_to_yzegp(evts['Idet'], evts['Jdet'])
evts['Idet'].name = 'Z'
evts['Jdet'].name = 'Y'
self.tb_evts = at.vstack([self.tb_evts, evts], metadata_conflicts='silent')
self.sort_with_time()
[docs] def read(self, filename, add_data=False):
"""read an event from a fits file (in IJdet convention) and save it into the tb_evts attribute
:param filename: PATH/name of the event file to read
:param add_data: replace or add to precedent data. Default is False
:type filename: string
:type add_data: bool
"""
tb_evts = at.Table.read(filename, format='fits')
self._save_from_IJdet(tb_evts, add_data)
[docs] def write(self, filename, creator, t_start=0, t_stop=0, proc_id="01"):
"""save the event in a ECLAIRs calibrated event fits file
Common keywords are added to the header.
:param filename: PATH/name of the file to save to
:param start_time: events start time in s from MJDREF
:param t_stop: events end time in s from MJDREF
:param creator: program that generated the file
:type filename: string
:type start_time: float
:type t_stop: float
:type creator: string (default='unknown')
"""
#if start_time.mjd != 0:
# time set to MJD
# time en ms -> en jour
#self.tb_evts['time'] = self.tb_evts['time']/1000/3600/24 + start_time.mjd
# construction of the fits file
if self.nb_row == 0:
print("WARNING: no event !!")
else:
if t_start==0:
t_start = self.tb_evts['time'].min()
if t_stop==0:
t_stop = self.tb_evts['time'].max()
evts = deepcopy(self.tb_evts)
evts['Z'], evts['Y'] = arr_conv.yzegp_to_ijdet(evts['Y'], evts['Z'])
evts['Y'].name = 'Jdet'
evts['Z'].name = 'Idet'
del evts.meta['Y']
del evts.meta['Z']
evts.meta['Jdet'] = 'column number on J detector axis'
evts.meta['Idet'] = 'column number on I detector axis'
tbhdu = fits.table_to_hdu(evts)
prihdr = fits.Header()
prihdu = fits.PrimaryHDU(header=prihdr)
fits_tools.common_keyword(tbhdu, t_start, t_stop,creator, 1.0, proc_id)
thdulist = fits.HDUList([prihdu, tbhdu])
thdulist[1].header['comment'] = "ECLAIRs calibrated events list"
thdulist[1].name = 'EVTCAL'
thdulist.writeto(filename, overwrite=True, checksum=True)
[docs] def write_L1(self, filename, creator, t_start=0, t_stop=0, proc_id="01"):
"""save the event in a ECLAIRs single event energy coded fits file
Common keywords are added to the header.
:param filename: PATH/name of the file to save to
:param start_time: events start time in s from MJDREF
:param t_stop: events end time in s from MJDREF
:param creator: program that generated the file
:type filename: string
:type start_time: float
:type t_stop: float
:type creator: string (default='unknown')
"""
if self.nb_row == 0:
print("WARNING: no event !!")
else:
if t_start==0:
t_start = self.tb_evts['time'].min()
if t_stop==0:
t_stop = self.tb_evts['time'].max()
evts = deepcopy(self.tb_evts)
evts['Z'], evts['Y'] = arr_conv.yzegp_to_ijdet(evts['Y'], evts['Z'])
evts['Y'].name = 'Jdet'
evts['Z'].name = 'Idet'
del evts.meta['Y']
del evts.meta['Z']
del evts.meta['PI']
del evts.meta['FLAG']
evts.meta['Jdet'] = 'column number on J detector axis'
evts.meta['Idet'] = 'column number on I detector axis'
evts.meta['time'] = 'time of the event (from PPS and clock)'
evts.remove_columns(['PI', 'FLAG'])
ebd = at.Column(np.zeros(self.nb_row), name='EBD', meta={'EBD' : 'energy band (0 to 3, -1 for SEU, 4 for SEO'}, dtype='u1')
evts.add_column(ebd)
tbhdu = fits.table_to_hdu(evts)
prihdr = fits.Header()
prihdu = fits.PrimaryHDU(header=prihdr)
fits_tools.common_keyword(tbhdu, t_start, t_stop,creator, 2.0, proc_id)
thdulist = fits.HDUList([prihdu, tbhdu])
thdulist[1].header['comment'] = "ECLAIRs single events list, with energy coded"
thdulist[1].name = 'ECL-EVT-SEC'
thdulist.writeto(filename, overwrite=True, checksum=True)