"""
Data Flow is the layer for transmitting the data along
the ECPI pipeline.
Data Flow is a class containing four different classes
each one for a particular type of data to be transmitted
A) GtiFlow concerns all the information and functionality
related to the Good Time Intervals.
B) AavPvtFlow concerns the information about the orbit and
the attitude of the satellite.
C) EventFlow concerns the events and the different type
of related objects
D) DetImagFlow concerns the detector images and its
different representations
All these objects are grouped in a global container which
becomes the data_flow layer in the main pipeline.
"""
import logging
# Loading ECPI objects
import os.path as osp
import os
from glob import glob
import numpy
import astropy.units as u
from ecpi.common import add_path_caldb_eclairs, add_path_root_eclairs, get_caldb_dir
from ecpi.common.io.events import EclairsCalEvtData
from ecpi.process.bube.core.detector_images import DetectorImages
from ecpi.pipeline.io.gti_flow import GtiFlow
from ecpi.pipeline.io.aav_pvt_flow import AavPvtFlow
from ecpi.pipeline.io.inputs import download_common_keywords
from ecpi.common.instru.model_effect import ECLAIRsDetectorEffect, ECLAIRsDetectorEffectDefault
from ecpi.common.sky.catalog import CatalogIdentifiedSources
s_logger = logging.getLogger(__name__)
[docs]class DetImagFlow(object):
"""
Detector images and representations
"""
def __init__(self):
self.status = -1
self.loaded = False
[docs]class EventFlow(object):
"""
Events and related objects
"""
def __init__(self):
"""
Inits EventFlow
"""
self.status = -1
self.evt = None
[docs] def set_events(self, events):
"""
:param events:
:type events: instance of EclairsCalEvtData
:return:
:rtype: boolean
"""
assert(isinstance(events, EclairsCalEvtData))
if not (events.nb_row > 0):
return False
self.evt = events
self.status = 0
return True
[docs]class DataFlow:
"""
Scientific Data Products and its component
"""
comp_by_prod = {
"ecl-gti-1": "dpco", "ecl-pix-lif": "dpco", "ecl-evt-cor": "dpco",
"ecl-gti-2": "cali", "ecl-evt-cal": "cali",
"ecl-gti-3": "bube", "ecl-det-ima": "bube", "ecl-det-ubc": "bube",
"ecl-sky-ima": "imag", "ecl-sop-ima": "imag", "ecl-det-mod": "imag",
"ecl-evs-soe": "spex", "ecl-lcs-soe": "spex", "ecl-sps-soe": "spex",
"ecl-sky-mos": "mosa", "ecl-sop-mos": "mosa"
}
def __init__(self, load_kws=True, only_check=False):
"""
Inits DataFlow
:param load_kws: condition on loading keywords
:type load_kws: boolean
:param only_check: condition on only_check
:type only_check: boolean
"""
# define ECPI components & associated classes
self.status = -1
self.current_module = None
# self.eclairs_observation = None
self.detector_images = None
self.catalogue_sources = None
self.sky_images = None
self.new_sources = None
self.events = None
self.detector_effect = None
# TODO: Verify if EvenFlow() and DetImagFlow() classes
# are needed. Maybe just the original objects
# self.evt = EventFlow()
self.gtis = GtiFlow()
self.aav_pvt = AavPvtFlow()
# self.det_ima = DetImagFlow()
# TODO: Remove the following lines when effects and instrumental
# information be in place.
if not only_check:
self.set_detector_effect()
self.common_kws = None
if load_kws:
self.load_common_keywords()
def _get_data_by_model(self, model):
"""
:param model: name of desired scientific product
:type model: string
:return:
"""
if model in ["ecl-gti-1", "ecl-gti-2", "ecl-gti-3"]:
return {"gtis": {**self.gtis.gtis, **self.gtis.gtis_eoc} }
# if model == "ecl-pix-lif":
# return {}
if model == "ecl-evt-cor":
return {"events": self.events}
if model == "ecl-evt-cal":
return {"events": self.events}
if model == "ecl-det-ima":
return {"det_images": self.detector_images}
if model == "ecl-det-ubc":
return {"det_images": self.detector_images}
if model == "ecl-sky-ima":
return {"det_images": self.detector_images, "sky_images": self.sky_images}
if model == "ecl-sop-ima":
return {"det_images": self.detector_images, "sky_images": self.sky_images}
# TODO: Check the elements for each SDP in the following list
# if model == "ecl-det-mod":
# return {}
# if model == "ecl-evs-soe":
# return {}
# if model == "ecl-lcs-soe":
# return {}
# if model == "ecl-sps-soe":
# return {}
# if model == "ecl-sky-mos":
# return {}
# if model == "ecl-sop-mos":
# return {}
return {}
#
# Setters
#
[docs] def set_aav_pvt(self, t_aav, t_pvt, compute_eof=False):
"""
:param t_aav:
:type t_aav: table
:param t_pvt:
:type t_pvt: table
:param compute_eof:
:type compute_eof: boolean
:return:
"""
return self.aav_pvt.set_orbit(t_aav, t_pvt, compute_eof=compute_eof)
[docs] def set_eof(self, t_eof):
"""
:param t_eof: table of Earth occultation fractions
:type t_eof: table
:return:
"""
return self.aav_pvt.set_eof(t_eof)
[docs] def set_events(self, events):
"""
:param events:
:type events: instance of EclairsCalEvtData
:return:
:rtype:
"""
assert isinstance(events, EclairsCalEvtData)
self.events = events
return True
[docs] def set_basic_gtis(self, d_gtis):
"""
:param d_gtis: good time intervals
:type d_gtis: dictionnary
:return:
"""
assert isinstance(d_gtis, dict)
return self.gtis.set_basic_gtis(d_gtis)
[docs] def set_earth_gtis(self, d_gtis, exist_ok=False):
"""
:param d_gtis: good time intervals
:type d_gtis: dictionnary
:param exist_ok:
:type exist_ok: boolean
:return:
"""
assert isinstance(d_gtis, dict)
return self.gtis.set_earth_gtis(d_gtis, exist_ok=exist_ok)
[docs] def set_user_gtis(self, d_gtis):
"""
User gtis can be given with arbitrary bounds and a preliminary
check is made for setting in the right decimation
:param d_gtis: good time intervals
:type d_gtis: dictionnary
:return:
"""
assert isinstance(d_gtis, dict)
if len(d_gtis) > 1:
s_logger.error(f'User gtis must be configured in a unique list')
return False
user_gtis = d_gtis["user"]
fitted_gtis = []
for gti in user_gtis:
if gti[0] < self.get_t_start() or self.get_t_stop() < gti[1]:
s_logger.error(f'User gti {gti} is out of bounds')
return False
mask = self.aav_pvt.data['TIME_AAV'] <= gti[0]
gti_st = self.aav_pvt.data['TIME_AAV'][mask][-1]
mask = gti[1] <= self.aav_pvt.data['TIME_AAV']
gti_sp = self.aav_pvt.data['TIME_AAV'][mask][0]
fitted_gtis.append([gti_st, gti_sp])
s_logger.info(f'Fitting user gtis [{user_gtis}] => [{fitted_gtis}] ')
return self.gtis.set_user_gtis({"user": fitted_gtis})
[docs] def set_det_images(self, det_images_by_gti):
"""
:param det_images_by_gti:
:type det_images_by_gti: dictionnary
:return:
"""
# dict of neo, peo, teo, npe dicts
assert isinstance(det_images_by_gti, dict)
for gti in det_images_by_gti.keys():
assert (isinstance(det_images_by_gti[gti]['det_ima'], DetectorImages))
self.detector_images = det_images_by_gti
return True
[docs] def set_detector_effect(self, detector_effect=None):
"""
:param detector_effect:
:type detector_effect: instance of ECLAIRsDetectorEffectDefault
:return:
"""
if detector_effect is None:
self.detector_effect = ECLAIRsDetectorEffectDefault()
else:
assert(isinstance(detector_effect, ECLAIRsDetectorEffect))
self.detector_effect = detector_effect
return True
[docs] def set_imag_output(self, d_sky_images_by_gti):
"""
:param d_sky_images_by_gti:
:type d_sky_images_by_gti: dictionnary
:return:
"""
# dict of neo, peo, teo, npe dicts
assert isinstance(d_sky_images_by_gti, dict)
for gti in d_sky_images_by_gti.keys():
assert (
isinstance(
d_sky_images_by_gti[gti]['cat_sources'][0],
CatalogIdentifiedSources
)
)
self.sky_images = d_sky_images_by_gti
return True
[docs] def set_pi_vector(self, pi_vect):
"""
:param pi_vect:
:type pi_vect: numpy.ndarray
:return:
"""
assert isinstance(pi_vect, numpy.ndarray)
assert len(self.events.tb_evts) == len(pi_vect)
self.events.tb_evts.add_column(pi_vect, name='PI')
self.events.tb_evts['PI'].meta = {'PI': 'Pulse height Invariant energy channel (0-1023)'}
self.events.tb_evts['PI'].dtype = 'f'
self.events.tb_evts['PI'].unit = u.keV
s_logger.info(f'Events header: \n{self.events.tb_evts[0:3]}')
return True
#
# Getters
#
[docs] def get_aav_size(self):
"""
:return:
"""
return self.aav_pvt.n_data
[docs] def get_eof_table(self):
"""
:return:
"""
if 'EOF' not in self.aav_pvt.data.colnames:
s_logger.error('The table of Earth occultation fractions is not present!')
return False
return self.aav_pvt.get_eof_table()
[docs] def get_pvt_time_column(self):
"""
:return:
"""
return self.aav_pvt.get_time_table()
[docs] def get_ptgori_mean(self):
"""
:return:
"""
return self.aav_pvt.ptgori_mean
[docs] def get_sdp_data(self, sdp_name):
"""
:param sdp_name:
:type sdp_name: string
:return:
"""
assert isinstance(sdp_name, str)
if not self.is_ready(self.comp_by_prod[sdp_name.lower()]):
s_logger.error(f'No data loaded related [{sdp_name.upper()}] in Data_Flow object')
return None
info = {
"tstart": self.aav_pvt.data['TIME_AAV'].data[0],
"tstop": self.aav_pvt.data['TIME_AAV'].data[-1],
"radec": self.aav_pvt.ptgori_mean,
"common_kws": self.common_kws,
"dpix": self.detector_effect
}
info.update(self._get_data_by_model(sdp_name.lower()))
return info
[docs] def get_t_start(self):
"""
:return:
"""
if not self.aav_pvt.loaded:
s_logger.error(f'No data loaded in ATT-ORB object')
return -1
return self.aav_pvt.data['TIME_AAV'][0]
[docs] def get_t_stop(self):
"""
:return:
"""
if not self.aav_pvt.loaded:
s_logger.error(f'No data loaded in ATT-ORB object')
return -1
return self.aav_pvt.data['TIME_AAV'][-1]
[docs] def get_index_by_time(self, time):
"""
:param time:
:type time:
:return:
"""
# print(f'Looking for index element with time [{time}]')
if time < self.get_t_start() or self.get_t_stop() < time:
s_logger.error(f'Time {time} is out of bound from actual data')
return -1
return self.aav_pvt.data.loc_indices[time] # ['TIME_AAV'][idx]
[docs] def get_time_by_index(self, idx):
"""
:param idx:
:type idx: integer
:return:
"""
if idx == 0 or idx == -1:
return self.aav_pvt.data['TIME_AAV'][idx]
return self.aav_pvt.data['TIME_AAV'][idx]
#
# Interface
#
[docs] def compute_ptgori_mean(self):
"""
:return:
"""
return self.aav_pvt.compute_ptgori_mean()
[docs] def compute_earth_gtis(self, peof_min, peof_max):
"""
:param peof_min:
:type peof_min: float
:param peof_max:
:type peof_max: float
:return:
"""
assert isinstance(peof_max, float)
assert isinstance(peof_min, float)
self.gtis.compute_earth_gtis(self.aav_pvt, peof_min, peof_max)
[docs] def intersect_gtis(self, gti=""):
"""
:param gti:
:type gti: string
:return:
"""
return self.gtis.intersect_gtis(gti)
[docs] def show_gtis(self):
"""
:return:
"""
self.gtis.show_gtis()
[docs] def is_ready(self, c_name):
"""
:param c_name:
:type c_name: string
:return:
"""
print(f"Checking component [{c_name}]")
assert(isinstance(c_name, str))
# TODO: Check for others elements to
# validate each component. For the
# moment, we assume the only constraint for
# Dpco, Cali and Bube is to load the AavPvt table
# For DPCO, CALI or BUBE, we need aav_pvt and evt
ape_svo = ["dpco", "cali", "bube"]
ape_usr = ["mosa", "spex"]
if c_name in ape_svo:
return self.aav_pvt.loaded
if c_name == "imag":
print(f"Checking component [{c_name}]")
return isinstance(self.detector_images, dict) and len(self.detector_images.keys()) > 0
if c_name in ape_usr:
# TODO: check the conditions for MOSA and SPEX
return False
return False
[docs] def load_detector_effect_from_dir(self, caldb_dir):
"""
:param caldb_dir:
:type caldb_dir: string
:return:
"""
if not osp.isdir(caldb_dir):
s_logger.error(f"Directory {caldb_dir} does not exist. Skipping !!")
return False
s_logger.info(f"Loading Detector Effect files from {caldb_dir}")
detpix = ECLAIRsDetectorEffect()
detpix.path_arf = glob(osp.join(caldb_dir, "ECL-RSP-ARF*.fits"))[0]
s_logger.info(f"Loading Detector Effect files from {detpix.path_arf}")
detpix.path_irf = glob(osp.join(caldb_dir, "ECL-RSP-IRF*.fits"))[0]
s_logger.info(f"Loading Detector Effect files from {detpix.path_irf}")
detpix.path_rmf = glob(osp.join(caldb_dir, "ECL-RSP-RMF*.fits"))[0]
s_logger.info(f"Loading Detector Effect files from {detpix.path_rmf}")
# TODO: Change when the CalDB file AUX_ECL_UNI
detpix.path_efficiency = add_path_root_eclairs(
osp.join("ecpi", "extras", "caldb", "efficiency_matrix.fits")
)
detpix.path_nonuniformity = add_path_root_eclairs(
osp.join("ecpi", "extras", "caldb", "non_uniformity_matrix.fits")
)
detpix.set_arf(detpix.path_arf)
# detpix.set_irf(detpix.path_irf)
detpix.set_rmf(detpix.path_rmf)
detpix.set_efficiency(detpix.path_efficiency)
detpix.set_nonuniformity(detpix.path_nonuniformity)
self.detector_effect = detpix
return True
[docs] def load_common_keywords(self):
"""
:return:
"""
if isinstance(self.common_kws, dict):
self.show_common_keywords(lim=5)
s_logger.error("Common KWs already loaded. Skipping !!")
return False
s_logger.info("Loading Common KWs from http://fsc.svom.eu")
self.common_kws = download_common_keywords()
return True
[docs] def show_common_keywords(self, lim=-1):
"""
:param lim:
:type lim: integer
:return:
"""
if self.common_kws is None:
s_logger.error(f'None Keywords loaded. Load common keywords in advance!')
return False
for kw in self.common_kws.values():
print(f'Keyword {kw["name"]}')
for l_name in kw['product_keywords'][:lim]:
print(
f'Name: {l_name["name"]}\t'
f'{l_name["unit"]}\t'
f'{l_name["data_type"]}\t'
f'{l_name["description"]}'
)
return True