Source code for ecpi.process.bube.bube_with_files

"""Generic component implementation for BUBE
"""
import os.path as osp
import logging
from astropy.time import Time, TimeDelta
import numpy as np

from ecpi.common import add_path_current_module
from ecpi.process.bube.core.main_bube import bube
from ecpi.process.generic.component import GenericComponentProcessing
import ecpi.pipeline.io.inputs as ecio
from ecpi.process.bube.io.outputs import genfits_ecl_sdp_bube
from ecpi.common.io.events import EclairsCalEvtData
from ecpi.common.io.fits_tools import get_fits_files_with_extname

s_logger = logging.getLogger(__name__)

BUBE_FILE_SCHEME = add_path_current_module(__file__, osp.join("io", "bube_schema.json"))


[docs]class ProcessBubeWithFiles(GenericComponentProcessing): """ High level handling of BUBE module. """ def __init__(self): """**Constructor** """ super().__init__(BUBE_FILE_SCHEME) self.name = "bube" self.error_range = 300 dict_comp = { 10: "error in ECL-EVT-CAL product generation", 21: "error starting time", 22: "error no ECL-EVT-SEC files found", 23: "error no SVO-ATT-CNV files found", 24: "something wrong happened during BUBE module running", 25: "error in ECL-DET-UBC product generation", 26: "starting times does not match between event and attitude files", 27: "python exception", 28: "malformed energy channels in parameters", 29: "energy bounds outside instrument capabilities (ARF)" } self.d_status.update(dict_comp) self.t_start = None def _load_check_file_in(self): """Checks for existence of ECL-EVT-sEC & SVO-ATT-CNV files, load events in memory ad create input dictionary for BUBE module. initialize self.d_data_in like key "evt" : object EclairsCalEvtData "att" : SVO_ATT-CNV "orb" : SVO-ORB-CNV "ptgori_mean" : with SVO_ATT-CNV """ s_logger.debug(f'Running _load_check_file_in [{self.name}]') if not super()._load_check_file_in(): return False if self.data_flow.is_ready(self.name): s_logger.info('Not necessary to load new data') return True # check if event & attitude files exist in input working dir. # TODO: change ECL-EVT-SEC once CALI was activated evt_files = get_fits_files_with_extname('ECL-EVT-SEC', self.d_pars['working_in']) pvt_files = get_fits_files_with_extname('SVO-ORB-CNV', self.d_pars['working_in']) att_files = get_fits_files_with_extname('SVO-ATT-CNV', self.d_pars['working_in']) if not evt_files: self.status = 22 return False if not att_files: self.status = 23 return False if not pvt_files: self.status = 21 return False # load event files into memory. events = EclairsCalEvtData() try: for file in evt_files: s_logger.info(f"Reading event file {file} in [{self.name}]") events.read(file, add_data=True) except IOError: s_logger.exception(f"Reading event file {file} in [{self.name}]") self.status = 30 return False # load attitude and position files into memory try: t_att = ecio.read_files(att_files) t_pvt = ecio.read_files(pvt_files) except IOError: s_logger.exception("ERROR read ATT or ORB") self.status = 29 return False # set events, attitude and position into data__flow if not self.data_flow.set_aav_pvt(t_att, t_pvt, compute_eof=True): s_logger.error(f'Error setting data in attPvt object from {att_files}') return False if not self.data_flow.set_events(events): s_logger.error(f'Error setting data in Events object from {evt_files}') return False return True def _create_output_files(self): """Create BUBE scientific files. Call _create_output_file_ECL_DET_UBC and/or _create_output_file_ECL_EVT_CAL or None functions depending on .ini parameter file. """ s_logger.debug(f'Running _create_output_files [{self.name}]') if self.status != 0: return False response = True for model in self.d_pars['out_files']: bube_sdp_data = self.data_flow.get_sdp_data(model) if not genfits_ecl_sdp_bube(bube_sdp_data, model, self.d_pars['working_out']): s_logger.exception(f'Creating {model} product!') self.status = 23 response = False s_logger.info(f'Successfully created {model} product') return response def _process_component(self): """Run BUBE module. """ if self.status != 0: return False try: detectim, gtis, error_code, error_msg = bube(self.data_flow, self.d_pars) s_logger.debug(f"{error_code}: {error_msg} [bube]") except (ValueError, KeyError): s_logger.exception("BUBE exception") self.status = 27 return False # create output for next component self.data_flow.set_det_images(detectim) if "ear_occ_bub" not in self.d_pars.keys() or not self.d_pars["ear_occ_bub"]: self.data_flow.set_earth_gtis(gtis, exist_ok=True) print(f"Returning {len(detectim)} detector images for image treatment") return True def _check_extra(self): """Check parameters specific to the BUBE module in parameter INI file. """ s_logger.debug('Running _check_extra [bube]') if self.status != 0: return False # TODO: recover this test once the time format was established # check_params if t_start posterior to MJDREF time origin. # t_mjdref = Time("1858-11-17T00:00:00.000") # t_start = Time(self.d_pars["t_start"]) # if t_start.mjd - t_mjdref.mjd < 0: # s_logger.error("Starting time previous to the MJDREF time origin!") # self.status = 21 # We only must check that in [[a,b],[c,d],..] we have a < b, c < d, ... for energy_range in self.d_pars['energy_channels']: low_chan, high_chan = energy_range if not (isinstance(low_chan, int) and isinstance(high_chan, int) \ and low_chan < high_chan): s_logger.error("Malformed energy_channels bouds in input parameter file!") self.status = 28 return False # if high_chan > self.mod_effect.chan_nb(): # s_logger.error("Energy_channel entry outside of actual charged ARF file!") # self.status = 29 # return False if len(self.d_pars['ear_occ_bub']) != len(self.d_pars['bkg_cor_mod']): s_logger.error("The number of elements in *ear_occ_bub* and *bkg_cor_mod* differs!") self.status = 30 return False return True