Source code for ecpi.pipeline.io.data_flow

"""
    Data Flow is the layer for transmitting the data along
    the ECPI pipeline.
    Data Flow is a class containing four different classes
    each one for a particular type of data to be transmitted
    A) GtiFlow concerns all the information and functionality
        related to the Good Time Intervals.
    B) AavPvtFlow concerns the information about the orbit and
        the attitude of the satellite.
    C) EventFlow concerns the events and the different type
        of related objects
    D) DetImagFlow concerns the detector images and its
        different representations

    All these objects are grouped in a global container which
    becomes the data_flow layer in the main pipeline.
"""

import logging

# Loading ECPI objects
import os.path as osp
import os
from glob import glob
import numpy
import astropy.units as u
from ecpi.common import add_path_caldb_eclairs, add_path_root_eclairs, get_caldb_dir
from ecpi.common.io.events import EclairsCalEvtData
from ecpi.process.bube.core.detector_images import DetectorImages
from ecpi.pipeline.io.gti_flow import GtiFlow
from ecpi.pipeline.io.aav_pvt_flow import AavPvtFlow
from ecpi.pipeline.io.inputs import download_common_keywords
from ecpi.common.instru.model_effect import ECLAIRsDetectorEffect, ECLAIRsDetectorEffectDefault
from ecpi.common.sky.catalog import CatalogIdentifiedSources

s_logger = logging.getLogger(__name__)


