Source code for ecpi.pipeline.io.aav_pvt_flow

"""
    Data Flow is the layer for transmitting the data along
    the ECPI pipeline.
    Data Flow is a class containing four different classes
    each one for a particular type of data to be transmitted
    A) GtiFlow concerns all the information and functionality
        related to the Good Time Intervals.
    B) AttPvtFlow concerns the information about the orbit and
        the attitude of the satellite.
    C) EventFlow concerns the events and the different type
        of related objects
    D) DetImagFlow concerns the detector images and its
        different representations

    All these objects are grouped in a global container which
    becomes the data_flow layer in the main pipeline.
"""

import logging

import numpy as np
from astropy.table import Table, hstack
import ecpi.process.dpco.core.gti_earth as gearth

# Loading ECPI objects
# from ecpi.common.io.events import EclairsCalEvtData
from ecpi.common.mission.attitude import AttitudeECLAIRs

s_logger = logging.getLogger(__name__)


[docs]class AavPvtFlow(object): """ """ def __init__(self): """ Inits AavPvtFlow """ self.status = -1 self._has_eof = False self.data = None self.n_data = -1 self.gtol = 1e-4 self.loaded = False self.ptgori_mean = None def _check_coherence(self, t_aav, t_pvt): """ :param t_aav: attitude data :type t_aav: table :param t_pvt: orbit data :type t_pvt: table :return: """ assert(isinstance(t_aav, Table)) assert(isinstance(t_pvt, Table)) if len(t_aav) != len(t_pvt): s_logger.error('Size of Attitude and Orbit files does not match.') return False tat_start = t_aav['TIME_AAV'][0] tpv_start = t_pvt['TIME_PVT'][0] if np.fabs(tat_start-tpv_start) > self.gtol: s_logger.error('Starting time in Attitude and Orbit files differs.') return False tat_stop = t_aav['TIME_AAV'][-1] tpv_stop = t_pvt['TIME_PVT'][-1] if np.fabs(tat_stop-tpv_stop) > self.gtol: s_logger.error('Ending time in Attitude and Orbit files differs.') return False return True def _compute_eof(self): """ Adding a new column to QTable :return: """ if self._has_eof: s_logger.error('Earth occultation fraction already exists!') return False eof = gearth.get_earth_occultation_fraction_array(self.data) if len(eof) != self.n_data: s_logger.error('The table of Earth occultation fractions has a bad size!') return False self.data.add_column(eof, name='EOF') self._has_eof = True return True
[docs] def set_orbit(self, t_aav, t_pvt, compute_eof=False): """ Create attitude/orbit table file :param t_aav: attitude data :type t_aav: table :param t_pvt: orbit data :type t_pvt: table :param compute_eof: Condition on computing the Earth occultation fraction :type compute_eof: boolean :return: """ # Check coherence assert(len(t_aav) > 0) assert(len(t_pvt) > 0) if not self._check_coherence(t_aav, t_pvt): return False # Create table s_logger.info('Creating ATT/PVT table...') self.data = hstack([t_aav, t_pvt], metadata_conflicts='silent') self.n_data = len(self.data) self.loaded = (self.n_data > 0) if self.loaded: s_logger.info('Computing pointing and creating index...') self.data.add_index('TIME_AAV') self.compute_ptgori_mean() if compute_eof: s_logger.info('Computing earth fraction column...') self._compute_eof() self.status = 0 return True
# TODO: check if it is necessary as an independent interface
[docs] def set_eof(self, t_eof): """ Adding a new column to QTable :param t_eof: Earth occultation fraction :type t_eof: table :return: """ if self._has_eof: s_logger.error('Earth occultation fraction already exists!') return False if len(t_eof) != self.n_data: s_logger.error('The table of Earth occultation fractions has a bad size!') return False self.data.add_column(t_eof, name='EOF') self._has_eof = True return True
[docs] def get_eof_table(self): """ Returns the table of fractions :return: """ if 'EOF' not in self.data.colnames: s_logger.error('Earth occultation fraction table does not exist!') return False return self.data['EOF'].data
[docs] def get_time_table(self): """ Returns the table of time_aav :return: """ l_time = ['TIME', 'TIME_AAV', 'TIME_PVT'] for s_time in l_time: if s_time not in self.data.colnames: s_logger.error(f'Orbit table ATT+PVT has not {s_time} column!') return False return self.data['TIME_AAV'].data
[docs] def get_size(self): """ :return: """ if not self.loaded: return 0 return self.n_data
[docs] def compute_ptgori_mean(self): """ :return: """ if 'QPARAM' not in self.data.colnames: s_logger.error('Missing ATT/PVT data. Files are not already loaded') return False att = AttitudeECLAIRs() # TODO: verify if we need to compute the mean # Warning self.data is an astropy table and # we access the elements with *.data quater = self.data['QPARAM'][0] s_logger.info(f"Recovering quaternion attitude with value : {quater}") att.set_attitude_svom_quater(quater) ptg = att.ptg_instru(deg=True)[0] ori = np.array([att.ori_instru(deg=True)]) self.ptgori_mean = np.concatenate((ptg, ori)) return True