Source code for ecpi.database.interact_sdb

"""SDB interface
"""

import os
import json
import logging
import pprint
from datetime import datetime
from astropy.io import fits
import requests
from svom.messaging import SdbIo, HttpIo

logger = logging.getLogger(__name__)

LIST_PRODUCTS_DC3 = [
    'ECL-SKY-IMA',
    'ECL-SOP-IMA',
    'ECL-DET-UBC',
    'ECL-EVT-CAL',
    'ECL-GTI-1',
    'ECL-GTI-3'
]
LIST_INPUTS_DC3 = ['ECL-EVT-SEC', 'SVO-ATT-CNV', 'SVO-ORB-CNV']


[docs]class SdbParameters(object): """ #TODO: may be simpler solution can be used than a singleton """ instance = None def __new__(cls, *args, **kargs): """ used singleton design pattern """ if cls.instance is None: return object.__new__(cls, *args, **kargs) else: return cls.instance def __init__(self): if SdbParameters.instance is None: SdbParameters.instance = self self.pars = { "protocol_write": "https", "host_write": "proxy.svom.org/sdb-import", "path_write": "/v0/add_product", "protocol_search": "https", "host_search": "fsc.svom.org/sdb", "path_search": "/server/search" }
[docs]def get_url_query(): """return url SDB query """ sdb = SdbParameters() return sdb.pars['protocol_search']+'://'+sdb.pars['host_search']+sdb.pars['path_search']
[docs]def extract_type(n_file): """from ECL-EVT-CAL_UTCtotootxxx.fits extract ECL-EVT-CAL :param n_file: """ e_type = n_file[:11] e_type.replace('_', '-') if e_type in LIST_INPUTS_DC3 or e_type in LIST_PRODUCTS_DC3: return e_type else: return ""
[docs]def ping_sdb(count=2): """Ping SDB server. """ # sdb_url = "svom-sdb.lam.fr" sdb_url = "svom-sdb.lal.fr." response = os.system(f"ping -c {count} {sdb_url}") return True if response == 0 else False
[docs]def receive_event_att_files(sdb_filename, dest_dir): """Receive event ECL-EVT-SEC_XXX.fits or SVO-ATT-CNV_XXX.fits files from the SDB. :param sdb_filename: name of file to be downloaded from SDB :type sdb_filename: str :param dest_dir: path of the destination directory :type dest_dir: str :return: if the operation succeeded :rtype: bool """ sdb = SdbParameters() if not os.path.exists(dest_dir): logger.info(f"Destination directory {dest_dir} does not exist !") return False try: url = 'https://' + sdb.pars['host'] + ':' + \ str(sdb.pars['port']) + \ sdb.pars['path_read'] + '/' + sdb_filename response = requests.get(url=url) except requests.exceptions.ConnectionError: logger.info(f"Failed to establish a new connection with {url}") return False if response.ok: logger.info("receive_event_att_files() function has failed.") logger.info(f"Status code: {response.status_code}") return False abspath_filename = os.path.join(dest_dir, sdb_filename) with open(abspath_filename, 'wb') as f: f.write(response.content) return True
[docs]def get_url_with_namefile(n_file, prod_acronyme="", level_obj='l1'): # pragma: no cover """ :param n_file: :param prod_acronyme: :param level_obj: """ if prod_acronyme == "": prod_acronyme = extract_type(n_file) # download # memo url_query = 'https://svom-sdb.lal.in2p3.fr/server/search/l1?a=2;4;5;26&c=16::eq::SVO-ATT-CNV;5::eq::4278190334' url_query = get_url_query() if level_obj == "l1": # url_query += f'/l1?a=2;26&c=26::eq::{n_file}' url_query += f'/l1?a=2;4&c=11::eq::{n_file}' if prod_acronyme != "": url_query += f';3::eq::{prod_acronyme}' # meno url_query += f'/l1?a=2;26&c=26::eq::{n_file}' else: # https://svom-sdb.lal.in2p3.fr/server/search/products?a=1;12;2;3;4;6;7;11&c=9::eq::ECL;8::eq::GP;3::eq::ECL-SKY-IMA url_query += f'/products?a=4&c=9::eq::ECL;8::eq::GP;11::eq::{n_file}' if prod_acronyme != "": url_query += f';3::eq::{prod_acronyme}' try: logger.debug(url_query) response = requests.get(url=url_query) except: logger.exception(f"Failed to download file with {url_query}") return "" if response.ok: l_json = response.json() logger.debug(l_json) if len(l_json) == 0: logger.debug(url_query) logger.error(f"No product with name '{n_file}' in SDB") return "" if len(l_json) > 1: logger.warning(f"{len(l_json)} objects in SDB with same filename '{n_file}'") logger.warning("So get last ...") ret_url = l_json[-1]['url'] logger.debug(f"find {ret_url}") return ret_url else: logger.error(f"Status code: {response.status_code}") return ""
# def download_name(f_name, dest_dir, type_obj=""): # pragma: no cover # """ # # :param f_name: # :param dest_dir: # :param type_obj: # """ # if type_obj == "": # type_obj = extract_type(f_name) # if type_obj == "": # l_obj = "l1" # ret_url = get_url_with_namefile(f_name, level_obj=l_obj) # if ret_url == "": # l_obj = "product" # ret_url = get_url_with_namefile(f_name, level_obj=l_obj) # else: # if type_obj in LIST_INPUTS_DC3: # l_obj = "l1" # else: # l_obj = "product" # ret_url = get_url_with_namefile(f_name, type_obj) # if ret_url == "": # logger.error(f"Can't find {f_name} in SDB") # return False # return download_url_and_rename(ret_url, dest_dir, f_name)
[docs]def download_url_and_rename(url_file, dest_dir, new_name=""): # pragma: no cover """ :return: if the operation succeeded :rtype: bool """ if not os.path.exists(dest_dir): logger.info(f"Destination directory {dest_dir} does not exist !") return False if url_file == "": logger.info(f"url is empty!") return False # download try: response = requests.get(url=url_file) except: logger.exception(f"Failed to download file with {url_file}") return False # write if response.ok: if new_name == "": new_name = url_file.split('/')[-1] abspath_filename = os.path.join(dest_dir, new_name) with open(abspath_filename, 'wb') as f: f.write(response.content) return True else: logger.info(f"Status code: {response.status_code}") return False
[docs]def send_product_ecpi(path_product): # pragma: no cover """Send L1 product generated by the ECLAIRs pipeline to the SDB. Check if input file has registered CARD KW value for DC1: ECL-SKY-IMA, ECL-SOP-IMA, ECL-DET-UBC and ECL-EVT-CAL. :param path_product: path to the .fits file to be sent. :type path_product: str :return: if the operation succeeded :rtype: bool """ sdb = SdbParameters() fits_file = fits.open(path_product) assert os.path.exists(path_product) assert path_product.endswith('.fits') try: assert fits_file[0].header['CARD'] in LIST_PRODUCTS_DC3 except AssertionError: logger.error(f"CARD keyword not present in {path_product} file") fits_file.close() return False fits_file.close() try: file = {'product': open(path_product, 'rb')} url = sdb.pars['protocol_write'] + '://' + sdb.pars['host_write'] + \ sdb.pars['path_write'] response = requests.post(url=url, files=file) except: logger.exception(f"Failed to establish a new connection with {url}") return False if not response.ok: logger.error(f"Error POST : {response.status_code}") logger.error(response.url) logger.error(f"response :{response.headers}") return False return True
# # def find_name_last_download(obs_id, type_obj): # pragma: no cover # """ # # :param obs_id: # :param type_obj: # """ # # # curl 'https://svom-sdb.lal.in2p3.fr/server/search/l1?a=2;4;5&c=16::eq::SVO-ATT-CNV;5::eq::4278190334' # # curl -H "Authorization: Bearer $token" \ # # "https://fsc.svom.org/sdb/server/search/l1?a=1;4;6&c=3::eq::SVO-ORB-CNV;2::eq::7164" # url_query = get_url_query() # # url_query += f'/l1?a=2;4;26&c=5::eq::{obs_id};16::eq::{type_obj}' # url_query += f'/l1?a=1;4;6&c=2::eq::{obs_id};3::eq::{type_obj}' # try: # response = requests.get(url=url_query) # except: # logger.exception(f"Failed to download file with {url_query}") # return "" # if response.ok: # l_json = response.json() # logger.debug(l_json) # if len(l_json) == 0: # logger.debug(url_query) # logger.error(f"No product with type {type_obj} at OBS_ID={obs_id}") # return "" # if len(l_json) > 1: # last_product = sorted( # l_json, # key=lambda i: datetime.strptime(i['added_at'], '%Y-%m-%d %H:%M:%S.%f') # )[-1] # logger.debug(last_product) # ret_name = last_product['filename'] # else: # ret_name = l_json[0]['filename'] # logger.debug(f"Last download is {ret_name} at OBS_ID={obs_id}") # return ret_name # else: # logger.info(f"Failed to download file with {url_query}") # logger.info(f"Status code: {response.status_code}") # return "" #
[docs]def find_name_last_download(acronym): # pragma: no cover """ :param obs_id: :param type_obj: """ sdb = SdbIo() try: response = sdb.raw_search( acronym=acronym, outputs=['product_id', 'date', 'upload_filename'] ) except: logger.exception(f"Failed to download file with {acronym}") return "" if response.ok: l_json = response.json() logger.debug(l_json) if len(l_json) == 0: logger.debug(acronym) logger.error(f"No product with type {acronym}") return "" if len(l_json) > 1: last_product = sorted( l_json, key=lambda i: datetime.strptime(i['added_at'], '%Y-%m-%d %H:%M:%S.%f') )[-1] logger.debug(last_product) ret_name = last_product['upload_filename'] else: ret_name = l_json[0]['upload_filename'] logger.debug(f"Last download is {ret_name}") return ret_name else: logger.info(f"Failed to download file with {acronym}") logger.info(f"Status code: {response.status_code}") return ""
[docs]def receive_qla1_files_with_sdb(destination_dir): """ :param destination_dir: path to destination directory :type: destination_dir: string :return: """ if not os.path.exists(destination_dir): logger.info(f"Destination directory {destination_dir} does not exist !") return False sdb = SdbIo() for acronym in LIST_INPUTS_DC3: # filename = f'{acronym}_latest.fits' loc_filename = sdb.save_latest_raw_fits_file( dirname=destination_dir, # filename=filename, acronym=acronym ) # assert filename == loc_filename # Maybe some additional verifications # We must rename files with an official format (UTC date)? return True
[docs]def send_product_ecpi_to_sdb(path_product): # pragma: no cover """Send L1 product generated by the ECLAIRs pipeline to the SDB. Check if input file has registered CARD KW value for DC1: ECL-SKY-IMA, ECL-SOP-IMA, ECL-DET-UBC and ECL-EVT-CAL. :param path_product: path to the .fits file to be sent. :type path_product: str :return: if the operation succeeded :rtype: bool """ assert os.path.exists(path_product) assert path_product.endswith('.fits') with fits.open(path_product) as fits_file: try: assert fits_file[0].header['CARD'] in LIST_PRODUCTS_DC3 except AssertionError: logger.error(f"CARD keyword not present in {path_product} file") return False sdb = SdbIo() try: response = sdb.import_or_update(path_product) except: logger.exception(f"Failed to upload the product {path_product}") return False if not response.ok: logger.error(f"Error POST : {response.status_code}") logger.error(response.url) logger.error(f"response :{response.headers}") return False return True