Source code for ecpi.server.manage_pipeline

"""Management of processing submit on HTTP ECPI server
"""
import os
import time
import string 
import threading
import logging
import random
import datetime
import copy as cp
import multiprocessing as mpc

from ecpi.common import add_path_process_eclairs
from ecpi.pipeline.lib.ecpi_pipeline import EclairsPipelineForHttpServer
from ecpi.pipeline.qla import EclairsQlaLevel1
from ecpi.logger_ecpi import ECPILogger, return_level

S_logger = logging.getLogger('werkzeug.process')


[docs]class MessagingFSCnone(object): """ Interface for messaging handle """
[docs] def publish_server_start(self): pass
[docs] def publish_periodic_status(self): pass
[docs] def publish_ecpi_start(self, proc_id): pass
[docs] def publish_ecpi_end(self, proc_id, error_code, error_message): pass
[docs]def get_upload_path(): """ Getter for upload path """ return add_path_process_eclairs('')
[docs]def process_ecpi(id_process, f_pars, p_queue): """ Process ECPI for given ID. """ level = return_level(f_pars) o_log = ECPILogger(level, add_path_process_eclairs(id_process + '.log'), False) o_log.message_start() try: ecpi = EclairsPipelineForHttpServer(f_pars, id_process) ecpi.run() exit_code = ecpi.status exit_mes = ecpi.status_msg except: o_log.logger.exception('Got exception on main handler') exit_code = -1 exit_mes = 'very bad news, python exception !!!!' o_log.message_end() # send exit values to parents processus p_queue.put([exit_code, exit_mes])
[docs]def process_qla_level1(id_process, params, p_queue): """launch QLA level 1 pipeline :param id_process: :param params: :param p_queue: """ o_log = ECPILogger('debug', add_path_process_eclairs(id_process + '.log'), False) o_log.message_start() try: qla = EclairsQlaLevel1(params, id_process) qla.run() exit_code = qla.status exit_mes = qla.status_msg except: o_log.logger.exception('Got exception on main handler') exit_code = -1 exit_mes = 'very bad news, python exception !!!!' o_log.message_end() # send exit values to parents processus p_queue.put([exit_code, exit_mes])
[docs]def dummy_ecpi(id_process, f_pars, p_queue): """ Dummy ECPI for testing. """ # print("je lance le pipeline ecpi ", id_process) # print("avec le fichier de parametres ", f_pars, flush=True) time_dodo = random.randint(1, 10) time.sleep(time_dodo) exit_code = random.randint(0, 4) exit_mes = f"j'ai fait dodo durant {time_dodo}s." p_queue.put([exit_code, exit_mes])
[docs]class ManageProcessusEcpi(object): """ manage: * start processing * update status processing with thread timer or status request * nats message associated processing #TODO: manage process finished keep 100 last ? before i month ? """ instance = None def __new__(cls, *args, **kargs): """submit_process used singleton design pattern, ie only one instance of class ManageProcessusEcpi """ if cls.instance is None: return object.__new__(cls, *args, **kargs) else: return cls.instance def __init__(self): if ManageProcessusEcpi.instance is None: ManageProcessusEcpi.instance = self self.lock = threading.RLock() # running processing dict : "name" : [process, queue] self.pipelines_running = {} # finished processing dict : "name" : [exit value, exit message] self.pipelines_finished = {} self.pipeline_last = None # by default messaging do nothing self.nats = MessagingFSCnone()
[docs] def init_nats(self, nats): self.nats = nats
[docs] def whoiam(self): # self description S_logger.debug(self)
[docs] def update_status_pipeline(self): S_logger.debug('update_status_pipeline') with self.lock: # print('update status processing') d_ppline = cp.copy(self.pipelines_running) for name, info in d_ppline.items(): #print(info, flush=True) proc_ecpi = info['process'] assert isinstance(proc_ecpi, mpc.Process) if not proc_ecpi.is_alive(): # process finished self.pipelines_running.pop(name) mes_queue = info['queue'].get() new_info = info.copy() new_info["queue"] = mes_queue new_info.pop('process') #print('add ', name, new_info) self.pipelines_finished[name] = new_info self.nats.publish_ecpi_end(name, mes_queue[0], mes_queue[1]) mes_end = f"{name} is finished with status " mes_end += f"{mes_queue[0]} and mess '{mes_queue[1]}'" S_logger.info(mes_end)
[docs] def is_valid_proc_name(self, proc_name): with self.lock: valid = proc_name in self.pipelines_running or \ proc_name in self.pipelines_finished return valid
[docs] def get_exit_value(self, process_name): if process_name in self.pipelines_finished: return self.pipelines_finished[process_name]['queue'][0] else: return None
[docs] def get_exit_message(self, process_name): if process_name in self.pipelines_finished: return self.pipelines_finished[process_name]['queue'][1] else: return None
[docs] def nb_pipeline_running(self): """ Return number of running processes. """ with self.lock: nb_run = len(self.pipelines_running) return nb_run
[docs] def nb_pipeline_done(self): with self.lock: nb_done = len(self.pipelines_finished) return nb_done
def _get_id(self): msize = 6 mchars = string.ascii_letters + string.digits idp = 'ECL_' + ''.join(random.choice(mchars) for _ in range(msize)) return idp
[docs] def submit_process(self, p_func, p_params): """ Submit p_func(p_params) in processpro """ with self.lock: proc_name = self._get_id() mtarget = p_func mname = proc_name p_queue = mpc.Queue() margs = (proc_name, p_params, p_queue) pipe_proc = mpc.Process(target=mtarget, name=mname, args=margs) pipe_proc.start() self.nats.publish_ecpi_start(proc_name) S_logger.info(f"submit pipeline : {proc_name}") self.pipeline_last = pipe_proc.name date_start = datetime.datetime.now().replace(microsecond=0).isoformat() self.pipelines_running[proc_name] = { 'process':pipe_proc, 'queue':p_queue, 'params': p_params, 'date_start': date_start } return proc_name
[docs] def submit_ecpi(self, f_pars): """ Submit process ECPI """ return self.submit_process(process_ecpi, f_pars)
[docs] def is_running(self, p_id): # return whther process ID is running with self.lock: running = p_id in self.pipelines_running return running
[docs] def get_all_name_proc(self): with self.lock: all_names = list(self.pipelines_finished.keys()) +\ list(self.pipelines_running.keys()) return all_names
[docs]def timer_update_status(manager): """ Update pipeline status. """ assert isinstance(manager, ManageProcessusEcpi) while True: time.sleep(1) manager.update_status_pipeline()
[docs]def timer_nats(manager): """ Set NATS messaging period to 60s. """ assert isinstance(manager, ManageProcessusEcpi) period_nats_message = 5 * 60 while True: time.sleep(period_nats_message) manager.nats.publish_periodic_status()
# ============================================ # Thread update status on NATS and processing # ============================================ S_manager = ManageProcessusEcpi() S_thread_update = threading.Thread(target=timer_update_status, args=(S_manager,)) # TODO: comment next line before generate documentation and uncomment after #S_thread_update.start() S_thread_nats = threading.Thread(target=timer_nats, args=(S_manager,)) # TODO: comment next line before generate documentation and uncomment after #S_thread_nats.start() # ============================================