Source code for ecpi.process.imag.io.outputs

"""IMAG products
"""

import logging
import os.path as osp
from datetime import datetime as dt
import numpy as np
from astropy.time import Time
from json_product_generator.fits_product_builder import product_builder as fpb
from ecpi.common.mission.time import create_time_tag_from_tstart
import ecpi.common.mission.time as mt
logger = logging.getLogger(__name__)


def _create_ecl_sky_ima_sdp(ima_sdp_data, filename_with_path, gti_type):
    """
    Create the product ECL-SKY-IMA using json-product-generator (a.k.a. genfits)

    file contains:
        - Intensity: cleaned sky count image
        - Variance: cleaned sky variance image
        - SNR: cleaned sky signal to noise ratio image
        - Exposure: initial sky Exposure image
        - Model: Added sources model images (for debug)
        - Initial Sky: Initial sky intensity image (for debug)

    :param ima_sdp_data: Data dictionary created by Data_Flow() containing the data
                         organized by gti_type
    :type ima_sdp_data:  Dictionary
    :param filename_with_path: Absolute path of the fits product o be created
    :type filename_with_path:  String
    :param gti_type: Name of the gti for recovering the right dictionary
    :type gti_type:  String
    """
    model = 'ECL-SKY-IMA'
    sky_images = ima_sdp_data['sky_images'][gti_type]['sky_images']
    energy_channels = ima_sdp_data['det_images'][gti_type]['det_ima'].energy_channels
    fpe = fpb.FitsProductEclairs(model, filename_with_path)
    print(f'###################### FITS_output_file_path: {fpe.fits_output_file_path} ############')
    # TODO: All this values must be recovered from Data_Flow in common_kws
    vals = {'TSTART': ima_sdp_data['tstart'], 'TSTOP': ima_sdp_data['tstop'], 'CREATOR': 'eclairs-gp',
            'DATE': dt.now().isoformat(), 'SIMUID': "0", 'RA_PNT': ima_sdp_data['radec'][0],
            'DEC_PNT': ima_sdp_data['radec'][1], 'CARD': 'IMAGES', 'PIPLEVEL': 'IMAG', 'FSCLEVEL': 'L3',
            'SJDSTART': ima_sdp_data['tstart'] / 86400,
            'SJDSTOP': ima_sdp_data['tstop'] / 86400,
            'SIMFLAG': 1, 'SIMUID': '001',
            'TELAPSE': round(ima_sdp_data['tstop'] - ima_sdp_data['tstart'], 2)}
    # fpe.update_common_kw(ima_sdp_data['common_kws'])
    fpe.update_common_kw(vals)
    fpe.update_dict_data({'CARD': 'GROUP', 'EXTNAME': f'{model}-GRP', 'GRPNAME': f"{model}-GRP"}, 1)
    idx_ref = fpe.image_hdu_params[0]
    cur_idx = idx_ref
    product_ext = ['INTENSITY', 'VARIANCE', 'SIGNIFICANCE', 'EXPOSURE', 'MODEL-SUM']
    for image_idx in range(len(sky_images)):
        chanmin = energy_channels[image_idx][0]
        chanmax = energy_channels[image_idx][1]
        e_min = ima_sdp_data['dpix'].get_energymin_val(chanmin)
        e_max = ima_sdp_data['dpix'].get_energymax_val(chanmax)

        ima_cnt = sky_images[image_idx].cleaned_sky_count
        ima_var = sky_images[image_idx].cleaned_sky_var
        ima_snr = sky_images[image_idx].cleaned_sky_snr
        ima_exp = sky_images[image_idx].exposure
        ima_mod = sky_images[image_idx].model_image
        list_data = (ima_cnt, ima_var, ima_snr, ima_exp, ima_mod)

        for ind, imatyp in enumerate(product_ext):
            data = np.array(list_data[ind], dtype='float32')
            fpe.add_image_hdu(idx_ref, cur_idx, chanmin, chanmax, imatyp, data,
                              E_MIN=e_min, E_MAX=e_max)
            cur_idx += 1
    try:
        fpe.update_dict_data([fpe._add_rows_table_hdu1d()], 1, 'd')
    except Exception as e:
        logger.error(f"................Error : {e}")
    # logger.debug(f'Content of the fits dictionaries: {fpe.print_dict()}')
    fpe.fill_fits_product(fits_validator=True, use_check_dict=True)


