Source code for process.server.ecpi_dc0

'''main modules to run the ECLAIRs' IMAG pipeline for the dc0

Created on 25 oct 2018

author: Camille Catalano, APC/IN2P3/CNRS
'''


import os
import subprocess
import sys, traceback
import time, datetime
from random import gauss
import numpy as np
from astropy.io import fits
import json

from database.interface_db import DataBaseSVOMInterface
from common import get_root_eclairs
from svom.messaging.natsio import NatsIo
from common.sky.catalog import CatalogIdentifiedSources
from process.soex.core.SoexProcesses import LightCurve, CountSpectrum
import process.soex.io.outputs as soex_out


[docs]def process_message(response_dict, cfg, stream_id): """Not use for now threads block the process anyway if no nats """ item = json.dumps(response_dict).encode() nats_client = NatsIo( host=cfg['messaging']['HOST'], port=cfg['messaging']['PORT'], user=cfg['messaging']['USER'], password=cfg['messaging']['PASSWORD'], streaming_id=stream_id, cluster=cfg['messaging']['CLUSTER']) nats_client.publish("science.core", item)
#nats_client.stop()
[docs]def read_evt_sec(evt_filename, dirin): """read the ecl_evt_sec file and return the times of the beginning and end of the observation :param evt_filename: name of the event file :type evt_filename: string :param dirin: PATH to the directory of the event file :type dirin: string attention ! avec le vrai ECL-EVT-SEC le temps sera en seconde (seconde depuis une ref ?) :return: start_time, end_time in s from mjdref :rtype: float, float """ hdu = fits.open(dirin + '/' + evt_filename) data = hdu[1].data return data[0][0], data[-1][0]
[docs]def read_svo_att(att_filename, dirin, time_obs): """read the svo_att fits file and return the attitude corresponding to the time_obs if time_obs < time in file -> return first attitude if time_obs > time in file -> return last attitude :param att_filename: name of the attitude file :type att_filename: string :param dirin: PATH to the directory of the attitude file :type dirin: string :param time_obs: time of the beginning of the observation in s from mjdref :type time_obs: float :return: attitude in s [ra, dec, ori] :rtype: [float, float, float] """ hdu = fits.open(dirin + '/' + att_filename) data = hdu[1].data indice = 0 if data[-1][0] < time_obs: return data[-1][-1] while(data[indice][0] < time_obs): indice =+ 1 return data[indice][-1]
[docs]def fake_sps(start_time, end_time, file_path, proc_id): """create a fake ECL-SPS-S1.fits file product :param start_time: start time of the observation in s from mjdref :type start_time: float :param end_time: end time of the observation in s from mjdref :type end_time: float :param file_path: PATH directory to save the file :type file_path: string :param proc_id: id number of the process :type proc_id: string """ sps = CountSpectrum() for pi in np.arange(0, 1024): sps.add_bin([pi, 1/(pi+1), 0.1/(pi+1)]) soex_out.save_spectrum(file_path, sps, start_time, end_time, 'ECLAIRs_dc0', proc_id)
[docs]def fake_lcs(start_time, end_time, file_path, proc_id): """create a fake ECL-LCS-S1.fits file product :param start_time: start time of the observation in s from mjdref :type start_time: float :param end_time: end time of the observation in s from mjdref :type end_time: float :param file_path: PATH directory to save the file :type file_path: string :param proc_id: id number of the process :type proc_id: string """ lcs = LightCurve() time_step = 10 # 10s t = start_time while (t + time_step < end_time): lcs.add_bin([t, t+time_step, round(gauss(5,1)), 0.2]) t = t + time_step soex_out.save_lightcurve(file_path, lcs, start_time, end_time, 'ECLAIRs_dc0', proc_id)
[docs]def fake_sop_ima(attitude, tstart, tstop, file_path, proc_id): """create a fake ECL-SOP-IMA.fits file product 'sourceID', 'Y', 'Z', 'Y_fit', 'error_Y', 'Z_fit', 'error_Z', 'ra', 'dec', 'errorrad', 'flux_1', 'pflux_1', 'errflux1', 'snr', 'name', 'class' :param attitude: [ra, dec, ori] in degrees :type attitude: [float, float, float] :param tstart: start time of the observation in s from mjdref :type tstart: float :param tstop: end time of the observation in s from mjdref :type tstop: float :param file_path: PATH directory to save the file :type file_path: string :param proc_id: id number of the process :type proc_id: string """ fake_catal = CatalogIdentifiedSources() fake_catal.add_src(["ECL_HBEwDo_1","0","0","-1.393073145300E-02","0.5","3.241939097643E-02","0.5",attitude[0],attitude[1],"0.5","8.147397677148E-01","7.415390680642E-01","5.713292557245E-03","1.298988541909E+02","CrabDC0","0"]) fake_catal.save_catalog(file_path, attitude, tstart, tstop, "ECLAIRs_dc0", proc_id)
[docs]def pipeline_dc0(evt_filenames, att_filenames, s_time_start, cfg, proc_id, db_interface: DataBaseSVOMInterface): """ECLAIRs pipeline for dc0 The function saves stdout info into a file as well as stderr :param evt_filenames: names of the event files to process :param att_filenames: names of the attitude files to process :param proc_id: id number of the process :param db_interface: interface object to the database :param s_time_start: start time of the docker :type evt_filenames: list(string) :type att_filenames: list(string) :type proc_id: string :type db_interface: DataBaseSVOMInterface :type s_time_start: string """ service_descriptor = {"name" : "Server HTTP ECLAIRs General Program", "instrument" : ["ECLAIRs"], "creation_date" : s_time_start, "version" : "dc0" } work_dir = os.path.join(get_root_eclairs(),'../ECLAIRs_pipeline_dir/'+proc_id) try: os.makedirs(work_dir) except FileExistsError: os.system('rm -rf ' + work_dir + '/*') saveout = sys.stdout f_out = open(work_dir+'/stdout.txt', 'w') response_dict = { "service_descriptor" : service_descriptor, "date" : datetime.datetime.now().replace(microsecond=0).isoformat(), "message" : 'beginning of pipeline %s'%proc_id } item = json.dumps(response_dict).encode() nats_client = NatsIo( host=cfg['messaging']['HOST'], port=cfg['messaging']['PORT'], user=cfg['messaging']['USER'], password=cfg['messaging']['PASSWORD'], streaming_id='eclairs_proc', cluster=cfg['messaging']['CLUSTER']) nats_client.publish("science.core", item) sys.stdout = f_out sys.stdout.write('#################################') sys.stdout.write(datetime.datetime.now().strftime("\nStart date : %Y-%m-%d %H:%M:%S")) sys.stdout.write('\nName pipeline : ECL_DC0') sys.stdout.write('\nName processing: %s'%proc_id) sys.stdout.write('\n#################################\n') saveerr = sys.stderr f_err = open(work_dir+'/stderr.txt', 'w') sys.stderr = f_err tcpu_start = time.time() try: input_dir = db_interface.get_files_from_names(evt_filenames, att_filenames, proc_id, cfg) if input_dir: # donwloaded file in db_interface.dir_proc/evt_filenames[0] ou att_filenames[0] start_time, end_time = read_evt_sec(evt_filenames[0], input_dir) # remplacer time_obs avec un temps entre end_time et start_time attitude = read_svo_att(att_filenames[0], input_dir, 3254647.09408) fake_sop_ima(attitude, start_time, end_time, work_dir, proc_id) fake_lcs(start_time, end_time, attitude, work_dir, proc_id) fake_sps(attitude, start_time, end_time, work_dir, proc_id) error_code = 'OK' error_message = "everything's fine" else: error_code = 'InputDataError' error_message = 'Input files could not be provided' sys.stderr.write("{}".format(error_message)) except Exception as e: error_code = str(e) error_message = "something went wrong" traceback.print_exc(file=sys.stderr) # JMC : tempo to catch status running easier time.sleep(20) db_interface.insert_status(error_code, proc_id) sys.stdout.write('\n#################################') sys.stdout.write('\nReturn status : %s'%error_code) sys.stdout.write('\nReturn message : %s'%error_message) sys.stdout.write('\nTime clock : %s s'%(time.time() - tcpu_start)) sys.stdout.write('\n#################################') sys.stdout = saveout sys.stderr = saveerr f_out.close() f_err.close() db_interface.insert_product(work_dir, proc_id) subprocess.run('rm -rf ' + work_dir, shell=True) response_dict = { "service_descriptor" : service_descriptor, "date" : datetime.datetime.now().replace(microsecond=0).isoformat(), "message" : 'beginning of pipeline %s'%proc_id } if error_code == 'OK': response_dict = { "service_descriptor" : service_descriptor, "date" : datetime.datetime.now().replace(microsecond=0).isoformat(), "message" : 'end of pipeline %s'%proc_id } else: response_dict = { "service_descriptor" : service_descriptor, "date" : datetime.datetime.now().replace(microsecond=0).isoformat(), "message" : '[pipeline {}] {}: {}'.format(proc_id, error_code, error_message) } item = json.dumps(response_dict).encode() nats_client = NatsIo( host=cfg['messaging']['HOST'], port=cfg['messaging']['PORT'], user=cfg['messaging']['USER'], password=cfg['messaging']['PASSWORD'], streaming_id='eclairs_proc2', cluster=cfg['messaging']['CLUSTER']) nats_client.publish("science.core", item) nats_client.stop() print('Fin du pipeline %s'%proc_id)