Source code for ecpi.common.io.fits_tools

'''
Created on 20 mars 2018

based on 'Tools.py' python program of Cyril Lachaud
Generic functions to manipulate fits file

@author: Catalano Camille, APC
'''

import os
import logging
from glob import glob
from datetime import datetime
import matplotlib.pyplot as plt
from astropy.io import fits
import astropy.time as astrotime
import ecpi.common.instru.array_convention as ac
from ecpi.common.mission.time import get_mjd_ref

logger = logging.getLogger(__name__) 
CREATOR = 'ECLAIRs_pipeline'


##############################
[docs]def common_keyword(fits_header, tstart, tstop, creator=CREATOR, version='0.1', proc_id="01"): """add to the fits header the common keywords used in all the pipeline products .. seealso:: doc : ECL-GP-FITS-KEYWORDS.pdf ECLAIRs GP fits keywords https://forge.in2p3.fr/dmsf/files/6731/view :param fits_header: header data unit to add the keyword (ex: fits.PrimaryHDU) :param creator: name of the program that has created the fits file :param version: fits file format version :param proc_id: id of the pipeline process :type fits_header: astropy.io.fits.*HDU :type creator: string (default='ECLAIRs_pipeline') :type version: string (default='0.1') :type proc_id: string (default="01") :param tstart: start time of the observation in s from MJDREF :type tstart: string :param tstop: end time of the observation in s from MJDREF :type tstop: string """ creation_date = astrotime.Time(datetime.now(), precision=0).tt.isot fits_header.header['TELESCOP'] = ('SVOM', 'Telescope or mission name') fits_header.header['INSTRUME'] = ('ECL', 'Instrument name') fits_header.header['TIMESYS'] = ('TT', 'Time frame system') fits_header.header['TIMEUNIT'] = ('s', 'Time unit') fits_header.header['TIMEREF'] = ('LOCAL', 'Time reference frame') fits_header.header['MJDREF'] = (get_mjd_ref(), 'Modified Julian Date of origin') fits_header.header['OBS_ID'] = ('1', 'Observation Identifier (type_number)') fits_header.header['OBS_TYPE'] = ('', 'Slew and pointing status') fits_header.header['OBSBOUND'] = ('', 'Reason for Observation ending') fits_header.header['TSTART'] = (tstart, 'Start time of the Observation') fits_header.header['TSTOP'] = (tstop, 'End time of the Observation') fits_header.header['EXPOSURE'] = (int(tstop) - int(tstart), \ 'Effective exposure (dead time corrected)') fits_header.header['TELAPSE'] = ('', 'Total elapsed time of the data') fits_header.header['EXTREL'] = (version, 'FSC release number for template') fits_header.header['CREATOR'] = (creator, 'software name and version') fits_header.header['CONFIGUR'] = ('', 'Software system configuration') fits_header.header['DATE'] = (creation_date, 'file creation date') fits_header.header['STAMP'] = ('', '') fits_header.header['FSCLEVL'] = ('1', 'FSC level of data processing') fits_header.header['APID_'] = ('1', '') fits_header.header['ORIGIN'] = ('FSC', 'Origin of FITS file') fits_header.header['SIMUID'] = (proc_id, 'simulation identifier') fits_header.header['SIMFLAG'] = (1, 'Simulation (1) or real data (0)') fits_header.header['EXTREL'] = ('', 'FSC release number') fits_header.header['LONGSTRN'] = ('OGIP 1.0', 'The HEASARC Long String Convention may be used') fits_header.header['HDUCLASS'] = ('OGIP', 'Format conforms to OGIP standards') fits_header.header['HDUVERS'] = ('1.1.0', 'Version of format') fits_header.header['COMMENT'] = ('', '') fits_header.header['DATAMODE'] = ('', '') fits_header.header['DATATYPE'] = ('', 'Type of data for observation')
##############################
[docs]def grouphdu_keyword(fits_header, grp_table_name, attitude): """ add to the fits header the specific keywords for a group HDU :param fits_header: header data unit to add the keyword (ex: fits.PrimaryHDU) :type fits_header: astropy.io.fits.*HDU :param grp_table_name: grouping table name :type grp_table_name: string """ fits_header.header['EXTVER'] = ('1', 'Assigned by template parser') fits_header.header['GRPID1'] = ('-1', 'EXTVER of Group containing this HDU') fits_header.header['GRPLC1'] = ('', 'URL of file containing Group') fits_header.header['GRPNAME'] = (grp_table_name, 'Grouping Table name') fits_header.header['HDUCLASS'] = ('OGIP', 'Format conforms to OGIP standards') fits_header.header['HDUVERS'] = ('1.1.0', 'Version of format') fits_header.header['LONGSTRN'] = ('OGIP 1.0', 'The HEASARC Long String Convention may be used') fits_header.header['OBSBOUND'] = ('', 'Reason for Observation ending') fits_header.header['SIMUID'] = ('', 'simulation identifier') fits_header.header['SIMFLAG'] = (1, 'Simulation (1) or real data (0)') fits_header.header['RA_PNT'] = (attitude[0], '[deg] RA pointing') fits_header.header['DEC_PNT'] = (attitude[1], '[deg] Dec. pointing') fits_header.header['ORI_PNT'] = (attitude[2], '[deg] Orientation pointing') fits_header.header['ONTIME'] = ('', ' [s] Sum of good time intervals') fits_header.header['RADECSYS'] = ('FK5', 'celestial coord system') fits_header.header['TELAPSE'] = ('', 'Total elapsed time of the data') fits_header.header['CARD'] = ('', 'ECLAIRs file') fits_header.header['DATE-END'] = ('', 'Date of the end of the observation') fits_header.header['DATE-OBS'] = ('', 'Date of the start of the observation') fits_header.header['DEADC'] = ('', 'Dead time correction')
##############################
[docs]def energy_keywords(fits_header, chanmin, chanmax, e_min, e_max): """add to fits header the keywords related to energy :param fits_header: header data unit to add the keyword (ex: fits.PrimaryHDU) :type fits_header: astropy.io.fits.*HDU :param chanmin: lowest channel of the energy range :type chanmin: float :param chanmax: highest channel of the energy range :type chanmax: float :param e_min: lower energy limit in keV :type e_min: float :param e_max: upper energy limit in keV :type e_max: float """ fits_header.header['CHANMIN'] = (chanmin, 'lowest channel of the energy range') fits_header.header['CHANMAX'] = (chanmax, 'highest channel of the energy range') fits_header.header['E_MIN'] = (e_min, '[keV] lower energy limit') fits_header.header['E_MAX'] = (e_max, '[keV] upper energy limit')
[docs]def pointing_keywords(fits_header, attitude, obs_time): """add to fits header the keywords related to pointing of ECLAIRs :param fits_header: header data unit to add the keyword (ex: fits.PrimaryHDU) :type fits_header: astropy.io.fits.*HDU :param attitude: [ra, dec, ori] in degrees :type attitude: [float, float, float] """ fits_header.header['RA_PNT'] = (attitude[0], '[deg] RA pointing') fits_header.header['DEC_PNT'] = (attitude[1], '[deg] Dec. pointing') fits_header.header['RADECSYS'] = ('FK5', 'celestial coord system') fits_header.header['ONTIME'] = (obs_time, ' [s] Sum of good time intervals') # what is the signification of DEADC ? # is it observation time - GTIs ?? fits_header.header['DEADC'] = ('', 'Dead time correction')
##############################
[docs]def write_fits(filename, data): """Save an array into a fits file data are saved in the primaryHDU :param filename: PATH/filename of the fits file :param data: data to write in the fits file :type data: array :type filename: string (PATH) """ try: os.remove(filename) except OSError: pass hdu = fits.PrimaryHDU(data) hdu.writeto(filename)
##############################
[docs]def read_fits(name, ugts_standard=0): """Read an array from a fits file .. warning:: The structure of the fits file must be simple: data into hdu[0].data :param name: PATH/name of the fits file :type name: string :param ugts_standard: flag to read from ugts Idet,Jdet orientation and make the transformation to Yecl,Zecl :type ugts_standard: bool :return: the data from the fits file :rtype: array """ hdu = fits.open(name) tdata = hdu[0].data if ugts_standard: tdata = ac.ijdet_to_yzegp_array(tdata) return tdata
##############################
[docs]def set_array_from_file(filename, ugts_standard=0): """read a fits file (using read_fits) :param filename: PATH/filename of the fits file :type filename: string :param ugts_standard: flag to read from ugts Idet,Jdet orientation and make the transformation to Yecl,Zecl :type ugts_standard: bool :return: array of the data, len of the first dim (=size of the square array) :rtype: array, int """ the_array = read_fits(filename, ugts_standard) if the_array.shape[0] != the_array.shape[1]: logger.error('Arrays must have the same size in both dimensions.') return the_array, the_array.shape[0]
##############################
[docs]def set_array_and_header_from_file(filename, ugts_standard=0): """read a fits file :param filename: PATH/filename of the fits file :type filename: string :param ugts_standard: flag to read from ugts Idet,Jdet orientation and make the transformation to Yecl,Zecl :type ugts_standard: bool :return: array of the data, len of the first dim (=size of the square array), header object :rtype: array, int, HDUList """ hdu = fits.open(filename) the_array = hdu[0].data if the_array.shape[0] != the_array.shape[1]: logger.error('Arrays must have the same size in both dimensions.') if ugts_standard: the_array = ac.ijdet_to_yzegp_array(the_array) return the_array, the_array.shape[0], hdu[0].header
##############################
[docs]def image_plot(image, save=None, show=False, title=None, clim=None, scale_legend=None, **kwargs): # pragma: no cover """plot/show/save the image of the array the function uses imshow :param image: data array to plot :param save: PATH/name of the file to save the fig. If None, no saving. :param show: show the plot :param title: title of the plot :param clim: color limit :param scale_legend: legend for the color scale :param **kwargs: other arguments directly passed to plt.imshow :type image: float array :type save: string (default=None) :type show: bool (default=False) :type title: string (default=None) :type clim: (float, float) (default=None) :type scale_legend: string (default=None) .. warning:: clim has not been tested. """ plt.imshow(image, interpolation="nearest", origin='lower', **kwargs) if title != None: plt.title(title) if clim != None: plt.clim(clim) cbar = plt.colorbar() if scale_legend != None: cbar.set_label(scale_legend) if save != None: plt.savefig(save) if show == True: plt.show()
[docs]def get_fits_files_with_extname(card_name, workdir): """Return list of .fits files whose CARD is equal to card_name. :param card_name: name of the CARD keyword to be searched for :type card_name: string :param workdir: directory containing .fits files :type workdir: string :return: list of files in workdir which has their CARD equal to card_name :rtype: list """ res = list() l_fits_files = glob(workdir + "/*.fits") for fits_file in l_fits_files: f = fits.open(fits_file) try: card = f[1].header['EXTNAME'] except: continue if card == card_name: res.append(fits_file) f.close() return res
[docs]def get_caldb_files_with_name(name, workdir): """Return list of .fits files whose CCNM0001 is equal to name. :param name: name of the CCNM0001 keyword to be searched for :type name: string :param workdir: directory containing .fits files :type workdir: string :return: list of files in workdir which has their CCNM0001 equal to card_name :rtype: list """ res = list() l_fits_files = glob(workdir + "/*.fits") for fits_file in l_fits_files: logger.debug(fits_file) f = fits.open(fits_file) try: ccnm = f[0].header['CCNM0001'] logger.debug(ccnm) except: ccnm = "" continue if ccnm == name: res.append(fits_file) f.close() return res