Source code for ecpi.pipeline.io.gti_flow

"""
    Data Flow is the layer for transmitting the data along
    the ECPI pipeline.
    Data Flow is a class containing four different classes
    each one for a particular type of data to be transmitted
    A) GtiFlow concerns all the information and functionality
        related to the Good Time Intervals.
    B) AttPvtFlow concerns the information about the orbit and
        the attitude of the satellite.
    C) EventFlow concerns the events and the different type
        of related objects
    D) DetImagFlow concerns the detector images and its
        different representations

    All these objects are grouped in a global container which
    becomes the data_flow layer in the main pipeline.
"""

import logging

# TODO: clean and document code
import ecpi.process.dpco.core.gti_earth as gearth

s_logger = logging.getLogger(__name__)


[docs]def union(gtia, gtib): """ Computes the union of two gtis :param gtia: first gti to combine :type gtia: numpy.ndarray :param gtib: second gti to combine :type gtib: numpy.ndarray :return: gti_union: final gti made by the union of the two input gtis :rtype: gti_union: numpy.ndarray """ tot = gtia + gtib if len(tot) < 2: s_logger.error("Malformed gti set!") return False s = [[gti[0], -1] for gti in tot] [s.append([gti[1], 1]) for gti in tot] s.sort() gti_union = [] count = 0 # We fix the first element to be the lower bound gti_ini = s[0][0] for i in range(len(s) - 1): count += s[i][1] if count == 0: gti_union.append([gti_ini, s[i][0]]) gti_ini = s[i + 1][0] gti_union.append([gti_ini, s[-1][0]]) return gti_union
[docs]def intersects_gti(gti_a, gti_b): """ Computes the intersection of 2 GTI (defined as sorted numpy arrays). Returns the intersected array. :param gtia: first gti to be intersect :type gtia: numpy.ndarray :param gtib: second gti to be intersect :type gtib: numpy.ndarray :return: gti_intersection: intersection of the two input gtis :rtype gti_intersection: numpy.ndarray """ n = len(gti_a) m = len(gti_b) print(gti_a, gti_b) if n < 1 or m < 1: s_logger.error("Empty GTI set !") return [[]] gti_intersection = [] i = j = 0 while i < n and j < m: # Left bound for intersecting segment left_b = max(gti_a[i][0], gti_b[j][0]) # Right bound for intersecting segment right_b = min(gti_a[i][1], gti_b[j][1]) if left_b < right_b: gti_intersection.append([left_b, right_b]) if gti_a[i][1] < gti_b[j][1]: i += 1 else: j += 1 return gti_intersection
# TODO: place the docstrings for this file
[docs]class GtiFlow(object): """ """ def __init__(self): """ inits GtiFlow """ self.status = -1 self.loaded = False self.level = 0 self.names = None self.gtis = {} self.gtis_eoc = {}
[docs] def set_gti_types(self, gti_types): """ :param gti_types: dict of different possible gtis :type gti_types: dictionnary :return: """ assert(isinstance(gti_types, dict)) self.names = gti_types
[docs] def set_basic_gtis(self, gtis): """ :param gtis: :type gtis: dictionnary :return: """ assert(isinstance(gtis, dict)) if not gtis: s_logger.warning("Empty GTIs dictionary, nothing changes.") return False # We use a two step setter : # We parse the dictionary before adding attributes # this avoids adding partial dictionaries for key in gtis: if key in self.gtis: s_logger.error(f"GTI {key}, already exists in GtiFlow!") return False # Now we add the elements as attributes for GtiFlow for key in gtis: self.gtis.update({key: gtis[key]}) return True
[docs] def set_earth_gtis(self, gtis, exist_ok=False): """ :param gtis: :type gtis: dictionnary :param exist_ok: :type exist_ok: boolean :return: """ assert(isinstance(gtis, dict)) if not gtis: s_logger.warning("Empty GTIs dictionary, nothing changes.") return False # We use a two step setter : # We parse the dictionary before adding attributes # this avoids adding partial dictionaries if exist_ok: self.gtis_eoc = {} else: for key in gtis: if key in self.gtis_eoc: s_logger.error(f"GTI {key}, already exists in GtiFlow!") return False # Now we add the elements as attributes for GtiFlow for key in gtis: self.gtis_eoc.update({key: gtis[key]}) return True
[docs] def set_user_gtis(self, gtis): """ User gtis can be given with arbitrary bounds and a preliminary check is made for setting in the right decimation :param gtis: :type gtis: dictionnary :return: """ assert(isinstance(gtis, dict)) if not gtis: s_logger.warning("Empty GTIs dictionary, nothing changes.") return False # We use a two step setter : # We parse the dictionary before adding attributes # this avoids adding partial dictionaries for key in gtis: if key in self.gtis: s_logger.error(f"GTI {key}, already exists in GtiFlow!") return False # Now we add the elements as attributes for GtiFlow for key in gtis: self.gtis.update({key: gtis[key]}) return True
[docs] def join_gti(self, gtia, gtib): """ Computes the union of two gtis :param gtia: :param gtib: :return: """ # TODO: set all the gtis in the right way tot = gtia + gtib if len(tot) < 2: s_logger.error("Malformed gti set!") return False s = [[gti[0], -1] for gti in tot] [s.append([gti[1], 1]) for gti in tot] s.sort() gti_union = [] count = 0 # We fix the first element to be the lower bound gti_ini = s[0][0] for i in range(len(s) - 1): count += s[i][1] if count == 0: gti_union.append([gti_ini, s[i][0]]) gti_ini = s[i + 1][0] gti_union.append([gti_ini, s[-1][0]]) return gti_union
[docs] def intersect_gtis(self, in_gti=""): """ Computes the intersection of a earth gti with the dpco basic gtis. :param in_gti: :type in_gti: string :return: """ gti_list = [a for a in self.gtis.values()] print(f'\nBasic Gtis are :{self.gtis}, \nthe list is {gti_list}\nand in_gti is {in_gti}') if self.gtis_eoc is not None: print( f'\n' f'Earth Gtis are :{self.gtis_eoc}, \n' f'the list is {gti_list}\n' f'and in_gti is {in_gti}' ) in_gti = in_gti.lower() if in_gti in self.gtis_eoc: gti_list.append(self.gtis_eoc[in_gti]) elif not in_gti == "": s_logger.error(f"GTI {in_gti} is not a valid earth gti") return False print(f'\nList Gtis is {gti_list} before intersecting') if len(gti_list) == 1: return gti_list[0] gti_intersected = intersects_gti(gti_list[0], gti_list[1]) counter = 2 while counter < len(gti_list): gti_intersected = intersects_gti(gti_list[counter], gti_intersected) counter += 1 print(f'\nIntersected gti is {gti_intersected}', flush=True) return gti_intersected
[docs] def compute_earth_gtis(self, t_orb, peof_min, peof_max): """ :param t_orb: :type t_orb: table :param peof_min: :type peof_min: float :param peof_max: :type peof_max: float :return: """ eag = gearth.get_gtis_by_interval( t_orb.data["EOF"], t_orb.data["TIME_AAV"], peof_min, peof_max ) self.set_earth_gtis(eag)
[docs] def show_gtis(self): """ :return: """ for git in [self.gtis, self.gtis_eoc]: [print(f'Gti {a} : {self.gtis[a]}') for a in git.keys()] return True