Source code for ecpi.database.interact_caldb

"""CALDB interface
"""

import os
import re
import logging
import subprocess
import urllib
import shutil
import requests
from svom.messaging import HttpIo

logger = logging.getLogger(__name__)

ext_cfg = {
    "host": "svom-caldb.lal.in2p3.fr",
    "path": "/api/",
    "port": ""
}

# Available calibration files are classified by
#  <class_type> in three groups :
#       cpf:    Calibration Product Files
#       bcf:    Basic Calibration Files
#       pcf:    Primary Calibration Files
# See the official documentation for more information :
#    https://heasarc.gsfc.nasa.gov/docs/heasarc/caldb/docs/memos/cal_gen_93_006/cal_gen_93_006.html
caldb_files = {
    'arf_efarea': {
        'class': 'cpf',
        'ecpi_name': 'ECL-RSP-ARF'
    },
    'irf_matrix': {
        'class': 'cpf',
        'ecpi_name': 'ECL-RSP-IRF'
    },
    'rmf_matrix': {
        'class': 'cpf',
        'ecpi_name': 'ECL-RSP-RMF'
    },
    'gain_coef': {
        'class': 'bcf',
        'ecpi_name': 'ECL-PIX-CAL'
    }
}


[docs]def send_file_to_caldb(file): """Send whatever file to CALDB. Usefull for tests. Not to be used in production. Following keywords MUST appear in primary hdu file: "TELESCOP" = "SVOM" "INSTRUME" = "ECL" "CCLS0001" = "BCF" "CDTP0001" = "DATA" "CCNM0001" = "ARF" "CVSD0001" = "1999-01-01" "CVST0001" = "00:00:00" "CDES0001" = "test dataset" The KW CCNM0001 specifies the type in your request. :param file: path to the file to be sent to CALDB :type file: str """ with open(file, 'rb') as fp: data = fp.read() url = 'https://' + ext_cfg['host'] + ext_cfg['path'] response = requests.post(url + "/admin/", data=data) logger.debug(f"CALDB response: {response}")
[docs]def download_file_from_caldb(dest_dir, file_calib, path_caldb="user/data/svom/ecl/cpf/arf_effarea"): """Simple download with API REST interface / JM Colley .. note: for the moment the download does not account for the port 5000. Otherwise it does not work. :param dest_dir: destination directory :type dest_dir: str :param file_calib: name of the calibration file to be downloaded :type file_calib: str :param path_caldb: root path in caldb :type path_caldb: str """ if not os.path.exists(dest_dir): logger.info(f"Destination directory {dest_dir} does not exist !") return False if ext_cfg['port'] is "": raw_url = 'https://' + ext_cfg['host'] + ext_cfg['path'] else: raw_url = 'https://' + ext_cfg['host'] + ':' + \ str(ext_cfg['port']) + ext_cfg['path'] # https://svom-caldb.lal.in2p3.fr/api/user/data/svom/ecl/bcf/aux-ecl-arf-rsp/aux-ecl-arf-rsp_2017-01-01.fits url = os.path.join(raw_url, path_caldb, file_calib) p_file = os.path.join(dest_dir, file_calib) cmd = f'curl {url} > {p_file}' logger.debug(f"cmd : {cmd}") os.system(cmd) ret = os.stat(p_file).st_size > 0 rename_arf_dc2(p_file) return ret
[docs]def receive_arf_file_from_caldb(dest_dir): """Receive ARf file from the CALDB. :param dest_dir: path of the destination directory :type dest_dir: str :return: if the operation succeeded :rtype: bool """ if not os.path.exists(dest_dir): logger.info(f"Destination directory {dest_dir} does not exist !") return False filetype = 'ARF' headers = { "Cache-Control": "no-cache", "Pragma": "no-cache" } try: url = call_quzcif(filetype) response = requests.get(url, headers=headers) except requests.exceptions.ConnectionError: logger.info("Failed to establish a new connection with CALDB") return False response_header = response.headers['content-disposition'] caldb_filename = re.findall("filename=(.+)", response_header)[0] abspath_filename = os.path.join(dest_dir, caldb_filename) with open(abspath_filename, 'wb') as f: f.write(response.content) # rename from ecl_rsp_ene to ECL-RSP-ARF. rename_arf(abspath_filename) return True
[docs]def execute(cmd): """Simple subprocess run """ proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True) return proc.communicate()
[docs]def call_quzcif(filetype='ECL-RSP-ARF'): """ CALDB request quzcif """ if "CALDB" not in os.environ: logger.error("CALDB should be in environment") if "CALDBCONFIG" not in os.environ: logger.error("CALDBCONFIG should be in environment") if "CALDBALIAS" not in os.environ: logger.error("CALDBALIAS should be in environment") # filetype = 'ECL-RSP-ARF' cmd = ["quzcif svom ecl - - %s now now -" % (filetype)] out, _ = execute(cmd) return out.decode('utf-8').split()[0]
# TODO: Check the os independence
[docs]def rename_arf(arf_path): """Rename arf with correct syntaxe """ root = '/'.join(arf_path.split('/')[:-1]) + '/' f = arf_path.split('/')[-1] f = f.replace(f[:11], f[:11].upper()) os.rename(arf_path, root + f)
# TODO: Check the os independence
[docs]def rename_arf_dc2(arf_path): """Rename arf with correct syntaxe """ root = '/'.join(arf_path.split('/')[:-1]) + '/' f_arf = arf_path.split('/')[-1] f_arf = 'ECL-RSP-ARF_' + f_arf os.rename(arf_path, root + f_arf)
[docs]class CaldbIo(HttpIo): #pylint: disable=R0902 """ (A)synchronous HTTP client """ def __init__(self, search_url='https://fsc.svom.org/caldb/api', max_tries=5, #pylint: disable=R0913 ssl_context=None, asynchronous=False, loop=None, use_tokens=True): super().__init__(server_url='', max_tries=max_tries, ssl_context=ssl_context, asynchronous=asynchronous, loop=loop, use_tokens=use_tokens) self.search_endpoint = f"{search_url.rstrip('/')}/user/search/ecl" self.list_endpoint = f"{search_url.rstrip('/')}/user/list/ecl"
[docs] def get_list_cal_files(self, class_type): class_type = class_type.strip('/') if class_type not in caldb_files.keys(): logger.error(f'File type ' f'[{class_type}] ' f'is not supported or not a valid calibration file!') return None url = self.search_endpoint + '/' + class_type.strip('/') response = self.get(url) if response.status_code//200 not in [2, 4]: logger.error(f'File type ' f'[{class_type}] ' f'is not supported or not a valid calibration file!') return None return response.json()
[docs] def save_caldb_last_fits_files(self, dir_name=None, **kwargs): """ Dumps fits file as {dirname}/{filename} for a given search. if the keyword args search leads to multiple entries filename should not be provided. Returns list of file paths """ if not os.path.exists(dir_name): logger.info(f"Destination directory {dir_name} does not exist !") return False dumped_files = [] # We use tokens # auth_header = self.kc_tokens.get_authorization_header() session = requests.Session() # session.headers.update(auth_header) for cal_file in caldb_files.keys(): fits_url = f'{self.search_endpoint.rstrip("/")}/{cal_file}/now' print(f'Recovering {cal_file} from {fits_url}') auth_header = self.kc_tokens.get_authorization_header() session.headers.update(auth_header) with session.request('get', fits_url) as f_in: if f_in.status_code != 200: logger.debug(f'Cannot download from {fits_url}') continue filename = f_in.headers.get('Content-disposition').split('=')[-1] filename = filename.replace('"', '').replace(cal_file, caldb_files[cal_file]['ecpi_name']) local_file = f'{dir_name.rstrip("/")}/{filename}' logger.info('Dumping remote file %s as %s', fits_url, local_file) with open(local_file, 'wb') as f_out: f_out.write(f_in.content) dumped_files.append(local_file) return dumped_files