"""
@author: BACON Philippe, APC/IN2P3/CNRS
"""
import os
import time
import configparser
import logging
from glob import glob
from collections import OrderedDict
from ecpi.common import add_path_current_module
from ecpi.common import add_path_process_eclairs
from ecpi.common import add_path_data_ref_eclairs
from ecpi.database.interact_sdb import receive_event_att_files
import ecpi.database.interact_sdb as sdb
import ecpi.database.interact_caldb as caldb
from ecpi.process.generic.composant import GenericComposantProcessing
from ecpi.process.dpco.dpco_with_files import ProcessDpcoWithFiles
from ecpi.process.bube.bube_with_files import ProcessBubeWithFiles
from ecpi.process.imag.imag_with_files import ProcessImagWithFiles
import ecpi.common.params.tools_check as tpars
import ecpi.common.instru.model_effect as me
from ecpi.database.caldb_tools import get_arf_files
logger = logging.getLogger(__name__)
CRT_PATH = add_path_current_module(__file__, "")
DICT_SCHEMA_PATHS = {
"gen": CRT_PATH + '/io/general_schema.json',
"gen_http": CRT_PATH + '/io/general_http_schema.json',
"sdb": CRT_PATH + '/io/sdb_schema.json',
"caldb": CRT_PATH + '/io/caldb_schema.json'
}
[docs]class EclairsPipelineLocal(object):
"""Main work flow processing for ECLAIRs pipeline
"""
def __init__(self, cfg_path):
"""**constructor**
:param cfg_path: path to the pipeline configuration .ini file.
:type cfg_path: str
:param mode: running mode. Must be either local, sdbcaldb or http
:type mode: str
"""
self.d_status = {
0: "OK",
# GENERAL
1: "[general] parameter validation error",
2: "[general] input working directory does not exist",
3: "[general] output working directory does not exist",
4: "[general] caldb directory does not exist",
5: "file parameters must content [general] section",
6: "convert error : configparser to dictionary",
7: "Calibration directory is ambiguious, ie 2 files defined same effect.",
8: "No file expected in calibration directory."
}
self._set_status(0)
self.cfg = configparser.ConfigParser(
interpolation=configparser.ExtendedInterpolation()
)
if type(cfg_path) is dict:
self.cfg.read_dict(cfg_path)
else:
self.cfg.read(cfg_path)
self.cfg_path = cfg_path
self.d_pars = {}
self.mod_effect = me.ECLAIRsDetectorEffectDefault()
# define ECPI components & associated classes
self.d_components = OrderedDict({
'dpco': ProcessDpcoWithFiles,
# 'cali': ProcessBubeWithFiles,
'bube': ProcessBubeWithFiles,
'imag': ProcessImagWithFiles
})
self.l_components = self.d_components.keys()
def _format_section_parameter_list(self, section='missing section name'):
s = f"\n#### [{section.upper()}] section parameters:\n[{section}]"
try:
for k, v in sorted(list(self.d_pars[section].items())):
s += f"\n{k} = {v}"
s += "\n####"
except:
pass
return s
def _check_section_general(self):
"""Check parameters in general section of .ini parameter file.
More precisely check_params if work dirs exist.
"""
b_all_ok = True
if not self.cfg.has_section('general'):
self._set_status(5)
logger.error(f"\n\t#### {self.status_msg} not in {self.cfg.sections()}")
b_all_ok = False
else:
ret_parser = tpars.cfgparser_to_dict4json(self.cfg, 'general')
if ret_parser is None:
self._set_status(6)
logger.error(self.status_msg)
b_all_ok = False
else:
self.d_pars['general'] = ret_parser
if not tpars.check_add_default_file(DICT_SCHEMA_PATHS['gen'], \
self.d_pars['general']):
self._set_status(1)
logger.error(self.status_msg)
b_all_ok = False
if not b_all_ok:
return False
# check_params if in/out working directories exist.
working_in = self.d_pars['general']['working_in']
working_out = self.d_pars['general']['working_out']
caldb_dir = self.d_pars['general']['caldb_dir']
if not os.path.exists(working_in):
self._set_status(2)
logger.error(self.status_msg)
return False
if not os.path.exists(working_out):
self._set_status(3)
logger.error(self.status_msg)
return False
if caldb_dir == "@local":
caldb_dir = add_path_data_ref_eclairs("instru")
self.d_pars['general']['caldb_dir'] = caldb_dir
if not os.path.exists(caldb_dir):
self._set_status(4)
logger.error(f"directory '{caldb_dir}' doesn't exist.")
return False
self.working_in = working_in
self.working_out = working_out
self.caldb_dir = caldb_dir
return True
def _init_model_effect(self):
l_arf_file = get_arf_files(self.caldb_dir)
if len(l_arf_file) == 0:
self._set_status(8)
logger.error(self.status_msg)
return False
if len(l_arf_file) > 1:
self._set_status(7)
logger.error(self.status_msg)
return False
self.mod_effect.set_arf(l_arf_file[0])
return True
def _before_processing(self):
"""Check parameters from general section of .ini parameter file.
If successful then _ecpi_processing the pipeline one module after the other.
"""
logger.warning(f"\n\t#### Checking parameters list of section [GENERAL]")
if not self._check_section_general():
return False
logger.warning(f"\n\t#### Parameters list of section [GENERAL] OK.\
{self._format_section_parameter_list('general')}")
if not self._ecpi_processing(only_check=True):
return False
# init model effect detector with caldb_dir
self.caldb_dir = self.d_pars["general"]["caldb_dir"]
return self._init_model_effect()
def _after_processing(self):
return True
def _ecpi_processing(self, only_check=False):
"""Run the ECLAIRs GP pipeline.
Checks what are the mentioned sections in the parameter file
and call them successively. The specific parameters needed to _ecpi_processing
a single section are loaded each time this section is called.
:param only_check: flag for component check_params.
:type only_check: bool
"""
if self.status != 0:
return
data_out = None
for cpst in self.l_components:
if cpst in self.cfg.sections():
data_in = data_out
component = self.d_components[cpst]()
assert isinstance(component, GenericComposantProcessing)
component.set_model_effect_detector(self.mod_effect)
if only_check:
logger.info(f"\n\t#### Checking parameters list of [{cpst.upper()}] module")
ret_set_cfg = component.set_cfg_params(self.cfg_path, cpst)
if not ret_set_cfg:
self._set_status_with_comp(component)
logger.error(self.status_msg)
return False
ret_check = component.check_params()
if not ret_check:
# error check_params params
self._set_status_with_comp(component)
logger.error(self.status_msg)
return False
if only_check:
logger.info(f"\n\t#### Parameters list of [{cpst.upper()}] module OK.") # \n\t#### {component.d_pars}")
continue
# add common parameters
string_composant = "\n###########################################################"
string_composant += f"\n\tStarting module [{cpst.upper()}]"
string_composant += "\n##########################################################"
logger.warning(string_composant)
logger.warning(f"{component}")
component.complete_params_with_dict(self.d_pars['general'])
if component.run(data_in):
data_out = component.get_data_out()
else:
# error processing
self._set_status_with_comp(component)
logger.error(self.status_msg)
return False
return True
def _set_status(self, e_int):
"""Set status with error produced by features of EclairsPipeline class
"""
self.status = e_int
self.status_msg = self.d_status[e_int]
def _set_status_with_comp(self, composant):
"""Set status with error produced by component called
"""
self.status = composant.status_int()
self.status_msg = composant.status_msg()
#
# User interface
#
[docs] def run(self):
'''run pipeline, always do after_processing (cleaning mainly)
'''
if self._before_processing():
proc_ok = self._ecpi_processing()
if not proc_ok:
# must do _after_processing
# and keep error processing first
err_proc, mes_err = self.status, self.status_msg
self._after_processing()
self.status, self.status_msg = err_proc, mes_err
return False
else:
return self._after_processing()
else:
# must do _after_processing
# and keep error _before_processing first
err_proc, mes_err = self.status, self.status_msg
self._after_processing()
self.status, self.status_msg = err_proc, mes_err
return False
[docs]class EclairsPipelineSdbCaldb(EclairsPipelineLocal):
""" ECLAIRs pipeline in SDB CALDB mode
"""
def __init__(self, cfg_path):
"""**constructor**
:param cfg_path: path to the pipeline configuration file.
:type cfg_path: str
:param mode: running mode. Must be either local, sdbcaldb or http
:type mode: str
"""
super().__init__(cfg_path)
sdb_caldb_status = {
# SDB
31: "[sdb] parameter validation error",
32: "[sdb] download error",
33: "[sdb] upload error",
# CALDB
65: "[caldb] parameter validation error",
66: "[caldb] download error"
}
self.d_status.update(sdb_caldb_status)
def _create_workspace(self):
"""Create workspace where event & attitude files are
stored in /in and ECPI products are stored in /out.
"""
pass
def _before_processing(self):
"""Check parameters from general, sdb and caldb sections of .ini parameter file.
If successful then _ecpi_processing the pipeline one module after the other.
"""
logger.info(f"\n\t#### Checking parameters list of section [GENERAL]")
if not self._check_section_general():
return False
logger.warning(f"\n\t#### Parameters list of section [GENERAL] OK.\
{self._format_section_parameter_list('general')}")
logger.info("Parameters [general] ok")
self._create_workspace()
if not self._check_pars_sdb():
return False
logger.warning(f"\n\t#### Parameters list of section [SDB] OK.\
{self._format_section_parameter_list('sdb')}")
logger.info("Parameters [sdb] OK")
if not self._check_pars_caldb():
return False
logger.warning(f"\n\t#### Parameters list of section [CALDB] OK.\
{self._format_section_parameter_list('caldb')}")
logger.info("Parameters [caldb] OK")
if not self._ecpi_processing(only_check=True):
return False
logger.info("Parameters ecpi compoment OK")
if not (self._get_files_from_sdb() and self._get_files_from_caldb()):
b_ret = False
else:
b_ret = self._init_model_effect()
return b_ret
def _after_processing(self):
"""Post scientif products to SDB
"""
return self._post_files_to_sdb()
def _check_pars_sdb(self):
"""Check parameters in the sdb section of the .ini file.
"""
ret_parser = tpars.cfgparser_to_dict4json(self.cfg, 'sdb')
if ret_parser is None:
self._set_status(6)
return False
self.d_pars['sdb'] = ret_parser
if not tpars.check_add_default_file(DICT_SCHEMA_PATHS['sdb'], self.d_pars['sdb']):
self._set_status(31)
logger.error(self.status_msg)
return False
self.event_files = self.d_pars['sdb']['event_files']
self.attitude_files = self.d_pars['sdb']['attitude_files']
if isinstance(self.event_files, str):
self.event_files = [self.event_files]
if isinstance(self.attitude_files, str):
self.attitude_files = [self.attitude_files]
return True
def _get_files_from_sdb(self): # pragma: no cover
"""Download event & attitude files from SDB.
"""
def download_list(l_name, type_obj=""):
'''
:param l_name:
:param type_obj:
'''
for file in l_name:
if os.path.isfile(self.working_in + "/" + file):
logger.info(f"{file} file already present in input directory. Skipping.")
continue
logger.info(f"Requesting for {file} file in SDB...")
status = sdb.download_name(file, self.working_in, type_obj)
if not status:
logger.error(f"File: '{file}' could not be downloaded from SDB.")
self._set_status(32)
logger.error(self.status_msg)
return False
return True
if self.status != 0:
return False
if 'event_files' in self.d_pars['sdb']:
ret = download_list(self.d_pars['sdb']['event_files'])
if 'attitude_files' in self.d_pars['sdb']:
ret = ret and download_list(self.d_pars['sdb']['attitude_files'])
if 'l1_files' in self.d_pars['sdb']:
ret = ret and download_list(self.d_pars['sdb']['l1_files'])
if 'l2_files' in self.d_pars['sdb']:
ret = ret and download_list(self.d_pars['sdb']['l2_files'])
if ret:
logger.info(f"All download are OK.")
return ret
def _check_pars_caldb(self):
"""Check parameters in the caldb section of the .ini file.
"""
ret_parser = tpars.cfgparser_to_dict4json(self.cfg, 'caldb')
if ret_parser is None:
self._set_status(6)
logger.error(self.status_msg)
return False
self.d_pars['caldb'] = ret_parser
if not tpars.check_add_default_file(DICT_SCHEMA_PATHS['caldb'], self.d_pars['caldb']):
self._set_status(65)
logger.error(self.status_msg)
return False
self.arf_file = self.d_pars['caldb']['arf_file']
return True
def _get_files_from_caldb(self): # pragma: no cover
"""Download arf for DC-1
"""
if self.status != 0:
return False
arf_filename = self.d_pars['caldb']['arf_file']
arf_path = self.caldb_dir + "/" + arf_filename
if os.path.exists(arf_path):
logger.info(f"{arf_filename} file already present in caldb_dir directory. Skipping.")
return True
if not caldb.download_file_from_caldb(self.caldb_dir, arf_filename):
logger.error(f"{arf_filename} file could not be downloaded from CALDB.")
return False
logger.info(f"{arf_filename} file successfully downloaded from CALDB.")
return True
def _get_files_from_caldb_gen(self): # pragma: no cover
"""Download instrument related files from CALDB.
"""
if self.status != 0:
return False
arf_file = [fname for fname in glob(self.caldb_dir + "/*.fits") \
if fname.split("/")[-1].startswith("ECL-RSP-ENE")]
if arf_file != []:
arf_filename = arf_file[0].split("/")[-1]
logger.info(f"{arf_filename} file already present in caldb_dir directory. Skipping.")
else:
logger.info(f"Requesting for ARF file in CALDB...")
status = caldb.receive_arf_file_from_caldb(self.caldb_dir)
if not status:
logger.error("ARF file could not be downloaded from CALDB.")
self._set_status(1)
logger.info("ARF file successfully downloaded from SDB.")
return True
def _post_files_to_sdb(self): # pragma: no cover
"""Upload ECLAIRs GP pipeline scientific products to SDB.
"""
for file in glob(self.working_out + "/*.fits"):
status = sdb.send_product_ecpi(file)
if not status:
logger.warning(f"{file} file could not be uploaded to SDB.")
self._set_status(33)
logger.error(self.status_msg)
break
logger.info(f"{file} file successfully uploaded to SDB.")
return True
[docs]class EclairsPipelineForHttpServer(EclairsPipelineSdbCaldb):
"""ECLAIRs pipeline in HTTP server mode
"""
def __init__(self, cfg_path, id_proc=None):
"""**constructor**
:param cfg_path: path to the pipeline configuration file.
:type cfg_path: str
"""
super().__init__(cfg_path)
self.caldb_dir = add_path_process_eclairs("caldb_dir")
if not os.path.exists(self.caldb_dir):
os.makedirs(self.caldb_dir)
else:
os.system(f'rm -rf {self.caldb_dir}/*')
if id_proc is None:
creation_time = time.strftime("%Y%m%dT%H%M%S")
self.id_proc = f"ECL_{creation_time}"
else:
self.id_proc = id_proc
self.dirname = add_path_process_eclairs(self.id_proc)
def _check_section_general(self):
"""Check parameters in general section of .ini parameter file.
More precisely check_params if work dirs exist.
"""
# todo verfier si general exist sinon le creer vide dans d_pars
if not self.cfg.has_section('general'):
logger.warning(f"\n\t#### Section [general] not in sections list {self.cfg.sections()}\
\n\t\tsection [general] added with default parameters")
self.cfg.add_section('general')
ret_parser = tpars.cfgparser_to_dict4json(self.cfg, 'general')
if ret_parser is None:
self._set_status(6)
return False
self.d_pars['general'] = ret_parser
if not tpars.check_add_default_file(DICT_SCHEMA_PATHS['gen_http'], \
self.d_pars['general']):
self._set_status(1)
logger.error(self.status_msg)
return False
return True
def _after_processing(self):
self._post_files_to_sdb()
self._clear_workspace()
return self.status == 0
def _create_workspace(self):
"""Create workspace where event & attitude files are
stored in /in and ECPI products are stored in /out.
"""
os.makedirs(self.dirname)
os.makedirs(self.dirname + "/in")
os.makedirs(self.dirname + "/out")
self.working_in = self.dirname + '/in'
self.working_out = self.dirname + '/out'
logger.info(f"Create working_in: {self.working_in}")
self.d_pars['general']["working_in"] = self.working_in
self.d_pars['general']["working_out"] = self.working_out
self.d_pars['general']["caldb_dir"] = self.caldb_dir
def _clear_workspace(self): # pragma: no cover
"""Clean workspace:
"""
os.system(f"rm -rf {self.dirname}")
logger.info("Cleaned input and output directories.")