"""
Creates output .fits files for IMAG
Created on 21 february 2019
@author: Catalano Camille, APC
"""
import numpy as np
import astropy.table as astt
import astropy.units as u
from astropy.io import fits
from astropy.time import Time
from ecpi.common.io import fits_tools
from ecpi.common.instru.model_effect import ECLAIRsDetectorEffect
from ecpi.common.instru.array_convention import yzegp_to_ijdet_array, ijdet_to_yzegp_array
[docs]def fill_hdu_count(hdu_count, time_exp_mn, attitude, obs_time):
"""Fill headers for count images in ECL-SKY-IMA.
:param hdu_count: header
:type hdu_count: fits.Header
:param attitude: TBC
:type attitude: AttitudeECLAIRs object
:param time_exp_mn: time of the observation
:type time_exp_mn: float
"""
hdu_count.name = 'Intensity'
hdu_count.header['timeexp'] = str(time_exp_mn) + ' minutes'
hdu_count.header['comment'] = "Intensity Image"
hdu_count.header['BUNIT'] = 'ct.s^-1.cm^-2'
hdu_count.header['IMATYPE'] = ('Intensity', "Type of image")
hdu_count.header['HDUCLAS1'] = ('IMAGE', 'Dataset contains a sky image')
hdu_count.header['BSCALE'] = (1, 'Real value=value*BSCALE + BZERO')
hdu_count.header['BZERO'] = (0, 'Offset applied to true pixel values')
hdu_count.header['BUNIT'] = ('counts/sec', 'Unit for pixel values')
hdu_count.header['CHANTYPE'] = ('PI', 'Type of detector channels')
hdu_count.header['EXTVER'] = ('1', 'Assigned by template parser')
hdu_count.header['HDUCLASS'] = ('OGIP', 'Format conforms to OGIP standards')
hdu_count.header['HDUVERS'] = ('1.1.0', 'Version of format')
hdu_count.header['LONGSTRN'] = ('OGIP 1.0', 'The HEASARC Long String Convention may be used')
hdu_count.header['OBSBOUND'] = ('', 'Reason for Observation ending')
hdu_count.header['SIMUID'] = ('', 'simulation identifier')
hdu_count.header['SIMFLAG'] = (1, 'Simulation (1) or real data (0)')
hdu_count.header['DEADC'] = ('', 'Dead time correction')
hdu_count.header['RA_PNT'] = (attitude[0], '[deg] RA pointing')
hdu_count.header['DEC_PNT'] = (attitude[1], '[deg] Dec. pointing')
hdu_count.header['ONTIME'] = (obs_time, ' [s] Sum of good time intervals')
hdu_count.header['RADECSYS'] = ('FK5', 'celestial coord system')
[docs]def fill_hdu_var(hdu_var, time_exp_mn, attitude, obs_time):
"""Fill headers for var images in ECL-SKY-IMA.
:param hdu_count: header
:type hdu_count: fits.Header
:param attitude: TBC
:type attitude: AttitudeECLAIRs object
:param time_exp_mn: time of the observation
:type time_exp_mn: float
"""
hdu_var.name = 'Variance'
hdu_var.header['timeexp'] = str(time_exp_mn) + ' minutes'
hdu_var.header['comment'] = "Variance Image"
hdu_var.header['BUNIT'] = 'ct^2.s^-2.cm^-4'
hdu_var.header['IMATYPE'] = ('Variance', "Type of image")
hdu_var.header['HDUCLAS1'] = ('IMAGE', 'Dataset contains a sky image')
hdu_var.header['BSCALE'] = (1, 'Real value=value*BSCALE + BZERO')
hdu_var.header['BZERO'] = (0, 'Offset applied to true pixel values')
hdu_var.header['BUNIT'] = ('counts/sec', 'Unit for pixel values')
hdu_var.header['CHANTYPE'] = ('PI', 'Type of detector channels')
hdu_var.header['EXTVER'] = ('1', 'Assigned by template parser')
hdu_var.header['HDUCLASS'] = ('OGIP', 'Format conforms to OGIP standards')
hdu_var.header['HDUVERS'] = ('1.1.0', 'Version of format')
hdu_var.header['LONGSTRN'] = ('OGIP 1.0', 'The HEASARC Long String Convention may be used')
hdu_var.header['OBSBOUND'] = ('', 'Reason for Observation ending')
hdu_var.header['SIMUID'] = ('', 'simulation identifier')
hdu_var.header['SIMFLAG'] = (1, 'Simulation (1) or real data (0)')
hdu_var.header['DEADC'] = ('', 'Dead time correction')
hdu_var.header['RA_PNT'] = (attitude[0], '[deg] RA pointing')
hdu_var.header['DEC_PNT'] = (attitude[1], '[deg] Dec. pointing')
hdu_var.header['ONTIME'] = (obs_time, ' [s] Sum of good time intervals')
hdu_var.header['RADECSYS'] = ('FK5', 'celestial coord system')
[docs]def fill_hdu_snr(hdu_snr, time_exp_mn, attitude, obs_time):
"""Fill headers for snr images in ECL-SKY-IMA.
:param hdu_count: header
:type hdu_count: fits.Header
:param attitude: TBC
:type attitude: AttitudeECLAIRs object
:param time_exp_mn: time of the observation
:type time_exp_mn: float
"""
hdu_snr.name = 'SNR'
hdu_snr.header['timeexp'] = str(time_exp_mn) + ' minutes'
hdu_snr.header['comment'] = "Signal to Noise Image"
hdu_snr.header['IMATYPE'] = ('SNR', "Type of image")
hdu_snr.header['HDUCLAS1'] = ('IMAGE', 'Dataset contains a sky image')
hdu_snr.header['BSCALE'] = (1, 'Real value=value*BSCALE + BZERO')
hdu_snr.header['BZERO'] = (0, 'Offset applied to true pixel values')
hdu_snr.header['BUNIT'] = ('counts/sec', 'Unit for pixel values')
hdu_snr.header['CHANTYPE'] = ('PI', 'Type of detector channels')
hdu_snr.header['EXTVER'] = ('1', 'Assigned by template parser')
hdu_snr.header['HDUCLASS'] = ('OGIP', 'Format conforms to OGIP standards')
hdu_snr.header['HDUVERS'] = ('1.1.0', 'Version of format')
hdu_snr.header['LONGSTRN'] = ('OGIP 1.0', 'The HEASARC Long String Convention may be used')
hdu_snr.header['OBSBOUND'] = ('', 'Reason for Observation ending')
hdu_snr.header['SIMUID'] = ('', 'simulation identifier')
hdu_snr.header['SIMFLAG'] = (1, 'Simulation (1) or real data (0)')
hdu_snr.header['DEADC'] = ('', 'Dead time correction')
hdu_snr.header['RA_PNT'] = (attitude[0], '[deg] RA pointing')
hdu_snr.header['DEC_PNT'] = (attitude[1], '[deg] Dec. pointing')
hdu_snr.header['ONTIME'] = (obs_time, ' [s] Sum of good time intervals')
hdu_snr.header['RADECSYS'] = ('FK5', 'celestial coord system')
[docs]def fill_hdu_exp(hdu_exp, time_exp_mn, attitude, obs_time):
"""Fill headers for exp images in ECL-SKY-IMA.
:param hdu_count: header
:type hdu_count: fits.Header
:param attitude: TBC
:type attitude: AttitudeECLAIRs object
:param time_exp_mn: time of the observation
:type time_exp_mn: float
"""
hdu_exp.name = 'Exposure'
hdu_exp.header['timeexp'] = str(time_exp_mn) + ' minutes'
hdu_exp.header['comment'] = "Exposure Image"
hdu_exp.header['BUNIT'] = 'seconds'
hdu_exp.header['IMATYPE'] = ('Exposure', "Type of image")
hdu_exp.header['BSCALE'] = (1, 'Real value=value*BSCALE + BZERO')
hdu_exp.header['BZERO'] = (0, 'Offset applied to true pixel values')
hdu_exp.header['BUNIT'] = ('counts/sec', 'Unit for pixel values')
hdu_exp.header['CHANTYPE'] = ('PI', 'Type of detector channels')
hdu_exp.header['EXTVER'] = ('1', 'Assigned by template parser')
hdu_exp.header['HDUCLASS'] = ('OGIP', 'Format conforms to OGIP standards')
hdu_exp.header['HDUVERS'] = ('1.1.0', 'Version of format')
hdu_exp.header['LONGSTRN'] = ('OGIP 1.0', 'The HEASARC Long String Convention may be used')
hdu_exp.header['OBSBOUND'] = ('', 'Reason for Observation ending')
hdu_exp.header['SIMUID'] = ('', 'simulation identifier')
hdu_exp.header['SIMFLAG'] = (1, 'Simulation (1) or real data (0)')
hdu_exp.header['DEADC'] = ('', 'Dead time correction')
hdu_exp.header['RA_PNT'] = (attitude[0], '[deg] RA pointing')
hdu_exp.header['DEC_PNT'] = (attitude[1], '[deg] Dec. pointing')
hdu_exp.header['ONTIME'] = (obs_time, ' [s] Sum of good time intervals')
hdu_exp.header['RADECSYS'] = ('FK5', 'celestial coord system')
[docs]def fill_hdu_mod(hdu_mod, time_exp_mn, attitude, obs_time):
"""Fill headers for mod images in ECL-SKY-IMA.
:param hdu_count: header
:type hdu_count: fits.Header
:param attitude: TBC
:type attitude: AttitudeECLAIRs object
:param time_exp_mn: time of the observation
:type time_exp_mn: float
"""
hdu_mod.name = 'Model'
hdu_mod.header['timeexp'] = str(time_exp_mn) + ' minutes'
hdu_mod.header['comment'] = "Source models Image"
hdu_mod.header['BUNIT'] = 'ct.s^-1.cm^-2'
hdu_mod.header['IMATYPE'] = ('Model', "Type of image")
hdu_mod.header['HDUCLAS1'] = ('IMAGE', 'Dataset contains a sky image')
hdu_mod.header['BSCALE'] = (1, 'Real value=value*BSCALE + BZERO')
hdu_mod.header['BZERO'] = (0, 'Offset applied to true pixel values')
hdu_mod.header['BUNIT'] = ('counts/sec', 'Unit for pixel values')
hdu_mod.header['CHANTYPE'] = ('PI', 'Type of detector channels')
hdu_mod.header['EXTVER'] = ('1', 'Assigned by template parser')
hdu_mod.header['HDUCLASS'] = ('OGIP', 'Format conforms to OGIP standards')
hdu_mod.header['HDUVERS'] = ('1.1.0', 'Version of format')
hdu_mod.header['LONGSTRN'] = ('OGIP 1.0', 'The HEASARC Long String Convention may be used')
hdu_mod.header['OBSBOUND'] = ('', 'Reason for Observation ending')
hdu_mod.header['SIMUID'] = ('', 'simulation identifier')
hdu_mod.header['SIMFLAG'] = (1, 'Simulation (1) or real data (0)')
hdu_mod.header['DEADC'] = ('', 'Dead time correction')
hdu_mod.header['RA_PNT'] = (attitude[0], '[deg] RA pointing')
hdu_mod.header['DEC_PNT'] = (attitude[1], '[deg] Dec. pointing')
hdu_mod.header['ONTIME'] = (obs_time, ' [s] Sum of good time intervals')
hdu_mod.header['RADECSYS'] = ('FK5', 'celestial coord system')
[docs]def save_sky_images(sky_images, file_path, eded : ECLAIRsDetectorEffect, obs_info_dict,
creator='IMAG?', proc_id="01"):
"""
save images in a fits file ECL-SKY-IMA
use common keywords
filename is 'ECL-SKY-IMA-' + proc_id + '.fits'
file contains:
- Intensity: cleaned sky count image
- Variance: cleaned sky variance image
- SNR: cleaned sky signal to noise ratio image
- Exposure: initial sky Exposure image
- Model: Added sources model images (for debug)
- Initial Sky: Initial sky intensity image (for debug)
save also the models images in a fits file: see save_models
:param sky_images: list of sky images and models
:type sky_images: list(SkyImages)
:param file_path: PATH to the directory where to write the file
:type file_path: string
:param eded: ECLAIRsDetectorEffect object containing path to ARF file
:type eded: ECLAIRsDetectorEffect object
:param obs_info_dict: dictionnary containing relevant variables for observation
Includes following keys: tstart (type: float), tstop (type: float),
energy_ranges (type: [[int, int]]) and attitude (type: [ra, dec, ori])
:type obs_info_dict: dict
:param creator: program that has generated the file. Default='IMAG?'
:type creator: string
:param proc_id: id of the process. Default="01".
:type proc_id: string
"""
# unpack observation related variables
tstart = obs_info_dict['tstart']
tstop = obs_info_dict['tstop']
energy_ranges = obs_info_dict['energy_ranges']
attitude = obs_info_dict['attitude']
# construction of primary and grouping headers
prihdr = fits.Header()
prihdu = fits.PrimaryHDU(header=prihdr)
obstime = tstop - tstart
fits_tools.pointing_keywords(prihdu, attitude, obstime)
fits_tools.common_keyword(prihdu, tstart, tstop, creator, 1.0, proc_id)
prihdu.header['CARD'] = ("ECL-SKY-IMA", "Product type")
hdu_list = [prihdu]
hdu_images = []
grp_table = astt.Table([[]]*11,
names=('member_xtension', 'member_name', 'member_version',
'member_position', 'member_location', 'member_uri_type',
'imatype', 'chanmin', 'chanmax', 'e_min', 'e_max'),
dtype=('S8', 'S32', 'i4', 'i4', 'S256', 'S3',
'S8', 'i2', 'i2', 'f4', 'f4'))
grp_table['e_min'].unit = u.keV
grp_table['e_max'].unit = u.keV
grp_table['member_version'].null = 0 # does not work...
for image_idx in range(len(sky_images)):
chanmin = energy_ranges[image_idx][0]
chanmax = energy_ranges[image_idx][1]
e_min = eded.get_energymin_val(chanmin)
e_max = eded.get_energymax_val(chanmax)
# add row in astropy table for each imatype.
grp_table.add_row(['IMAGE ', 'Intensity', '1', image_idx*5+2, '', '',
'intensity', chanmin, chanmax, e_min, e_max])
grp_table.add_row(['IMAGE ', 'Variance', '1', image_idx*5+3, '', '',
'variance', chanmin, chanmax, e_min, e_max])
grp_table.add_row(['IMAGE ', 'SNR', '1', image_idx*5+4, '', '',
'SNR', chanmin, chanmax, e_min, e_max])
grp_table.add_row(['IMAGE ', 'Exposure', '1', image_idx*5+5, '', '',
'Exposure', chanmin, chanmax, e_min, e_max])
grp_table.add_row(['IMAGE ', 'Model', '1', image_idx*5+6, '', '',
'Model', chanmin, chanmax, e_min, e_max])
# declare elementary HDU for each imatype.
hdu_cnt = fits.ImageHDU(np.array(sky_images[image_idx].cleaned_sky_count, dtype='float32'))
hdu_var = fits.ImageHDU(np.array(sky_images[image_idx].cleaned_sky_var, dtype='float32'))
hdu_snr = fits.ImageHDU(np.array(sky_images[image_idx].cleaned_sky_snr, dtype='float32'))
hdu_exp = fits.ImageHDU(np.array(sky_images[image_idx].exposure, dtype='float32'))
hdu_mod = fits.ImageHDU(np.array(sky_images[image_idx].model_image, dtype='float32'))
# fill each elementary HDU with appropriate KW.
fits_tools.common_keyword(hdu_cnt, tstart, tstop, creator, 1.0, proc_id)
fits_tools.energy_keywords(hdu_cnt, chanmin, chanmax, e_min, e_max)
hdu_cnt.header.extend(sky_images[image_idx].wcs.to_header().cards)
hdu_cnt.header['CARD'] = ("ECL-SKY-IMA", "Product type")
fits_tools.common_keyword(hdu_var, tstart, tstop, creator, 1.0, proc_id)
fits_tools.energy_keywords(hdu_var, chanmin, chanmax, e_min, e_max)
hdu_var.header.extend(sky_images[image_idx].wcs.to_header().cards)
hdu_var.header['CARD'] = ("ECL-SKY-IMA", "Product type")
fits_tools.common_keyword(hdu_snr, tstart, tstop, creator, 1.0, proc_id)
fits_tools.energy_keywords(hdu_snr, chanmin, chanmax, e_min, e_max)
hdu_snr.header.extend(sky_images[image_idx].wcs.to_header().cards)
hdu_snr.header['CARD'] = ("ECL-SKY-IMA", "Product type")
fits_tools.common_keyword(hdu_exp, tstart, tstop, creator, 1.0, proc_id)
fits_tools.energy_keywords(hdu_exp, chanmin, chanmax, e_min, e_max)
hdu_exp.header.extend(sky_images[image_idx].wcs.to_header().cards)
hdu_exp.header['CARD'] = ("ECL-SKY-IMA", "Product type")
fits_tools.common_keyword(hdu_mod, tstart, tstop, creator, 1.0, proc_id)
fits_tools.energy_keywords(hdu_mod, chanmin, chanmax, e_min, e_max)
hdu_mod.header.extend(sky_images[image_idx].wcs.to_header().cards)
hdu_mod.header['CARD'] = ("ECL-SKY-IMA", "Product type")
# additional KW are added in update_images_headers()
dict_imag = {
'count': hdu_cnt,
'var': hdu_var,
'snr': hdu_snr,
'exp': hdu_exp,
'mod': hdu_mod
}
sky_img = sky_images[image_idx].time_exp_mn
hdu_count, hdu_var, hdu_snr, hdu_exp, hdu_mod = update_images_headers(dict_imag,
sky_img,
attitude, obstime)
hdu_images.extend([hdu_count, hdu_var, hdu_snr, hdu_exp, hdu_mod])
hdu_grp = fits.table_to_hdu(grp_table)
hdu_grp.header['comment'] = 'ECLAIRs reconstructed and cleaned individual Sky Images'
hdu_grp.header['CARD'] = ("ECL-SKY-IMA", "Product type")
fits_tools.common_keyword(hdu_grp, tstart, tstop, creator, 1.0, proc_id)
fits_tools.grouphdu_keyword(hdu_grp, 'ECL-SKY-IMA-GRP', attitude)
hdu_list.append(hdu_grp)
hdu_list.extend(hdu_images)
thdulist = fits.HDUList(hdu_list)
time_isot_tag = create_time_tag_from_tstart(tstart)
thdulist.writeto(file_path + '/ECL-SKY-IMA-UTC' + time_isot_tag + '.fits',
overwrite=True, checksum=True)
[docs]def save_catalogs(catalogs_output, file_path, eded, obs_info_dict, creator, proc_id="01"):
"""save output sources catalogs in fits file ECL-SOP-IMA
use common keywords
filename is 'ECL-SOP-IMA-' + proc_id + '.fits'
file contains: for each energy bands, the catalog of identified sources
:param catalogs_output: list of sources catalogs
:type catalogs_output: list(CatalogIdentifiedSources)
:param file_path: PATH to the directory where to write the file
:type file_path: string
:param eded: ECLAIRsDetectorEffect object containing path to ARF file
:type eded: ECLAIRsDetectorEffect object
:param obs_info_dict: dictionnary containing relevant variables for observation
Includes following keys: tstart (type: float), tstop (type: float),
energy_ranges (type: [[int, int]]) and attitude (type: [ra, dec, ori])
:type obs_info_dict: dict
:param creator: program that has generated the file.
:type creator: string
:param proc_id: id of the process. Default="01".
:type proc_id: string
"""
# unpack observation related variables
tstart = obs_info_dict['tstart']
tstop = obs_info_dict['tstop']
energy_ranges = obs_info_dict['energy_ranges']
attitude = obs_info_dict['attitude']
prihdr = fits.Header()
prihdu = fits.PrimaryHDU(header=prihdr)
obstime = tstop - tstart
fits_tools.pointing_keywords(prihdu, attitude, obstime)
fits_tools.common_keyword(prihdu, tstart, tstop, creator, 1.0, proc_id)
prihdu.header['CARD'] = ("ECL-SOP-IMA", "Product type")
hdu_list = [prihdu]
hdu_catalogs = []
grp_table = astt.Table([[]]*10,
names=('member_xtension', 'member_name', 'member_version',
'member_position', 'member_location', 'member_uri_type',
'chanmin', 'chanmax', 'e_min', 'e_max'),
dtype=('S8', 'S32', 'i4', 'i4', 'S256', 'S3', 'i2', 'i2', 'f4', 'f4'))
grp_table['e_min'].unit = u.keV
grp_table['e_max'].unit = u.keV
for catalog_idx in range(len(catalogs_output)):
chanmin = energy_ranges[catalog_idx][0]
chanmax = energy_ranges[catalog_idx][1]
e_min = eded.get_energymin_val(chanmin)
e_max = eded.get_energymax_val(chanmax)
grp_table.add_row(['BINTABLE ', "ECL-SKY-RES", '1',
catalog_idx+2, '',
'', chanmin, chanmax, e_min, e_max])
hdu = fits.table_to_hdu(catalogs_output[catalog_idx]._catalog)
hdu.name = "ECL-SKY-RES"
fits_tools.common_keyword(hdu, tstart, tstop, creator, 1.0, proc_id)
fits_tools.energy_keywords(hdu, chanmin, chanmax, e_min, e_max)
fits_tools.pointing_keywords(hdu, attitude, obstime)
hdu.header['comment'] = 'source parameters from sky images'
hdu.header['CARD'] = ('ECL-SOP-IMA', 'ECLAIRs file')
hdu.header['CHANTYPE'] = ('', 'Channel type (PHA, PI etc)')
hdu_catalogs.append(hdu)
hdu_grp = fits.table_to_hdu(grp_table)
hdu_grp.header['comment'] = 'Source Parameters derived from single reconstructed sky image'
fits_tools.common_keyword(hdu_grp, tstart, tstop, creator, 1.0, proc_id)
fits_tools.grouphdu_keyword(hdu_grp, 'ECL-SOP-RES-GRP', attitude)
hdu_list.append(hdu_grp)
hdu_list.extend(hdu_catalogs)
thdulist = fits.HDUList(hdu_list)
time_isot_tag = create_time_tag_from_tstart(tstart)
thdulist.writeto(file_path + '/ECL-SOP-IMA-UTC' + time_isot_tag + '.fits',
overwrite=True, checksum=True)
[docs]def create_time_tag_from_tstart(tstart):
"""Creates time tag of scientific products.
:param tstart: starting time of the observation
:type tstart: float
"""
time_isot_tag = Time(tstart/(24*3600), scale='tt', format='mjd').isot
time_isot_tag = time_isot_tag.replace("-", "")
time_isot_tag = time_isot_tag.replace(":", "")
return time_isot_tag.split(".")[0]
[docs]def read_ecl_det_ubc_file(l_files):
"""Load information contained in ECL-DET-UBC files.
..warning: works for a single ECL-DET-UBC file for the moment (DC1).
:param l_files: list of ECL-DET-UBC files as specified in parameter file.
:type l_files: list
:return: dictionary of read values in ECL-DET-UBC file
:rtype: dict
"""
hdul = fits.open(l_files[0])
num_ext = len(list(hdul))
dict_att = {
'shadowgrams': np.array([
ijdet_to_yzegp_array(hdul[k].data) for k in range(2, num_ext, 3)
]),
'shadowgrams_var': np.array([
ijdet_to_yzegp_array(hdul[k].data) for k in range(3, num_ext+1, 3)
]),
'energy_ranges': [
[int(hdul[k].header['CHANMIN']), int(hdul[k].header['CHANMAX'])]
for k in range(2, num_ext, 3)
],
'tstart': float(hdul[1].header['TSTART']),
'tstop': float(hdul[1].header['TSTOP']),
'obstime': float(hdul[1].header['EXPOSURE'])
}
hdul.close()
return dict_att