Source code for ecpi.process.server.manage_pipeline

"""
@author: Colley Jean-Marc, APC/IN2P3/CNRS
"""
import os
import time
import string 
import threading
import logging
import random
import copy as cp
import multiprocessing as mpc
from ecpi.common import add_path_process_eclairs
from ecpi.process.pipeline.lib_ecpi_pipeline import EclairsPipelineForHttpServer
from ecpi.logger_ecpi import ECPILogger, return_level

S_logger = logging.getLogger('server.manager')


[docs]class MessagingFSCnone(object): """ @author: Colley Jean-Marc, APC/IN2P3/CNRS """
[docs] def publish_server_start(self): pass
[docs] def publish_periodic_status(self): pass
[docs] def publish_ecpi_start(self, proc_id): pass
[docs] def publish_ecpi_end(self, proc_id, error_code, error_message): pass
[docs]def get_upload_path(): """ Getter for upload path """ return add_path_process_eclairs('')
[docs]def process_ecpi(id_process, f_pars, p_queue): """ Process ECPI for given ID. """ level = return_level(f_pars) o_log = ECPILogger(level, add_path_process_eclairs(id_process + '.log'), False) o_log.message_start() try: ecpi = EclairsPipelineForHttpServer(f_pars, id_process) ecpi.run() exit_code = ecpi.status exit_mes = ecpi.status_msg except: o_log.logger.exception('Got exception on main handler') exit_code = -1 exit_mes = 'very bad news, python exception !!!!' o_log.message_end() # send exit values to parents processus p_queue.put([exit_code, exit_mes])
[docs]def dummy_ecpi(id_process, f_pars, p_queue): """ Dummy ECPI for testing. """ print("je lance le pipeline ecpi ", id_process) print("avec le fichier de parametres ", f_pars, flush=True) time_dodo = random.randint(1, 10) time.sleep(time_dodo) exit_code = random.randint(0, 4) exit_mes = f"j'ai fait dodo durant {time_dodo}s." p_queue.put([exit_code, exit_mes])
[docs]class ManageProcessusEcpi(object): """ manage: * start processing * update status processing with thread timer or status request * nats message associated processing @author: Colley Jean-Marc, APC/IN2P3/CNRS """ instance = None def __new__(cls, *args, **kargs): """ used singleton design pattern, ie only one instance of class ManageProcessusEcpi """ if cls.instance is None: return object.__new__(cls, *args, **kargs) else: return cls.instance def __init__(self): if ManageProcessusEcpi.instance is None: ManageProcessusEcpi.instance = self self.lock = threading.RLock() # running processing dict : "name" : [process, queue] self.pipelines_running = {} # finished processing dict : "name" : [exit value, exit message] self.pipelines_finished = {} self.pipeline_last = None # by default messaging do nothing self.nats = MessagingFSCnone()
[docs] def init_nats(self, nats): self.nats = nats
[docs] def whoiam(self): # self description print(self)
def _create_info_proc(self, proc_name): # create info dict for process with # name proc_name info_proc = { "name" : proc_name, "start" : time.strftime("%c") } return info_proc
[docs] def update_status_pipeline(self): S_logger.debug('update_status_pipeline') with self.lock: #print('update status processing') d_ppline = cp.copy(self.pipelines_running) for name, info in d_ppline.items(): #print(info, flush=True) proc_ecpi = info[0] assert isinstance(proc_ecpi, mpc.Process) if not proc_ecpi.is_alive(): self.pipelines_running.pop(name) mes_queue = info[1].get() self.pipelines_finished[name] = mes_queue self.nats.publish_ecpi_end(name, mes_queue[0], mes_queue[1]) S_logger.info( (f"{name} is finished with status ", f"{mes_queue[0]} and mess {mes_queue[1]}") )
[docs] def is_valid_proc_name(self, proc_name): return proc_name in self.pipelines_running or \ proc_name in self.pipelines_finished
[docs] def get_exit_value(self, process_name): if process_name in self.pipelines_finished: return self.pipelines_finished[process_name][0] else: return None
[docs] def get_exit_message(self, process_name): if process_name in self.pipelines_finished: return self.pipelines_finished[process_name][1] else: return None
[docs] def nb_pipeline_running(self): """ Return number of running processes. """ return len(self.pipelines_running)
[docs] def nb_pipeline_run(self): return len(self.pipelines_finished)
def _get_id(self): msize = 6 mchars = string.ascii_letters + string.digits idp = 'ECL_' + ''.join(random.choice(mchars) for _ in range(msize)) return idp
[docs] def submit_ecpi(self, f_pars): """ Submit jobs to ECPI """ with self.lock: proc_name = self._get_id() mtarget = process_ecpi mname = proc_name p_queue = mpc.Queue() margs = (proc_name, f_pars, p_queue) pipe_proc = mpc.Process(target=mtarget, name=mname, args=margs) pipe_proc.start() self.nats.publish_ecpi_start(proc_name) S_logger.info(f"submit pipeline : {proc_name}") self.pipeline_last = pipe_proc.name self.pipelines_running[proc_name] = [pipe_proc, p_queue] return proc_name
[docs] def is_running(self, p_id): # return whther process ID is running return p_id in self.pipelines_running
[docs]def timer_update_status(manager): """ Update pipeline status. """ assert isinstance(manager, ManageProcessusEcpi) while True: time.sleep(1) manager.update_status_pipeline()
[docs]def timer_nats(manager): """ Set NATS messaging period to 60s. """ assert isinstance(manager, ManageProcessusEcpi) period_nats_message = 60 while True: time.sleep(period_nats_message) manager.nats.publish_periodic_status()
# ============================================ # Thread update status on NATS and processing # ============================================ S_manager = ManageProcessusEcpi() S_thread_update = threading.Thread(target=timer_update_status, args=(S_manager,)) #TODO: comment next line before generate documentation and uncomment after # S_thread_update.start() S_thread_nats = threading.Thread(target=timer_nats, args=(S_manager,)) #TODO: comment next line before generate documentation and uncomment after # S_thread_nats.start() # ============================================