Source code for ecpi.process.server.manage_pipeline

"""
@author: Colley Jean-Marc, APC/IN2P3/CNRS
"""
import os
import time
import string 
import threading
import logging
import random
import datetime
import copy as cp
import multiprocessing as mpc
from ecpi.common import add_path_process_eclairs
from ecpi.process.pipeline.lib_ecpi_pipeline import EclairsPipelineForHttpServer
from ecpi.process.pipeline.qla import EclairsQlaLevel1
from ecpi.logger_ecpi import ECPILogger, return_level

S_logger = logging.getLogger('werkzeug.process')


[docs]class MessagingFSCnone(object): """ @author: Colley Jean-Marc, APC/IN2P3/CNRS """
[docs] def publish_server_start(self): pass
[docs] def publish_periodic_status(self): pass
[docs] def publish_ecpi_start(self, proc_id): pass
[docs] def publish_ecpi_end(self, proc_id, error_code, error_message): pass
[docs]def get_upload_path(): """ Getter for upload path """ return add_path_process_eclairs('')
[docs]def process_ecpi(id_process, f_pars, p_queue): """ Process ECPI for given ID. """ level = return_level(f_pars) o_log = ECPILogger(level, add_path_process_eclairs(id_process + '.log'), False) o_log.message_start() try: ecpi = EclairsPipelineForHttpServer(f_pars, id_process) ecpi.run() exit_code = ecpi.status exit_mes = ecpi.status_msg except: o_log.logger.exception('Got exception on main handler') exit_code = -1 exit_mes = 'very bad news, python exception !!!!' o_log.message_end() # send exit values to parents processus p_queue.put([exit_code, exit_mes])
[docs]def process_qla_level1(id_process, params, p_queue): '''launch QLA level 1 pipeline :param id_process: :param params: :param p_queue: ''' o_log = ECPILogger('debug', add_path_process_eclairs(id_process + '.log'), False) o_log.message_start() try: qla = EclairsQlaLevel1(params, id_process) qla.run() exit_code = qla.status exit_mes = qla.status_msg except: o_log.logger.exception('Got exception on main handler') exit_code = -1 exit_mes = 'very bad news, python exception !!!!' o_log.message_end() # send exit values to parents processus p_queue.put([exit_code, exit_mes])
[docs]def dummy_ecpi(id_process, f_pars, p_queue): """ Dummy ECPI for testing. """ print("je lance le pipeline ecpi ", id_process) print("avec le fichier de parametres ", f_pars, flush=True) time_dodo = random.randint(1, 10) time.sleep(time_dodo) exit_code = random.randint(0, 4) exit_mes = f"j'ai fait dodo durant {time_dodo}s." p_queue.put([exit_code, exit_mes])
[docs]class ManageProcessusEcpi(object): """ manage: * start processing * update status processing with thread timer or status request * nats message associated processing #TODO: manage process finished keep 100 last ? before i month ? @author: Colley Jean-Marc, APC/IN2P3/CNRS """ instance = None def __new__(cls, *args, **kargs): """submit_process used singleton design pattern, ie only one instance of class ManageProcessusEcpi """ if cls.instance is None: return object.__new__(cls, *args, **kargs) else: return cls.instance def __init__(self): if ManageProcessusEcpi.instance is None: ManageProcessusEcpi.instance = self self.lock = threading.RLock() # running processing dict : "name" : [process, queue] self.pipelines_running = {} # finished processing dict : "name" : [exit value, exit message] self.pipelines_finished = {} self.pipeline_last = None # by default messaging do nothing self.nats = MessagingFSCnone()
[docs] def init_nats(self, nats): self.nats = nats
[docs] def whoiam(self): # self description print(self)
[docs] def update_status_pipeline(self): S_logger.debug('update_status_pipeline') with self.lock: # print('update status processing') d_ppline = cp.copy(self.pipelines_running) for name, info in d_ppline.items(): #print(info, flush=True) proc_ecpi = info['process'] assert isinstance(proc_ecpi, mpc.Process) if not proc_ecpi.is_alive(): # process finished self.pipelines_running.pop(name) mes_queue = info['queue'].get() new_info = info.copy() new_info["queue"] = mes_queue new_info.pop('process') #print('add ', name, new_info) self.pipelines_finished[name] = new_info self.nats.publish_ecpi_end(name, mes_queue[0], mes_queue[1]) mes_end = f"{name} is finished with status " mes_end += f"{mes_queue[0]} and mess '{mes_queue[1]}'" S_logger.info(mes_end)
[docs] def is_valid_proc_name(self, proc_name): with self.lock: valid = proc_name in self.pipelines_running or \ proc_name in self.pipelines_finished return valid
[docs] def get_exit_value(self, process_name): if process_name in self.pipelines_finished: return self.pipelines_finished[process_name]['queue'][0] else: return None
[docs] def get_exit_message(self, process_name): if process_name in self.pipelines_finished: return self.pipelines_finished[process_name]['queue'][1] else: return None
[docs] def nb_pipeline_running(self): """ Return number of running processes. """ with self.lock: nb_run = len(self.pipelines_running) return nb_run
[docs] def nb_pipeline_done(self): with self.lock: nb_done = len(self.pipelines_finished) return nb_done
def _get_id(self): msize = 6 mchars = string.ascii_letters + string.digits idp = 'ECL_' + ''.join(random.choice(mchars) for _ in range(msize)) return idp
[docs] def submit_process(self, p_func, p_params): """ Submit p_func(p_params) in processpro """ with self.lock: proc_name = self._get_id() mtarget = p_func mname = proc_name p_queue = mpc.Queue() margs = (proc_name, p_params, p_queue) pipe_proc = mpc.Process(target=mtarget, name=mname, args=margs) pipe_proc.start() self.nats.publish_ecpi_start(proc_name) S_logger.info(f"submit pipeline : {proc_name}") self.pipeline_last = pipe_proc.name date_start = datetime.datetime.now().replace(microsecond=0).isoformat() self.pipelines_running[proc_name] = { 'process':pipe_proc, 'queue':p_queue, 'params': p_params, 'date_start': date_start } return proc_name
[docs] def submit_ecpi(self, f_pars): """ Submit process ECPI """ return self.submit_process(process_ecpi, f_pars)
[docs] def is_running(self, p_id): # return whther process ID is running with self.lock: running = p_id in self.pipelines_running return running
[docs] def get_all_name_proc(self): with self.lock: all_names = list(self.pipelines_finished.keys()) +\ list(self.pipelines_running.keys()) return all_names
[docs]def timer_update_status(manager): """ Update pipeline status. """ assert isinstance(manager, ManageProcessusEcpi) while True: time.sleep(1) manager.update_status_pipeline()
[docs]def timer_nats(manager): """ Set NATS messaging period to 60s. """ assert isinstance(manager, ManageProcessusEcpi) period_nats_message = 5*60 while True: time.sleep(period_nats_message) manager.nats.publish_periodic_status()
# ============================================ # Thread update status on NATS and processing # ============================================ S_manager = ManageProcessusEcpi() S_thread_update = threading.Thread(target=timer_update_status, args=(S_manager,)) # TODO: comment next line before generate documentation and uncomment after #S_thread_update.start() S_thread_nats = threading.Thread(target=timer_nats, args=(S_manager,)) # TODO: comment next line before generate documentation and uncomment after #S_thread_nats.start() # ============================================