'''
Created on 20 mars 2018
based on 'Tools.py' python program of Cyril Lachaud
Generic functions to manipulate fits file
@author: Catalano Camille, APC
'''
import os
import logging
from glob import glob
from datetime import datetime
import matplotlib.pyplot as plt
from astropy.io import fits
import astropy.time as astrotime
import ecpi.common.instru.array_convention as ac
from ecpi.common.mission.time import get_mjd_ref
logger = logging.getLogger(__name__)
CREATOR = 'ECLAIRs_pipeline'
##############################
[docs]def common_keyword(fits_header, tstart, tstop, creator=CREATOR, version='0.1', proc_id="01"):
"""add to the fits header the common keywords used in all the pipeline products
.. seealso:: doc : ECL-GP-FITS-KEYWORDS.pdf ECLAIRs GP fits keywords
https://forge.in2p3.fr/dmsf/files/6731/view
:param fits_header: header data unit to add the keyword (ex: fits.PrimaryHDU)
:param creator: name of the program that has created the fits file
:param version: fits file format version
:param proc_id: id of the pipeline process
:type fits_header: astropy.io.fits.*HDU
:type creator: string (default='ECLAIRs_pipeline')
:type version: string (default='0.1')
:type proc_id: string (default="01")
:param tstart: start time of the observation in s from MJDREF
:type tstart: string
:param tstop: end time of the observation in s from MJDREF
:type tstop: string
"""
creation_date = astrotime.Time(datetime.now(), precision=0).tt.isot
fits_header.header['TELESCOP'] = ('SVOM', 'Telescope or mission name')
fits_header.header['INSTRUME'] = ('ECL', 'Instrument name')
fits_header.header['TIMESYS'] = ('TT', 'Time frame system')
fits_header.header['TIMEUNIT'] = ('s', 'Time unit')
fits_header.header['TIMEREF'] = ('LOCAL', 'Time reference frame')
fits_header.header['MJDREF'] = (get_mjd_ref(), 'Modified Julian Date of origin')
fits_header.header['OBS_ID'] = ('1', 'Observation Identifier (type_number)')
fits_header.header['OBS_TYPE'] = ('', 'Slew and pointing status')
fits_header.header['OBSBOUND'] = ('', 'Reason for Observation ending')
fits_header.header['TSTART'] = (tstart, 'Start time of the Observation')
fits_header.header['TSTOP'] = (tstop, 'End time of the Observation')
fits_header.header['EXPOSURE'] = (int(tstop) - int(tstart), \
'Effective exposure (dead time corrected)')
fits_header.header['TELAPSE'] = ('', 'Total elapsed time of the data')
fits_header.header['EXTREL'] = (version, 'FSC release number for template')
fits_header.header['CREATOR'] = (creator, 'software name and version')
fits_header.header['CONFIGUR'] = ('', 'Software system configuration')
fits_header.header['DATE'] = (creation_date, 'file creation date')
fits_header.header['STAMP'] = ('', '')
fits_header.header['FSCLEVL'] = ('1', 'FSC level of data processing')
fits_header.header['APID_'] = ('1', '')
fits_header.header['ORIGIN'] = ('FSC', 'Origin of FITS file')
fits_header.header['SIMUID'] = (proc_id, 'simulation identifier')
fits_header.header['SIMFLAG'] = (1, 'Simulation (1) or real data (0)')
fits_header.header['EXTREL'] = ('', 'FSC release number')
fits_header.header['LONGSTRN'] = ('OGIP 1.0', 'The HEASARC Long String Convention may be used')
fits_header.header['HDUCLASS'] = ('OGIP', 'Format conforms to OGIP standards')
fits_header.header['HDUVERS'] = ('1.1.0', 'Version of format')
fits_header.header['COMMENT'] = ('', '')
fits_header.header['DATAMODE'] = ('', '')
fits_header.header['DATATYPE'] = ('', 'Type of data for observation')
##############################
[docs]def grouphdu_keyword(fits_header, grp_table_name, attitude):
"""
add to the fits header the specific keywords for a group HDU
:param fits_header: header data unit to add the keyword (ex: fits.PrimaryHDU)
:type fits_header: astropy.io.fits.*HDU
:param grp_table_name: grouping table name
:type grp_table_name: string
"""
fits_header.header['EXTVER'] = ('1', 'Assigned by template parser')
fits_header.header['GRPID1'] = ('-1', 'EXTVER of Group containing this HDU')
fits_header.header['GRPLC1'] = ('', 'URL of file containing Group')
fits_header.header['GRPNAME'] = (grp_table_name, 'Grouping Table name')
fits_header.header['HDUCLASS'] = ('OGIP', 'Format conforms to OGIP standards')
fits_header.header['HDUVERS'] = ('1.1.0', 'Version of format')
fits_header.header['LONGSTRN'] = ('OGIP 1.0', 'The HEASARC Long String Convention may be used')
fits_header.header['OBSBOUND'] = ('', 'Reason for Observation ending')
fits_header.header['SIMUID'] = ('', 'simulation identifier')
fits_header.header['SIMFLAG'] = (1, 'Simulation (1) or real data (0)')
fits_header.header['RA_PNT'] = (attitude[0], '[deg] RA pointing')
fits_header.header['DEC_PNT'] = (attitude[1], '[deg] Dec. pointing')
fits_header.header['ORI_PNT'] = (attitude[2], '[deg] Orientation pointing')
fits_header.header['ONTIME'] = ('', ' [s] Sum of good time intervals')
fits_header.header['RADECSYS'] = ('FK5', 'celestial coord system')
fits_header.header['TELAPSE'] = ('', 'Total elapsed time of the data')
fits_header.header['CARD'] = ('', 'ECLAIRs file')
fits_header.header['DATE-END'] = ('', 'Date of the end of the observation')
fits_header.header['DATE-OBS'] = ('', 'Date of the start of the observation')
fits_header.header['DEADC'] = ('', 'Dead time correction')
##############################
[docs]def energy_keywords(fits_header, chanmin, chanmax, e_min, e_max):
"""add to fits header the keywords related to energy
:param fits_header: header data unit to add the keyword (ex: fits.PrimaryHDU)
:type fits_header: astropy.io.fits.*HDU
:param chanmin: lowest channel of the energy range
:type chanmin: float
:param chanmax: highest channel of the energy range
:type chanmax: float
:param e_min: lower energy limit in keV
:type e_min: float
:param e_max: upper energy limit in keV
:type e_max: float
"""
fits_header.header['CHANMIN'] = (chanmin, 'lowest channel of the energy range')
fits_header.header['CHANMAX'] = (chanmax, 'highest channel of the energy range')
fits_header.header['E_MIN'] = (e_min, '[keV] lower energy limit')
fits_header.header['E_MAX'] = (e_max, '[keV] upper energy limit')
[docs]def pointing_keywords(fits_header, attitude, obs_time):
"""add to fits header the keywords related to pointing of ECLAIRs
:param fits_header: header data unit to add the keyword (ex: fits.PrimaryHDU)
:type fits_header: astropy.io.fits.*HDU
:param attitude: [ra, dec, ori] in degrees
:type attitude: [float, float, float]
"""
fits_header.header['RA_PNT'] = (attitude[0], '[deg] RA pointing')
fits_header.header['DEC_PNT'] = (attitude[1], '[deg] Dec. pointing')
fits_header.header['RADECSYS'] = ('FK5', 'celestial coord system')
fits_header.header['ONTIME'] = (obs_time, ' [s] Sum of good time intervals')
# what is the signification of DEADC ?
# is it observation time - GTIs ??
fits_header.header['DEADC'] = ('', 'Dead time correction')
##############################
[docs]def write_fits(filename, data):
"""Save an array into a fits file
data are saved in the primaryHDU
:param filename: PATH/filename of the fits file
:param data: data to write in the fits file
:type data: array
:type filename: string (PATH)
"""
try:
os.remove(filename)
except OSError:
pass
hdu = fits.PrimaryHDU(data)
hdu.writeto(filename)
##############################
[docs]def read_fits(name, ugts_standard=0):
"""Read an array from a fits file
.. warning:: The structure of the fits file must be simple: data into hdu[0].data
:param name: PATH/name of the fits file
:type name: string
:param ugts_standard: flag to read from ugts Idet,Jdet orientation
and make the transformation to Yecl,Zecl
:type ugts_standard: bool
:return: the data from the fits file
:rtype: array
"""
hdu = fits.open(name)
tdata = hdu[0].data
if ugts_standard:
tdata = ac.ijdet_to_yzegp_array(tdata)
return tdata
##############################
[docs]def set_array_from_file(filename, ugts_standard=0):
"""read a fits file (using read_fits)
:param filename: PATH/filename of the fits file
:type filename: string
:param ugts_standard: flag to read from ugts Idet,Jdet orientation
and make the transformation to Yecl,Zecl
:type ugts_standard: bool
:return: array of the data, len of the first dim (=size of the square array)
:rtype: array, int
"""
the_array = read_fits(filename, ugts_standard)
if the_array.shape[0] != the_array.shape[1]:
logger.error('Arrays must have the same size in both dimensions.')
return the_array, the_array.shape[0]
##############################
[docs]def set_array_and_header_from_file(filename, ugts_standard=0):
"""read a fits file
:param filename: PATH/filename of the fits file
:type filename: string
:param ugts_standard: flag to read from ugts Idet,Jdet orientation
and make the transformation to Yecl,Zecl
:type ugts_standard: bool
:return: array of the data, len of the first dim (=size of the square array), header object
:rtype: array, int, HDUList
"""
hdu = fits.open(filename)
the_array = hdu[0].data
if the_array.shape[0] != the_array.shape[1]:
logger.error('Arrays must have the same size in both dimensions.')
if ugts_standard:
the_array = ac.ijdet_to_yzegp_array(the_array)
return the_array, the_array.shape[0], hdu[0].header
##############################
[docs]def image_plot(image, save=None, show=False, title=None, clim=None, scale_legend=None, **kwargs): # pragma: no cover
"""plot/show/save the image of the array
the function uses imshow
:param image: data array to plot
:param save: PATH/name of the file to save the fig. If None, no saving.
:param show: show the plot
:param title: title of the plot
:param clim: color limit
:param scale_legend: legend for the color scale
:param **kwargs: other arguments directly passed to plt.imshow
:type image: float array
:type save: string (default=None)
:type show: bool (default=False)
:type title: string (default=None)
:type clim: (float, float) (default=None)
:type scale_legend: string (default=None)
.. warning:: clim has not been tested.
"""
plt.imshow(image, interpolation="nearest", origin='lower', **kwargs)
if title != None:
plt.title(title)
if clim != None:
plt.clim(clim)
cbar = plt.colorbar()
if scale_legend != None:
cbar.set_label(scale_legend)
if save != None:
plt.savefig(save)
if show == True:
plt.show()
[docs]def get_fits_files_with_extname(card_name, workdir):
"""Return list of .fits files whose CARD is
equal to card_name.
:param card_name: name of the CARD keyword to be searched for
:type card_name: string
:param workdir: directory containing .fits files
:type workdir: string
:return: list of files in workdir which has their CARD equal to
card_name
:rtype: list
"""
res = list()
l_fits_files = glob(workdir + "/*.fits")
for fits_file in l_fits_files:
f = fits.open(fits_file)
try:
card = f[1].header['EXTNAME']
except:
continue
if card == card_name:
res.append(fits_file)
f.close()
return res
[docs]def get_caldb_files_with_name(name, workdir):
"""Return list of .fits files whose CCNM0001 is
equal to name.
:param name: name of the CCNM0001 keyword to be searched for
:type name: string
:param workdir: directory containing .fits files
:type workdir: string
:return: list of files in workdir which has their CCNM0001 equal to
card_name
:rtype: list
"""
res = list()
l_fits_files = glob(workdir + "/*.fits")
for fits_file in l_fits_files:
logger.debug(fits_file)
f = fits.open(fits_file)
try:
ccnm = f[0].header['CCNM0001']
logger.debug(ccnm)
except:
ccnm = ""
continue
if ccnm == name:
res.append(fits_file)
f.close()
return res