Source code for ecpi.common.io.fits_tools

"""Generic functions to manipulate fits file
"""

import os.path as osp
import os
import logging
from glob import glob
from datetime import datetime as dt
import matplotlib.pyplot as plt
from astropy.io import fits
import astropy.time as astrotime
import ecpi.common.instru.array_convention as ac
from ecpi.common.mission.time import get_mjd_ref

logger = logging.getLogger(__name__)

CREATOR = 'ECLAIRs_pipeline'


[docs]def isot_whitout_sign(isot): """ Remove '.', ':', '-' from isot string :param isot: time in isot format :type isot: str """ isot = isot.split('.')[0] isot = isot.replace(':', '') isot = isot.replace('-', '') return isot
[docs]def get_name_fits(prod, t_start, obs_id="0", vers=None): """get name file FITS for a product template : prod_{obs_id_}t_start_at_t_create.fits :param prod: ECLAIRs product like ECL-SKY-IMA :type prod: string :param t_start: time start of event file associated to image :type t_start: string, isot format :param obs_id: observation identificator :type obs_id: string """ t_start_ns = isot_whitout_sign(t_start) if vers is None: # version is time creation. vers = isot_whitout_sign(astrotime.Time(dt.now(), precision=0).tt.isot) ret_name = f"{prod}_{obs_id}_{t_start_ns}_vers_{vers}.fits" return ret_name
[docs]def common_keyword(fits_header, tstart, tstop, creator=CREATOR, version='0.1', proc_id="01"): """add to the fits header the common keywords used in all the pipeline products .. seealso:: doc : ECL-GP-FITS-KEYWORDS.pdf ECLAIRs GP fits keywords https://forge.in2p3.fr/dmsf/files/6731/view :param fits_header: header data unit to add the keyword (ex: fits.PrimaryHDU) :param creator: name of the program that has created the fits file :param version: fits file format version :param proc_id: id of the pipeline process :type fits_header: astropy.io.fits.*HDU :type creator: string (default='ECLAIRs_pipeline') :type version: string (default='0.1') :type proc_id: string (default="01") :param tstart: start time of the observation in s from MJDREF :type tstart: string :param tstop: end time of the observation in s from MJDREF :type tstop: string """ creation_date = astrotime.Time(dt.now(), precision=0).tt.isot fits_header.header['TELESCOP'] = ('SVOM', 'Telescope or mission name') fits_header.header['INSTRUME'] = ('ECL', 'Instrument name') fits_header.header['TIMESYS'] = ('TT', 'Time frame system') fits_header.header['TIMEUNIT'] = ('s', 'Time unit') fits_header.header['TIMEREF'] = ('LOCAL', 'Time reference frame') fits_header.header['MJDREF'] = (get_mjd_ref(), 'Modified Julian Date of origin') fits_header.header['OBS_ID'] = ('1', 'Observation Identifier (type_number)') fits_header.header['OBS_TYPE'] = ('', 'Slew and pointing status') fits_header.header['OBSBOUND'] = ('', 'Reason for Observation ending') fits_header.header['TSTART'] = (tstart, 'Start time of the Observation') fits_header.header['TSTOP'] = (tstop, 'End time of the Observation') fits_header.header['EXPOSURE'] = (int(tstop) - int(tstart), 'Effective exposure (dead time corrected)') fits_header.header['TELAPSE'] = ('', 'Total elapsed time of the data') fits_header.header['EXTREL'] = (version, 'FSC release number for template') fits_header.header['CREATOR'] = (creator, 'software name and version') fits_header.header['CONFIGUR'] = ('', 'Software system configuration') fits_header.header['DATE'] = (creation_date, 'file creation date') fits_header.header['STAMP'] = ('', '') fits_header.header['FSCLEVL'] = ('1', 'FSC level of data processing') fits_header.header['APID_'] = ('1', '') fits_header.header['ORIGIN'] = ('FSC', 'Origin of FITS file') fits_header.header['SIMUID'] = (proc_id, 'simulation identifier') fits_header.header['SIMFLAG'] = (1, 'Simulation (1) or real data (0)') fits_header.header['LONGSTRN'] = ('OGIP 1.0', 'The HEASARC Long String Convention may be used') fits_header.header['HDUCLASS'] = ('OGIP', 'Format conforms to OGIP standards') fits_header.header['HDUVERS'] = ('1.1.0', 'Version of format') fits_header.header['COMMENT'] = ('', '') fits_header.header['DATAMODE'] = ('', '') fits_header.header['DATATYPE'] = ('', 'Type of data for observation')
[docs]def grouphdu_keyword(fits_header, grp_table_name, attitude): """ add to the fits header the specific keywords for a group HDU :param fits_header: header data unit to add the keyword (ex: fits.PrimaryHDU) :type fits_header: astropy.io.fits.*HDU :param grp_table_name: grouping table name :type grp_table_name: string """ fits_header.header['EXTVER'] = ('1', 'Assigned by template parser') fits_header.header['GRPID1'] = ('-1', 'EXTVER of Group containing this HDU') fits_header.header['GRPLC1'] = ('', 'URL of file containing Group') fits_header.header['GRPNAME'] = (grp_table_name, 'Grouping Table name') fits_header.header['HDUCLASS'] = ('OGIP', 'Format conforms to OGIP standards') fits_header.header['HDUVERS'] = ('1.1.0', 'Version of format') fits_header.header['LONGSTRN'] = ('OGIP 1.0', 'The HEASARC Long String Convention may be used') fits_header.header['OBSBOUND'] = ('', 'Reason for Observation ending') fits_header.header['SIMUID'] = ('', 'simulation identifier') fits_header.header['SIMFLAG'] = (1, 'Simulation (1) or real data (0)') fits_header.header['RA_PNT'] = (attitude[0], '[deg] RA pointing') fits_header.header['DEC_PNT'] = (attitude[1], '[deg] Dec. pointing') fits_header.header['ORI_PNT'] = (attitude[2], '[deg] Orientation pointing') fits_header.header['ONTIME'] = ('', ' [s] Sum of good time intervals') fits_header.header['RADECSYS'] = ('FK5', 'celestial coord system') fits_header.header['TELAPSE'] = ('', 'Total elapsed time of the data') fits_header.header['CARD'] = ('', 'ECLAIRs file') fits_header.header['DATE-END'] = ('', 'Date of the end of the observation') fits_header.header['DATE-OBS'] = ('', 'Date of the start of the observation') fits_header.header['DEADC'] = ('', 'Dead time correction')
[docs]def energy_keywords(fits_header, chanmin, chanmax, e_min, e_max): """add to fits header the keywords related to energy :param fits_header: header data unit to add the keyword (ex: fits.PrimaryHDU) :type fits_header: astropy.io.fits.*HDU :param chanmin: lowest channel of the energy range :type chanmin: float :param chanmax: highest channel of the energy range :type chanmax: float :param e_min: lower energy limit in keV :type e_min: float :param e_max: upper energy limit in keV :type e_max: float """ fits_header.header['CHANMIN'] = (chanmin, 'lowest channel of the energy range') fits_header.header['CHANMAX'] = (chanmax, 'highest channel of the energy range') fits_header.header['E_MIN'] = (e_min, '[keV] lower energy limit') fits_header.header['E_MAX'] = (e_max, '[keV] upper energy limit')
[docs]def pointing_keywords(fits_header, attitude, obs_time): """add to fits header the keywords related to pointing of ECLAIRs :param fits_header: header data unit to add the keyword (ex: fits.PrimaryHDU) :type fits_header: astropy.io.fits.*HDU :param attitude: [ra, dec, ori] in degrees :type attitude: [float, float, float] """ fits_header.header['RA_PNT'] = (attitude[0], '[deg] RA pointing') fits_header.header['DEC_PNT'] = (attitude[1], '[deg] Dec. pointing') fits_header.header['RADECSYS'] = ('FK5', 'celestial coord system') fits_header.header['ONTIME'] = (obs_time, ' [s] Sum of good time intervals') # what is the signification of DEADC ? # is it observation time - GTIs ?? fits_header.header['DEADC'] = ('', 'Dead time correction')
[docs]def write_fits(filename, data): """Save an array into a fits file data are saved in the primaryHDU :param filename: PATH/filename of the fits file :param data: data to write in the fits file :type data: array :type filename: string (PATH) """ try: os.remove(filename) except OSError: pass hdu = fits.PrimaryHDU(data) hdu.writeto(filename)
##############################
[docs]def read_fits(name, ugts_standard=False): """Read an array from a fits file .. warning:: The structure of the fits file must be simple: data into hdu[0].data :param name: PATH/name of the fits file :type name: string :param ugts_standard: flag to read from ugts Idet,Jdet orientation and make the transformation to Yecl,Zecl :type ugts_standard: bool :return: the data from the fits file :rtype: array """ hdu = fits.open(name) tdata = hdu[0].data if ugts_standard: tdata = ac.ijdet_to_yzegp_array(tdata) hdu.close() return tdata
##############################
[docs]def set_array_from_file(filename, ugts_standard=False): """read a fits file (using read_fits) :param filename: PATH/filename of the fits file :type filename: string :param ugts_standard: flag to read from ugts Idet,Jdet orientation and make the transformation to Yecl,Zecl :type ugts_standard: bool :return: array of the data, len of the first dim (=size of the square array) :rtype: array, int """ the_array = read_fits(filename, ugts_standard) if the_array.shape[0] != the_array.shape[1]: logger.error('Arrays must have the same size in both dimensions.') return the_array, the_array.shape[0]
##############################
[docs]def set_array_and_header_from_file(filename, ugts_standard=False): """read a fits file :param filename: PATH/filename of the fits file :type filename: string :param ugts_standard: flag to read from ugts Idet,Jdet orientation and make the transformation to Yecl,Zecl :type ugts_standard: bool :return: array of the data, len of the first dim (=size of the square array), header object :rtype: array, int, HDUList """ hdu = fits.open(filename) the_array = hdu[0].data if the_array.shape[0] != the_array.shape[1]: logger.error('Arrays must have the same size in both dimensions.') if ugts_standard: the_array = ac.ijdet_to_yzegp_array(the_array) hdu.close() return the_array, the_array.shape[0], hdu[0].header
[docs]def image_plot(image, save=None, show=False, title=None, clim=None, scale_legend=None, **kwargs): # pragma: no cover """plot/show/save the image of the array the function uses imshow :param image: data array to plot :param save: PATH/name of the file to save the fig. If None, no saving. :param show: show the plot :param title: title of the plot :param clim: color limit :param scale_legend: legend for the color scale :param **kwargs: other arguments directly passed to plt.imshow :type image: float array :type save: string (default=None) :type show: bool (default=False) :type title: string (default=None) :type clim: (float, float) (default=None) :type scale_legend: string (default=None) .. warning:: clim has not been tested. """ plt.imshow(image, interpolation="nearest", origin='lower', **kwargs) if title is not None: plt.title(title) if clim is not None: plt.clim(clim) cbar = plt.colorbar() if scale_legend is not None: cbar.set_label(scale_legend) if save is not None: plt.savefig(save) if show is True: plt.show()
[docs]def get_fits_files_with_extname(card_name, workdir): """Return list of .fits files whose CARD is equal to card_name. :param card_name: name of the CARD keyword to be searched for :type card_name: string :param workdir: directory containing .fits files :type workdir: string :return: list of files in workdir which has their CARD equal to card_name :rtype: list """ res = list() l_fits_files = glob(osp.join(workdir, "*.fits")) l_fits_files += glob(osp.join(workdir, "*.fits.gz")) for fits_file in l_fits_files: f = fits.open(fits_file) try: card = f[1].header['EXTNAME'] except: continue if card == card_name: res.append(fits_file) f.close() return res
# TODO: COVERAGE. Cannot create or download CALDB file for the moment.
[docs]def get_caldb_files_with_name(name, workdir): """Return list of .fits files whose CCNM0001 is equal to name. :param name: name of the CCNM0001 keyword to be searched for :type name: string :param workdir: directory containing .fits files :type workdir: string :return: list of files in workdir which have their CCNM0001 equal to card_name :rtype: list """ res = list() l_fits_files = glob(osp.join(workdir, "*.fits")) l_fits_files += glob(osp.join(workdir, "*.fits.gz")) for fits_file in l_fits_files: logger.debug(fits_file) f = fits.open(fits_file) try: ccnm = f[0].header['CCNM0001'] logger.debug(ccnm) except: ccnm = "" continue if ccnm == name: res.append(fits_file) f.close() return res