Source code for process.server.ManagePipelineECLAIRs

'''
Created on 27 juin 2018

@author: Colley Jean-Marc, APC/IN2P3/CNRS
'''

from multiprocessing import Process
import threading
import process.server.ecpi_server as ppecl
import process.server.ecpi_dc0 as imag_dc0
import copy as cp
import database.interface_MdbSim as dbsvom
import os
from common import get_root_eclairs


[docs]class ManagePipelineECLAIRs(object): instance = None def __new__(cls, *args, **kargs): """ used singleton design pattern, ie only one instance of class ManagePipelineECLAIRs """ if cls.instance is None: return object.__new__(cls, *args, **kargs) else: return cls.instance def __init__(self): if ManagePipelineECLAIRs.instance is None : ManagePipelineECLAIRs.instance = self db_dir = os.path.join(get_root_eclairs(), '../DB_dir') self.db = dbsvom.DataBaseSVOMMdbSim('ECLAIRs', db_dir) self.lock = threading.RLock() self.pipelines_running = [] self.pipelines_run = [] self.pipeline_last = []
[docs] def whoiam(self): print(self)
def _update_status_pipeline(self): self.lock.acquire() l_ppline = cp.copy(self.pipelines_running) for ppline in l_ppline: if not ppline.is_alive(): self.pipelines_running.remove(ppline) self.pipelines_run.append(ppline) self.lock.release()
[docs] def nb_pipeline_running(self): self._update_status_pipeline() return len(self.pipelines_running)
[docs] def nb_pipeline_run(self): self._update_status_pipeline() return len(self.pipelines_run)
[docs] def submit_sop(self, snr, t0, t1, s_time_start): """ TODO revoir le cas de soumission multiple en même temps suivant les besoins resortant des cas d'utilisation """ # self.lock.acquire() # deafut mode BUSY , can't processing request ret_status = "NOK" if self.nb_pipeline_running() < 10: proc_id = self.db.get_id() mtarget = ppecl.pipeline_source_detection mname = proc_id margs = (t0, t1, snr, s_time_start, proc_id, self.db) pipe_proc = Process(target=mtarget, name=mname, args=margs) pipe_proc.start() print("submit pipeline %s"%proc_id) self.pipelines_running.append(pipe_proc) self.pipeline_last = pipe_proc ret_status = proc_id # self.lock.release() # return ret_status
[docs] def is_running(self, p_id): # self.lock.acquire() ret_val = False if self.nb_pipeline_running(): for ppline in self.pipelines_running: if ppline.name == p_id: ret_val = True break # self.lock.release() # return ret_val
[docs]class ManagePipelineECLAIRsDC0(ManagePipelineECLAIRs): """ instance = None def __new__(cls, *args, **kargs): ''' used singleton design pattern, ie only one instance of class ManagePipelineECLAIRs ''' if cls.instance is None: return object.__new__(cls, *args, **kargs) else: return cls.instance """ def __init__(self): if ManagePipelineECLAIRs.instance is None : ManagePipelineECLAIRs.instance = self db_dir = os.path.join(get_root_eclairs(), '../DB_dir') self.db = dbsvom.DataBaseSVOMMdbDC0('ECLAIRs', db_dir) self.lock = threading.RLock() self.pipelines_running = [] self.pipelines_run = [] self.pipeline_last = []
[docs] def submit_sop(self, evt_filenames, att_filenames, s_time_start, cfg): """ TODO revoir le cas de soumission multiple en même temps suivant les besoins resortant des cas d'utilisation """ # self.lock.acquire() # deafut mode BUSY , can't processing request ret_status = "NOK" if self.nb_pipeline_running() < 10: proc_id = self.db.get_id() mtarget = imag_dc0.pipeline_dc0 mname = proc_id margs = (evt_filenames, att_filenames, s_time_start, cfg, proc_id, self.db) pipe_proc = Process(target=mtarget, name=mname, args=margs) pipe_proc.start() print("submit pipeline %s"%proc_id) self.pipelines_running.append(pipe_proc) self.pipeline_last = pipe_proc ret_status = proc_id # self.lock.release() # return ret_status