"""Class to manage events
"""
import logging
import shutil
import os
import os.path as osp
from copy import deepcopy
import astropy.table as at
import astropy.units as u
from astropy.io import fits
import numpy as np
import matplotlib.pyplot as plt
from ecpi.common.io import fits_tools
import ecpi.common.instru.array_convention as arr_conv
#pylint:disable=W0102
[docs]class XEvtData(object):
"""standard container for ECLAIRs' event or data file
"""
def __init__(self):
"""**constructor** : init with an empty, photons table
"""
# declare logger
self.logger = logging.getLogger(__name__)
self.nb_pix = 80 # ATTENTION !! Ceci est temporaire et DOIT etre changer pour une forme plus robuste
self.name_col = ('TIME', 'Y', 'Z', 'energy')
self.meta_col = {'TIME': 'arrival time in s after mjdref keyword',
'Y': "detector pixel 0-79",
'Z': "detector pixel 0-79",
'energy': 'photon energy in adu'}
self.dtype_col = ('f8', 'u1', 'u1', 'i2')
self.tb_evts = at.Table(names=self.name_col,
meta=self.meta_col,
dtype=self.dtype_col)
self.tb_evts['TIME'].unit = u.s
self.tb_evts['energy'].unit = u.adu
self.tb_evts['Y'].unit = u.pix
self.tb_evts['Z'].unit = u.pix
self.start_time = 0
@property
def nb_row(self):
return len(self.tb_evts)
[docs] def sort_with_time(self):
"""sort the event table according to the arrival time of each photon
"""
self.tb_evts.sort('TIME')
[docs] def write(self, filename, creator='unkowwn', t_start=0, t_stop=0):
"""save the event in a fits file
:param filename: PATH/name of the file to save to
:param start_time: events start time in s from MJDREF
:param t_stop: events end time in s from MJDREF
:param creator: program that generated the file
:type filename: string
:type start_time: float
:type t_stop: float
:type creator: string (default='unknown')
"""
if t_start == 0:
t_start = self.tb_evts['TIME'].min()
if t_stop == 0:
t_stop = self.tb_evts['TIME'].max()
evts = deepcopy(self.tb_evts)
evts['Z'], evts['Y'] = arr_conv.yzegp_to_ijdet(evts['Y'], evts['Z'])
evts['Y'].name = 'JDET'
evts['Z'].name = 'IDET'
tbhdu = fits.table_to_hdu(evts)
prihdr = fits.Header()
prihdu = fits.PrimaryHDU(header=prihdr)
fits_tools.common_keyword(prihdu, t_start, t_stop, creator, 1.0, '')
prihdu.header['CARD'] = ("ECL-EVT", "Product type")
fits_tools.common_keyword(tbhdu, t_start, t_stop, creator, 1.0, '')
thdulist = fits.HDUList([prihdu, tbhdu])
thdulist[1].header['comment'] = "ECLAIRs events list"
thdulist[1].name = 'events table'
thdulist.writeto(filename, overwrite=True, checksum=True)
[docs] def read(self, filename, add_data=False):
"""read an event from a fits file and save it into the tb_evts attribute
the events are sorted according to arrival time
:param filename: PATH/name of the event file to read
:param add_data: replace or add to precedent data. Default is False
:type filename: string
:type add_data: bool
"""
if add_data:
evts_to_add = at.Table.read(filename, format='fits')
evts_to_add['JDET'], evts_to_add['IDET'] = \
arr_conv.ijdet_to_yzegp(evts_to_add['IDET'], evts_to_add['JDET'])
evts_to_add['IDET'].name = 'Z'
evts_to_add['JDET'].name = 'Y'
self.tb_evts = at.vstack([self.tb_evts, evts_to_add], metadata_conflicts='silent')
self.sort_with_time()
else:
self.tb_evts = at.Table.read(filename, format='fits')
self.tb_evts['JDET'], self.tb_evts['IDET'] = \
arr_conv.ijdet_to_yzegp(self.tb_evts['IDET'], self.tb_evts['JDET'])
self.tb_evts['IDET'].name = 'Z'
self.tb_evts['JDET'].name = 'Y'
self.sort_with_time()
[docs] def set_evt(self, data_table):
"""set the data event table with the input array
:param data_table: input data array to read from
:type data_table: array
"""
self.tb_evts = at.Table(np.array(data_table), names=self.name_col,
meta=self.meta_col, dtype=self.dtype_col)
[docs] def construct_images(self, t_window, t_step, method='sliding_window'):
"""construct a suite of images from the data event table
images integrated during time = t_window with time step = t_step
- if t_window == t_step: images are successives
- if t_window == t_max - t_min: return one stacked array
2 construct methods : sliding_window or cumulative
- sliding_window : images are independently computed
- cumulative : images are added with the precedent one
.. note:: may be some data not included from the end of the event file
:param t_window: integration window of the images, in 'time' format. Must be < t_max - t_min
:type t_window: float
:param t_step: step between two beginning time of windows in 'time' format
:type t_step: float
:param method: construction method: 'sliding_window' or 'cumulative'.
Default='sliding_window'
:type method: string
:return: constructed images
:rtype: float np.array
"""
t_min = self.tb_evts['TIME'].min()
t_max = self.tb_evts['TIME'].max()
if (t_window == t_max - t_min):
return self.extract_image(t_min, t_max + 1)
elif ((0 < t_window < t_max - t_min) and (method == 'sliding_window')):
arr_images = np.empty((0, self.nb_pix, self.nb_pix))
t_1 = t_min
t_2 = t_window + t_min
while (t_2 <= t_max):
one_image = self.extract_image(t_1, t_2)
arr_images = np.append(arr_images, [one_image], axis=0)
t_1 += t_step
t_2 += t_step
return arr_images
elif ((0 < t_window < t_max - t_min) and (method == 'cumulative')):
arr_images = np.empty((0, self.nb_pix, self.nb_pix))
t_1 = t_min
t_2 = t_window + t_min
one_image = self.extract_image(t_1, t_2)
arr_images = np.append(arr_images, [one_image], axis=0)
t_2 += t_step
while (t_2 <= t_max):
one_image = self.extract_image(t_1, t_2)
arr_images = np.append(arr_images, [one_image], axis=0)
t_2 += t_step
return arr_images
else:
msg = f"The time window {t_window} must have a shorter duration than"
raise ValueError(f"{msg} the duration of the event in question {t_max}")
[docs] def create_movie(self, name_file, erase_images=True):
self.sort_with_time()
images_path = osp.join(osp.dirname(name_file), 'tmp')
self.logger.debug(images_path)
try:
os.mkdir(images_path)
except:
# TODO: osp.join : How to modify this command
# os.system('rm -rf '+images_path+'/*')
shutil.rmtree(images_path)
os.makedirs(images_path)
self.save_images(images_path, title=os.path.basename(name_file))
# os.system('ffmpeg -r 3 -i ' + images_path + '/img%02d.png -vcodec mpeg4 -y ' + name_file + '.mp4')
# TODO: osp.join : to check if this works...!!!
# cmd = f"ffmpeg -r {1/self.movie_step} -i {osp.join(images_path, 'img%05d.png')}"
cmd = f"ffmpeg -r {1/self.movie_step} -i {images_path}/img%05d.png"
os.system(f"{cmd} -vcodec mpeg4 -y -b 1000000 {name_file}.mp4")
if erase_images:
shutil.rmtree(images_path)
[docs] def save_images(self, images_path, t_window=1.0, t_step=0.5, method='sliding_window', title=""):
"""construct and save a suite of images from the data event table
2 construct methods : sliding_window or cumulative
- sliding_window : images are independently computed
- cumulative : integration time of images increase of t_step
each step (t_window == size of the initial time window)
images integrated during time = t_window with time step = t_step
- if t_window == t_step: images are successives
- if t_window == t_max: return one stacked array
:param t_window: integration window of the images, in 'time' format.
Must be < t_max - t_min.
:type t_window: float
:param t_step: step between two beginning time of windows in 'time' format
:type t_step: float
:param images_path: path to the directory to save images into
:type images_path: string (PATH)
:param method: construction method: 'sliding_window' or 'cumulative'.
Default='sliding_window'
:type method: string
"""
self.movie_step = t_step
arr_images = self.construct_images(t_window, t_step, method)
time_unit = 's'
if arr_images.ndim == 2:
plt.figure()
# plt.title('shadowgram_%02d' % (i) )
plt.title('shadowgram_00')
plt.imshow(arr_conv.yzegp_to_ijdet_array(arr_images))
cbar = plt.colorbar()
cbar.set_label('photons number')
# ATTENTION au format d'affichage des temps
plt.text(-10, 68, ' t_window={:.0f}{}\n'.format(t_window, time_unit), \
bbox={'facecolor': 'white', 'pad': 5})
plt.savefig(osp.join(images_path, 'img00.png'))
plt.close()
else:
z_min = arr_images.min()
z_max = arr_images.max()
if method == 'cumulative':
for i in range(len(arr_images)):
plt.figure()
# plt.title('shadowgram_%02d' % (i) )
plt.title(title)
plt.imshow(arr_conv.yzegp_to_ijdet_array(arr_images[i]), vmin=z_min, vmax=z_max)
cbar = plt.colorbar()
cbar.set_label('photons number')
# ATTENTION au format d'affichage des temps
# plt.text(-10, 68, ' t_window={:.0f}{}'.format(t_window + t_step * i, time_unit), bbox={'facecolor': 'white', 'pad': 5})
# TODO: osp.join check the following line
plt.savefig(images_path + '/img{:05d}.png'.format(i))
plt.close()
else:
for i in range(len(arr_images)):
plt.figure()
# plt.title('shadowgram_%02d' % (i) )
plt.title(title)
plt.imshow(arr_conv.yzegp_to_ijdet_array(arr_images[i]), vmin=z_min, vmax=z_max)
cbar = plt.colorbar()
cbar.set_label(f'photons number integrated on {t_window}s')
# ATTENTION au format d'affichage des temps
# plt.text(-10, 68, ' t_window={0:.0f}{2}\n time={1:.0f}{2}'.format(t_window, t_step * i, time_unit), bbox={'facecolor': 'white', 'pad': 5})
# TODO: osp.join check the following line
plt.savefig(images_path + '/img{:05d}.png'.format(i))
plt.close()
[docs]class EclairsCalEvtData(XEvtData):
"""Standard container for ECLAIRs' calibrated event data
"""
def __init__(self, nb_line=None):
"""**constructor** : init with an empty counts table
"""
self.d_gti = {}
self.nb_pix = 80 # ATTENTION !! Ceci est temporaire et DOIT etre change pour une forme plus robuste
self.name_col = ('TIME', 'Y', 'Z', 'PHA', 'FLAG')
self.meta_col = {
'TIME': 'Universal Time in s from MJDREF',
'Y': "detector pixel 0-79",
'Z': "detector pixel 0-79",
'PHA': 'Pulse Height Amplitude energy channel (0-1023)',
# 'PI' : 'Pulse height Invariant energy channel (0-1023)',
'FLAG': 'Flag pixel status'
}
# warning! i1 is interpreted as a bool by cfits
self.dtype_col = ('f8', 'u1', 'u1', 'u2', 'b')
dt_numpy = ('f8, u1, u1, u2, b')
if nb_line:
empty_array = np.zeros(nb_line, dtype=dt_numpy)
self.tb_evts = at.Table(data=empty_array, names=self.name_col,
meta=self.meta_col,
dtype=self.dtype_col)
else:
self.tb_evts = at.Table(names=self.name_col,
meta=self.meta_col,
dtype=self.dtype_col)
self.tb_evts['TIME'].unit = u.s;
self.tb_evts['PHA'].unit = u.adu
self.tb_evts['Y'].unit = u.pix
self.tb_evts['Z'].unit = u.pix
# declare logger
self.logger = logging.getLogger(__name__)
[docs] def add_evt(self, eced):
assert isinstance(eced, EclairsCalEvtData)
self.tb_evts = at.vstack([self.tb_evts, eced.tb_evts])
[docs] def set_evt_from_std_array(self, data_table):
"""set the data event table with the input array (time, y, z, pha)
:param data_table: input data table [time, y, z, pha]
:type data_table: array
"""
self.tb_evts = at.Table(np.array(data_table), names=self.name_col[0:4],
meta=self.meta_col, dtype=self.dtype_col[0:4])
self.tb_evts['TIME'].unit = u.s;
self.tb_evts['PHA'].unit = u.adu
self.tb_evts['Y'].unit = u.pix
self.tb_evts['Z'].unit = u.pix
flag_column = at.Column([0], name='FLAG', meta={'FLAG': 'Flag pixel status'}, dtype='i4')
self.tb_evts['FLAG'] = flag_column
def _save_from_IJdet(self, evts, add_data=False):
"""save it into the tb_evts attribute
:param evts: events in (Y,Z)
:param add_data: replace or add to precedent data. Default is False
:type filename: astropy.table.Table
:type add_data: bool
"""
if (not add_data):
self.tb_evts = evts
self.tb_evts['JDET'], self.tb_evts['IDET'] = \
arr_conv.ijdet_to_yzegp(self.tb_evts['IDET'], self.tb_evts['JDET'])
self.tb_evts['IDET'].name = 'Z'
self.tb_evts['JDET'].name = 'Y'
else:
evts['JDET'], evts['IDET'] = arr_conv.ijdet_to_yzegp(evts['IDET'], evts['JDET'])
evts['IDET'].name = 'Z'
evts['JDET'].name = 'Y'
self.tb_evts = at.vstack([self.tb_evts, evts], metadata_conflicts='silent')
self.sort_with_time()
[docs] def read(self, filename, add_data=False):
"""
read an event from a fits file (in IJdet convention) and save it into the tb_evts attribute
:param filename: PATH/name of the event file to read
:param add_data: replace or add to precedent data. Default is False
:type filename: string
:type add_data: bool
"""
tb_evts = at.Table.read(filename, format='fits')
self._save_from_IJdet(tb_evts, add_data)
[docs] def write_L1(self, filename, creator="ECPI/APC", t_start=0, t_stop=0, proc_id="01"):
"""save the event in a ECLAIRs single event energy coded fits file
Common keywords are added to the header.
:param filename: PATH/name of the file to save to
:param start_time: events start time in s from MJDREF
:param t_stop: events end time in s from MJDREF
:param creator: program that generated the file
:type filename: string
:type start_time: float
:type t_stop: float
:type creator: string (default='unknown')
"""
if self.nb_row == 0:
# TODO: COVERAGE
self.logger.warning('no event !')
else:
if t_start == 0:
t_start = self.tb_evts['TIME'].min()
if t_stop == 0:
t_stop = self.tb_evts['TIME'].max()
evts = deepcopy(self.tb_evts)
evts['Z'], evts['Y'] = arr_conv.yzegp_to_ijdet(evts['Y'], evts['Z'])
evts['Y'].name = 'JDET'
evts['Z'].name = 'IDET'
del evts.meta['Y']
del evts.meta['Z']
evts['PHA'] = np.uint16(evts['PHA'])
del evts.meta['FLAG']
evts.meta['JDET'] = 'column number on J detector axis'
evts.meta['IDET'] = 'column number on I detector axis'
evts.meta['TIME'] = 'time of the event (from PPS and clock)'
evts.remove_columns(['FLAG'])
ebd = at.Column(np.zeros(self.nb_row), name='EBD',
meta={'EBD': 'energy band (0 to 3, -1 for SEU, 4 for SEO'},
dtype='u1')
evts.add_column(ebd)
tbhdu = fits.table_to_hdu(evts)
prihdr = fits.Header()
prihdu = fits.PrimaryHDU(header=prihdr)
fits_tools.common_keyword(prihdu, t_start, t_stop, creator, 1.0, proc_id)
prihdu.header['CARD'] = ("ECL-EVT-SEC", "Product type")
fits_tools.common_keyword(tbhdu, t_start, t_stop, creator, 2.0, proc_id)
thdulist = fits.HDUList([prihdu, tbhdu])
thdulist[1].header['comment'] = "ECLAIRs single events list, with energy coded"
thdulist[1].name = 'ECL-EVT-SEC'
thdulist.writeto(filename, overwrite=True, checksum=True)