def _create_ecl_sop_ima_sdp(ima_sdp_data, filename_with_path, gti_type):
    """
    Create the product ECL-SOP-IMA using json-product-generator (a.k.a. genfits)

    file contains one catalog for energy band.

    :param ima_sdp_data: Data dictionary created by Data_Flow() containing the data
                         organized by gti_type
    :type ima_sdp_data:  Dictionary
    :param filename_with_path: Absolute path of the fits product o be created
    :type filename_with_path:  String
    :param gti_type: Name of the gti for recovering the right dictionary
    :type gti_type:  String
    """
    model = 'ECL-SOP-IMA'
    cat_sources = ima_sdp_data['sky_images'][gti_type]['cat_sources']
    energy_channels = ima_sdp_data['det_images'][gti_type]['det_ima'].energy_channels
    new_sources = ima_sdp_data['sky_images'][gti_type]['new_sources']

    fpe = fpb.FitsProductEclairs(model, filename_with_path)
    print(f'##################### FITS_output_file_path: {fpe.fits_output_file_path} ############')
    vals = {'TSTART': ima_sdp_data['tstart'], 'TSTOP': ima_sdp_data['tstop'], 'CREATOR': 'eclairs-gp',
            'DATE': dt.now().isoformat(), 'SIMUID': "0", 'RA_PNT': ima_sdp_data['radec'][0],
            'DEC_PNT': ima_sdp_data['radec'][1], 'CARD': 'IMAGES', 'PIPLEVEL': 'IMAG', 'FSCLEVEL': 'L3',
            'SJDSTART': ima_sdp_data['tstart'] / 86400,
            'SJDSTOP': ima_sdp_data['tstop'] / 86400,
            'SIMFLAG': 1, 'SIMUID': '001',
            'TELAPSE': round(ima_sdp_data['tstop'] - ima_sdp_data['tstart'], 2)}
    # fpe.update_common_kw(ima_sdp_data['common_kws'])
    fpe.update_common_kw(vals)
    fpe.update_dict_data({'CARD': 'GROUP', 'EXTNAME': f'{model}-GRP', 'GRPNAME': f"{model}-GRP"}, 1)

    hdu_idx = 2
    for cat_idx, catalog in enumerate(cat_sources):
        chanmin = energy_channels[cat_idx][0]
        chanmax = energy_channels[cat_idx][1]
        e_min = ima_sdp_data['dpix'].get_energymin_val(chanmin)
        e_max = ima_sdp_data['dpix'].get_energymax_val(chanmax)
        # TODO: complete the information from the real output from
        #       main_imag. For the moment, many values are missing
        # For each catalog we need to store every element
        for idx, src in enumerate(catalog.get_catalog()):
            src_fill = {
                'NEW_SOURCE': new_sources[cat_idx],
                'OBJECTID': src['sourceID'],
                'SOURCEID': src['name'].rstrip(),
                'Y_OBJ': src['Y'].astype(dtype='float64'),
                'Z_OBJ': src['Z'].astype(dtype='float64'),
                'RA_PEAK': src['ra'],
                'DEC_PEAK': src['dec'],
                'Y_PEAK': 2,
                'Z_PEAK': 2,
                'RA_FIN': 0.,
                'DEC_FIN': 0.,
                'Y_FIN': src['Y_fit'],
                'Z_FIN': src['Z_fit'],
                'YZ_OFFSET_FIN': 0.,
                'ANG_OFFSET_FIN': 0.,
                'YZ_ERR_FIN': 0.,
                'ELP_HMAJ': 0.,
                'ELP_HMIN': 0.,
                'ELP_ANGLE': 0.,
                'FLUX': src['flux'],
                'FLUX_ERR': src['errflux'],
                'RAWSNR': src['snr'],
                'FITSNR': 6.5,
                'FIT_MET': 'Y',
                'FIT_QUAL': 1
            }
            fpe.update_dict_data(src_fill, hdu_idx, 'd')
        fpe.update_some_keywords(hdu_idx, chanmin, chanmax, E_MIN=e_min, E_MAX=e_max)
        hdu_idx += 1
    try:
        # To generate Table of HDU 1 Grouping and insert into dict_data
        fpe.update_dict_data([fpe._add_rows_table_hdu1d()], 1, 'd')
    except Exception as e:
        logger.error(f"................Error : {e}")
    # logger.debug(f'Content of the fits dictionaries: {fpe.print_dict()}')
    fpe.fill_fits_product(True, True)


IMAG_SDP = {
    "ECL-SKY-IMA": _create_ecl_sky_ima_sdp,
    "ECL-SOP-IMA": _create_ecl_sop_ima_sdp,
    # "ECL-DET-MOD": _create_ecl_det_mod_sdp
}


[docs]def genfits_ecl_sdp_ima(sky_ima_data, model, dir_path): """ Interface for handling the scientific data products (SDPs) for the IMAG component :param sky_ima_data: Data dictionary created by Data_Flow() containing the data organized by gti_type :type sky_ima_data: Dictionary :param model: Model of the scientific data product to be constructed :type model: String :param dir_path: Absolut path of the pipeline's output folder :type dir_path: String :return: Final state of the function :rtype: Bool """ if model not in IMAG_SDP: logger.exception(f'[{model}] is not available as scientific product in component IMAG') return False time_isot_tag = create_time_tag_from_tstart(sky_ima_data['tstart']) for gti_type in sky_ima_data.get('sky_images').keys(): if gti_type == 'basic': # basic gtis does not contain earth fraction information file_name = f"{model}_UTC{time_isot_tag}.fits" else: # other gtis contain earth fraction information file_name = f"{model}-{gti_type.upper()}_UTC{time_isot_tag}.fits" filename_with_path = osp.join(dir_path, file_name) IMAG_SDP.get(model)(sky_ima_data, filename_with_path, gti_type) return True
# TODO: check for inserting this function in a global location # def create_time_tag_from_tstart(tstart): # """Creates time tag of scientific products. # # :param tstart: starting time of the observation # :type tstart: float # """ # time_isot_tag = Time(tstart / (24 * 3600), scale='tt', format='mjd').isot # time_isot_tag = time_isot_tag.replace("-", "") # time_isot_tag = time_isot_tag.replace(":", "") # return time_isot_tag.split(".")[0]