"""Implementation of generic module components ran by ECPI.
"""
import os.path
import logging
import json
import configparser
import abc
import ecpi.common.params.tools_check as tc
import ecpi.common.params.tools_params as tpars
import ecpi.common.instru.model_effect as me
from ecpi.pipeline.io.data_flow import DataFlow
s_logger = logging.getLogger(__name__)
[docs]class GenericComponentProcessing(abc.ABC):
""".
Mother class for all ECPI modules.
"""
def __init__(self, file_schema):
"""**constructor**
:param file_schema: path to the .json scheme
:type file_schema: str
"""
self.component_name = ""
self.d_pars = {}
self.sections = None
self.data_flow = None
self.mod_effect = None
self.error_range = 0
self.status = 0
self.d_status = {
0: "OK",
1: "parameter validation error",
2: "input working isn't present in the parameter dict",
3: "input working directory does not exist",
4: "convert error : configparser to dictionary",
5: "ECLAIRs model effect is not valid"
}
with open(file_schema, "r") as fichier:
self.json_schema = json.load(fichier)
def __str__(self):
"""class string information
"""
s = f"\n#### [{self.component_name.upper()}] module parameters:\n[{self.component_name}]"
for k, v in sorted(list(self.d_pars.items())):
# if type(v) == list and type(v[0]) is str:
v = v.__str__().replace("'", '"')
s += f"\n{k} = {v}"
s += "\n####"
return s
def _check_add_default_dict(self):
"""Check value and add optional default value for absent parameter
"""
# TODO : recover the previous definition. We print the dictionary for
# debug purposes
# return tc.check_add_default_dict(self.json_schema, self.d_pars)
check_status = tc.check_add_default_dict(self.json_schema, self.d_pars)
for key in self.d_pars.keys():
s_logger.debug(f"Key : {key} with value {self.d_pars[key]}")
return check_status
def _load_check_file_in(self):
"""Load file in memory d_data_in attribut
and check_params input data file: presence, type ,coherence, ...
#TODO: using parameter dir_in for read input files
"""
# debugging
print(f"values of working_in/out {self.d_pars}")
# must be defined in derived class
# initialize self.d_data_in
if "working_in" not in self.d_pars:
self.status = 2
return False
elif not os.path.exists(self.d_pars["working_in"]):
self.status = 3
return False
return True
def _tbd_check_unknown_pars(self):
"""Return False if d_pars content a parameters
not content in his scheme json
#TODO: for avoiding typos in input parameters
passe sous silence par une redéfinition par
sa valeur par défaut
"""
pass
@abc.abstractmethod
def _create_output_files(self):
"""Create output files
"""
return NotImplemented
@abc.abstractmethod
def _process_component(self):
"""Run component
"""
return NotImplemented
@abc.abstractmethod
def _check_extra(self):
"""Check more complex rules
"""
return NotImplemented
#
# User interface
#
[docs] def set_model_effect_detector(self, mod_dpix):
if not isinstance(mod_dpix, me.ECLAIRsDetectorEffect):
self.status = 5
return False
self.mod_effect = mod_dpix
return True
[docs] def set_dict_params(self, dict_pars):
"""Set dictionary parameters
:param dict_pars:
:type dict_pars:
"""
self.d_pars = dict_pars
[docs] def set_cfg_params(self, cfg_pars, section):
"""Set parameters of component with dict or file with configparser
:param cfg_pars: parameters containers dict or file
:type cfg_pars: dict or string
:param section: name of ecpi section
:type section: str
"""
cfg = configparser.ConfigParser(interpolation=configparser.ExtendedInterpolation())
if type(cfg_pars) is dict:
cfg.read_dict(cfg_pars)
elif os.path.isfile(cfg_pars):
cfg.read(cfg_pars)
elif type(cfg_pars) is str:
# cfg_pars is a string that does not contain the path to
# a config filename => then it is the content of a configuration
cfg.read_string(cfg_pars)
ret_parser = tpars.cfgparser_to_dicttyped(cfg, section)
if ret_parser is None:
self.status = 4
return False
self.d_pars = ret_parser
self.sections = cfg.sections
self.component_name = section
return True
[docs] def complete_params_with_dict(self, new_dict_pars):
"""Add params from section if not present in self.d_pars
"""
self.d_pars.update(new_dict_pars)
[docs] def check_params(self):
"""Check parameters of component
Parameters of component are in self.d_pars attribute
"""
if self._check_add_default_dict():
return self._check_extra()
else:
self.status = 1
return False
[docs] def run(self, data_in_obj=None):
"""Run/processing with parameters and d_data_in
:param data_in_obj: dictionary with data object
"""
assert isinstance(data_in_obj, DataFlow)
assert self.component_name != ""
s_logger.info(f"Running main run() process of component [{self.component_name}]")
self.data_flow = data_in_obj
if not data_in_obj.is_ready(self.component_name) and not self._load_check_file_in():
s_logger.error(f'Error checking data_flow content in {self.component_name}')
return False
if not self._process_component():
s_logger.error(f'Error processing {self.component_name}')
return False
return self._create_output_files()
[docs] def status_int(self):
"""Return status number
"""
return self.status + self.error_range
[docs] def status_msg(self):
"""Return status message
"""
msg_err = self.d_status[self.status]
return f"[{self.component_name}]: {msg_err}"