[docs]class DetImagFlow(object): """ Detector images and representations """ def __init__(self): self.status = -1 self.loaded = False
[docs]class EventFlow(object): """ Events and related objects """ def __init__(self): """ Inits EventFlow """ self.status = -1 self.evt = None
[docs] def set_events(self, events): """ :param events: :type events: instance of EclairsCalEvtData :return: :rtype: boolean """ assert(isinstance(events, EclairsCalEvtData)) if not (events.nb_row > 0): return False self.evt = events self.status = 0 return True
[docs]class DataFlow: """ Scientific Data Products and its component """ comp_by_prod = { "ecl-gti-1": "dpco", "ecl-pix-lif": "dpco", "ecl-evt-cor": "dpco", "ecl-gti-2": "cali", "ecl-evt-cal": "cali", "ecl-gti-3": "bube", "ecl-det-ima": "bube", "ecl-det-ubc": "bube", "ecl-sky-ima": "imag", "ecl-sop-ima": "imag", "ecl-det-mod": "imag", "ecl-evs-soe": "spex", "ecl-lcs-soe": "spex", "ecl-sps-soe": "spex", "ecl-sky-mos": "mosa", "ecl-sop-mos": "mosa" } def __init__(self, load_kws=True, only_check=False): """ Inits DataFlow :param load_kws: condition on loading keywords :type load_kws: boolean :param only_check: condition on only_check :type only_check: boolean """ # define ECPI components & associated classes self.status = -1 self.current_module = None # self.eclairs_observation = None self.detector_images = None self.catalogue_sources = None self.sky_images = None self.new_sources = None self.events = None self.detector_effect = None # TODO: Verify if EvenFlow() and DetImagFlow() classes # are needed. Maybe just the original objects # self.evt = EventFlow() self.gtis = GtiFlow() self.aav_pvt = AavPvtFlow() # self.det_ima = DetImagFlow() # TODO: Remove the following lines when effects and instrumental # information be in place. if not only_check: self.set_detector_effect() self.common_kws = None if load_kws: self.load_common_keywords() def _get_data_by_model(self, model): """ :param model: name of desired scientific product :type model: string :return: """ if model in ["ecl-gti-1", "ecl-gti-2", "ecl-gti-3"]: return {"gtis": {**self.gtis.gtis, **self.gtis.gtis_eoc} } # if model == "ecl-pix-lif": # return {} if model == "ecl-evt-cor": return {"events": self.events} if model == "ecl-evt-cal": return {"events": self.events} if model == "ecl-det-ima": return {"det_images": self.detector_images} if model == "ecl-det-ubc": return {"det_images": self.detector_images} if model == "ecl-sky-ima": return {"det_images": self.detector_images, "sky_images": self.sky_images} if model == "ecl-sop-ima": return {"det_images": self.detector_images, "sky_images": self.sky_images} # TODO: Check the elements for each SDP in the following list # if model == "ecl-det-mod": # return {} # if model == "ecl-evs-soe": # return {} # if model == "ecl-lcs-soe": # return {} # if model == "ecl-sps-soe": # return {} # if model == "ecl-sky-mos": # return {} # if model == "ecl-sop-mos": # return {} return {} # # Setters #
[docs] def set_aav_pvt(self, t_aav, t_pvt, compute_eof=False): """ :param t_aav: :type t_aav: table :param t_pvt: :type t_pvt: table :param compute_eof: :type compute_eof: boolean :return: """ return self.aav_pvt.set_orbit(t_aav, t_pvt, compute_eof=compute_eof)
[docs] def set_eof(self, t_eof): """ :param t_eof: table of Earth occultation fractions :type t_eof: table :return: """ return self.aav_pvt.set_eof(t_eof)
[docs] def set_events(self, events): """ :param events: :type events: instance of EclairsCalEvtData :return: :rtype: """ assert isinstance(events, EclairsCalEvtData) self.events = events return True
[docs] def set_basic_gtis(self, d_gtis): """ :param d_gtis: good time intervals :type d_gtis: dictionnary :return: """ assert isinstance(d_gtis, dict) return self.gtis.set_basic_gtis(d_gtis)
[docs] def set_earth_gtis(self, d_gtis, exist_ok=False): """ :param d_gtis: good time intervals :type d_gtis: dictionnary :param exist_ok: :type exist_ok: boolean :return: """ assert isinstance(d_gtis, dict) return self.gtis.set_earth_gtis(d_gtis, exist_ok=exist_ok)
[docs] def set_user_gtis(self, d_gtis): """ User gtis can be given with arbitrary bounds and a preliminary check is made for setting in the right decimation :param d_gtis: good time intervals :type d_gtis: dictionnary :return: """ assert isinstance(d_gtis, dict) if len(d_gtis) > 1: s_logger.error(f'User gtis must be configured in a unique list') return False user_gtis = d_gtis["user"] fitted_gtis = [] for gti in user_gtis: if gti[0] < self.get_t_start() or self.get_t_stop() < gti[1]: s_logger.error(f'User gti {gti} is out of bounds') return False mask = self.aav_pvt.data['TIME_AAV'] <= gti[0] gti_st = self.aav_pvt.data['TIME_AAV'][mask][-1] mask = gti[1] <= self.aav_pvt.data['TIME_AAV'] gti_sp = self.aav_pvt.data['TIME_AAV'][mask][0] fitted_gtis.append([gti_st, gti_sp]) s_logger.info(f'Fitting user gtis [{user_gtis}] => [{fitted_gtis}] ') return self.gtis.set_user_gtis({"user": fitted_gtis})
[docs] def set_det_images(self, det_images_by_gti): """ :param det_images_by_gti: :type det_images_by_gti: dictionnary :return: """ # dict of neo, peo, teo, npe dicts assert isinstance(det_images_by_gti, dict) for gti in det_images_by_gti.keys(): assert (isinstance(det_images_by_gti[gti]['det_ima'], DetectorImages)) self.detector_images = det_images_by_gti return True
[docs] def set_detector_effect(self, detector_effect=None): """ :param detector_effect: :type detector_effect: instance of ECLAIRsDetectorEffectDefault :return: """ if detector_effect is None: self.detector_effect = ECLAIRsDetectorEffectDefault() else: assert(isinstance(detector_effect, ECLAIRsDetectorEffect)) self.detector_effect = detector_effect return True
[docs] def set_imag_output(self, d_sky_images_by_gti): """ :param d_sky_images_by_gti: :type d_sky_images_by_gti: dictionnary :return: """ # dict of neo, peo, teo, npe dicts assert isinstance(d_sky_images_by_gti, dict) for gti in d_sky_images_by_gti.keys(): assert ( isinstance( d_sky_images_by_gti[gti]['cat_sources'][0], CatalogIdentifiedSources ) ) self.sky_images = d_sky_images_by_gti return True
[docs] def set_pi_vector(self, pi_vect): """ :param pi_vect: :type pi_vect: numpy.ndarray :return: """ assert isinstance(pi_vect, numpy.ndarray) assert len(self.events.tb_evts) == len(pi_vect) self.events.tb_evts.add_column(pi_vect, name='PI') self.events.tb_evts['PI'].meta = {'PI': 'Pulse height Invariant energy channel (0-1023)'} self.events.tb_evts['PI'].dtype = 'f' self.events.tb_evts['PI'].unit = u.keV s_logger.info(f'Events header: \n{self.events.tb_evts[0:3]}') return True
# # Getters #
[docs] def get_aav_size(self): """ :return: """ return self.aav_pvt.n_data
[docs] def get_eof_table(self): """ :return: """ if 'EOF' not in self.aav_pvt.data.colnames: s_logger.error('The table of Earth occultation fractions is not present!') return False return self.aav_pvt.get_eof_table()
[docs] def get_pvt_time_column(self): """ :return: """ return self.aav_pvt.get_time_table()
[docs] def get_ptgori_mean(self): """ :return: """ return self.aav_pvt.ptgori_mean
[docs] def get_sdp_data(self, sdp_name): """ :param sdp_name: :type sdp_name: string :return: """ assert isinstance(sdp_name, str) if not self.is_ready(self.comp_by_prod[sdp_name.lower()]): s_logger.error(f'No data loaded related [{sdp_name.upper()}] in Data_Flow object') return None info = { "tstart": self.aav_pvt.data['TIME_AAV'].data[0], "tstop": self.aav_pvt.data['TIME_AAV'].data[-1], "radec": self.aav_pvt.ptgori_mean, "common_kws": self.common_kws, "dpix": self.detector_effect } info.update(self._get_data_by_model(sdp_name.lower())) return info
[docs] def get_t_start(self): """ :return: """ if not self.aav_pvt.loaded: s_logger.error(f'No data loaded in ATT-ORB object') return -1 return self.aav_pvt.data['TIME_AAV'][0]
[docs] def get_t_stop(self): """ :return: """ if not self.aav_pvt.loaded: s_logger.error(f'No data loaded in ATT-ORB object') return -1 return self.aav_pvt.data['TIME_AAV'][-1]
[docs] def get_index_by_time(self, time): """ :param time: :type time: :return: """ # print(f'Looking for index element with time [{time}]') if time < self.get_t_start() or self.get_t_stop() < time: s_logger.error(f'Time {time} is out of bound from actual data') return -1 return self.aav_pvt.data.loc_indices[time] # ['TIME_AAV'][idx]
[docs] def get_time_by_index(self, idx): """ :param idx: :type idx: integer :return: """ if idx == 0 or idx == -1: return self.aav_pvt.data['TIME_AAV'][idx] return self.aav_pvt.data['TIME_AAV'][idx]
# # Interface #
[docs] def compute_ptgori_mean(self): """ :return: """ return self.aav_pvt.compute_ptgori_mean()
[docs] def compute_earth_gtis(self, peof_min, peof_max): """ :param peof_min: :type peof_min: float :param peof_max: :type peof_max: float :return: """ assert isinstance(peof_max, float) assert isinstance(peof_min, float) self.gtis.compute_earth_gtis(self.aav_pvt, peof_min, peof_max)
[docs] def intersect_gtis(self, gti=""): """ :param gti: :type gti: string :return: """ return self.gtis.intersect_gtis(gti)
[docs] def show_gtis(self): """ :return: """ self.gtis.show_gtis()
[docs] def is_ready(self, c_name): """ :param c_name: :type c_name: string :return: """ print(f"Checking component [{c_name}]") assert(isinstance(c_name, str)) # TODO: Check for others elements to # validate each component. For the # moment, we assume the only constraint for # Dpco, Cali and Bube is to load the AavPvt table # For DPCO, CALI or BUBE, we need aav_pvt and evt ape_svo = ["dpco", "cali", "bube"] ape_usr = ["mosa", "spex"] if c_name in ape_svo: return self.aav_pvt.loaded if c_name == "imag": print(f"Checking component [{c_name}]") return isinstance(self.detector_images, dict) and len(self.detector_images.keys()) > 0 if c_name in ape_usr: # TODO: check the conditions for MOSA and SPEX return False return False
[docs] def load_detector_effect_from_dir(self, caldb_dir): """ :param caldb_dir: :type caldb_dir: string :return: """ if not osp.isdir(caldb_dir): s_logger.error(f"Directory {caldb_dir} does not exist. Skipping !!") return False s_logger.info(f"Loading Detector Effect files from {caldb_dir}") detpix = ECLAIRsDetectorEffect() detpix.path_arf = glob(osp.join(caldb_dir, "ECL-RSP-ARF*.fits"))[0] s_logger.info(f"Loading Detector Effect files from {detpix.path_arf}") detpix.path_irf = glob(osp.join(caldb_dir, "ECL-RSP-IRF*.fits"))[0] s_logger.info(f"Loading Detector Effect files from {detpix.path_irf}") detpix.path_rmf = glob(osp.join(caldb_dir, "ECL-RSP-RMF*.fits"))[0] s_logger.info(f"Loading Detector Effect files from {detpix.path_rmf}") # TODO: Change when the CalDB file AUX_ECL_UNI detpix.path_efficiency = add_path_root_eclairs( osp.join("ecpi", "extras", "caldb", "efficiency_matrix.fits") ) detpix.path_nonuniformity = add_path_root_eclairs( osp.join("ecpi", "extras", "caldb", "non_uniformity_matrix.fits") ) detpix.set_arf(detpix.path_arf) # detpix.set_irf(detpix.path_irf) detpix.set_rmf(detpix.path_rmf) detpix.set_efficiency(detpix.path_efficiency) detpix.set_nonuniformity(detpix.path_nonuniformity) self.detector_effect = detpix return True
[docs] def load_common_keywords(self): """ :return: """ if isinstance(self.common_kws, dict): self.show_common_keywords(lim=5) s_logger.error("Common KWs already loaded. Skipping !!") return False s_logger.info("Loading Common KWs from http://fsc.svom.eu") self.common_kws = download_common_keywords() return True
[docs] def show_common_keywords(self, lim=-1): """ :param lim: :type lim: integer :return: """ if self.common_kws is None: s_logger.error(f'None Keywords loaded. Load common keywords in advance!') return False for kw in self.common_kws.values(): print(f'Keyword {kw["name"]}') for l_name in kw['product_keywords'][:lim]: print( f'Name: {l_name["name"]}\t' f'{l_name["unit"]}\t' f'{l_name["data_type"]}\t' f'{l_name["description"]}' ) return True