"""
@author: Colley Jean-Marc, APC/IN2P3/CNRS
"""
import os
import time
import string
import threading
import logging
import random
import copy as cp
import multiprocessing as mpc
from ecpi.common import add_path_process_eclairs
from ecpi.process.pipeline.lib_ecpi_pipeline import EclairsPipelineForHttpServer
from ecpi.logger_ecpi import ECPILogger, return_level
S_logger = logging.getLogger('server.manager')
[docs]class MessagingFSCnone(object):
"""
@author: Colley Jean-Marc, APC/IN2P3/CNRS
"""
[docs] def publish_server_start(self):
pass
[docs] def publish_periodic_status(self):
pass
[docs] def publish_ecpi_start(self, proc_id):
pass
[docs] def publish_ecpi_end(self, proc_id, error_code, error_message):
pass
[docs]def get_upload_path():
"""
Getter for upload path
"""
return add_path_process_eclairs('')
[docs]def process_ecpi(id_process, f_pars, p_queue):
"""
Process ECPI for given ID.
"""
level = return_level(f_pars)
o_log = ECPILogger(level, add_path_process_eclairs(id_process + '.log'), False)
o_log.message_start()
try:
ecpi = EclairsPipelineForHttpServer(f_pars, id_process)
ecpi.run()
exit_code = ecpi.status
exit_mes = ecpi.status_msg
except:
o_log.logger.exception('Got exception on main handler')
exit_code = -1
exit_mes = 'very bad news, python exception !!!!'
o_log.message_end()
# send exit values to parents processus
p_queue.put([exit_code, exit_mes])
[docs]def dummy_ecpi(id_process, f_pars, p_queue):
"""
Dummy ECPI for testing.
"""
print("je lance le pipeline ecpi ", id_process)
print("avec le fichier de parametres ", f_pars, flush=True)
time_dodo = random.randint(1, 10)
time.sleep(time_dodo)
exit_code = random.randint(0, 4)
exit_mes = f"j'ai fait dodo durant {time_dodo}s."
p_queue.put([exit_code, exit_mes])
[docs]class ManageProcessusEcpi(object):
"""
manage:
* start processing
* update status processing with thread timer or status request
* nats message associated processing
@author: Colley Jean-Marc, APC/IN2P3/CNRS
"""
instance = None
def __new__(cls, *args, **kargs):
"""
used singleton design pattern, ie only one instance of class ManageProcessusEcpi
"""
if cls.instance is None:
return object.__new__(cls, *args, **kargs)
else:
return cls.instance
def __init__(self):
if ManageProcessusEcpi.instance is None:
ManageProcessusEcpi.instance = self
self.lock = threading.RLock()
# running processing dict : "name" : [process, queue]
self.pipelines_running = {}
# finished processing dict : "name" : [exit value, exit message]
self.pipelines_finished = {}
self.pipeline_last = None
# by default messaging do nothing
self.nats = MessagingFSCnone()
[docs] def init_nats(self, nats):
self.nats = nats
[docs] def whoiam(self):
# self description
print(self)
def _create_info_proc(self, proc_name):
# create info dict for process with
# name proc_name
info_proc = {
"name" : proc_name,
"start" : time.strftime("%c")
}
return info_proc
[docs] def update_status_pipeline(self):
S_logger.debug('update_status_pipeline')
with self.lock:
#print('update status processing')
d_ppline = cp.copy(self.pipelines_running)
for name, info in d_ppline.items():
#print(info, flush=True)
proc_ecpi = info[0]
assert isinstance(proc_ecpi, mpc.Process)
if not proc_ecpi.is_alive():
self.pipelines_running.pop(name)
mes_queue = info[1].get()
self.pipelines_finished[name] = mes_queue
self.nats.publish_ecpi_end(name, mes_queue[0], mes_queue[1])
S_logger.info(
(f"{name} is finished with status ",
f"{mes_queue[0]} and mess {mes_queue[1]}")
)
[docs] def is_valid_proc_name(self, proc_name):
return proc_name in self.pipelines_running or \
proc_name in self.pipelines_finished
[docs] def get_exit_value(self, process_name):
if process_name in self.pipelines_finished:
return self.pipelines_finished[process_name][0]
else:
return None
[docs] def get_exit_message(self, process_name):
if process_name in self.pipelines_finished:
return self.pipelines_finished[process_name][1]
else:
return None
[docs] def nb_pipeline_running(self):
"""
Return number of running processes.
"""
return len(self.pipelines_running)
[docs] def nb_pipeline_run(self):
return len(self.pipelines_finished)
def _get_id(self):
msize = 6
mchars = string.ascii_letters + string.digits
idp = 'ECL_' + ''.join(random.choice(mchars) for _ in range(msize))
return idp
[docs] def submit_ecpi(self, f_pars):
"""
Submit jobs to ECPI
"""
with self.lock:
proc_name = self._get_id()
mtarget = process_ecpi
mname = proc_name
p_queue = mpc.Queue()
margs = (proc_name, f_pars, p_queue)
pipe_proc = mpc.Process(target=mtarget, name=mname, args=margs)
pipe_proc.start()
self.nats.publish_ecpi_start(proc_name)
S_logger.info(f"submit pipeline : {proc_name}")
self.pipeline_last = pipe_proc.name
self.pipelines_running[proc_name] = [pipe_proc, p_queue]
return proc_name
[docs] def is_running(self, p_id):
# return whther process ID is running
return p_id in self.pipelines_running
[docs]def timer_update_status(manager):
"""
Update pipeline status.
"""
assert isinstance(manager, ManageProcessusEcpi)
while True:
time.sleep(1)
manager.update_status_pipeline()
[docs]def timer_nats(manager):
"""
Set NATS messaging period to 60s.
"""
assert isinstance(manager, ManageProcessusEcpi)
period_nats_message = 60
while True:
time.sleep(period_nats_message)
manager.nats.publish_periodic_status()
# ============================================
# Thread update status on NATS and processing
# ============================================
S_manager = ManageProcessusEcpi()
S_thread_update = threading.Thread(target=timer_update_status, args=(S_manager,))
#TODO: comment next line before generate documentation and uncomment after
# S_thread_update.start()
S_thread_nats = threading.Thread(target=timer_nats, args=(S_manager,))
#TODO: comment next line before generate documentation and uncomment after
# S_thread_nats.start()
# ============================================