Source code for process.server.ManagePipelineECLAIRs
'''
Created on 27 juin 2018
@author: Colley Jean-Marc, APC/IN2P3/CNRS
'''
from multiprocessing import Process
import threading
import process.server.ecpi_server as ppecl
import process.server.ecpi_dc0 as imag_dc0
import copy as cp
import database.interface_MdbSim as dbsvom
import os
from common import get_root_eclairs
[docs]class ManagePipelineECLAIRs(object):
instance = None
def __new__(cls, *args, **kargs):
"""
used singleton design pattern, ie only one instance of class ManagePipelineECLAIRs
"""
if cls.instance is None:
return object.__new__(cls, *args, **kargs)
else:
return cls.instance
def __init__(self):
if ManagePipelineECLAIRs.instance is None :
ManagePipelineECLAIRs.instance = self
db_dir = os.path.join(get_root_eclairs(), '../DB_dir')
self.db = dbsvom.DataBaseSVOMMdbSim('ECLAIRs', db_dir)
self.lock = threading.RLock()
self.pipelines_running = []
self.pipelines_run = []
self.pipeline_last = []
[docs] def whoiam(self):
print(self)
def _update_status_pipeline(self):
self.lock.acquire()
l_ppline = cp.copy(self.pipelines_running)
for ppline in l_ppline:
if not ppline.is_alive():
self.pipelines_running.remove(ppline)
self.pipelines_run.append(ppline)
self.lock.release()
[docs] def nb_pipeline_running(self):
self._update_status_pipeline()
return len(self.pipelines_running)
[docs] def nb_pipeline_run(self):
self._update_status_pipeline()
return len(self.pipelines_run)
[docs] def submit_sop(self, snr, t0, t1, s_time_start):
"""
TODO revoir le cas de soumission multiple en même temps suivant
les besoins resortant des cas d'utilisation
"""
#
self.lock.acquire()
# deafut mode BUSY , can't processing request
ret_status = "NOK"
if self.nb_pipeline_running() < 10:
proc_id = self.db.get_id()
mtarget = ppecl.pipeline_source_detection
mname = proc_id
margs = (t0, t1, snr, s_time_start, proc_id, self.db)
pipe_proc = Process(target=mtarget, name=mname, args=margs)
pipe_proc.start()
print("submit pipeline %s"%proc_id)
self.pipelines_running.append(pipe_proc)
self.pipeline_last = pipe_proc
ret_status = proc_id
#
self.lock.release()
#
return ret_status
[docs] def is_running(self, p_id):
#
self.lock.acquire()
ret_val = False
if self.nb_pipeline_running():
for ppline in self.pipelines_running:
if ppline.name == p_id:
ret_val = True
break
#
self.lock.release()
#
return ret_val
[docs]class ManagePipelineECLAIRsDC0(ManagePipelineECLAIRs):
"""
instance = None
def __new__(cls, *args, **kargs):
'''
used singleton design pattern, ie only one instance of class ManagePipelineECLAIRs
'''
if cls.instance is None:
return object.__new__(cls, *args, **kargs)
else:
return cls.instance
"""
def __init__(self):
if ManagePipelineECLAIRs.instance is None :
ManagePipelineECLAIRs.instance = self
db_dir = os.path.join(get_root_eclairs(), '../DB_dir')
self.db = dbsvom.DataBaseSVOMMdbDC0('ECLAIRs', db_dir)
self.lock = threading.RLock()
self.pipelines_running = []
self.pipelines_run = []
self.pipeline_last = []
[docs] def submit_sop(self, evt_filenames, att_filenames, s_time_start, cfg):
"""
TODO revoir le cas de soumission multiple en même temps suivant
les besoins resortant des cas d'utilisation
"""
#
self.lock.acquire()
# deafut mode BUSY , can't processing request
ret_status = "NOK"
if self.nb_pipeline_running() < 10:
proc_id = self.db.get_id()
mtarget = imag_dc0.pipeline_dc0
mname = proc_id
margs = (evt_filenames, att_filenames, s_time_start, cfg, proc_id, self.db)
pipe_proc = Process(target=mtarget, name=mname, args=margs)
pipe_proc.start()
print("submit pipeline %s"%proc_id)
self.pipelines_running.append(pipe_proc)
self.pipeline_last = pipe_proc
ret_status = proc_id
#
self.lock.release()
#
return ret_status