"""
@author: Colley Jean-Marc, APC/IN2P3/CNRS
"""
import os
import time
import string
import threading
import logging
import random
import datetime
import copy as cp
import multiprocessing as mpc
from ecpi.common import add_path_process_eclairs
from ecpi.process.pipeline.lib_ecpi_pipeline import EclairsPipelineForHttpServer
from ecpi.process.pipeline.qla import EclairsQlaLevel1
from ecpi.logger_ecpi import ECPILogger, return_level
S_logger = logging.getLogger('werkzeug.process')
[docs]class MessagingFSCnone(object):
"""
@author: Colley Jean-Marc, APC/IN2P3/CNRS
"""
[docs] def publish_server_start(self):
pass
[docs] def publish_periodic_status(self):
pass
[docs] def publish_ecpi_start(self, proc_id):
pass
[docs] def publish_ecpi_end(self, proc_id, error_code, error_message):
pass
[docs]def get_upload_path():
"""
Getter for upload path
"""
return add_path_process_eclairs('')
[docs]def process_ecpi(id_process, f_pars, p_queue):
"""
Process ECPI for given ID.
"""
level = return_level(f_pars)
o_log = ECPILogger(level, add_path_process_eclairs(id_process + '.log'), False)
o_log.message_start()
try:
ecpi = EclairsPipelineForHttpServer(f_pars, id_process)
ecpi.run()
exit_code = ecpi.status
exit_mes = ecpi.status_msg
except:
o_log.logger.exception('Got exception on main handler')
exit_code = -1
exit_mes = 'very bad news, python exception !!!!'
o_log.message_end()
# send exit values to parents processus
p_queue.put([exit_code, exit_mes])
[docs]def process_qla_level1(id_process, params, p_queue):
'''launch QLA level 1 pipeline
:param id_process:
:param params:
:param p_queue:
'''
o_log = ECPILogger('debug', add_path_process_eclairs(id_process + '.log'), False)
o_log.message_start()
try:
qla = EclairsQlaLevel1(params, id_process)
qla.run()
exit_code = qla.status
exit_mes = qla.status_msg
except:
o_log.logger.exception('Got exception on main handler')
exit_code = -1
exit_mes = 'very bad news, python exception !!!!'
o_log.message_end()
# send exit values to parents processus
p_queue.put([exit_code, exit_mes])
[docs]def dummy_ecpi(id_process, f_pars, p_queue):
"""
Dummy ECPI for testing.
"""
print("je lance le pipeline ecpi ", id_process)
print("avec le fichier de parametres ", f_pars, flush=True)
time_dodo = random.randint(1, 10)
time.sleep(time_dodo)
exit_code = random.randint(0, 4)
exit_mes = f"j'ai fait dodo durant {time_dodo}s."
p_queue.put([exit_code, exit_mes])
[docs]class ManageProcessusEcpi(object):
"""
manage:
* start processing
* update status processing with thread timer or status request
* nats message associated processing
#TODO: manage process finished keep 100 last ? before i month ?
@author: Colley Jean-Marc, APC/IN2P3/CNRS
"""
instance = None
def __new__(cls, *args, **kargs):
"""submit_process
used singleton design pattern, ie only one instance of class ManageProcessusEcpi
"""
if cls.instance is None:
return object.__new__(cls, *args, **kargs)
else:
return cls.instance
def __init__(self):
if ManageProcessusEcpi.instance is None:
ManageProcessusEcpi.instance = self
self.lock = threading.RLock()
# running processing dict : "name" : [process, queue]
self.pipelines_running = {}
# finished processing dict : "name" : [exit value, exit message]
self.pipelines_finished = {}
self.pipeline_last = None
# by default messaging do nothing
self.nats = MessagingFSCnone()
[docs] def init_nats(self, nats):
self.nats = nats
[docs] def whoiam(self):
# self description
print(self)
[docs] def update_status_pipeline(self):
S_logger.debug('update_status_pipeline')
with self.lock:
# print('update status processing')
d_ppline = cp.copy(self.pipelines_running)
for name, info in d_ppline.items():
#print(info, flush=True)
proc_ecpi = info['process']
assert isinstance(proc_ecpi, mpc.Process)
if not proc_ecpi.is_alive():
# process finished
self.pipelines_running.pop(name)
mes_queue = info['queue'].get()
new_info = info.copy()
new_info["queue"] = mes_queue
new_info.pop('process')
#print('add ', name, new_info)
self.pipelines_finished[name] = new_info
self.nats.publish_ecpi_end(name, mes_queue[0], mes_queue[1])
mes_end = f"{name} is finished with status "
mes_end += f"{mes_queue[0]} and mess '{mes_queue[1]}'"
S_logger.info(mes_end)
[docs] def is_valid_proc_name(self, proc_name):
with self.lock:
valid = proc_name in self.pipelines_running or \
proc_name in self.pipelines_finished
return valid
[docs] def get_exit_value(self, process_name):
if process_name in self.pipelines_finished:
return self.pipelines_finished[process_name]['queue'][0]
else:
return None
[docs] def get_exit_message(self, process_name):
if process_name in self.pipelines_finished:
return self.pipelines_finished[process_name]['queue'][1]
else:
return None
[docs] def nb_pipeline_running(self):
"""
Return number of running processes.
"""
with self.lock:
nb_run = len(self.pipelines_running)
return nb_run
[docs] def nb_pipeline_done(self):
with self.lock:
nb_done = len(self.pipelines_finished)
return nb_done
def _get_id(self):
msize = 6
mchars = string.ascii_letters + string.digits
idp = 'ECL_' + ''.join(random.choice(mchars) for _ in range(msize))
return idp
[docs] def submit_process(self, p_func, p_params):
"""
Submit p_func(p_params) in processpro
"""
with self.lock:
proc_name = self._get_id()
mtarget = p_func
mname = proc_name
p_queue = mpc.Queue()
margs = (proc_name, p_params, p_queue)
pipe_proc = mpc.Process(target=mtarget, name=mname, args=margs)
pipe_proc.start()
self.nats.publish_ecpi_start(proc_name)
S_logger.info(f"submit pipeline : {proc_name}")
self.pipeline_last = pipe_proc.name
date_start = datetime.datetime.now().replace(microsecond=0).isoformat()
self.pipelines_running[proc_name] = {
'process':pipe_proc,
'queue':p_queue,
'params': p_params,
'date_start': date_start
}
return proc_name
[docs] def submit_ecpi(self, f_pars):
"""
Submit process ECPI
"""
return self.submit_process(process_ecpi, f_pars)
[docs] def is_running(self, p_id):
# return whther process ID is running
with self.lock:
running = p_id in self.pipelines_running
return running
[docs] def get_all_name_proc(self):
with self.lock:
all_names = list(self.pipelines_finished.keys()) +\
list(self.pipelines_running.keys())
return all_names
[docs]def timer_update_status(manager):
"""
Update pipeline status.
"""
assert isinstance(manager, ManageProcessusEcpi)
while True:
time.sleep(1)
manager.update_status_pipeline()
[docs]def timer_nats(manager):
"""
Set NATS messaging period to 60s.
"""
assert isinstance(manager, ManageProcessusEcpi)
period_nats_message = 5*60
while True:
time.sleep(period_nats_message)
manager.nats.publish_periodic_status()
# ============================================
# Thread update status on NATS and processing
# ============================================
S_manager = ManageProcessusEcpi()
S_thread_update = threading.Thread(target=timer_update_status, args=(S_manager,))
# TODO: comment next line before generate documentation and uncomment after
#S_thread_update.start()
S_thread_nats = threading.Thread(target=timer_nats, args=(S_manager,))
# TODO: comment next line before generate documentation and uncomment after
#S_thread_nats.start()
# ============================================