'''main modules to run the ECLAIRs' IMAG pipeline for the dc0
Created on 25 oct 2018
author: Camille Catalano, APC/IN2P3/CNRS
'''
import os
import subprocess
import sys, traceback
import time, datetime
from random import gauss
import numpy as np
from astropy.io import fits
import json
from database.interface_db import DataBaseSVOMInterface
from common import get_root_eclairs
from svom.messaging.natsio import NatsIo
from common.sky.catalog import CatalogIdentifiedSources
from process.soex.core.SoexProcesses import LightCurve, CountSpectrum
import process.soex.io.outputs as soex_out
[docs]def process_message(response_dict, cfg, stream_id):
"""Not use for now
threads block the process anyway if no nats
"""
item = json.dumps(response_dict).encode()
nats_client = NatsIo(
host=cfg['messaging']['HOST'],
port=cfg['messaging']['PORT'],
user=cfg['messaging']['USER'],
password=cfg['messaging']['PASSWORD'],
streaming_id=stream_id,
cluster=cfg['messaging']['CLUSTER'])
nats_client.publish("science.core", item)
#nats_client.stop()
[docs]def read_evt_sec(evt_filename, dirin):
"""read the ecl_evt_sec file and return the times of the beginning and end of the observation
:param evt_filename: name of the event file
:type evt_filename: string
:param dirin: PATH to the directory of the event file
:type dirin: string
attention ! avec le vrai ECL-EVT-SEC le temps sera en seconde (seconde depuis une ref ?)
:return: start_time, end_time in s from mjdref
:rtype: float, float
"""
hdu = fits.open(dirin + '/' + evt_filename)
data = hdu[1].data
return data[0][0], data[-1][0]
[docs]def read_svo_att(att_filename, dirin, time_obs):
"""read the svo_att fits file and return the attitude corresponding to the time_obs
if time_obs < time in file -> return first attitude
if time_obs > time in file -> return last attitude
:param att_filename: name of the attitude file
:type att_filename: string
:param dirin: PATH to the directory of the attitude file
:type dirin: string
:param time_obs: time of the beginning of the observation in s from mjdref
:type time_obs: float
:return: attitude in s [ra, dec, ori]
:rtype: [float, float, float]
"""
hdu = fits.open(dirin + '/' + att_filename)
data = hdu[1].data
indice = 0
if data[-1][0] < time_obs:
return data[-1][-1]
while(data[indice][0] < time_obs):
indice =+ 1
return data[indice][-1]
[docs]def fake_sps(start_time, end_time, file_path, proc_id):
"""create a fake ECL-SPS-S1.fits file product
:param start_time: start time of the observation in s from mjdref
:type start_time: float
:param end_time: end time of the observation in s from mjdref
:type end_time: float
:param file_path: PATH directory to save the file
:type file_path: string
:param proc_id: id number of the process
:type proc_id: string
"""
sps = CountSpectrum()
for pi in np.arange(0, 1024):
sps.add_bin([pi, 1/(pi+1), 0.1/(pi+1)])
soex_out.save_spectrum(file_path, sps, start_time, end_time, 'ECLAIRs_dc0', proc_id)
[docs]def fake_lcs(start_time, end_time, file_path, proc_id):
"""create a fake ECL-LCS-S1.fits file product
:param start_time: start time of the observation in s from mjdref
:type start_time: float
:param end_time: end time of the observation in s from mjdref
:type end_time: float
:param file_path: PATH directory to save the file
:type file_path: string
:param proc_id: id number of the process
:type proc_id: string
"""
lcs = LightCurve()
time_step = 10 # 10s
t = start_time
while (t + time_step < end_time):
lcs.add_bin([t, t+time_step, round(gauss(5,1)), 0.2])
t = t + time_step
soex_out.save_lightcurve(file_path, lcs, start_time, end_time, 'ECLAIRs_dc0', proc_id)
[docs]def fake_sop_ima(attitude, tstart, tstop, file_path, proc_id):
"""create a fake ECL-SOP-IMA.fits file product
'sourceID', 'Y', 'Z', 'Y_fit', 'error_Y', 'Z_fit', 'error_Z', 'ra', 'dec', 'errorrad', 'flux_1', 'pflux_1', 'errflux1', 'snr', 'name', 'class'
:param attitude: [ra, dec, ori] in degrees
:type attitude: [float, float, float]
:param tstart: start time of the observation in s from mjdref
:type tstart: float
:param tstop: end time of the observation in s from mjdref
:type tstop: float
:param file_path: PATH directory to save the file
:type file_path: string
:param proc_id: id number of the process
:type proc_id: string
"""
fake_catal = CatalogIdentifiedSources()
fake_catal.add_src(["ECL_HBEwDo_1","0","0","-1.393073145300E-02","0.5","3.241939097643E-02","0.5",attitude[0],attitude[1],"0.5","8.147397677148E-01","7.415390680642E-01","5.713292557245E-03","1.298988541909E+02","CrabDC0","0"])
fake_catal.save_catalog(file_path, attitude, tstart, tstop, "ECLAIRs_dc0", proc_id)
[docs]def pipeline_dc0(evt_filenames, att_filenames, s_time_start, cfg, proc_id, db_interface: DataBaseSVOMInterface):
"""ECLAIRs pipeline for dc0
The function saves stdout info into a file as well as stderr
:param evt_filenames: names of the event files to process
:param att_filenames: names of the attitude files to process
:param proc_id: id number of the process
:param db_interface: interface object to the database
:param s_time_start: start time of the docker
:type evt_filenames: list(string)
:type att_filenames: list(string)
:type proc_id: string
:type db_interface: DataBaseSVOMInterface
:type s_time_start: string
"""
service_descriptor = {"name" : "Server HTTP ECLAIRs General Program",
"instrument" : ["ECLAIRs"],
"creation_date" : s_time_start,
"version" : "dc0"
}
work_dir = os.path.join(get_root_eclairs(),'../ECLAIRs_pipeline_dir/'+proc_id)
try:
os.makedirs(work_dir)
except FileExistsError:
os.system('rm -rf ' + work_dir + '/*')
saveout = sys.stdout
f_out = open(work_dir+'/stdout.txt', 'w')
response_dict = {
"service_descriptor" : service_descriptor,
"date" : datetime.datetime.now().replace(microsecond=0).isoformat(),
"message" : 'beginning of pipeline %s'%proc_id
}
item = json.dumps(response_dict).encode()
nats_client = NatsIo(
host=cfg['messaging']['HOST'],
port=cfg['messaging']['PORT'],
user=cfg['messaging']['USER'],
password=cfg['messaging']['PASSWORD'],
streaming_id='eclairs_proc',
cluster=cfg['messaging']['CLUSTER'])
nats_client.publish("science.core", item)
sys.stdout = f_out
sys.stdout.write('#################################')
sys.stdout.write(datetime.datetime.now().strftime("\nStart date : %Y-%m-%d %H:%M:%S"))
sys.stdout.write('\nName pipeline : ECL_DC0')
sys.stdout.write('\nName processing: %s'%proc_id)
sys.stdout.write('\n#################################\n')
saveerr = sys.stderr
f_err = open(work_dir+'/stderr.txt', 'w')
sys.stderr = f_err
tcpu_start = time.time()
try:
input_dir = db_interface.get_files_from_names(evt_filenames, att_filenames, proc_id, cfg)
if input_dir:
# donwloaded file in db_interface.dir_proc/evt_filenames[0] ou att_filenames[0]
start_time, end_time = read_evt_sec(evt_filenames[0], input_dir)
# remplacer time_obs avec un temps entre end_time et start_time
attitude = read_svo_att(att_filenames[0], input_dir, 3254647.09408)
fake_sop_ima(attitude, start_time, end_time, work_dir, proc_id)
fake_lcs(start_time, end_time, attitude, work_dir, proc_id)
fake_sps(attitude, start_time, end_time, work_dir, proc_id)
error_code = 'OK'
error_message = "everything's fine"
else:
error_code = 'InputDataError'
error_message = 'Input files could not be provided'
sys.stderr.write("{}".format(error_message))
except Exception as e:
error_code = str(e)
error_message = "something went wrong"
traceback.print_exc(file=sys.stderr)
# JMC : tempo to catch status running easier
time.sleep(20)
db_interface.insert_status(error_code, proc_id)
sys.stdout.write('\n#################################')
sys.stdout.write('\nReturn status : %s'%error_code)
sys.stdout.write('\nReturn message : %s'%error_message)
sys.stdout.write('\nTime clock : %s s'%(time.time() - tcpu_start))
sys.stdout.write('\n#################################')
sys.stdout = saveout
sys.stderr = saveerr
f_out.close()
f_err.close()
db_interface.insert_product(work_dir, proc_id)
subprocess.run('rm -rf ' + work_dir, shell=True)
response_dict = {
"service_descriptor" : service_descriptor,
"date" : datetime.datetime.now().replace(microsecond=0).isoformat(),
"message" : 'beginning of pipeline %s'%proc_id
}
if error_code == 'OK':
response_dict = {
"service_descriptor" : service_descriptor,
"date" : datetime.datetime.now().replace(microsecond=0).isoformat(),
"message" : 'end of pipeline %s'%proc_id
}
else:
response_dict = {
"service_descriptor" : service_descriptor,
"date" : datetime.datetime.now().replace(microsecond=0).isoformat(),
"message" : '[pipeline {}] {}: {}'.format(proc_id, error_code, error_message)
}
item = json.dumps(response_dict).encode()
nats_client = NatsIo(
host=cfg['messaging']['HOST'],
port=cfg['messaging']['PORT'],
user=cfg['messaging']['USER'],
password=cfg['messaging']['PASSWORD'],
streaming_id='eclairs_proc2',
cluster=cfg['messaging']['CLUSTER'])
nats_client.publish("science.core", item)
nats_client.stop()
print('Fin du pipeline %s'%proc_id)