Source code for ecpi.database.interact_sdb

'''
Created on Oct 15th 2019

@author: Bacon Philippe, APC/IN2P3/CNRS
'''

import os
import json
import logging
import requests
from astropy.io import fits

logger = logging.getLogger(__name__) 

LIST_PRODUCTS_DC1 = ['ECL-SKY-IMA', 'ECL-SOP-IMA', 'ECL-DET-UBC', 'ECL-EVT-CAL']
LIST_INPUTS_DC1 = ['ECL-EVT-SEC', 'SVO-ATT-CNV']

ext_cfg = {
    "host": "svom-sdb.lal.in2p3.fr",
    "path_read": "/data/raw/DC1",
    "path_write": "/import/v0/add_product",
    "port": ""
}


[docs]def ping_sdb(count=2): """Ping SDB server. """ sdb_url = "svom-sdb.lam.fr" response = os.system(f"ping -c {count} {sdb_url}") return True if response == 0 else False
[docs]def send_event_att_files(path_file): """Send event ECL-EVT-SEC_XXX.fits or SVO-ATT-CNV_XXX.fits files to the SDB. :param path_file: path to the event file to be sent. :type path_file: str """ fits_file = fits.open(path_file) assert os.path.exists(path_file) assert path_file.endswith('.fits') assert fits_file[0].header['CARD'] in LIST_INPUTS_DC1 fits_file.close() file = {'ecpi_input_file': open(path_file, 'rb')} if ext_cfg['port'] is "": url = 'https://' + ext_cfg['host'] + ext_cfg['path_write'] else: url = 'https://' + ext_cfg['host'] + ':' + \ str(ext_cfg['port']) + ext_cfg['path_write'] response = requests.post(url=url, files=file) if response.status_code != 200 : logger.info("send_event_att_files() function has failed.") logger.info("Something's wrong on the MDB side !") logger.info("The MDB could not properly get the product") return infos = json.loads(response.text) dest_url = infos["product_url"] dest_dir = "/".join(dest_url.split("/")[:-1]) + "/" return response, dest_dir
[docs]def receive_event_att_files(sdb_filename, dest_dir): """Receive event ECL-EVT-SEC_XXX.fits or SVO-ATT-CNV_XXX.fits files from the SDB. :param sdb_filename: name of file to be downloaded from SDB :type sdb_filename: str :param dest_dir: path of the destination directory :type dest_dir: str :return: if the operation succeeded :rtype: bool """ if not os.path.exists(dest_dir): logger.info(f"Destination directory {dest_dir} does not exist !") return False try: if ext_cfg['port'] is "": url = 'https://' + ext_cfg['host'] + \ ext_cfg['path_read'] + '/' + sdb_filename else: url = 'https://' + ext_cfg['host'] + ':' + \ str(ext_cfg['port']) + \ ext_cfg['path_read'] + '/' + sdb_filename response = requests.get(url=url) except requests.exceptions.ConnectionError: logger.info(f"Failed to establish a new connection with {url}") return False if response.status_code != 200: logger.info("receive_event_att_files() function has failed.") logger.info(f"Status code: {response.status_code}") print(response.status_code) return False abspath_filename = os.path.join(dest_dir, sdb_filename) with open(abspath_filename, 'wb') as f: f.write(response.content) return True
[docs]def send_product_ecpi(path_product): """Send L1 product generated by the ECLAIRs pipeline to the SDB. Check if input file has registered CARD KW value for DC1: ECL-SKY-IMA, ECL-SOP-IMA, ECL-DET-UBC and ECL-EVT-CAL. :param path_product: path to the .fits file to be sent. :type path_product: str :return: if the operation succeeded :rtype: bool """ fits_file = fits.open(path_product) assert os.path.exists(path_product) assert path_product.endswith('.fits') try: assert fits_file[0].header['CARD'] in LIST_PRODUCTS_DC1 except AssertionError: logger.error(f"CARD keyword not present in {path_product} file") fits_file.close() return False try: file = {'product': open(path_product, 'rb')} if ext_cfg['port'] is "": url = 'https://' + ext_cfg['host'] + \ ext_cfg['path_write'] else: url = 'https://' + ext_cfg['host'] + ':' + \ str(ext_cfg['port']) + \ ext_cfg['path_write'] response = requests.post(url=url, files=file) except requests.exceptions.ConnectionError: logger.info(f"Failed to establish a new connection with {url}") fits_file.close() return False if response.status_code != 200: logger.info("send_l1_product() function has failed.") logger.info(f"Status code: {response.status_code}") print(response.status_code) fits_file.close() return False fits_file.close() return True