Source code for ecpi.pipeline.lib.ecpi_pipeline

"""ECLAIRs GP pipeline
"""

import os.path as osp
import os
import shutil
import time
import configparser
import logging
from glob import glob
from collections import OrderedDict

from ecpi.common import get_path_pipeline
from ecpi.common import add_path_process_eclairs
from ecpi.common import get_caldb_dir
from ecpi.database.interact_sdb import receive_event_att_files
import ecpi.database.interact_sdb as sdb
import ecpi.database.interact_caldb as caldb 
from ecpi.process.generic.component import GenericComponentProcessing
from ecpi.process.dpco.dpco_with_files import ProcessDpcoWithFiles
from ecpi.process.cali.cali_with_files import ProcessCaliWithFiles
from ecpi.process.bube.bube_with_files import ProcessBubeWithFiles
from ecpi.process.imag.imag_with_files import ProcessImagWithFiles
from ecpi.process.mosa.mosa_with_files import ProcessMosaWithFiles
from ecpi.process.spex.spex_with_files import ProcessSpexWithFiles
import ecpi.common.params.tools_params as tpars
import ecpi.common.params.tools_check as tcheck
import ecpi.common.instru.model_effect as me
from ecpi.database.caldb_tools import get_arf_files
from ecpi.pipeline.io.data_flow import DataFlow

s_logger = logging.getLogger(__name__)

CRT_PATH = osp.join(get_path_pipeline(), 'io')

DICT_SCHEMA_PATHS = {
    "gen": osp.join(CRT_PATH, 'general_schema.json'),
    "gen_http": osp.join(CRT_PATH, 'general_http_schema.json'),
    "sdb": osp.join(CRT_PATH, 'sdb_schema.json'),
    "caldb": osp.join(CRT_PATH, 'caldb_schema.json')
}


