Source code for ecpi.process.pipeline.lib_ecpi_pipeline

"""
@author: BACON Philippe, APC/IN2P3/CNRS
"""

import os
import time
import configparser
import logging
from glob import glob
from collections import OrderedDict

from ecpi.common import add_path_current_module
from ecpi.common import add_path_process_eclairs
from ecpi.common import add_path_data_ref_eclairs
from ecpi.database.interact_sdb import receive_event_att_files
from ecpi.database.interact_sdb import send_product_ecpi
import ecpi.database.interact_caldb as caldb 
from ecpi.process.generic.composant import GenericComposantProcessing
from ecpi.process.bube.bube_with_files import ProcessBubeWithFiles
from ecpi.process.imag.imag_with_files import ProcessImagWithFiles
import ecpi.common.params.tools_check as tpars
import ecpi.common.instru.model_effect as me
from ecpi.database.caldb_tools import get_arf_files

logger = logging.getLogger(__name__)

CRT_PATH = add_path_current_module(__file__, "")

DICT_SCHEMA_PATHS = {
    "gen": CRT_PATH + '/io/general_schema.json',
    "gen_http": CRT_PATH + '/io/general_http_schema.json',
    "sdb": CRT_PATH + '/io/sdb_schema.json',
    "caldb": CRT_PATH + '/io/caldb_schema.json'
}


