'''main modules to run the ECLAIRs' IMAG pipeline from the server
Created on 26 sept. 2018
author: Camille Catalano, APC/IN2P3/CNRS
'''
import os
import subprocess
import sys, traceback
import time, datetime
import json
import numpy as np
from database.interface_db import DataBaseSVOMInterface
from common import get_root_eclairs
from process.pipeline.ecpi_flows import BubeImag
from svom.messaging.natsio import NatsIo
import common.mission.attitude as ptgsvom
from common.mission.observation import EclairsObservation
from process.pipeline.ecpi_config import PipelineConfiguration
from common.instru.model_effect import ECLAIRsDetectorEffectDefault
[docs]def pipeline_source_detection(start_time, end_time, snr_limit, s_time_start, proc_id, db_interface: DataBaseSVOMInterface):
"""controller function to run the EclairsSourcesDetectionPipelineV1
The function saves stdout info into a file as well as stderr
:param start_time: starting time of the observation
:param end_time: ending time of the observation
:param snr_limit: snr limit to stop at the source detection algorithm
:param proc_id: id number of the process
:param db_interface: interface object to the database
:param s_time_start: start time of the docker
:type start_time: float (in s from mjdref) or string (isot)
:type end_time: float (in s from mjdref) or string (isot) > start_time
:type snr_limit: float >0
:type proc_id: string
:type db_interface: DataBaseSVOMInterface
:type s_time_start: string
"""
service_descriptor = {"name" : "Server HTTP ECLAIRs General Program",
"instrument" : ["ECLAIRs"],
"creation_date" : s_time_start,
"version" : "iteration 1.1"
}
work_dir = os.path.join(get_root_eclairs(),'../temp/ECLAIRs_pipeline_dir/'+proc_id)
try:
os.makedirs(work_dir)
except FileExistsError:
os.system('rm -rf ' + work_dir + '/*')
saveout = sys.stdout
f_out = open(work_dir+'/stdout.txt', 'w')
response_dict = {
"service_descriptor" : service_descriptor,
"date" : datetime.datetime.now().replace(microsecond=0).isoformat(),
"message" : 'beginning of pipeline %s'%proc_id
}
item = json.dumps(response_dict).encode()
nats_client = NatsIo(host=os.environ['NATS_HOST'], port=os.environ['NATS_PORT'], streaming_id='eclairs_proc', cluster='svom-cluster')
nats_client.publish("science.core", item)
sys.stdout = f_out
sys.stdout.write('#################################')
sys.stdout.write(datetime.datetime.now().strftime("\nStart date : %Y-%m-%d %H:%M:%S"))
sys.stdout.write('\nName pipeline : ECL_SOP_IMA')
sys.stdout.write('\nName processing: %s'%proc_id)
sys.stdout.write('\n#################################\n')
saveerr = sys.stderr
f_err = open(work_dir+'/stderr.txt', 'w')
sys.stderr = f_err
tcpu_start = time.time()
try:
svom_quaternion = db_interface.get_sat_attitude(0)
eclairs_pointing = ptgsvom.AttitudeECLAIRs()
eclairs_pointing.set_attitude_svom_quater(svom_quaternion)
attitude = np.append(eclairs_pointing.ptg_radec, eclairs_pointing.ori_rad)
conf = PipelineConfiguration('1.1')
conf.build_bube_config([[0, 1023]])
conf.build_imag_config('1.1', 3, snr_limit, db_interface.cat_FOV)
obs = EclairsObservation(start_time, end_time, attitude, [0,0,0], [0,0,0], ECLAIRsDetectorEffectDefault())
input_dir = db_interface.get_files_evt_between_time(obs.start_time, obs.end_time, proc_id)
ppline = BubeImag(work_dir, input_dir, obs, conf, proc_id)
ppline.run(verbose=True, show=True)
if ppline.error_code != 'OK':
sys.stderr.write("{}".format(ppline.error_message))
except Exception as e:
ppline.error_code = str(e)
traceback.print_exc(file=sys.stderr)
#
db_interface.insert_status(ppline.error_code, proc_id)
sys.stdout.write('\n#################################')
sys.stdout.write('\nReturn status : %s'%ppline.error_code)
sys.stdout.write('\nReturn message : %s'%ppline.error_message)
sys.stdout.write('\nTime clock : %s s'%(time.time() - tcpu_start))
sys.stdout.write('\n#################################')
sys.stdout = saveout
sys.stderr = saveerr
f_out.close()
f_err.close()
db_interface.insert_product(work_dir, proc_id)
subprocess.run('rm -rf ' + work_dir, shell=True)
response_dict = {
"service_descriptor" : service_descriptor,
"date" : datetime.datetime.now().replace(microsecond=0).isoformat(),
"message" : 'beginning of pipeline %s'%proc_id
}
if ppline.error_code == 'OK':
response_dict = {
"service_descriptor" : service_descriptor,
"date" : datetime.datetime.now().replace(microsecond=0).isoformat(),
"message" : 'end of pipeline %s'%proc_id
}
else:
response_dict = {
"service_descriptor" : service_descriptor,
"date" : datetime.datetime.now().replace(microsecond=0).isoformat(),
"message" : '[pipeline {}] {}: {}'.format(proc_id, ppline.error_code, ppline.error_message)
}
item = json.dumps(response_dict).encode()
nats_client.publish("science.core", item)
nats_client.stop()
print('Fin du pipeline %s'%proc_id)