"""
@author: BACON Philippe, APC/IN2P3/CNRS
"""
import logging
from astropy.time import Time, TimeDelta
import numpy as np
from ecpi.common import add_path_current_module
from ecpi.process.bube.core.main_bube import bube
from ecpi.process.generic.composant import GenericComposantProcessing
import ecpi.process.bube.io.outputs as buio
from ecpi.process.dpco.io.outputs import save_cal_events
from ecpi.common.mission.attitude import AttitudeECLAIRs
from ecpi.common.io.events import EclairsCalEvtData
from ecpi.common.mission.observation import EclairsObservation
from ecpi.common.io.fits_tools import get_fits_files_with_extname
from ecpi.common.mission.time import convert_mjd_in_svomref_seconds_ref
from ecpi.common.mission.time import from_isot_to_mjd
from ecpi.common.mission.orbit import EclairsOrbit
s_logger = logging.getLogger(__name__)
BUBE_FILE_SCHEME = add_path_current_module(__file__, "io/bube_schema.json")
[docs]class ProcessBubeWithFiles(GenericComposantProcessing):
"""
High level handling of BUBE module.
"""
def __init__(self):
"""**Constructor**
"""
super().__init__(BUBE_FILE_SCHEME)
self.name = "bube"
self.error_range = 300
dict_comp = {
10: "error in ECL-EVT-CAL product generation",
20: "error energy ranges",
21: "error starting time",
22: "error no ECL-EVT-SEC files found",
23: "error no SVO-ATT-CNV files found",
24: "something wrong happened during BUBE module running",
25: "error in ECL-DET-UBC product generation",
26: "starting times does not match between event and attitude files",
27: "python exception"
}
self.d_status.update(dict_comp)
self.t_start = None
def _load_check_file_in(self):
"""Checks for existence of ECL-EVT-sEC & SVO-ATT-CNV files,
load events in memory ad create input dictionnary for BUBE module.
initialize self.d_data_in like key
"evt" : object EclairsCalEvtData
"att" : SVO_ATT-CNV
"orb" : SVO-ORB-CNV
"ptgori_mean" : with SVO_ATT-CNV
"""
s_logger.debug('Running _load_check_file_in [bube]')
if not GenericComposantProcessing._load_check_file_in(self):
return False
# check if event & attitude files exist in input working dir.
event_files = get_fits_files_with_extname('ECL-EVT-SEC', self.d_pars['working_in'])
self.attitude_files = get_fits_files_with_extname('SVO-ATT-CNV', self.d_pars['working_in'])
self.orbit_files = get_fits_files_with_extname('SVO-ORB-CNV', self.d_pars['working_in'])
# check name file is not empty
if event_files == []:
self.status = 22
if self.status == 0:
if self.attitude_files == []:
self.status = 23
if self.status == 0:
if self.orbit_files == []:
self.status = 21
if self.status > 0:
return False
# load event files content in memory.
events = EclairsCalEvtData()
try:
for file in event_files:
events.read(file, add_data=True)
except:
s_logger.exception(f"ERROR read file '{file}'")
self.status = 30
return False
try:
t_att = buio.read_table_svo_att_cnv(self.attitude_files[0])
t_orb = buio.read_table_svo_orb_cnv(self.orbit_files[0])
except:
s_logger.exception("ERROR read ATT or ORB")
self.status = 29
return False
#TODO: must define with GTI
# define initial pointing
att = AttitudeECLAIRs()
quater = t_att['QPARAM'][0]
# use svom quaternion
att.set_attitude_svom_quater(quater)
# Compute pointing and orientation
ptg = att.ptg_instru(deg=True)[0]
ori = np.array([att.ori_instru(deg=True)])
ptgori = np.concatenate((ptg, ori))
# TODO : remove this switch over time conventions
# that depend on "mode" parameter
if self.d_pars['mode'] == 'static':
t_start = Time(self.d_pars['t_start']).mjd
t_stop = t_start + TimeDelta(self.d_pars['t_exposure']).value / 24 / 3600.
# set time reference SVOM
t_ref = Time("2017-01-01T00:00:00.000").mjd
self.t_start = convert_mjd_in_svomref_seconds_ref(t_start, t_ref)
self.t_stop = convert_mjd_in_svomref_seconds_ref(t_stop, t_ref)
elif self.d_pars['mode'] == 'dynamic':
# comply with DC2 time conversion conventions
t_start = Time(self.d_pars['t_start'])
t_stop = (t_start + TimeDelta(self.d_pars['t_exposure'], format='sec')).isot
self.t_start = from_isot_to_mjd(t_start)
self.t_stop = from_isot_to_mjd(t_stop)
self.d_data_in = {
'evt': events,
'att': t_att,
'orb': t_orb
}
# ADD pointing mean
self.d_data_in['ptgori_mean'] = ptgori
return True
def _create_output_files(self):
"""Create BUBE scientific files.
Call _create_output_file_ECL_DET_UBC and/or
_create_output_file_ECL_EVT_CAL or None functions depending
on .ini parameter file.
"""
s_logger.debug('Running _create_output_files [bube]')
if self.status != 0:
return False
product_files = self.d_pars['out_files']
if 'ECL-DET-UBC' in product_files:
self._create_output_file_ECL_DET_UBC()
if 'ECL-EVT-CAL' in product_files:
self._create_output_file_ECL_EVT_CAL()
return True
def _create_output_file_ECL_DET_UBC(self):
"""Create ECL-DET-UBC files.
"""
if self.status != 0:
return False
det_image = self.d_data_out['detector_images']
tstart = self.d_data_in['pvtatt'].start_time
tstop = self.d_data_in['pvtatt'].end_time
dest_dir = self.d_pars['working_out']
attitude = self.d_data_in['pvtatt'].attitude
eded = self.d_data_in['pvtatt'].dpix
try:
buio.save_ecl_det_ubc(det_image, tstart, tstop,
dest_dir, attitude, eded,
creator='ProcessBubeWithFiles', proc_id="01")
except (ValueError, KeyError):
s_logger.exception("BUBE exception")
self.status = 25
return False
return True
def _create_output_file_ECL_EVT_CAL(self):
"""Create ECL-EVT-CAL files.
..warning: this function has to be removed after DC1.
"""
if self.status != 0:
return False
tstart = self.d_data_in['pvtatt'].start_time
workdir_in = self.d_pars['working_in']
workdir_out = self.d_pars['working_out']
dest_dir = self.d_pars['working_out']
try:
save_cal_events(workdir_in, workdir_out, tstart, dest_dir)
except (ValueError, KeyError):
s_logger.exception("BUBE exception")
self.status = 10
return False
return True
def _before_process(self):
"""
only to use old definition of d_data_in
"""
# load attitude files content in memory.
evt = self.d_data_in["evt"]
assert isinstance(evt, EclairsCalEvtData)
if self.t_start is None:
self.t_start = evt.tb_evts["TIME"][0]
self.t_stop = evt.tb_evts["TIME"][-1]
self.attitude_files = get_fits_files_with_extname('SVO-ATT-CNV', self.d_pars['working_in'])
self.orbit_files = get_fits_files_with_extname('SVO-ORB-CNV', self.d_pars['working_in'])
d_attitude = buio.read_attitude_files(self.attitude_files)
d_orbit = buio.read_orbit_files(self.orbit_files)
ecl_orb = EclairsOrbit(
orbit_filename = self.orbit_files[0],
attitude_filename = self.attitude_files[0])
ecl_obs = EclairsObservation(self.t_start, self.t_stop,
self.d_data_in["ptgori_mean"],
d_orbit['POSITION'],
d_attitude['ANGLE_VEL'],
self.mod_effect)
ecl_obs.gti_compute()
self.d_data_in = {
'energy_ranges': self.d_pars['energy_channels'],
'bkg_correction_mode': self.d_pars['bkg_correction_mode'],
'mode': self.d_pars['mode'],
'list_events': [self.d_data_in['evt']],
'pvtatt': ecl_obs,
'orbit': ecl_orb
}
return True
def _process_composant(self):
"""Run BUBE module.
"""
s_logger.debug('Running _process_composant [bube]')
self._before_process()
if self.status != 0:
return False
# call core bube with data_in
try:
detectim, eclo, error_code, error_msg = bube(self.d_data_in)
s_logger.debug(f"{error_code}: {error_msg} [bube]")
except (ValueError, KeyError):
s_logger.exception("BUBE exception")
self.status = 27
return False
if error_code == "NOK":
#TODO: reuse error_code of module
self.status = 24
return False
# create output for next composant
self.d_data_out = {
'eclairs_observation': eclo,
'detector_images': detectim
}
return True
def _check_extra(self):
"""Check parameters specific to the BUBE module in parameter INI file.
"""
s_logger.debug('Running _check_extra [bube]')
if self.status != 0:
return
# check_params if t_start posterior to MJDREF time origin.
t_mjdref = Time("1858-11-17T00:00:00.000")
t_start = Time(self.d_pars["t_start"])
if t_start.mjd - t_mjdref.mjd < 0:
self.status = 21
# check_params energy bands.
energy_ranges = self.d_pars['energy_channels']
idx_max = self.mod_effect.chan_nb()
# check interval energy
# * must be >=0
# * must be < max chanel
# * left chanel < right chanel
for energy_range in energy_ranges:
if not (isinstance(energy_range, list)
and len(energy_range) == 2
and isinstance(energy_range[0], int)
and isinstance(energy_range[1], int)):
self.status = 20
return False
elif not (energy_range[1] >= 0
and energy_range[0] >= 0
and energy_range[0] < idx_max
and energy_range[1] < idx_max
and energy_range[1] >= energy_range[0]):
self.status = 20
return False
return True