[docs]class EclairsPipelineLocal(object): """Main work flow processing for ECLAIRs pipeline """ def __init__(self, cfg_path): """**constructor** :param cfg_path: path to the pipeline configuration .ini file. :type cfg_path: str :param mode: running mode. Must be either local, sdbcaldb or http :type mode: str """ self.d_status = { 0: "OK", # GENERAL 1: "[general] parameter validation error", 2: "[general] input working directory does not exist", 3: "[general] output working directory does not exist", 4: "[general] caldb directory does not exist", 5: "file parameters must content [general] section", 6: "convert error : configparser to dictionary", 7: "Calibration directory is ambiguious, ie 2 files defined same effect." } self._set_status(0) self.cfg = configparser.ConfigParser( interpolation=configparser.ExtendedInterpolation() ) self.cfg.read(cfg_path) self.cfg_path = cfg_path self.d_pars = {} self.mod_effect = me.ECLAIRsDetectorEffectDefault() # define ECPI components & associated classes self.d_components = OrderedDict({ # 'dpco': ProcessDpcoWithFiles, # 'cali': ProcessBubeWithFiles, 'bube': ProcessBubeWithFiles, 'imag': ProcessImagWithFiles }) self.l_components = self.d_components.keys() def _format_section_parameter_list(self, section='missing section name'): s = f"\n#### [{section.upper()}] section parameters:\n[{section}]" try: for k, v in sorted(list(self.d_pars[section].items())): s += f"\n{k} = {v}" s += "\n####" except: pass return s def _check_section_general(self): """Check parameters in general section of .ini parameter file. More precisely check_params if work dirs exist. """ b_all_ok = True if not self.cfg.has_section('general'): self._set_status(5) logger.error(f"\n\t#### {self.status_msg} not in {self.cfg.sections()}") b_all_ok = False else: ret_parser = tpars.cfgparser_to_dict4json(self.cfg, 'general') if ret_parser is None: self._set_status(6) logger.error(self.status_msg) b_all_ok = False else: self.d_pars['general'] = ret_parser if not tpars.check_add_default_file(DICT_SCHEMA_PATHS['gen'], \ self.d_pars['general']): self._set_status(1) logger.error(self.status_msg) b_all_ok = False if not b_all_ok: return False # check_params if in/out working directories exist. working_in = self.d_pars['general']['working_in'] working_out = self.d_pars['general']['working_out'] caldb_dir = self.d_pars['general']['caldb_dir'] if not os.path.exists(working_in): self._set_status(2) logger.error(self.status_msg) return False if not os.path.exists(working_out): self._set_status(3) logger.error(self.status_msg) return False if caldb_dir == "@local": caldb_dir = add_path_data_ref_eclairs("instru") self.d_pars['general']['caldb_dir'] = caldb_dir if not os.path.exists(caldb_dir): self._set_status(4) logger.error(f"directory '{caldb_dir}' doesn't exist.") return False self.working_in = working_in self.working_out = working_out self.caldb_dir = caldb_dir return True def _init_model_effect(self): l_arf_file = get_arf_files(self.caldb_dir) if len(l_arf_file) != 1: self._set_status(7) return False self.mod_effect.set_arf(l_arf_file[0]) return True def _before_processing(self): """Check parameters from general section of .ini parameter file. If successful then _ecpi_processing the pipeline one module after the other. """ logger.warning(f"\n\t#### Checking parameters list of section [GENERAL]") if not self._check_section_general(): return False logger.warning(f"\n\t#### Parameters list of section [GENERAL] OK.\ {self._format_section_parameter_list('general')}") if not self._ecpi_processing(only_check=True): return False # init model effect detector with caldb_dir self.caldb_dir = self.d_pars["general"]["caldb_dir"] return self._init_model_effect() def _after_processing(self): pass def _ecpi_processing(self, only_check=False): """Run the ECLAIRs GP pipeline. Checks what are the mentioned sections in the parameter file and call them successively. The specific parameters needed to _ecpi_processing a single section are loaded each time this section is called. :param only_check: flag for component check_params. :type only_check: bool """ if self.status != 0: return data_out = None for cpst in self.l_components: if cpst in self.cfg.sections(): data_in = data_out component = self.d_components[cpst]() assert isinstance(component, GenericComposantProcessing) component.set_model_effect_detector(self.mod_effect) if only_check: logger.info(f"\n\t#### Checking parameters list of [{cpst.upper()}] module") ret_set_cfg = component.set_cfg_params(self.cfg_path, cpst) if not ret_set_cfg: self._set_status_with_comp(component) logger.error(self.status_msg) return False ret_check = component.check_params() if not ret_check: # error check_params params self._set_status_with_comp(component) logger.error(self.status_msg) return False if only_check: logger.info(f"\n\t#### Parameters list of [{cpst.upper()}] module OK.") # \n\t#### {component.d_pars}") continue # add common parameters string_composant = "\n###########################################################" string_composant += f"\n\tStarting module [{cpst.upper()}]" string_composant += "\n##########################################################" logger.warning(string_composant) logger.warning(f"{component}") component.complete_params_with_dict(self.d_pars['general']) if component.run(data_in): data_out = component.get_data_out() else: # error processing self._set_status_with_comp(component) logger.error(self.status_msg) return False return True def _set_status(self, e_int): """Set status with error produced by features of EclairsPipeline class """ self.status = e_int self.status_msg = self.d_status[e_int] def _set_status_with_comp(self, composant): """Set status with error produced by component called """ self.status = composant.status_int() self.status_msg = composant.status_msg() # # User interface #
[docs] def run(self): if not self._before_processing(): return False if not self._ecpi_processing(): return False return self._after_processing()
[docs]class EclairsPipelineSdbCaldb(EclairsPipelineLocal): """ ECLAIRs pipeline in SDB CALDB mode """ def __init__(self, cfg_path): """**constructor** :param cfg_path: path to the pipeline configuration file. :type cfg_path: str :param mode: running mode. Must be either local, sdbcaldb or http :type mode: str """ super().__init__(cfg_path) sdb_caldb_status = { # SDB 31 : "[sdb] parameter validation error", 32 : "[sdb] download error", 33 : "[sdb] upload error", # CALDB 65 : "[caldb] parameter validation error", 66 : "[caldb] download error" } self.d_status.update(sdb_caldb_status) def _create_workspace(self): """Create workspace where event & attitude files are stored in /in and ECPI products are stored in /out. """ pass def _before_processing(self): """Check parameters from general, sdb and caldb sections of .ini parameter file. If successful then _ecpi_processing the pipeline one module after the other. """ logger.info(f"\n\t#### Checking parameters list of section [GENERAL]") if not self._check_section_general(): return False logger.warning(f"\n\t#### Parameters list of section [GENERAL] OK.\ {self._format_section_parameter_list('general')}") logger.info("Parameters [general] ok") self._create_workspace() if not self._check_pars_sdb(): return False logger.warning(f"\n\t#### Parameters list of section [SDB] OK.\ {self._format_section_parameter_list('sdb')}") logger.info("Parameters [sdb] OK") if not self._check_pars_caldb(): return False logger.warning(f"\n\t#### Parameters list of section [CALDB] OK.\ {self._format_section_parameter_list('caldb')}") logger.info("Parameters [caldb] OK") if not self._ecpi_processing(only_check=True): return False logger.info("Parameters ecpi compoment OK") if not (self._get_files_from_sdb() and self._get_files_from_caldb()): b_ret = False else: b_ret = self._init_model_effect() return b_ret def _after_processing(self): """Post scientif products to SDB """ self._post_files_to_sdb() def _check_pars_sdb(self): """Check parameters in the sdb section of the .ini file. """ ret_parser = tpars.cfgparser_to_dict4json(self.cfg, 'sdb') if ret_parser is None: self.status = 6 return False self.d_pars['sdb'] = ret_parser if not tpars.check_add_default_file(DICT_SCHEMA_PATHS['sdb'], self.d_pars['sdb']): self._set_status(31) logger.error(self.status_msg) return False self.event_files = self.d_pars['sdb']['event_files'] self.attitude_files = self.d_pars['sdb']['attitude_files'] if isinstance(self.event_files, str): self.event_files = [self.event_files] if isinstance(self.attitude_files, str): self.attitude_files = [self.attitude_files] return True def _get_files_from_sdb(self): # pragma: no cover """Download event & attitude files from SDB. """ if self.status != 0: return False for file in self.event_files + self.attitude_files: if os.path.isfile(self.working_in + "/" + file): logger.info(f"{file} file already present in input directory. Skipping.") continue logger.info(f"Requesting for {file} file in SDB...") status = receive_event_att_files(file, self.working_in) if not status: logger.error(f"{file} file could not be downloaded from SDB.") self._set_status(32) logger.error(self.status_msg) return False logger.info(f"{file} file successfully downloaded from SDB.") return True def _check_pars_caldb(self): """Check parameters in the caldb section of the .ini file. """ ret_parser = tpars.cfgparser_to_dict4json(self.cfg, 'caldb') if ret_parser is None: self.status = 6 return False self.d_pars['caldb'] = ret_parser if not tpars.check_add_default_file(DICT_SCHEMA_PATHS['caldb'], self.d_pars['caldb']): self._set_status(65) logger.error(self.status_msg) return False self.arf_file = self.d_pars['caldb']['arf_file'] return True def _get_files_from_caldb(self): # pragma: no cover """Download arf for DC-1 """ if self.status != 0: return False arf_filename = self.d_pars['caldb']['arf_file'] arf_path = self.caldb_dir + "/" + arf_filename if os.path.exists(arf_path): logger.info(f"{arf_filename} file already present in caldb_dir directory. Skipping.") return True if not caldb.download_file_from_caldb(self.caldb_dir, arf_filename): logger.error(f"{arf_filename} file could not be downloaded from CALDB.") return False logger.info(f"{arf_filename} file successfully downloaded from CALDB.") return True def _get_files_from_caldb_gen(self): # pragma: no cover """Download instrument related files from CALDB. """ if self.status != 0: return False arf_file = [fname for fname in glob(self.caldb_dir + "/*.fits") \ if fname.split("/")[-1].startswith("ECL-RSP-ENE")] if arf_file != []: arf_filename = arf_file[0].split("/")[-1] logger.info(f"{arf_filename} file already present in caldb_dir directory. Skipping.") else: logger.info(f"Requesting for ARF file in CALDB...") status = caldb.receive_arf_file_from_caldb(self.caldb_dir) if not status: logger.error("ARF file could not be downloaded from CALDB.") self.status = 1 logger.info("ARF file successfully downloaded from SDB.") return True def _post_files_to_sdb(self): # pragma: no cover """Upload ECLAIRs GP pipeline scientific products to SDB. """ if self.status != 0: return for file in glob(self.working_out + "/*.fits"): status = send_product_ecpi(file) if not status: logger.warning(f"{file} file could not be uploaded to SDB.") self._set_status(33) logger.error(self.status_msg) break logger.info(f"{file} file successfully uploaded to SDB.") return True
[docs]class EclairsPipelineForHttpServer(EclairsPipelineSdbCaldb): """ECLAIRs pipeline in HTTP server mode """ def __init__(self, cfg_path, id_proc=None): """**constructor** :param cfg_path: path to the pipeline configuration file. :type cfg_path: str """ super().__init__(cfg_path) self.caldb_dir = add_path_process_eclairs("caldb_dir") if not os.path.exists(self.caldb_dir): os.makedirs(self.caldb_dir) else: os.system(f'rm -rf {self.caldb_dir}/*') if id_proc is None: creation_time = time.strftime("%Y%m%dT%H%M%S") id_proc = f"ECL_{creation_time}" self.dirname = add_path_process_eclairs(id_proc) def _check_section_general(self): """Check parameters in general section of .ini parameter file. More precisely check_params if work dirs exist. """ # todo verfier si general exist sinon le creer vide dans d_pars if not self.cfg.has_section('general'): logger.warning(f"\n\t#### Section [general] not in sections list {self.cfg.sections()}\ \n\t\tsection [general] added with default parameters") self.cfg.add_section('general') ret_parser = tpars.cfgparser_to_dict4json(self.cfg, 'general') if ret_parser is None: self.status = 6 return False self.d_pars['general'] = ret_parser if not tpars.check_add_default_file(DICT_SCHEMA_PATHS['gen_http'], \ self.d_pars['general']): self._set_status(1) logger.error(self.status_msg) return False return True def _after_processing(self): self._post_files_to_sdb() self._clear_workspace() def _create_workspace(self): """Create workspace where event & attitude files are stored in /in and ECPI products are stored in /out. """ os.makedirs(self.dirname) os.makedirs(self.dirname + "/in") os.makedirs(self.dirname + "/out") self.working_in = self.dirname + '/in' self.working_out = self.dirname + '/out' logger.info(f"Create working_in: {self.working_in}") self.d_pars['general']["working_in"] = self.working_in self.d_pars['general']["working_out"] = self.working_out self.d_pars['general']["caldb_dir"] = self.caldb_dir def _clear_workspace(self): # pragma: no cover """Clean workspace: remove all .fits files but does not remove .log file. """ list_files_in = glob(self.working_in + "/*.fits") list_files_out = glob(self.d_pars["general"]["working_out"] + "/*.fits") if list_files_in != []: for file in list_files_in: os.remove(file) if list_files_out != []: for file in list_files_out: os.remove(file) logger.info("Cleaned input and output directories.")