[docs]class EclairsPipelineLocal(object): """Main work flow processing for ECLAIRs pipeline """ def __init__(self, cfg_path): """**constructor** :param cfg_path: path to the pipeline configuration .ini file. :type cfg_path: str """ self.d_status = { 0: "OK", # GENERAL 1: "[general] parameter validation error", 2: "[general] input working directory does not exist", 3: "[general] output working directory does not exist", 4: "[general] caldb directory does not exist", 5: "file parameters must content [general] section", 6: "convert error : configparser to dictionary", 7: "Calibration directory is ambiguous, ie 2 files defined same effect.", 8: "No file expected in calibration directory." } self._set_status(0) self.cfg = configparser.ConfigParser( interpolation=configparser.ExtendedInterpolation() ) if type(cfg_path) is dict: self.cfg.read_dict(cfg_path) else: self.cfg.read(cfg_path) self.cfg_path = cfg_path self.d_pars = {} # self.mod_effect = me.ECLAIRsDetectorEffectDefault() self.data_flow = None # define ECPI components & associated classes self.d_components = OrderedDict({ 'dpco': ProcessDpcoWithFiles, 'cali': ProcessCaliWithFiles, 'bube': ProcessBubeWithFiles, 'imag': ProcessImagWithFiles, 'mosa': ProcessMosaWithFiles, 'spex': ProcessSpexWithFiles }) self.l_components = self.d_components.keys() def _format_section_parameter_list(self, section='missing section name'): s = f"\n#### [{section.upper()}] section parameters:\n[{section}]" try: for k, v in sorted(list(self.d_pars[section].items())): s += f"\n{k} = {v}" s += "\n####" except: pass return s def _check_section_general(self): """Check parameters in general section of .ini parameter file. More precisely check_params if work dirs exist. """ b_all_ok = True if not self.cfg.has_section('general'): self._set_status(5) s_logger.error(f"\n\t#### {self.status_msg} not in {self.cfg.sections()}") b_all_ok = False else: ret_parser = tpars.cfgparser_to_dicttyped(self.cfg, 'general') if ret_parser is None: self._set_status(6) s_logger.error(self.status_msg) b_all_ok = False else: self.d_pars['general'] = ret_parser if not tcheck.check_add_default_file(DICT_SCHEMA_PATHS['gen'], \ self.d_pars['general']): self._set_status(1) s_logger.error(self.status_msg) b_all_ok = False if not b_all_ok: return False # check_params if in/out working directories exist. working_in = self.d_pars['general']['working_in'] working_out = self.d_pars['general']['working_out'] caldb_dir = self.d_pars['general']['caldb_dir'] if not os.path.exists(working_in): self._set_status(2) s_logger.error(self.status_msg) return False if not os.path.exists(working_out): self._set_status(3) s_logger.error(self.status_msg) return False if caldb_dir == "@local": caldb_dir = get_caldb_dir() self.d_pars['general']['caldb_dir'] = caldb_dir if not os.path.exists(caldb_dir): self._set_status(4) s_logger.error(f"directory '{caldb_dir}' doesn't exist.") return False self.working_in = working_in self.working_out = working_out self.caldb_dir = caldb_dir return True def _init_model_effect(self): l_arf_file = get_arf_files(self.caldb_dir) if len(l_arf_file) == 0: self._set_status(8) s_logger.error(self.status_msg) return False if len(l_arf_file) > 1: self._set_status(7) s_logger.error(self.status_msg) return False self.mod_effect.set_arf(l_arf_file[0]) return True def _before_processing(self): """Check parameters from general section of .ini parameter file. If successful then _ecpi_processing the pipeline one module after the other. """ s_logger.warning(f"\n\t#### Checking parameters list of section [GENERAL]") if not self._check_section_general(): return False s_logger.warning(f"\n\t#### Parameters list of section [GENERAL] OK.\ {self._format_section_parameter_list('general')}") if not self._ecpi_processing(only_check=True): return False # init model effect detector with caldb_dir self.caldb_dir = self.d_pars["general"]["caldb_dir"] # return self._init_model_effect() return True def _after_processing(self): return True def _ecpi_processing(self, only_check=False): """Run the ECLAIRs GP pipeline. Checks what are the mentioned sections in the parameter file and call them successively. The specific parameters needed to _ecpi_processing a single section are loaded each time this section is called. :param only_check: flag for component check_params. :type only_check: bool """ if self.status != 0: return if self.data_flow is None: self.data_flow = DataFlow(only_check=only_check) else: self.data_flow.load_detector_effect_from_dir(self.caldb_dir) for cpst in self.l_components: if cpst in self.cfg.sections(): component = self.d_components[cpst]() assert isinstance(component, GenericComponentProcessing) if only_check: s_logger.info(f"\t#### Checking parameters list of [{cpst.upper()}] module") else: s_logger.info(f"\t#### Loading detector effect into data_flow") component.set_model_effect_detector(self.data_flow.detector_effect) string_component = "\n########################################################" string_component += f"\n\tStarting module [{cpst.upper()}]" string_component += "\n#######################################################" s_logger.info(string_component) if not component.set_cfg_params(self.cfg_path, cpst): self._set_status_with_comp(component) s_logger.error(self.status_msg) return False if not component.check_params(): # error check_params params self._set_status_with_comp(component) s_logger.error(self.status_msg) return False if only_check: s_logger.info(f"Parameters list of [{cpst.upper()}] module OK.") continue # print all parameters with default value s_logger.info(component) # add common parameters component.complete_params_with_dict(self.d_pars['general']) if not component.run(self.data_flow): # error processing self._set_status_with_comp(component) s_logger.error(self.status_msg) return False return True def _set_status(self, e_int): """Set status with error produced by features of EclairsPipeline class """ self.status = e_int self.status_msg = self.d_status[e_int] def _set_status_with_comp(self, component): """Set status with error produced by component called """ self.status = component.status_int() self.status_msg = component.status_msg() # # User interface #
[docs] def run(self): """run pipeline, always do after_processing (cleaning mainly) """ if self._before_processing(): s_logger.info('Running the main loop and before processing is ok!') proc_ok = self._ecpi_processing() if not proc_ok: s_logger.info('Running the main loop and ecpi processing is not ok!') # must do _after_processing # and keep error processing first err_proc, mes_err = self.status, self.status_msg self._after_processing() self.status, self.status_msg = err_proc, mes_err return False else: s_logger.info('Running the main loop and ecpi processing is ok!') return self._after_processing() else: s_logger.info('Running the main loop and before processing is not ok!') # must do _after_processing # and keep error _before_processing first err_proc, mes_err = self.status, self.status_msg self._after_processing() self.status, self.status_msg = err_proc, mes_err return False
[docs]class EclairsPipelineSdbCaldb(EclairsPipelineLocal): """ ECLAIRs pipeline in SDB CALDB mode """ def __init__(self, cfg_path): """**constructor** :param cfg_path: path to the pipeline configuration file. :type cfg_path: str """ super().__init__(cfg_path) sdb_caldb_status = { # SDB 31: "[sdb] parameter validation error", 32: "[sdb] download error", 33: "[sdb] upload error", # CALDB 65: "[caldb] parameter validation error", 66: "[caldb] download error" } self.d_status.update(sdb_caldb_status) def _create_workspace(self): """Create workspace where event & attitude files are stored in /in and ECPI products are stored in /out. """ pass def _before_processing(self): """Check parameters from general, sdb and caldb sections of .ini parameter file. If successful then _ecpi_processing the pipeline one module after the other. """ s_logger.info(f"\n\t#### Checking parameters list of section [GENERAL]") if not self._check_section_general(): return False s_logger.warning(f"\n\t#### Parameters list of section [GENERAL] OK.\ {self._format_section_parameter_list('general')}") s_logger.info("Parameters [general] ok") self._create_workspace() if not self._check_pars_sdb(): return False s_logger.warning(f"\n\t#### Parameters list of section [SDB] OK.\ {self._format_section_parameter_list('sdb')}") s_logger.info("Parameters [sdb] OK") if not self._check_pars_caldb(): return False s_logger.warning(f"\n\t#### Parameters list of section [CALDB] OK.\ {self._format_section_parameter_list('caldb')}") s_logger.info("Parameters [caldb] OK") if not self._ecpi_processing(only_check=True): return False s_logger.info("Parameters ecpi component OK") if not (self._get_files_from_sdb() and self._get_files_from_caldb()): return False # else: # b_ret = self._init_model_effect() return True def _after_processing(self): """Post scientific products to SDB """ return self._post_files_to_sdb() def _check_pars_sdb(self): """Check parameters in the sdb section of the .ini file. """ ret_parser = tpars.cfgparser_to_dicttyped(self.cfg, 'sdb') if ret_parser is None: self._set_status(6) return False self.d_pars['sdb'] = ret_parser if not tcheck.check_add_default_file(DICT_SCHEMA_PATHS['sdb'], self.d_pars['sdb']): self._set_status(31) s_logger.error(self.status_msg) return False self.event_files = self.d_pars['sdb']['event_files'] self.attitude_files = self.d_pars['sdb']['attitude_files'] if isinstance(self.event_files, str): self.event_files = [self.event_files] if isinstance(self.attitude_files, str): self.attitude_files = [self.attitude_files] return True def _get_files_from_sdb(self): # pragma: no cover """Download event & attitude files from SDB. """ # def download_list(l_name, type_obj=""): # """ # # :param l_name: # :param type_obj: # """ # for file in l_name: # if os.path.isfile(osp.join(self.working_in, file)): # s_logger.info(f"{file} file already present in input directory. Skipping.") # continue # s_logger.info(f"Requesting for {file} file in SDB...") # status = sdb.download_name(file, self.working_in, type_obj) # if not status: # s_logger.error(f"File: '{file}' could not be downloaded from SDB.") # self._set_status(32) # s_logger.error(self.status_msg) # return False # return True # # if self.status != 0: # return False # if 'event_files' in self.d_pars['sdb']: # ret = download_list(self.d_pars['sdb']['event_files']) # if 'attitude_files' in self.d_pars['sdb']: # ret = ret and download_list(self.d_pars['sdb']['attitude_files']) # if 'l1_files' in self.d_pars['sdb']: # ret = ret and download_list(self.d_pars['sdb']['l1_files']) # if 'l2_files' in self.d_pars['sdb']: # ret = ret and download_list(self.d_pars['sdb']['l2_files']) # if ret: # s_logger.info(f"All download are OK.") # return ret if self.status != 0: return False return sdb.receive_qla1_files_with_sdb(self.working_in) def _check_pars_caldb(self): """Check parameters in the caldb section of the .ini file. """ ret_parser = tpars.cfgparser_to_dicttyped(self.cfg, 'caldb') if ret_parser is None: self._set_status(6) s_logger.error(self.status_msg) return False self.d_pars['caldb'] = ret_parser if not tcheck.check_add_default_file(DICT_SCHEMA_PATHS['caldb'], self.d_pars['caldb']): self._set_status(65) s_logger.error(self.status_msg) return False self.arf_file = self.d_pars['caldb']['arf_file'] return True def _get_files_from_caldb(self): # pragma: no cover """Download arf for DC-1 """ if self.status != 0: return False arf_filename = self.d_pars['caldb']['arf_file'] arf_path = osp.join(self.caldb_dir, arf_filename) if os.path.exists(arf_path): s_logger.info(f"{arf_filename} file already present in caldb_dir directory. Skipping.") return True # if not caldb.download_file_from_caldb(self.caldb_dir, arf_filename): m_caldb = caldb.CaldbIo(use_tokens=True) if not m_caldb.save_caldb_last_fits_files(self.caldb_dir): s_logger.error(f"{arf_filename} file could not be downloaded from CALDB.") return False s_logger.info(f"{arf_filename} file successfully downloaded from CALDB.") return True def _get_files_from_caldb_gen(self): # pragma: no cover """Download instrument related files from CALDB. """ if self.status != 0: return False arf_file = [fname for fname in glob(osp.join(self.caldb_dir, "*.fits")) \ if fname.split(osp.sep)[-1].startswith("ECL-RSP-ARF")] if arf_file != []: arf_filename = arf_file[0].split(osp.sep)[-1] s_logger.info(f"{arf_filename} file already present in caldb_dir directory. Skipping.") else: s_logger.info(f"Requesting for ARF file in CALDB...") status = caldb.receive_arf_file_from_caldb(self.caldb_dir) if not status: s_logger.error("ARF file could not be downloaded from CALDB.") self._set_status(1) s_logger.info("ARF file successfully downloaded from SDB.") return True def _post_files_to_sdb(self): # pragma: no cover """Upload ECLAIRs GP pipeline scientific products to SDB. """ for file in glob(osp.join(self.working_out, "*.fits")): # status = sdb.send_product_ecpi(file) status = sdb.send_product_ecpi_to_sdb(file) if not status: s_logger.warning(f"{file} file could not be uploaded to SDB.") self._set_status(33) s_logger.error(self.status_msg) break s_logger.info(f"{file} file successfully uploaded to SDB.") return True
[docs]class EclairsPipelineForHttpServer(EclairsPipelineSdbCaldb): """ECLAIRs pipeline in HTTP server mode """ def __init__(self, cfg_path, id_proc=None): """**constructor** :param cfg_path: path to the pipeline configuration file. :type cfg_path: str """ super().__init__(cfg_path) self.caldb_dir = add_path_process_eclairs("caldb_dir") if os.path.exists(self.caldb_dir): shutil.rmtree(self.caldb_dir) os.makedirs(self.caldb_dir) if id_proc is None: creation_time = time.strftime("%Y%m%dT%H%M%S") self.id_proc = f"ECL_{creation_time}" else: self.id_proc = id_proc self.dirname = add_path_process_eclairs(self.id_proc) def _check_section_general(self): """Check parameters in general section of .ini parameter file. More precisely check_params if work dirs exist. """ # TODO verfier si general exist sinon le creer vide dans d_pars if not self.cfg.has_section('general'): s_logger.warning(f"\n\t#### Section [general] not in sections list "\ "{self.cfg.sections()}\n\t\tsection [general] added"\ " with default parameters") self.cfg.add_section('general') ret_parser = tpars.cfgparser_to_dicttyped(self.cfg, 'general') if ret_parser is None: self._set_status(6) return False self.d_pars['general'] = ret_parser if not tcheck.check_add_default_file(DICT_SCHEMA_PATHS['gen_http'], \ self.d_pars['general']): self._set_status(1) s_logger.error(self.status_msg) return False return True def _after_processing(self): super()._post_files_to_sdb() self._clear_workspace() return self.status == 0 def _create_workspace(self): """Create workspace where event & attitude files are stored in /in and ECPI products are stored in /out. """ # os.makedirs(self.dirname) os.makedirs(osp.join(self.dirname, "in")) os.makedirs(osp.join(self.dirname, "out")) self.working_in = osp.join(self.dirname, 'in') self.working_out = osp.join(self.dirname, 'out') s_logger.info(f"Create working_in: {self.working_in}") self.d_pars['general']["working_in"] = self.working_in self.d_pars['general']["working_out"] = self.working_out self.d_pars['general']["caldb_dir"] = self.caldb_dir def _clear_workspace(self): # pragma: no cover """Clean workspace: """ shutil.rmtree(self.dirname) s_logger.info("Cleaned input and output directories.")