"""CALDB interface
"""
import os
import re
import logging
import subprocess
import urllib
import shutil
import requests
from svom.messaging import HttpIo
logger = logging.getLogger(__name__)
ext_cfg = {
"host": "svom-caldb.lal.in2p3.fr",
"path": "/api/",
"port": ""
}
# Available calibration files are classified by
# <class_type> in three groups :
# cpf: Calibration Product Files
# bcf: Basic Calibration Files
# pcf: Primary Calibration Files
# See the official documentation for more information :
# https://heasarc.gsfc.nasa.gov/docs/heasarc/caldb/docs/memos/cal_gen_93_006/cal_gen_93_006.html
caldb_files = {
'arf_efarea': {
'class': 'cpf',
'ecpi_name': 'ECL-RSP-ARF'
},
'irf_matrix': {
'class': 'cpf',
'ecpi_name': 'ECL-RSP-IRF'
},
'rmf_matrix': {
'class': 'cpf',
'ecpi_name': 'ECL-RSP-RMF'
},
'gain_coef': {
'class': 'bcf',
'ecpi_name': 'ECL-PIX-CAL'
}
}
[docs]def send_file_to_caldb(file):
"""Send whatever file to CALDB.
Usefull for tests. Not to be used in production.
Following keywords MUST appear in primary hdu file:
"TELESCOP" = "SVOM"
"INSTRUME" = "ECL"
"CCLS0001" = "BCF"
"CDTP0001" = "DATA"
"CCNM0001" = "ARF"
"CVSD0001" = "1999-01-01"
"CVST0001" = "00:00:00"
"CDES0001" = "test dataset"
The KW CCNM0001 specifies the type in your request.
:param file: path to the file to be sent to CALDB
:type file: str
"""
with open(file, 'rb') as fp:
data = fp.read()
url = 'https://' + ext_cfg['host'] + ext_cfg['path']
response = requests.post(url + "/admin/", data=data)
logger.debug(f"CALDB response: {response}")
[docs]def download_file_from_caldb(dest_dir, file_calib,
path_caldb="user/data/svom/ecl/cpf/arf_effarea"):
"""Simple download with API REST interface / JM Colley
.. note: for the moment the download does not account for the port 5000.
Otherwise it does not work.
:param dest_dir: destination directory
:type dest_dir: str
:param file_calib: name of the calibration file to be downloaded
:type file_calib: str
:param path_caldb: root path in caldb
:type path_caldb: str
"""
if not os.path.exists(dest_dir):
logger.info(f"Destination directory {dest_dir} does not exist !")
return False
if ext_cfg['port'] is "":
raw_url = 'https://' + ext_cfg['host'] + ext_cfg['path']
else:
raw_url = 'https://' + ext_cfg['host'] + ':' + \
str(ext_cfg['port']) + ext_cfg['path']
# https://svom-caldb.lal.in2p3.fr/api/user/data/svom/ecl/bcf/aux-ecl-arf-rsp/aux-ecl-arf-rsp_2017-01-01.fits
url = os.path.join(raw_url, path_caldb, file_calib)
p_file = os.path.join(dest_dir, file_calib)
cmd = f'curl {url} > {p_file}'
logger.debug(f"cmd : {cmd}")
os.system(cmd)
ret = os.stat(p_file).st_size > 0
rename_arf_dc2(p_file)
return ret
[docs]def receive_arf_file_from_caldb(dest_dir):
"""Receive ARf file from the CALDB.
:param dest_dir: path of the destination directory
:type dest_dir: str
:return: if the operation succeeded
:rtype: bool
"""
if not os.path.exists(dest_dir):
logger.info(f"Destination directory {dest_dir} does not exist !")
return False
filetype = 'ARF'
headers = {
"Cache-Control": "no-cache",
"Pragma": "no-cache"
}
try:
url = call_quzcif(filetype)
response = requests.get(url, headers=headers)
except requests.exceptions.ConnectionError:
logger.info("Failed to establish a new connection with CALDB")
return False
response_header = response.headers['content-disposition']
caldb_filename = re.findall("filename=(.+)", response_header)[0]
abspath_filename = os.path.join(dest_dir, caldb_filename)
with open(abspath_filename, 'wb') as f:
f.write(response.content)
# rename from ecl_rsp_ene to ECL-RSP-ARF.
rename_arf(abspath_filename)
return True
[docs]def execute(cmd):
"""Simple subprocess run
"""
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
return proc.communicate()
[docs]def call_quzcif(filetype='ECL-RSP-ARF'):
""" CALDB request quzcif
"""
if "CALDB" not in os.environ:
logger.error("CALDB should be in environment")
if "CALDBCONFIG" not in os.environ:
logger.error("CALDBCONFIG should be in environment")
if "CALDBALIAS" not in os.environ:
logger.error("CALDBALIAS should be in environment")
# filetype = 'ECL-RSP-ARF'
cmd = ["quzcif svom ecl - - %s now now -" % (filetype)]
out, _ = execute(cmd)
return out.decode('utf-8').split()[0]
# TODO: Check the os independence
[docs]def rename_arf(arf_path):
"""Rename arf with correct syntaxe
"""
root = '/'.join(arf_path.split('/')[:-1]) + '/'
f = arf_path.split('/')[-1]
f = f.replace(f[:11], f[:11].upper())
os.rename(arf_path, root + f)
# TODO: Check the os independence
[docs]def rename_arf_dc2(arf_path):
"""Rename arf with correct syntaxe
"""
root = '/'.join(arf_path.split('/')[:-1]) + '/'
f_arf = arf_path.split('/')[-1]
f_arf = 'ECL-RSP-ARF_' + f_arf
os.rename(arf_path, root + f_arf)
[docs]class CaldbIo(HttpIo): #pylint: disable=R0902
"""
(A)synchronous HTTP client
"""
def __init__(self, search_url='https://fsc.svom.org/caldb/api', max_tries=5, #pylint: disable=R0913
ssl_context=None, asynchronous=False, loop=None, use_tokens=True):
super().__init__(server_url='',
max_tries=max_tries,
ssl_context=ssl_context,
asynchronous=asynchronous,
loop=loop,
use_tokens=use_tokens)
self.search_endpoint = f"{search_url.rstrip('/')}/user/search/ecl"
self.list_endpoint = f"{search_url.rstrip('/')}/user/list/ecl"
[docs] def get_list_cal_files(self, class_type):
class_type = class_type.strip('/')
if class_type not in caldb_files.keys():
logger.error(f'File type '
f'[{class_type}] '
f'is not supported or not a valid calibration file!')
return None
url = self.search_endpoint + '/' + class_type.strip('/')
response = self.get(url)
if response.status_code//200 not in [2, 4]:
logger.error(f'File type '
f'[{class_type}] '
f'is not supported or not a valid calibration file!')
return None
return response.json()
[docs] def save_caldb_last_fits_files(self, dir_name=None, **kwargs):
"""
Dumps fits file as {dirname}/{filename} for a given search.
if the keyword args search leads to multiple entries
filename should not be provided. Returns list of file paths
"""
if not os.path.exists(dir_name):
logger.info(f"Destination directory {dir_name} does not exist !")
return False
dumped_files = []
# We use tokens
# auth_header = self.kc_tokens.get_authorization_header()
session = requests.Session()
# session.headers.update(auth_header)
for cal_file in caldb_files.keys():
fits_url = f'{self.search_endpoint.rstrip("/")}/{cal_file}/now'
print(f'Recovering {cal_file} from {fits_url}')
auth_header = self.kc_tokens.get_authorization_header()
session.headers.update(auth_header)
with session.request('get', fits_url) as f_in:
if f_in.status_code != 200:
logger.debug(f'Cannot download from {fits_url}')
continue
filename = f_in.headers.get('Content-disposition').split('=')[-1]
filename = filename.replace('"', '').replace(cal_file,
caldb_files[cal_file]['ecpi_name'])
local_file = f'{dir_name.rstrip("/")}/{filename}'
logger.info('Dumping remote file %s as %s', fits_url, local_file)
with open(local_file, 'wb') as f_out:
f_out.write(f_in.content)
dumped_files.append(local_file)
return dumped_files