"""SDB interface
"""
import os
import json
import logging
import pprint
from datetime import datetime
from astropy.io import fits
import requests
logger = logging.getLogger(__name__)
LIST_PRODUCTS_DC1 = ['ECL-SKY-IMA', 'ECL-SOP-IMA', 'ECL-DET-UBC', 'ECL-EVT-CAL']
LIST_INPUTS_DC1 = ['ECL-EVT-SEC', 'SVO-ATT-CNV', 'SVO-ORB-CNV']
[docs]class SdbParameters(object):
"""
#TODO: may be simpler solution can be used than a singleton
"""
instance = None
def __new__(cls, *args, **kargs):
"""
used singleton design pattern
"""
if cls.instance is None:
return object.__new__(cls, *args, **kargs)
else:
return cls.instance
def __init__(self):
if SdbParameters.instance is None:
SdbParameters.instance = self
self.pars = {
"protocol_write": "https",
"host_write": "proxy.svom.org/sdb-import",
"path_write": "/v0/add_product",
"protocol_search": "https",
"host_search": "fsc.svom.org/sdb",
"path_search": "/server/search"
}
[docs]def get_url_query():
"""return url SDB query
"""
sdb = SdbParameters()
return sdb.pars['protocol_search']+'://'+sdb.pars['host_search']+sdb.pars['path_search']
[docs]def ping_sdb(count=2):
"""Ping SDB server.
"""
sdb_url = "svom-sdb.lam.fr"
response = os.system(f"ping -c {count} {sdb_url}")
return True if response == 0 else False
[docs]def receive_event_att_files(sdb_filename, dest_dir):
"""Receive event ECL-EVT-SEC_XXX.fits or SVO-ATT-CNV_XXX.fits files from the SDB.
:param sdb_filename: name of file to be downloaded from SDB
:type sdb_filename: str
:param dest_dir: path of the destination directory
:type dest_dir: str
:return: if the operation succeeded
:rtype: bool
"""
sdb = SdbParameters()
if not os.path.exists(dest_dir):
logger.info(f"Destination directory {dest_dir} does not exist !")
return False
try:
url = 'https://' + sdb.pars['host'] + ':' + \
str(sdb.pars['port']) + \
sdb.pars['path_read'] + '/' + sdb_filename
response = requests.get(url=url)
except requests.exceptions.ConnectionError:
logger.info(f"Failed to establish a new connection with {url}")
return False
if response.ok:
logger.info("receive_event_att_files() function has failed.")
logger.info(f"Status code: {response.status_code}")
return False
abspath_filename = os.path.join(dest_dir, sdb_filename)
with open(abspath_filename, 'wb') as f:
f.write(response.content)
return True
[docs]def get_url_with_namefile(n_file, prod_acronyme="", level_obj='l1'): # pragma: no cover
"""
:param n_file:
:param prod_acronyme:
:param level_obj:
"""
if prod_acronyme == "":
prod_acronyme = extract_type(n_file)
# download
# memo url_query = 'https://svom-sdb.lal.in2p3.fr/server/search/l1?a=2;4;5;26&c=16::eq::SVO-ATT-CNV;5::eq::4278190334'
url_query = get_url_query()
if level_obj == "l1":
url_query += f'/l1?a=2;26&c=26::eq::{n_file}'
if prod_acronyme != "":
url_query += f';16::eq::{prod_acronyme}'
# meno url_query += f'/l1?a=2;26&c=26::eq::{n_file}'
else:
# https://svom-sdb.lal.in2p3.fr/server/search/products?a=1;12;2;3;4;6;7;11&c=9::eq::ECL;8::eq::GP;3::eq::ECL-SKY-IMA
url_query += f'/products?a=4&c=9::eq::ECL;8::eq::GP;11::eq::{n_file}'
if prod_acronyme != "":
url_query += f';3::eq::{prod_acronyme}'
try:
logger.debug(url_query)
response = requests.get(url=url_query)
except:
logger.exception(f"Failed to download file with {url_query}")
return ""
if response.ok:
l_json = response.json()
logger.debug(l_json)
if len(l_json) == 0:
logger.debug(url_query)
logger.error(f"No product with name '{n_file}' in SDB")
return ""
if len(l_json) > 1:
logger.warning(f"{len(l_json)} objects in SDB with same filename '{n_file}'")
logger.warning("So get last ...")
ret_url = l_json[-1]['url']
logger.debug(f"find {ret_url}")
return ret_url
else:
logger.error(f"Status code: {response.status_code}")
return ""
[docs]def download_name(f_name, dest_dir, type_obj=""): # pragma: no cover
"""
:param f_name:
:param dest_dir:
:param type_obj:
"""
if type_obj == "":
type_obj = extract_type(f_name)
if type_obj == "":
l_obj = "l1"
ret_url = get_url_with_namefile(f_name, level_obj=l_obj)
if ret_url == "":
l_obj = "product"
ret_url = get_url_with_namefile(f_name, level_obj=l_obj)
else:
if type_obj in LIST_INPUTS_DC1:
l_obj = "l1"
else:
l_obj = "product"
ret_url = get_url_with_namefile(f_name, type_obj)
if ret_url == "":
logger.error(f"Can't find {f_name} in SDB")
return False
return download_url_and_rename(ret_url, dest_dir, f_name)
[docs]def download_url_and_rename(url_file, dest_dir, new_name=""): # pragma: no cover
"""
:return: if the operation succeeded
:rtype: bool
"""
if not os.path.exists(dest_dir):
logger.info(f"Destination directory {dest_dir} does not exist !")
return False
if url_file == "":
logger.info(f"url is empty!")
return False
# download
try:
response = requests.get(url=url_file)
except:
logger.exception(f"Failed to download file with {url_file}")
return False
# write
if response.ok:
if new_name == "":
new_name = url_file.split('/')[-1]
abspath_filename = os.path.join(dest_dir, new_name)
with open(abspath_filename, 'wb') as f:
f.write(response.content)
return True
else:
logger.info(f"Status code: {response.status_code}")
return False
[docs]def send_product_ecpi(path_product): # pragma: no cover
"""Send L1 product generated by the ECLAIRs pipeline to the SDB.
Check if input file has registered CARD KW value for DC1:
ECL-SKY-IMA, ECL-SOP-IMA, ECL-DET-UBC and ECL-EVT-CAL.
:param path_product: path to the .fits file to be sent.
:type path_product: str
:return: if the operation succeeded
:rtype: bool
"""
sdb = SdbParameters()
fits_file = fits.open(path_product)
assert os.path.exists(path_product)
assert path_product.endswith('.fits')
try:
assert fits_file[0].header['CARD'] in LIST_PRODUCTS_DC1
except AssertionError:
logger.error(f"CARD keyword not present in {path_product} file")
fits_file.close()
return False
fits_file.close()
try:
file = {'product': open(path_product, 'rb')}
url = sdb.pars['protocol_write'] + '://' + sdb.pars['host_write'] + \
sdb.pars['path_write']
response = requests.post(url=url, files=file)
except:
logger.exception(f"Failed to establish a new connection with {url}")
return False
if not response.ok:
logger.error(f"Error POST : {response.status_code}")
logger.error(response.url)
logger.error(f"response :{response.headers}")
return False
return True
[docs]def find_name_last_download(obs_id, type_obj): # pragma: no cover
"""
:param obs_id:
:param type_obj:
"""
# curl 'https://svom-sdb.lal.in2p3.fr/server/search/l1?a=2;4;5&c=16::eq::SVO-ATT-CNV;5::eq::4278190334'
url_query = get_url_query()
url_query += f'/l1?a=2;4;26&c=5::eq::{obs_id};16::eq::{type_obj}'
try:
response = requests.get(url=url_query)
except:
logger.exception(f"Failed to download file with {url_query}")
return ""
if response.ok:
l_json = response.json()
logger.debug(l_json)
if len(l_json) == 0:
logger.debug(url_query)
logger.error(f"No product with type {type_obj} at OBS_ID={obs_id}")
return ""
if len(l_json) > 1:
last_product = sorted(
l_json,
key=lambda i: datetime.strptime(i['added_at'], '%Y-%m-%d %H:%M:%S.%f')
)[-1]
logger.debug(last_product)
ret_name = last_product['filename']
else:
ret_name = l_json[0]['filename']
logger.debug(f"Last download is {ret_name} at OBS_ID={obs_id}")
return ret_name
else:
logger.info(f"Failed to download file with {url_query}")
logger.info(f"Status code: {response.status_code}")
return ""