Source code for ecpi.process.generic.component

"""Implementation of generic module components ran by ECPI.
"""

import os.path
import logging
import json
import configparser
import abc
import ecpi.common.params.tools_check as tc
import ecpi.common.params.tools_params as tpars
import ecpi.common.instru.model_effect as me


from ecpi.pipeline.io.data_flow import DataFlow

s_logger = logging.getLogger(__name__)


[docs]class GenericComponentProcessing(abc.ABC): """. Mother class for all ECPI modules. """ def __init__(self, file_schema): """**constructor** :param file_schema: path to the .json scheme :type file_schema: str """ self.component_name = "" self.d_pars = {} self.sections = None self.data_flow = None self.mod_effect = None self.error_range = 0 self.status = 0 self.d_status = { 0: "OK", 1: "parameter validation error", 2: "input working isn't present in the parameter dict", 3: "input working directory does not exist", 4: "convert error : configparser to dictionary", 5: "ECLAIRs model effect is not valid" } with open(file_schema, "r") as fichier: self.json_schema = json.load(fichier) def __str__(self): """class string information """ s = f"\n#### [{self.component_name.upper()}] module parameters:\n[{self.component_name}]" for k, v in sorted(list(self.d_pars.items())): # if type(v) == list and type(v[0]) is str: v = v.__str__().replace("'", '"') s += f"\n{k} = {v}" s += "\n####" return s def _check_add_default_dict(self): """Check value and add optional default value for absent parameter """ # TODO : recover the previous definition. We print the dictionary for # debug purposes # return tc.check_add_default_dict(self.json_schema, self.d_pars) check_status = tc.check_add_default_dict(self.json_schema, self.d_pars) for key in self.d_pars.keys(): s_logger.debug(f"Key : {key} with value {self.d_pars[key]}") return check_status def _load_check_file_in(self): """Load file in memory d_data_in attribut and check_params input data file: presence, type ,coherence, ... #TODO: using parameter dir_in for read input files """ # debugging print(f"values of working_in/out {self.d_pars}") # must be defined in derived class # initialize self.d_data_in if "working_in" not in self.d_pars: self.status = 2 return False elif not os.path.exists(self.d_pars["working_in"]): self.status = 3 return False return True def _tbd_check_unknown_pars(self): """Return False if d_pars content a parameters not content in his scheme json #TODO: for avoiding typos in input parameters passe sous silence par une redéfinition par sa valeur par défaut """ pass @abc.abstractmethod def _create_output_files(self): """Create output files """ return NotImplemented @abc.abstractmethod def _process_component(self): """Run component """ return NotImplemented @abc.abstractmethod def _check_extra(self): """Check more complex rules """ return NotImplemented # # User interface #
[docs] def set_model_effect_detector(self, mod_dpix): if not isinstance(mod_dpix, me.ECLAIRsDetectorEffect): self.status = 5 return False self.mod_effect = mod_dpix return True
[docs] def set_dict_params(self, dict_pars): """Set dictionary parameters :param dict_pars: :type dict_pars: """ self.d_pars = dict_pars
[docs] def set_cfg_params(self, cfg_pars, section): """Set parameters of component with dict or file with configparser :param cfg_pars: parameters containers dict or file :type cfg_pars: dict or string :param section: name of ecpi section :type section: str """ cfg = configparser.ConfigParser(interpolation=configparser.ExtendedInterpolation()) if type(cfg_pars) is dict: cfg.read_dict(cfg_pars) elif os.path.isfile(cfg_pars): cfg.read(cfg_pars) elif type(cfg_pars) is str: # cfg_pars is a string that does not contain the path to # a config filename => then it is the content of a configuration cfg.read_string(cfg_pars) ret_parser = tpars.cfgparser_to_dicttyped(cfg, section) if ret_parser is None: self.status = 4 return False self.d_pars = ret_parser self.sections = cfg.sections self.component_name = section return True
[docs] def complete_params_with_dict(self, new_dict_pars): """Add params from section if not present in self.d_pars """ self.d_pars.update(new_dict_pars)
[docs] def check_params(self): """Check parameters of component Parameters of component are in self.d_pars attribute """ if self._check_add_default_dict(): return self._check_extra() else: self.status = 1 return False
[docs] def run(self, data_in_obj=None): """Run/processing with parameters and d_data_in :param data_in_obj: dictionary with data object """ assert isinstance(data_in_obj, DataFlow) assert self.component_name != "" s_logger.info(f"Running main run() process of component [{self.component_name}]") self.data_flow = data_in_obj if not data_in_obj.is_ready(self.component_name) and not self._load_check_file_in(): s_logger.error(f'Error checking data_flow content in {self.component_name}') return False if not self._process_component(): s_logger.error(f'Error processing {self.component_name}') return False return self._create_output_files()
[docs] def status_int(self): """Return status number """ return self.status + self.error_range
[docs] def status_msg(self): """Return status message """ msg_err = self.d_status[self.status] return f"[{self.component_name}]: {msg_err}"