"""ECLAIRs GP pipeline
"""
import os.path as osp
import os
import shutil
import time
import configparser
import logging
from glob import glob
from collections import OrderedDict
from ecpi.common import get_path_pipeline
from ecpi.common import add_path_process_eclairs
from ecpi.common import get_caldb_dir
from ecpi.database.interact_sdb import receive_event_att_files
import ecpi.database.interact_sdb as sdb
import ecpi.database.interact_caldb as caldb
from ecpi.process.generic.component import GenericComponentProcessing
from ecpi.process.dpco.dpco_with_files import ProcessDpcoWithFiles
from ecpi.process.cali.cali_with_files import ProcessCaliWithFiles
from ecpi.process.bube.bube_with_files import ProcessBubeWithFiles
from ecpi.process.imag.imag_with_files import ProcessImagWithFiles
from ecpi.process.mosa.mosa_with_files import ProcessMosaWithFiles
from ecpi.process.spex.spex_with_files import ProcessSpexWithFiles
import ecpi.common.params.tools_params as tpars
import ecpi.common.params.tools_check as tcheck
import ecpi.common.instru.model_effect as me
from ecpi.database.caldb_tools import get_arf_files
from ecpi.pipeline.io.data_flow import DataFlow
s_logger = logging.getLogger(__name__)
CRT_PATH = osp.join(get_path_pipeline(), 'io')
DICT_SCHEMA_PATHS = {
"gen": osp.join(CRT_PATH, 'general_schema.json'),
"gen_http": osp.join(CRT_PATH, 'general_http_schema.json'),
"sdb": osp.join(CRT_PATH, 'sdb_schema.json'),
"caldb": osp.join(CRT_PATH, 'caldb_schema.json')
}
[docs]class EclairsPipelineLocal(object):
"""Main work flow processing for ECLAIRs pipeline
"""
def __init__(self, cfg_path):
"""**constructor**
:param cfg_path: path to the pipeline configuration .ini file.
:type cfg_path: str
"""
self.d_status = {
0: "OK",
# GENERAL
1: "[general] parameter validation error",
2: "[general] input working directory does not exist",
3: "[general] output working directory does not exist",
4: "[general] caldb directory does not exist",
5: "file parameters must content [general] section",
6: "convert error : configparser to dictionary",
7: "Calibration directory is ambiguous, ie 2 files defined same effect.",
8: "No file expected in calibration directory."
}
self._set_status(0)
self.cfg = configparser.ConfigParser(
interpolation=configparser.ExtendedInterpolation()
)
if type(cfg_path) is dict:
self.cfg.read_dict(cfg_path)
else:
self.cfg.read(cfg_path)
self.cfg_path = cfg_path
self.d_pars = {}
# self.mod_effect = me.ECLAIRsDetectorEffectDefault()
self.data_flow = None
# define ECPI components & associated classes
self.d_components = OrderedDict({
'dpco': ProcessDpcoWithFiles,
'cali': ProcessCaliWithFiles,
'bube': ProcessBubeWithFiles,
'imag': ProcessImagWithFiles,
'mosa': ProcessMosaWithFiles,
'spex': ProcessSpexWithFiles
})
self.l_components = self.d_components.keys()
def _format_section_parameter_list(self, section='missing section name'):
s = f"\n#### [{section.upper()}] section parameters:\n[{section}]"
try:
for k, v in sorted(list(self.d_pars[section].items())):
s += f"\n{k} = {v}"
s += "\n####"
except:
pass
return s
def _check_section_general(self):
"""Check parameters in general section of .ini parameter file.
More precisely check_params if work dirs exist.
"""
b_all_ok = True
if not self.cfg.has_section('general'):
self._set_status(5)
s_logger.error(f"\n\t#### {self.status_msg} not in {self.cfg.sections()}")
b_all_ok = False
else:
ret_parser = tpars.cfgparser_to_dicttyped(self.cfg, 'general')
if ret_parser is None:
self._set_status(6)
s_logger.error(self.status_msg)
b_all_ok = False
else:
self.d_pars['general'] = ret_parser
if not tcheck.check_add_default_file(DICT_SCHEMA_PATHS['gen'], \
self.d_pars['general']):
self._set_status(1)
s_logger.error(self.status_msg)
b_all_ok = False
if not b_all_ok:
return False
# check_params if in/out working directories exist.
working_in = self.d_pars['general']['working_in']
working_out = self.d_pars['general']['working_out']
caldb_dir = self.d_pars['general']['caldb_dir']
if not os.path.exists(working_in):
self._set_status(2)
s_logger.error(self.status_msg)
return False
if not os.path.exists(working_out):
self._set_status(3)
s_logger.error(self.status_msg)
return False
if caldb_dir == "@local":
caldb_dir = get_caldb_dir()
self.d_pars['general']['caldb_dir'] = caldb_dir
if not os.path.exists(caldb_dir):
self._set_status(4)
s_logger.error(f"directory '{caldb_dir}' doesn't exist.")
return False
self.working_in = working_in
self.working_out = working_out
self.caldb_dir = caldb_dir
return True
def _init_model_effect(self):
l_arf_file = get_arf_files(self.caldb_dir)
if len(l_arf_file) == 0:
self._set_status(8)
s_logger.error(self.status_msg)
return False
if len(l_arf_file) > 1:
self._set_status(7)
s_logger.error(self.status_msg)
return False
self.mod_effect.set_arf(l_arf_file[0])
return True
def _before_processing(self):
"""Check parameters from general section of .ini parameter file.
If successful then _ecpi_processing the pipeline one module after the other.
"""
s_logger.warning(f"\n\t#### Checking parameters list of section [GENERAL]")
if not self._check_section_general():
return False
s_logger.warning(f"\n\t#### Parameters list of section [GENERAL] OK.\
{self._format_section_parameter_list('general')}")
if not self._ecpi_processing(only_check=True):
return False
# init model effect detector with caldb_dir
self.caldb_dir = self.d_pars["general"]["caldb_dir"]
# return self._init_model_effect()
return True
def _after_processing(self):
return True
def _ecpi_processing(self, only_check=False):
"""Run the ECLAIRs GP pipeline.
Checks what are the mentioned sections in the parameter file
and call them successively. The specific parameters needed to _ecpi_processing
a single section are loaded each time this section is called.
:param only_check: flag for component check_params.
:type only_check: bool
"""
if self.status != 0:
return
if self.data_flow is None:
self.data_flow = DataFlow(only_check=only_check)
else:
self.data_flow.load_detector_effect_from_dir(self.caldb_dir)
for cpst in self.l_components:
if cpst in self.cfg.sections():
component = self.d_components[cpst]()
assert isinstance(component, GenericComponentProcessing)
if only_check:
s_logger.info(f"\t#### Checking parameters list of [{cpst.upper()}] module")
else:
s_logger.info(f"\t#### Loading detector effect into data_flow")
component.set_model_effect_detector(self.data_flow.detector_effect)
string_component = "\n########################################################"
string_component += f"\n\tStarting module [{cpst.upper()}]"
string_component += "\n#######################################################"
s_logger.info(string_component)
if not component.set_cfg_params(self.cfg_path, cpst):
self._set_status_with_comp(component)
s_logger.error(self.status_msg)
return False
if not component.check_params():
# error check_params params
self._set_status_with_comp(component)
s_logger.error(self.status_msg)
return False
if only_check:
s_logger.info(f"Parameters list of [{cpst.upper()}] module OK.")
continue
# print all parameters with default value
s_logger.info(component)
# add common parameters
component.complete_params_with_dict(self.d_pars['general'])
if not component.run(self.data_flow):
# error processing
self._set_status_with_comp(component)
s_logger.error(self.status_msg)
return False
return True
def _set_status(self, e_int):
"""Set status with error produced by features of EclairsPipeline class
"""
self.status = e_int
self.status_msg = self.d_status[e_int]
def _set_status_with_comp(self, component):
"""Set status with error produced by component called
"""
self.status = component.status_int()
self.status_msg = component.status_msg()
#
# User interface
#
[docs] def run(self):
"""run pipeline, always do after_processing (cleaning mainly)
"""
if self._before_processing():
s_logger.info('Running the main loop and before processing is ok!')
proc_ok = self._ecpi_processing()
if not proc_ok:
s_logger.info('Running the main loop and ecpi processing is not ok!')
# must do _after_processing
# and keep error processing first
err_proc, mes_err = self.status, self.status_msg
self._after_processing()
self.status, self.status_msg = err_proc, mes_err
return False
else:
s_logger.info('Running the main loop and ecpi processing is ok!')
return self._after_processing()
else:
s_logger.info('Running the main loop and before processing is not ok!')
# must do _after_processing
# and keep error _before_processing first
err_proc, mes_err = self.status, self.status_msg
self._after_processing()
self.status, self.status_msg = err_proc, mes_err
return False
[docs]class EclairsPipelineSdbCaldb(EclairsPipelineLocal):
""" ECLAIRs pipeline in SDB CALDB mode
"""
def __init__(self, cfg_path):
"""**constructor**
:param cfg_path: path to the pipeline configuration file.
:type cfg_path: str
"""
super().__init__(cfg_path)
sdb_caldb_status = {
# SDB
31: "[sdb] parameter validation error",
32: "[sdb] download error",
33: "[sdb] upload error",
# CALDB
65: "[caldb] parameter validation error",
66: "[caldb] download error"
}
self.d_status.update(sdb_caldb_status)
def _create_workspace(self):
"""Create workspace where event & attitude files are
stored in /in and ECPI products are stored in /out.
"""
pass
def _before_processing(self):
"""Check parameters from general, sdb and caldb sections of .ini parameter file.
If successful then _ecpi_processing the pipeline one module after the other.
"""
s_logger.info(f"\n\t#### Checking parameters list of section [GENERAL]")
if not self._check_section_general():
return False
s_logger.warning(f"\n\t#### Parameters list of section [GENERAL] OK.\
{self._format_section_parameter_list('general')}")
s_logger.info("Parameters [general] ok")
self._create_workspace()
if not self._check_pars_sdb():
return False
s_logger.warning(f"\n\t#### Parameters list of section [SDB] OK.\
{self._format_section_parameter_list('sdb')}")
s_logger.info("Parameters [sdb] OK")
if not self._check_pars_caldb():
return False
s_logger.warning(f"\n\t#### Parameters list of section [CALDB] OK.\
{self._format_section_parameter_list('caldb')}")
s_logger.info("Parameters [caldb] OK")
if not self._ecpi_processing(only_check=True):
return False
s_logger.info("Parameters ecpi component OK")
if not (self._get_files_from_sdb() and self._get_files_from_caldb()):
return False
# else:
# b_ret = self._init_model_effect()
return True
def _after_processing(self):
"""Post scientific products to SDB
"""
return self._post_files_to_sdb()
def _check_pars_sdb(self):
"""Check parameters in the sdb section of the .ini file.
"""
ret_parser = tpars.cfgparser_to_dicttyped(self.cfg, 'sdb')
if ret_parser is None:
self._set_status(6)
return False
self.d_pars['sdb'] = ret_parser
if not tcheck.check_add_default_file(DICT_SCHEMA_PATHS['sdb'], self.d_pars['sdb']):
self._set_status(31)
s_logger.error(self.status_msg)
return False
self.event_files = self.d_pars['sdb']['event_files']
self.attitude_files = self.d_pars['sdb']['attitude_files']
if isinstance(self.event_files, str):
self.event_files = [self.event_files]
if isinstance(self.attitude_files, str):
self.attitude_files = [self.attitude_files]
return True
def _get_files_from_sdb(self): # pragma: no cover
"""Download event & attitude files from SDB.
"""
# def download_list(l_name, type_obj=""):
# """
#
# :param l_name:
# :param type_obj:
# """
# for file in l_name:
# if os.path.isfile(osp.join(self.working_in, file)):
# s_logger.info(f"{file} file already present in input directory. Skipping.")
# continue
# s_logger.info(f"Requesting for {file} file in SDB...")
# status = sdb.download_name(file, self.working_in, type_obj)
# if not status:
# s_logger.error(f"File: '{file}' could not be downloaded from SDB.")
# self._set_status(32)
# s_logger.error(self.status_msg)
# return False
# return True
#
# if self.status != 0:
# return False
# if 'event_files' in self.d_pars['sdb']:
# ret = download_list(self.d_pars['sdb']['event_files'])
# if 'attitude_files' in self.d_pars['sdb']:
# ret = ret and download_list(self.d_pars['sdb']['attitude_files'])
# if 'l1_files' in self.d_pars['sdb']:
# ret = ret and download_list(self.d_pars['sdb']['l1_files'])
# if 'l2_files' in self.d_pars['sdb']:
# ret = ret and download_list(self.d_pars['sdb']['l2_files'])
# if ret:
# s_logger.info(f"All download are OK.")
# return ret
if self.status != 0:
return False
return sdb.receive_qla1_files_with_sdb(self.working_in)
def _check_pars_caldb(self):
"""Check parameters in the caldb section of the .ini file.
"""
ret_parser = tpars.cfgparser_to_dicttyped(self.cfg, 'caldb')
if ret_parser is None:
self._set_status(6)
s_logger.error(self.status_msg)
return False
self.d_pars['caldb'] = ret_parser
if not tcheck.check_add_default_file(DICT_SCHEMA_PATHS['caldb'], self.d_pars['caldb']):
self._set_status(65)
s_logger.error(self.status_msg)
return False
self.arf_file = self.d_pars['caldb']['arf_file']
return True
def _get_files_from_caldb(self): # pragma: no cover
"""Download arf for DC-1
"""
if self.status != 0:
return False
arf_filename = self.d_pars['caldb']['arf_file']
arf_path = osp.join(self.caldb_dir, arf_filename)
if os.path.exists(arf_path):
s_logger.info(f"{arf_filename} file already present in caldb_dir directory. Skipping.")
return True
# if not caldb.download_file_from_caldb(self.caldb_dir, arf_filename):
m_caldb = caldb.CaldbIo(use_tokens=True)
if not m_caldb.save_caldb_last_fits_files(self.caldb_dir):
s_logger.error(f"{arf_filename} file could not be downloaded from CALDB.")
return False
s_logger.info(f"{arf_filename} file successfully downloaded from CALDB.")
return True
def _get_files_from_caldb_gen(self): # pragma: no cover
"""Download instrument related files from CALDB.
"""
if self.status != 0:
return False
arf_file = [fname for fname in glob(osp.join(self.caldb_dir, "*.fits")) \
if fname.split(osp.sep)[-1].startswith("ECL-RSP-ARF")]
if arf_file != []:
arf_filename = arf_file[0].split(osp.sep)[-1]
s_logger.info(f"{arf_filename} file already present in caldb_dir directory. Skipping.")
else:
s_logger.info(f"Requesting for ARF file in CALDB...")
status = caldb.receive_arf_file_from_caldb(self.caldb_dir)
if not status:
s_logger.error("ARF file could not be downloaded from CALDB.")
self._set_status(1)
s_logger.info("ARF file successfully downloaded from SDB.")
return True
def _post_files_to_sdb(self): # pragma: no cover
"""Upload ECLAIRs GP pipeline scientific products to SDB.
"""
for file in glob(osp.join(self.working_out, "*.fits")):
# status = sdb.send_product_ecpi(file)
status = sdb.send_product_ecpi_to_sdb(file)
if not status:
s_logger.warning(f"{file} file could not be uploaded to SDB.")
self._set_status(33)
s_logger.error(self.status_msg)
break
s_logger.info(f"{file} file successfully uploaded to SDB.")
return True
[docs]class EclairsPipelineForHttpServer(EclairsPipelineSdbCaldb):
"""ECLAIRs pipeline in HTTP server mode
"""
def __init__(self, cfg_path, id_proc=None):
"""**constructor**
:param cfg_path: path to the pipeline configuration file.
:type cfg_path: str
"""
super().__init__(cfg_path)
self.caldb_dir = add_path_process_eclairs("caldb_dir")
if os.path.exists(self.caldb_dir):
shutil.rmtree(self.caldb_dir)
os.makedirs(self.caldb_dir)
if id_proc is None:
creation_time = time.strftime("%Y%m%dT%H%M%S")
self.id_proc = f"ECL_{creation_time}"
else:
self.id_proc = id_proc
self.dirname = add_path_process_eclairs(self.id_proc)
def _check_section_general(self):
"""Check parameters in general section of .ini parameter file.
More precisely check_params if work dirs exist.
"""
# TODO verfier si general exist sinon le creer vide dans d_pars
if not self.cfg.has_section('general'):
s_logger.warning(f"\n\t#### Section [general] not in sections list "\
"{self.cfg.sections()}\n\t\tsection [general] added"\
" with default parameters")
self.cfg.add_section('general')
ret_parser = tpars.cfgparser_to_dicttyped(self.cfg, 'general')
if ret_parser is None:
self._set_status(6)
return False
self.d_pars['general'] = ret_parser
if not tcheck.check_add_default_file(DICT_SCHEMA_PATHS['gen_http'], \
self.d_pars['general']):
self._set_status(1)
s_logger.error(self.status_msg)
return False
return True
def _after_processing(self):
super()._post_files_to_sdb()
self._clear_workspace()
return self.status == 0
def _create_workspace(self):
"""Create workspace where event & attitude files are
stored in /in and ECPI products are stored in /out.
"""
# os.makedirs(self.dirname)
os.makedirs(osp.join(self.dirname, "in"))
os.makedirs(osp.join(self.dirname, "out"))
self.working_in = osp.join(self.dirname, 'in')
self.working_out = osp.join(self.dirname, 'out')
s_logger.info(f"Create working_in: {self.working_in}")
self.d_pars['general']["working_in"] = self.working_in
self.d_pars['general']["working_out"] = self.working_out
self.d_pars['general']["caldb_dir"] = self.caldb_dir
def _clear_workspace(self): # pragma: no cover
"""Clean workspace:
"""
shutil.rmtree(self.dirname)
s_logger.info("Cleaned input and output directories.")