Source code for ecpi.common.io.events

"""Class to manage events
"""

import logging
import shutil
import os
import os.path as osp
from copy import deepcopy
import astropy.table as at
import astropy.units as u
from astropy.io import fits
import numpy as np
import matplotlib.pyplot as plt
from ecpi.common.io import fits_tools
import ecpi.common.instru.array_convention as arr_conv

#pylint:disable=W0102

[docs]class XEvtData(object): """standard container for ECLAIRs' event or data file """ def __init__(self): """**constructor** : init with an empty, photons table """ # declare logger self.logger = logging.getLogger(__name__) self.nb_pix = 80 # ATTENTION !! Ceci est temporaire et DOIT etre changer pour une forme plus robuste self.name_col = ('TIME', 'Y', 'Z', 'energy') self.meta_col = {'TIME': 'arrival time in s after mjdref keyword', 'Y': "detector pixel 0-79", 'Z': "detector pixel 0-79", 'energy': 'photon energy in adu'} self.dtype_col = ('f8', 'u1', 'u1', 'i2') self.tb_evts = at.Table(names=self.name_col, meta=self.meta_col, dtype=self.dtype_col) self.tb_evts['TIME'].unit = u.s self.tb_evts['energy'].unit = u.adu self.tb_evts['Y'].unit = u.pix self.tb_evts['Z'].unit = u.pix self.start_time = 0 @property def nb_row(self): return len(self.tb_evts)
[docs] def sort_with_time(self): """sort the event table according to the arrival time of each photon """ self.tb_evts.sort('TIME')
[docs] def write(self, filename, creator='unkowwn', t_start=0, t_stop=0): """save the event in a fits file :param filename: PATH/name of the file to save to :param start_time: events start time in s from MJDREF :param t_stop: events end time in s from MJDREF :param creator: program that generated the file :type filename: string :type start_time: float :type t_stop: float :type creator: string (default='unknown') """ if t_start == 0: t_start = self.tb_evts['TIME'].min() if t_stop == 0: t_stop = self.tb_evts['TIME'].max() evts = deepcopy(self.tb_evts) evts['Z'], evts['Y'] = arr_conv.yzegp_to_ijdet(evts['Y'], evts['Z']) evts['Y'].name = 'JDET' evts['Z'].name = 'IDET' tbhdu = fits.table_to_hdu(evts) prihdr = fits.Header() prihdu = fits.PrimaryHDU(header=prihdr) fits_tools.common_keyword(prihdu, t_start, t_stop, creator, 1.0, '') prihdu.header['CARD'] = ("ECL-EVT", "Product type") fits_tools.common_keyword(tbhdu, t_start, t_stop, creator, 1.0, '') thdulist = fits.HDUList([prihdu, tbhdu]) thdulist[1].header['comment'] = "ECLAIRs events list" thdulist[1].name = 'events table' thdulist.writeto(filename, overwrite=True, checksum=True)
[docs] def read(self, filename, add_data=False): """read an event from a fits file and save it into the tb_evts attribute the events are sorted according to arrival time :param filename: PATH/name of the event file to read :param add_data: replace or add to precedent data. Default is False :type filename: string :type add_data: bool """ if add_data: evts_to_add = at.Table.read(filename, format='fits') evts_to_add['JDET'], evts_to_add['IDET'] = \ arr_conv.ijdet_to_yzegp(evts_to_add['IDET'], evts_to_add['JDET']) evts_to_add['IDET'].name = 'Z' evts_to_add['JDET'].name = 'Y' self.tb_evts = at.vstack([self.tb_evts, evts_to_add], metadata_conflicts='silent') self.sort_with_time() else: self.tb_evts = at.Table.read(filename, format='fits') self.tb_evts['JDET'], self.tb_evts['IDET'] = \ arr_conv.ijdet_to_yzegp(self.tb_evts['IDET'], self.tb_evts['JDET']) self.tb_evts['IDET'].name = 'Z' self.tb_evts['JDET'].name = 'Y' self.sort_with_time()
[docs] def set_evt(self, data_table): """set the data event table with the input array :param data_table: input data array to read from :type data_table: array """ self.tb_evts = at.Table(np.array(data_table), names=self.name_col, meta=self.meta_col, dtype=self.dtype_col)
[docs] def extract_image(self, t_1, t_2, energy_band=[0, 1023], photon_sum='energy'): """extract one image from the data event table The image is build with photons between t_1 and t_2, and within 'energy_band'. :param t_1: start time to integrate the image :param t_2: end time for the integration :param photon_sum: column name to use to sum photons in the pixels image. Default='energy'. :param energy_band: energy band to sum the photon within. Default is [0, 1023]. :type t_1: float in 'time' format :type t_2: float in "time" format :type photon_sum: string :type energy_band: [int, int] :return: the image integrated between t_1 and t_2, within 'energy_band' :rtype: float array """ img = np.zeros((self.nb_pix, self.nb_pix)) window_index = np.where(((t_1 <= self.tb_evts['TIME']) & (self.tb_evts['TIME'] < t_2)) & \ ((energy_band[0] <= self.tb_evts[photon_sum]) & \ (self.tb_evts[photon_sum] <= energy_band[1])))[0] if len(window_index) == 0: self.logger.debug('No events found that satisfies time and energy conditions') self.logger.debug(f'Time interval: [{t_1}, {t_2}] s') self.logger.debug(f'Energy band channel indices: {energy_band}') return img for photon in np.nditer(self.tb_evts[window_index]): img[photon['Y'], photon['Z']] += 1 return img
[docs] def construct_images(self, t_window, t_step, method='sliding_window'): """construct a suite of images from the data event table images integrated during time = t_window with time step = t_step - if t_window == t_step: images are successives - if t_window == t_max - t_min: return one stacked array 2 construct methods : sliding_window or cumulative - sliding_window : images are independently computed - cumulative : images are added with the precedent one .. note:: may be some data not included from the end of the event file :param t_window: integration window of the images, in 'time' format. Must be < t_max - t_min :type t_window: float :param t_step: step between two beginning time of windows in 'time' format :type t_step: float :param method: construction method: 'sliding_window' or 'cumulative'. Default='sliding_window' :type method: string :return: constructed images :rtype: float np.array """ t_min = self.tb_evts['TIME'].min() t_max = self.tb_evts['TIME'].max() if (t_window == t_max - t_min): return self.extract_image(t_min, t_max + 1) elif ((0 < t_window < t_max - t_min) and (method == 'sliding_window')): arr_images = np.empty((0, self.nb_pix, self.nb_pix)) t_1 = t_min t_2 = t_window + t_min while (t_2 <= t_max): one_image = self.extract_image(t_1, t_2) arr_images = np.append(arr_images, [one_image], axis=0) t_1 += t_step t_2 += t_step return arr_images elif ((0 < t_window < t_max - t_min) and (method == 'cumulative')): arr_images = np.empty((0, self.nb_pix, self.nb_pix)) t_1 = t_min t_2 = t_window + t_min one_image = self.extract_image(t_1, t_2) arr_images = np.append(arr_images, [one_image], axis=0) t_2 += t_step while (t_2 <= t_max): one_image = self.extract_image(t_1, t_2) arr_images = np.append(arr_images, [one_image], axis=0) t_2 += t_step return arr_images else: msg = f"The time window {t_window} must have a shorter duration than" raise ValueError(f"{msg} the duration of the event in question {t_max}")
[docs] def create_movie(self, name_file, erase_images=True): self.sort_with_time() images_path = osp.join(osp.dirname(name_file), 'tmp') self.logger.debug(images_path) try: os.mkdir(images_path) except: # TODO: osp.join : How to modify this command # os.system('rm -rf '+images_path+'/*') shutil.rmtree(images_path) os.makedirs(images_path) self.save_images(images_path, title=os.path.basename(name_file)) # os.system('ffmpeg -r 3 -i ' + images_path + '/img%02d.png -vcodec mpeg4 -y ' + name_file + '.mp4') # TODO: osp.join : to check if this works...!!! # cmd = f"ffmpeg -r {1/self.movie_step} -i {osp.join(images_path, 'img%05d.png')}" cmd = f"ffmpeg -r {1/self.movie_step} -i {images_path}/img%05d.png" os.system(f"{cmd} -vcodec mpeg4 -y -b 1000000 {name_file}.mp4") if erase_images: shutil.rmtree(images_path)
[docs] def save_images(self, images_path, t_window=1.0, t_step=0.5, method='sliding_window', title=""): """construct and save a suite of images from the data event table 2 construct methods : sliding_window or cumulative - sliding_window : images are independently computed - cumulative : integration time of images increase of t_step each step (t_window == size of the initial time window) images integrated during time = t_window with time step = t_step - if t_window == t_step: images are successives - if t_window == t_max: return one stacked array :param t_window: integration window of the images, in 'time' format. Must be < t_max - t_min. :type t_window: float :param t_step: step between two beginning time of windows in 'time' format :type t_step: float :param images_path: path to the directory to save images into :type images_path: string (PATH) :param method: construction method: 'sliding_window' or 'cumulative'. Default='sliding_window' :type method: string """ self.movie_step = t_step arr_images = self.construct_images(t_window, t_step, method) time_unit = 's' if arr_images.ndim == 2: plt.figure() # plt.title('shadowgram_%02d' % (i) ) plt.title('shadowgram_00') plt.imshow(arr_conv.yzegp_to_ijdet_array(arr_images)) cbar = plt.colorbar() cbar.set_label('photons number') # ATTENTION au format d'affichage des temps plt.text(-10, 68, ' t_window={:.0f}{}\n'.format(t_window, time_unit), \ bbox={'facecolor': 'white', 'pad': 5}) plt.savefig(osp.join(images_path, 'img00.png')) plt.close() else: z_min = arr_images.min() z_max = arr_images.max() if method == 'cumulative': for i in range(len(arr_images)): plt.figure() # plt.title('shadowgram_%02d' % (i) ) plt.title(title) plt.imshow(arr_conv.yzegp_to_ijdet_array(arr_images[i]), vmin=z_min, vmax=z_max) cbar = plt.colorbar() cbar.set_label('photons number') # ATTENTION au format d'affichage des temps # plt.text(-10, 68, ' t_window={:.0f}{}'.format(t_window + t_step * i, time_unit), bbox={'facecolor': 'white', 'pad': 5}) # TODO: osp.join check the following line plt.savefig(images_path + '/img{:05d}.png'.format(i)) plt.close() else: for i in range(len(arr_images)): plt.figure() # plt.title('shadowgram_%02d' % (i) ) plt.title(title) plt.imshow(arr_conv.yzegp_to_ijdet_array(arr_images[i]), vmin=z_min, vmax=z_max) cbar = plt.colorbar() cbar.set_label(f'photons number integrated on {t_window}s') # ATTENTION au format d'affichage des temps # plt.text(-10, 68, ' t_window={0:.0f}{2}\n time={1:.0f}{2}'.format(t_window, t_step * i, time_unit), bbox={'facecolor': 'white', 'pad': 5}) # TODO: osp.join check the following line plt.savefig(images_path + '/img{:05d}.png'.format(i)) plt.close()
[docs]class EclairsCalEvtData(XEvtData): """Standard container for ECLAIRs' calibrated event data """ def __init__(self, nb_line=None): """**constructor** : init with an empty counts table """ self.d_gti = {} self.nb_pix = 80 # ATTENTION !! Ceci est temporaire et DOIT etre change pour une forme plus robuste self.name_col = ('TIME', 'Y', 'Z', 'PHA', 'FLAG') self.meta_col = { 'TIME': 'Universal Time in s from MJDREF', 'Y': "detector pixel 0-79", 'Z': "detector pixel 0-79", 'PHA': 'Pulse Height Amplitude energy channel (0-1023)', # 'PI' : 'Pulse height Invariant energy channel (0-1023)', 'FLAG': 'Flag pixel status' } # warning! i1 is interpreted as a bool by cfits self.dtype_col = ('f8', 'u1', 'u1', 'u2', 'b') dt_numpy = ('f8, u1, u1, u2, b') if nb_line: empty_array = np.zeros(nb_line, dtype=dt_numpy) self.tb_evts = at.Table(data=empty_array, names=self.name_col, meta=self.meta_col, dtype=self.dtype_col) else: self.tb_evts = at.Table(names=self.name_col, meta=self.meta_col, dtype=self.dtype_col) self.tb_evts['TIME'].unit = u.s; self.tb_evts['PHA'].unit = u.adu self.tb_evts['Y'].unit = u.pix self.tb_evts['Z'].unit = u.pix # declare logger self.logger = logging.getLogger(__name__)
[docs] def add_evt(self, eced): assert isinstance(eced, EclairsCalEvtData) self.tb_evts = at.vstack([self.tb_evts, eced.tb_evts])
[docs] def set_evt_from_std_array(self, data_table): """set the data event table with the input array (time, y, z, pha) :param data_table: input data table [time, y, z, pha] :type data_table: array """ self.tb_evts = at.Table(np.array(data_table), names=self.name_col[0:4], meta=self.meta_col, dtype=self.dtype_col[0:4]) self.tb_evts['TIME'].unit = u.s; self.tb_evts['PHA'].unit = u.adu self.tb_evts['Y'].unit = u.pix self.tb_evts['Z'].unit = u.pix flag_column = at.Column([0], name='FLAG', meta={'FLAG': 'Flag pixel status'}, dtype='i4') self.tb_evts['FLAG'] = flag_column
[docs] def extract_image(self, t_1, t_2, energy_range=[0, 1023], val='PHA'): """extract one image from the data event table extract image by summing the PHA of photons in the pixels of the image :param t_1: start time to integrate the image :param t_2: end time for the integration :param energy_range: energy band to sum the photon within. Default is [0, 1023]. :type t_1: float in 'time' format :type t_2: float in "time" format :type energy_range: [int, int] :return: the image integrated between t_1 and t_2, and within the energy range :rtype: float array """ return super().extract_image(t_1, t_2, energy_band=energy_range, photon_sum=val)
[docs] def extract_images(self, t_1, t_2, energy_ranges, val='PHA'): """extract images according to the time and energy ranges in input :param t_1: start time to integrate the image :type t_1: float in 'time' format :param t_2: end time for the integration :type t_2: float in "time" format :param energy_ranges: different energy ranges (in PI) to extract the images [[low_energy, high_energy],[]] :type energy_ranges: [[int, int]] :return: extracted images between t_1 and t_2, and within the energy ranges :rtype: list(array) """ images = [] for energy_range in energy_ranges: extracted_img = self.extract_image(t_1, t_2, energy_range, val=val) if np.any(extracted_img < 0): msg = 'Negative value in extracted image' self.logger.error(f'{msg} ({t_1}, {t_2}, {energy_range})') images.append(extracted_img) return images
def _save_from_IJdet(self, evts, add_data=False): """save it into the tb_evts attribute :param evts: events in (Y,Z) :param add_data: replace or add to precedent data. Default is False :type filename: astropy.table.Table :type add_data: bool """ if (not add_data): self.tb_evts = evts self.tb_evts['JDET'], self.tb_evts['IDET'] = \ arr_conv.ijdet_to_yzegp(self.tb_evts['IDET'], self.tb_evts['JDET']) self.tb_evts['IDET'].name = 'Z' self.tb_evts['JDET'].name = 'Y' else: evts['JDET'], evts['IDET'] = arr_conv.ijdet_to_yzegp(evts['IDET'], evts['JDET']) evts['IDET'].name = 'Z' evts['JDET'].name = 'Y' self.tb_evts = at.vstack([self.tb_evts, evts], metadata_conflicts='silent') self.sort_with_time()
[docs] def read(self, filename, add_data=False): """ read an event from a fits file (in IJdet convention) and save it into the tb_evts attribute :param filename: PATH/name of the event file to read :param add_data: replace or add to precedent data. Default is False :type filename: string :type add_data: bool """ tb_evts = at.Table.read(filename, format='fits') self._save_from_IJdet(tb_evts, add_data)
[docs] def write_L1(self, filename, creator="ECPI/APC", t_start=0, t_stop=0, proc_id="01"): """save the event in a ECLAIRs single event energy coded fits file Common keywords are added to the header. :param filename: PATH/name of the file to save to :param start_time: events start time in s from MJDREF :param t_stop: events end time in s from MJDREF :param creator: program that generated the file :type filename: string :type start_time: float :type t_stop: float :type creator: string (default='unknown') """ if self.nb_row == 0: # TODO: COVERAGE self.logger.warning('no event !') else: if t_start == 0: t_start = self.tb_evts['TIME'].min() if t_stop == 0: t_stop = self.tb_evts['TIME'].max() evts = deepcopy(self.tb_evts) evts['Z'], evts['Y'] = arr_conv.yzegp_to_ijdet(evts['Y'], evts['Z']) evts['Y'].name = 'JDET' evts['Z'].name = 'IDET' del evts.meta['Y'] del evts.meta['Z'] evts['PHA'] = np.uint16(evts['PHA']) del evts.meta['FLAG'] evts.meta['JDET'] = 'column number on J detector axis' evts.meta['IDET'] = 'column number on I detector axis' evts.meta['TIME'] = 'time of the event (from PPS and clock)' evts.remove_columns(['FLAG']) ebd = at.Column(np.zeros(self.nb_row), name='EBD', meta={'EBD': 'energy band (0 to 3, -1 for SEU, 4 for SEO'}, dtype='u1') evts.add_column(ebd) tbhdu = fits.table_to_hdu(evts) prihdr = fits.Header() prihdu = fits.PrimaryHDU(header=prihdr) fits_tools.common_keyword(prihdu, t_start, t_stop, creator, 1.0, proc_id) prihdu.header['CARD'] = ("ECL-EVT-SEC", "Product type") fits_tools.common_keyword(tbhdu, t_start, t_stop, creator, 2.0, proc_id) thdulist = fits.HDUList([prihdu, tbhdu]) thdulist[1].header['comment'] = "ECLAIRs single events list, with energy coded" thdulist[1].name = 'ECL-EVT-SEC' thdulist.writeto(filename, overwrite=True, checksum=True)