Source code for process.server.ecpi_server

'''main modules to run the ECLAIRs' IMAG pipeline from the server

Created on 26 sept. 2018

author: Camille Catalano, APC/IN2P3/CNRS
'''


import os
import subprocess
import sys, traceback
import time, datetime
import json
import numpy as np

from database.interface_db import DataBaseSVOMInterface
from common import get_root_eclairs
from process.pipeline.ecpi_flows import BubeImag
from svom.messaging.natsio import NatsIo
import common.mission.attitude as ptgsvom
from common.mission.observation import EclairsObservation
from process.pipeline.ecpi_config import PipelineConfiguration
from common.instru.model_effect import ECLAIRsDetectorEffectDefault


[docs]def pipeline_source_detection(start_time, end_time, snr_limit, s_time_start, proc_id, db_interface: DataBaseSVOMInterface): """controller function to run the EclairsSourcesDetectionPipelineV1 The function saves stdout info into a file as well as stderr :param start_time: starting time of the observation :param end_time: ending time of the observation :param snr_limit: snr limit to stop at the source detection algorithm :param proc_id: id number of the process :param db_interface: interface object to the database :param s_time_start: start time of the docker :type start_time: float (in s from mjdref) or string (isot) :type end_time: float (in s from mjdref) or string (isot) > start_time :type snr_limit: float >0 :type proc_id: string :type db_interface: DataBaseSVOMInterface :type s_time_start: string """ service_descriptor = {"name" : "Server HTTP ECLAIRs General Program", "instrument" : ["ECLAIRs"], "creation_date" : s_time_start, "version" : "iteration 1.1" } work_dir = os.path.join(get_root_eclairs(),'../temp/ECLAIRs_pipeline_dir/'+proc_id) try: os.makedirs(work_dir) except FileExistsError: os.system('rm -rf ' + work_dir + '/*') saveout = sys.stdout f_out = open(work_dir+'/stdout.txt', 'w') response_dict = { "service_descriptor" : service_descriptor, "date" : datetime.datetime.now().replace(microsecond=0).isoformat(), "message" : 'beginning of pipeline %s'%proc_id } item = json.dumps(response_dict).encode() nats_client = NatsIo(host=os.environ['NATS_HOST'], port=os.environ['NATS_PORT'], streaming_id='eclairs_proc', cluster='svom-cluster') nats_client.publish("science.core", item) sys.stdout = f_out sys.stdout.write('#################################') sys.stdout.write(datetime.datetime.now().strftime("\nStart date : %Y-%m-%d %H:%M:%S")) sys.stdout.write('\nName pipeline : ECL_SOP_IMA') sys.stdout.write('\nName processing: %s'%proc_id) sys.stdout.write('\n#################################\n') saveerr = sys.stderr f_err = open(work_dir+'/stderr.txt', 'w') sys.stderr = f_err tcpu_start = time.time() try: svom_quaternion = db_interface.get_sat_attitude(0) eclairs_pointing = ptgsvom.AttitudeECLAIRs() eclairs_pointing.set_attitude_svom_quater(svom_quaternion) attitude = np.append(eclairs_pointing.ptg_radec, eclairs_pointing.ori_rad) conf = PipelineConfiguration('1.1') conf.build_bube_config([[0, 1023]]) conf.build_imag_config('1.1', 3, snr_limit, db_interface.cat_FOV) obs = EclairsObservation(start_time, end_time, attitude, [0,0,0], [0,0,0], ECLAIRsDetectorEffectDefault()) input_dir = db_interface.get_files_evt_between_time(obs.start_time, obs.end_time, proc_id) ppline = BubeImag(work_dir, input_dir, obs, conf, proc_id) ppline.run(verbose=True, show=True) if ppline.error_code != 'OK': sys.stderr.write("{}".format(ppline.error_message)) except Exception as e: ppline.error_code = str(e) traceback.print_exc(file=sys.stderr) # db_interface.insert_status(ppline.error_code, proc_id) sys.stdout.write('\n#################################') sys.stdout.write('\nReturn status : %s'%ppline.error_code) sys.stdout.write('\nReturn message : %s'%ppline.error_message) sys.stdout.write('\nTime clock : %s s'%(time.time() - tcpu_start)) sys.stdout.write('\n#################################') sys.stdout = saveout sys.stderr = saveerr f_out.close() f_err.close() db_interface.insert_product(work_dir, proc_id) subprocess.run('rm -rf ' + work_dir, shell=True) response_dict = { "service_descriptor" : service_descriptor, "date" : datetime.datetime.now().replace(microsecond=0).isoformat(), "message" : 'beginning of pipeline %s'%proc_id } if ppline.error_code == 'OK': response_dict = { "service_descriptor" : service_descriptor, "date" : datetime.datetime.now().replace(microsecond=0).isoformat(), "message" : 'end of pipeline %s'%proc_id } else: response_dict = { "service_descriptor" : service_descriptor, "date" : datetime.datetime.now().replace(microsecond=0).isoformat(), "message" : '[pipeline {}] {}: {}'.format(proc_id, ppline.error_code, ppline.error_message) } item = json.dumps(response_dict).encode() nats_client.publish("science.core", item) nats_client.stop() print('Fin du pipeline %s'%proc_id)