"""
Data Flow is the layer for transmitting the data along
the ECPI pipeline.
Data Flow is a class containing four different classes
each one for a particular type of data to be transmitted
A) GtiFlow concerns all the information and functionality
related to the Good Time Intervals.
B) AttPvtFlow concerns the information about the orbit and
the attitude of the satellite.
C) EventFlow concerns the events and the different type
of related objects
D) DetImagFlow concerns the detector images and its
different representations
All these objects are grouped in a global container which
becomes the data_flow layer in the main pipeline.
"""
import logging
# TODO: clean and document code
import ecpi.process.dpco.core.gti_earth as gearth
s_logger = logging.getLogger(__name__)
[docs]def union(gtia, gtib):
"""
Computes the union of two gtis
:param gtia: first gti to combine
:type gtia: numpy.ndarray
:param gtib: second gti to combine
:type gtib: numpy.ndarray
:return: gti_union: final gti made by the union of the two input gtis
:rtype: gti_union: numpy.ndarray
"""
tot = gtia + gtib
if len(tot) < 2:
s_logger.error("Malformed gti set!")
return False
s = [[gti[0], -1] for gti in tot]
[s.append([gti[1], 1]) for gti in tot]
s.sort()
gti_union = []
count = 0
# We fix the first element to be the lower bound
gti_ini = s[0][0]
for i in range(len(s) - 1):
count += s[i][1]
if count == 0:
gti_union.append([gti_ini, s[i][0]])
gti_ini = s[i + 1][0]
gti_union.append([gti_ini, s[-1][0]])
return gti_union
[docs]def intersects_gti(gti_a, gti_b):
"""
Computes the intersection of 2 GTI (defined as sorted numpy arrays).
Returns the intersected array.
:param gtia: first gti to be intersect
:type gtia: numpy.ndarray
:param gtib: second gti to be intersect
:type gtib: numpy.ndarray
:return: gti_intersection: intersection of the two input gtis
:rtype gti_intersection: numpy.ndarray
"""
n = len(gti_a)
m = len(gti_b)
print(gti_a, gti_b)
if n < 1 or m < 1:
s_logger.error("Empty GTI set !")
return [[]]
gti_intersection = []
i = j = 0
while i < n and j < m:
# Left bound for intersecting segment
left_b = max(gti_a[i][0], gti_b[j][0])
# Right bound for intersecting segment
right_b = min(gti_a[i][1], gti_b[j][1])
if left_b < right_b:
gti_intersection.append([left_b, right_b])
if gti_a[i][1] < gti_b[j][1]:
i += 1
else:
j += 1
return gti_intersection
# TODO: place the docstrings for this file
[docs]class GtiFlow(object):
"""
"""
def __init__(self):
"""
inits GtiFlow
"""
self.status = -1
self.loaded = False
self.level = 0
self.names = None
self.gtis = {}
self.gtis_eoc = {}
[docs] def set_gti_types(self, gti_types):
"""
:param gti_types: dict of different possible gtis
:type gti_types: dictionnary
:return:
"""
assert(isinstance(gti_types, dict))
self.names = gti_types
[docs] def set_basic_gtis(self, gtis):
"""
:param gtis:
:type gtis: dictionnary
:return:
"""
assert(isinstance(gtis, dict))
if not gtis:
s_logger.warning("Empty GTIs dictionary, nothing changes.")
return False
# We use a two step setter :
# We parse the dictionary before adding attributes
# this avoids adding partial dictionaries
for key in gtis:
if key in self.gtis:
s_logger.error(f"GTI {key}, already exists in GtiFlow!")
return False
# Now we add the elements as attributes for GtiFlow
for key in gtis:
self.gtis.update({key: gtis[key]})
return True
[docs] def set_earth_gtis(self, gtis, exist_ok=False):
"""
:param gtis:
:type gtis: dictionnary
:param exist_ok:
:type exist_ok: boolean
:return:
"""
assert(isinstance(gtis, dict))
if not gtis:
s_logger.warning("Empty GTIs dictionary, nothing changes.")
return False
# We use a two step setter :
# We parse the dictionary before adding attributes
# this avoids adding partial dictionaries
if exist_ok:
self.gtis_eoc = {}
else:
for key in gtis:
if key in self.gtis_eoc:
s_logger.error(f"GTI {key}, already exists in GtiFlow!")
return False
# Now we add the elements as attributes for GtiFlow
for key in gtis:
self.gtis_eoc.update({key: gtis[key]})
return True
[docs] def set_user_gtis(self, gtis):
"""
User gtis can be given with arbitrary bounds and a preliminary
check is made for setting in the right decimation
:param gtis:
:type gtis: dictionnary
:return:
"""
assert(isinstance(gtis, dict))
if not gtis:
s_logger.warning("Empty GTIs dictionary, nothing changes.")
return False
# We use a two step setter :
# We parse the dictionary before adding attributes
# this avoids adding partial dictionaries
for key in gtis:
if key in self.gtis:
s_logger.error(f"GTI {key}, already exists in GtiFlow!")
return False
# Now we add the elements as attributes for GtiFlow
for key in gtis:
self.gtis.update({key: gtis[key]})
return True
[docs] def join_gti(self, gtia, gtib):
"""
Computes the union of two gtis
:param gtia:
:param gtib:
:return:
"""
# TODO: set all the gtis in the right way
tot = gtia + gtib
if len(tot) < 2:
s_logger.error("Malformed gti set!")
return False
s = [[gti[0], -1] for gti in tot]
[s.append([gti[1], 1]) for gti in tot]
s.sort()
gti_union = []
count = 0
# We fix the first element to be the lower bound
gti_ini = s[0][0]
for i in range(len(s) - 1):
count += s[i][1]
if count == 0:
gti_union.append([gti_ini, s[i][0]])
gti_ini = s[i + 1][0]
gti_union.append([gti_ini, s[-1][0]])
return gti_union
[docs] def intersect_gtis(self, in_gti=""):
"""
Computes the intersection of a earth gti with the dpco basic gtis.
:param in_gti:
:type in_gti: string
:return:
"""
gti_list = [a for a in self.gtis.values()]
print(f'\nBasic Gtis are :{self.gtis}, \nthe list is {gti_list}\nand in_gti is {in_gti}')
if self.gtis_eoc is not None:
print(
f'\n'
f'Earth Gtis are :{self.gtis_eoc}, \n'
f'the list is {gti_list}\n'
f'and in_gti is {in_gti}'
)
in_gti = in_gti.lower()
if in_gti in self.gtis_eoc:
gti_list.append(self.gtis_eoc[in_gti])
elif not in_gti == "":
s_logger.error(f"GTI {in_gti} is not a valid earth gti")
return False
print(f'\nList Gtis is {gti_list} before intersecting')
if len(gti_list) == 1:
return gti_list[0]
gti_intersected = intersects_gti(gti_list[0], gti_list[1])
counter = 2
while counter < len(gti_list):
gti_intersected = intersects_gti(gti_list[counter], gti_intersected)
counter += 1
print(f'\nIntersected gti is {gti_intersected}', flush=True)
return gti_intersected
[docs] def compute_earth_gtis(self, t_orb, peof_min, peof_max):
"""
:param t_orb:
:type t_orb: table
:param peof_min:
:type peof_min: float
:param peof_max:
:type peof_max: float
:return:
"""
eag = gearth.get_gtis_by_interval(
t_orb.data["EOF"],
t_orb.data["TIME_AAV"],
peof_min,
peof_max
)
self.set_earth_gtis(eag)
[docs] def show_gtis(self):
"""
:return:
"""
for git in [self.gtis, self.gtis_eoc]:
[print(f'Gti {a} : {self.gtis[a]}') for a in git.keys()]
return True