"""
Data Flow is the layer for transmitting the data along
the ECPI pipeline.
Data Flow is a class containing four different classes
each one for a particular type of data to be transmitted
A) GtiFlow concerns all the information and functionality
related to the Good Time Intervals.
B) AttPvtFlow concerns the information about the orbit and
the attitude of the satellite.
C) EventFlow concerns the events and the different type
of related objects
D) DetImagFlow concerns the detector images and its
different representations
All these objects are grouped in a global container which
becomes the data_flow layer in the main pipeline.
"""
import logging
import numpy as np
from astropy.table import Table, hstack
import ecpi.process.dpco.core.gti_earth as gearth
# Loading ECPI objects
# from ecpi.common.io.events import EclairsCalEvtData
from ecpi.common.mission.attitude import AttitudeECLAIRs
s_logger = logging.getLogger(__name__)
[docs]class AavPvtFlow(object):
"""
"""
def __init__(self):
"""
Inits AavPvtFlow
"""
self.status = -1
self._has_eof = False
self.data = None
self.n_data = -1
self.gtol = 1e-4
self.loaded = False
self.ptgori_mean = None
def _check_coherence(self, t_aav, t_pvt):
"""
:param t_aav: attitude data
:type t_aav: table
:param t_pvt: orbit data
:type t_pvt: table
:return:
"""
assert(isinstance(t_aav, Table))
assert(isinstance(t_pvt, Table))
if len(t_aav) != len(t_pvt):
s_logger.error('Size of Attitude and Orbit files does not match.')
return False
tat_start = t_aav['TIME_AAV'][0]
tpv_start = t_pvt['TIME_PVT'][0]
if np.fabs(tat_start-tpv_start) > self.gtol:
s_logger.error('Starting time in Attitude and Orbit files differs.')
return False
tat_stop = t_aav['TIME_AAV'][-1]
tpv_stop = t_pvt['TIME_PVT'][-1]
if np.fabs(tat_stop-tpv_stop) > self.gtol:
s_logger.error('Ending time in Attitude and Orbit files differs.')
return False
return True
def _compute_eof(self):
"""
Adding a new column to QTable
:return:
"""
if self._has_eof:
s_logger.error('Earth occultation fraction already exists!')
return False
eof = gearth.get_earth_occultation_fraction_array(self.data)
if len(eof) != self.n_data:
s_logger.error('The table of Earth occultation fractions has a bad size!')
return False
self.data.add_column(eof, name='EOF')
self._has_eof = True
return True
[docs] def set_orbit(self, t_aav, t_pvt, compute_eof=False):
"""
Create attitude/orbit table file
:param t_aav: attitude data
:type t_aav: table
:param t_pvt: orbit data
:type t_pvt: table
:param compute_eof: Condition on computing the Earth occultation fraction
:type compute_eof: boolean
:return:
"""
# Check coherence
assert(len(t_aav) > 0)
assert(len(t_pvt) > 0)
if not self._check_coherence(t_aav, t_pvt):
return False
# Create table
s_logger.info('Creating ATT/PVT table...')
self.data = hstack([t_aav, t_pvt], metadata_conflicts='silent')
self.n_data = len(self.data)
self.loaded = (self.n_data > 0)
if self.loaded:
s_logger.info('Computing pointing and creating index...')
self.data.add_index('TIME_AAV')
self.compute_ptgori_mean()
if compute_eof:
s_logger.info('Computing earth fraction column...')
self._compute_eof()
self.status = 0
return True
# TODO: check if it is necessary as an independent interface
[docs] def set_eof(self, t_eof):
"""
Adding a new column to QTable
:param t_eof: Earth occultation fraction
:type t_eof: table
:return:
"""
if self._has_eof:
s_logger.error('Earth occultation fraction already exists!')
return False
if len(t_eof) != self.n_data:
s_logger.error('The table of Earth occultation fractions has a bad size!')
return False
self.data.add_column(t_eof, name='EOF')
self._has_eof = True
return True
[docs] def get_eof_table(self):
"""
Returns the table of fractions
:return:
"""
if 'EOF' not in self.data.colnames:
s_logger.error('Earth occultation fraction table does not exist!')
return False
return self.data['EOF'].data
[docs] def get_time_table(self):
"""
Returns the table of time_aav
:return:
"""
l_time = ['TIME', 'TIME_AAV', 'TIME_PVT']
for s_time in l_time:
if s_time not in self.data.colnames:
s_logger.error(f'Orbit table ATT+PVT has not {s_time} column!')
return False
return self.data['TIME_AAV'].data
[docs] def get_size(self):
"""
:return:
"""
if not self.loaded:
return 0
return self.n_data
[docs] def compute_ptgori_mean(self):
"""
:return:
"""
if 'QPARAM' not in self.data.colnames:
s_logger.error('Missing ATT/PVT data. Files are not already loaded')
return False
att = AttitudeECLAIRs()
# TODO: verify if we need to compute the mean
# Warning self.data is an astropy table and
# we access the elements with *.data
quater = self.data['QPARAM'][0]
s_logger.info(f"Recovering quaternion attitude with value : {quater}")
att.set_attitude_svom_quater(quater)
ptg = att.ptg_instru(deg=True)[0]
ori = np.array([att.ori_instru(deg=True)])
self.ptgori_mean = np.concatenate((ptg, ori))
return True