"""SDB interface
"""
import os
import json
import logging
import pprint
from datetime import datetime
from astropy.io import fits
import requests
from svom.messaging import SdbIo, HttpIo
logger = logging.getLogger(__name__)
LIST_PRODUCTS_DC3 = [
'ECL-SKY-IMA',
'ECL-SOP-IMA',
'ECL-DET-UBC',
'ECL-EVT-CAL',
'ECL-GTI-1',
'ECL-GTI-3'
]
LIST_INPUTS_DC3 = ['ECL-EVT-SEC', 'SVO-ATT-CNV', 'SVO-ORB-CNV']
[docs]class SdbParameters(object):
"""
#TODO: may be simpler solution can be used than a singleton
"""
instance = None
def __new__(cls, *args, **kargs):
"""
used singleton design pattern
"""
if cls.instance is None:
return object.__new__(cls, *args, **kargs)
else:
return cls.instance
def __init__(self):
if SdbParameters.instance is None:
SdbParameters.instance = self
self.pars = {
"protocol_write": "https",
"host_write": "proxy.svom.org/sdb-import",
"path_write": "/v0/add_product",
"protocol_search": "https",
"host_search": "fsc.svom.org/sdb",
"path_search": "/server/search"
}
[docs]def get_url_query():
"""return url SDB query
"""
sdb = SdbParameters()
return sdb.pars['protocol_search']+'://'+sdb.pars['host_search']+sdb.pars['path_search']
[docs]def ping_sdb(count=2):
"""Ping SDB server.
"""
# sdb_url = "svom-sdb.lam.fr"
sdb_url = "svom-sdb.lal.fr."
response = os.system(f"ping -c {count} {sdb_url}")
return True if response == 0 else False
[docs]def receive_event_att_files(sdb_filename, dest_dir):
"""Receive event ECL-EVT-SEC_XXX.fits or SVO-ATT-CNV_XXX.fits files from the SDB.
:param sdb_filename: name of file to be downloaded from SDB
:type sdb_filename: str
:param dest_dir: path of the destination directory
:type dest_dir: str
:return: if the operation succeeded
:rtype: bool
"""
sdb = SdbParameters()
if not os.path.exists(dest_dir):
logger.info(f"Destination directory {dest_dir} does not exist !")
return False
try:
url = 'https://' + sdb.pars['host'] + ':' + \
str(sdb.pars['port']) + \
sdb.pars['path_read'] + '/' + sdb_filename
response = requests.get(url=url)
except requests.exceptions.ConnectionError:
logger.info(f"Failed to establish a new connection with {url}")
return False
if response.ok:
logger.info("receive_event_att_files() function has failed.")
logger.info(f"Status code: {response.status_code}")
return False
abspath_filename = os.path.join(dest_dir, sdb_filename)
with open(abspath_filename, 'wb') as f:
f.write(response.content)
return True
[docs]def get_url_with_namefile(n_file, prod_acronyme="", level_obj='l1'): # pragma: no cover
"""
:param n_file:
:param prod_acronyme:
:param level_obj:
"""
if prod_acronyme == "":
prod_acronyme = extract_type(n_file)
# download
# memo url_query = 'https://svom-sdb.lal.in2p3.fr/server/search/l1?a=2;4;5;26&c=16::eq::SVO-ATT-CNV;5::eq::4278190334'
url_query = get_url_query()
if level_obj == "l1":
# url_query += f'/l1?a=2;26&c=26::eq::{n_file}'
url_query += f'/l1?a=2;4&c=11::eq::{n_file}'
if prod_acronyme != "":
url_query += f';3::eq::{prod_acronyme}'
# meno url_query += f'/l1?a=2;26&c=26::eq::{n_file}'
else:
# https://svom-sdb.lal.in2p3.fr/server/search/products?a=1;12;2;3;4;6;7;11&c=9::eq::ECL;8::eq::GP;3::eq::ECL-SKY-IMA
url_query += f'/products?a=4&c=9::eq::ECL;8::eq::GP;11::eq::{n_file}'
if prod_acronyme != "":
url_query += f';3::eq::{prod_acronyme}'
try:
logger.debug(url_query)
response = requests.get(url=url_query)
except:
logger.exception(f"Failed to download file with {url_query}")
return ""
if response.ok:
l_json = response.json()
logger.debug(l_json)
if len(l_json) == 0:
logger.debug(url_query)
logger.error(f"No product with name '{n_file}' in SDB")
return ""
if len(l_json) > 1:
logger.warning(f"{len(l_json)} objects in SDB with same filename '{n_file}'")
logger.warning("So get last ...")
ret_url = l_json[-1]['url']
logger.debug(f"find {ret_url}")
return ret_url
else:
logger.error(f"Status code: {response.status_code}")
return ""
# def download_name(f_name, dest_dir, type_obj=""): # pragma: no cover
# """
#
# :param f_name:
# :param dest_dir:
# :param type_obj:
# """
# if type_obj == "":
# type_obj = extract_type(f_name)
# if type_obj == "":
# l_obj = "l1"
# ret_url = get_url_with_namefile(f_name, level_obj=l_obj)
# if ret_url == "":
# l_obj = "product"
# ret_url = get_url_with_namefile(f_name, level_obj=l_obj)
# else:
# if type_obj in LIST_INPUTS_DC3:
# l_obj = "l1"
# else:
# l_obj = "product"
# ret_url = get_url_with_namefile(f_name, type_obj)
# if ret_url == "":
# logger.error(f"Can't find {f_name} in SDB")
# return False
# return download_url_and_rename(ret_url, dest_dir, f_name)
[docs]def download_url_and_rename(url_file, dest_dir, new_name=""): # pragma: no cover
"""
:return: if the operation succeeded
:rtype: bool
"""
if not os.path.exists(dest_dir):
logger.info(f"Destination directory {dest_dir} does not exist !")
return False
if url_file == "":
logger.info(f"url is empty!")
return False
# download
try:
response = requests.get(url=url_file)
except:
logger.exception(f"Failed to download file with {url_file}")
return False
# write
if response.ok:
if new_name == "":
new_name = url_file.split('/')[-1]
abspath_filename = os.path.join(dest_dir, new_name)
with open(abspath_filename, 'wb') as f:
f.write(response.content)
return True
else:
logger.info(f"Status code: {response.status_code}")
return False
[docs]def send_product_ecpi(path_product): # pragma: no cover
"""Send L1 product generated by the ECLAIRs pipeline to the SDB.
Check if input file has registered CARD KW value for DC1:
ECL-SKY-IMA, ECL-SOP-IMA, ECL-DET-UBC and ECL-EVT-CAL.
:param path_product: path to the .fits file to be sent.
:type path_product: str
:return: if the operation succeeded
:rtype: bool
"""
sdb = SdbParameters()
fits_file = fits.open(path_product)
assert os.path.exists(path_product)
assert path_product.endswith('.fits')
try:
assert fits_file[0].header['CARD'] in LIST_PRODUCTS_DC3
except AssertionError:
logger.error(f"CARD keyword not present in {path_product} file")
fits_file.close()
return False
fits_file.close()
try:
file = {'product': open(path_product, 'rb')}
url = sdb.pars['protocol_write'] + '://' + sdb.pars['host_write'] + \
sdb.pars['path_write']
response = requests.post(url=url, files=file)
except:
logger.exception(f"Failed to establish a new connection with {url}")
return False
if not response.ok:
logger.error(f"Error POST : {response.status_code}")
logger.error(response.url)
logger.error(f"response :{response.headers}")
return False
return True
#
# def find_name_last_download(obs_id, type_obj): # pragma: no cover
# """
#
# :param obs_id:
# :param type_obj:
# """
#
# # curl 'https://svom-sdb.lal.in2p3.fr/server/search/l1?a=2;4;5&c=16::eq::SVO-ATT-CNV;5::eq::4278190334'
# # curl -H "Authorization: Bearer $token" \
# # "https://fsc.svom.org/sdb/server/search/l1?a=1;4;6&c=3::eq::SVO-ORB-CNV;2::eq::7164"
# url_query = get_url_query()
# # url_query += f'/l1?a=2;4;26&c=5::eq::{obs_id};16::eq::{type_obj}'
# url_query += f'/l1?a=1;4;6&c=2::eq::{obs_id};3::eq::{type_obj}'
# try:
# response = requests.get(url=url_query)
# except:
# logger.exception(f"Failed to download file with {url_query}")
# return ""
# if response.ok:
# l_json = response.json()
# logger.debug(l_json)
# if len(l_json) == 0:
# logger.debug(url_query)
# logger.error(f"No product with type {type_obj} at OBS_ID={obs_id}")
# return ""
# if len(l_json) > 1:
# last_product = sorted(
# l_json,
# key=lambda i: datetime.strptime(i['added_at'], '%Y-%m-%d %H:%M:%S.%f')
# )[-1]
# logger.debug(last_product)
# ret_name = last_product['filename']
# else:
# ret_name = l_json[0]['filename']
# logger.debug(f"Last download is {ret_name} at OBS_ID={obs_id}")
# return ret_name
# else:
# logger.info(f"Failed to download file with {url_query}")
# logger.info(f"Status code: {response.status_code}")
# return ""
#
[docs]def find_name_last_download(acronym): # pragma: no cover
"""
:param obs_id:
:param type_obj:
"""
sdb = SdbIo()
try:
response = sdb.raw_search(
acronym=acronym,
outputs=['product_id', 'date', 'upload_filename']
)
except:
logger.exception(f"Failed to download file with {acronym}")
return ""
if response.ok:
l_json = response.json()
logger.debug(l_json)
if len(l_json) == 0:
logger.debug(acronym)
logger.error(f"No product with type {acronym}")
return ""
if len(l_json) > 1:
last_product = sorted(
l_json,
key=lambda i: datetime.strptime(i['added_at'], '%Y-%m-%d %H:%M:%S.%f')
)[-1]
logger.debug(last_product)
ret_name = last_product['upload_filename']
else:
ret_name = l_json[0]['upload_filename']
logger.debug(f"Last download is {ret_name}")
return ret_name
else:
logger.info(f"Failed to download file with {acronym}")
logger.info(f"Status code: {response.status_code}")
return ""
[docs]def receive_qla1_files_with_sdb(destination_dir):
"""
:param destination_dir: path to destination directory
:type: destination_dir: string
:return:
"""
if not os.path.exists(destination_dir):
logger.info(f"Destination directory {destination_dir} does not exist !")
return False
sdb = SdbIo()
for acronym in LIST_INPUTS_DC3:
# filename = f'{acronym}_latest.fits'
loc_filename = sdb.save_latest_raw_fits_file(
dirname=destination_dir,
# filename=filename,
acronym=acronym
)
# assert filename == loc_filename
# Maybe some additional verifications
# We must rename files with an official format (UTC date)?
return True
[docs]def send_product_ecpi_to_sdb(path_product): # pragma: no cover
"""Send L1 product generated by the ECLAIRs pipeline to the SDB.
Check if input file has registered CARD KW value for DC1:
ECL-SKY-IMA, ECL-SOP-IMA, ECL-DET-UBC and ECL-EVT-CAL.
:param path_product: path to the .fits file to be sent.
:type path_product: str
:return: if the operation succeeded
:rtype: bool
"""
assert os.path.exists(path_product)
assert path_product.endswith('.fits')
with fits.open(path_product) as fits_file:
try:
assert fits_file[0].header['CARD'] in LIST_PRODUCTS_DC3
except AssertionError:
logger.error(f"CARD keyword not present in {path_product} file")
return False
sdb = SdbIo()
try:
response = sdb.import_or_update(path_product)
except:
logger.exception(f"Failed to upload the product {path_product}")
return False
if not response.ok:
logger.error(f"Error POST : {response.status_code}")
logger.error(response.url)
logger.error(f"response :{response.headers}")
return False
return True