'''database simulation module
Created on 15 mai 2018
@author: Colley Jean-Marc, APC/IN2P3/CNRS
'''
import subprocess
import numpy as np
import os.path
import string
import random
import tarfile
from astropy.coordinates.angles import Angle
from ecpi.common.instru.model_geom import InstruECLAIRs
import ecpi.common.mission.attitude as ptginst
import ecpi.database.interface_db as dbs
import ecpi.common.sky.catalog as catX
import ecpi.simu.lib.instru_x as simX
class DataBaseSVOMSimV1(dbs.DataBaseSVOMInterface):
"""
"""
def __init__(self, p_name, p_local_path, p_pwd=None, p_url=None):
"""**Constructor**
Open one connection between database SVOM and pipeline ECLAIRs with identifiant p_name
return exception if can't open connexion.
:param p_name: local name
:type p_name: string
:param p_local_path: local path
:type p_local_path: string
:param p_pwd: password
:type p_pwd: string
:param p_url: URL
:type p_url: string
"""
self.local_path = p_local_path
if not os.path.exists(p_local_path):
try:
os.makedirs(p_local_path)
except FileNotFoundError:
raise dbs.DBSvomError("Can't create directory %s"%self.local_path)
self.dir_proc = os.path.join(p_local_path, 'processing')
try:
os.makedirs(self.dir_proc)
except FileExistsError:
pass
self.name_com = p_name
# evts simu config
self.file_size = 3600.0
self.cat_astro = catX.CatalogAstroSourcesBasic()
self.cat_astro.read_cat_swift_light()
ra_ang = Angle("22h53m57.7")
dec_ang = Angle("16d08m53.6s")
ra_dec = np.array([ra_ang.deg, dec_ang.deg])
self._set_attitude_ECLAIRs(ra_dec, 0)
def _get_name_base(self, p_id):
return os.path.join(self.dir_proc, p_id)
def _get_name_input(self, p_id):
base = self._get_name_base(p_id)
return os.path.join(base, 'input')
def _get_name_output(self, p_id):
base = self._get_name_base(p_id)
return os.path.join(base, 'output')
def _create_directory(self, p_id):
try:
os.makedirs(self._get_name_base(p_id))
except FileExistsError:
pass
try:
os.makedirs(self._get_name_input(p_id))
except FileExistsError:
pass
try:
os.makedirs(self._get_name_output(p_id))
except FileExistsError:
pass
def del_all_products(self):
os.system("rm -rf %s/*"%self.dir_proc)
def _create_name(self, p_path, p_pref, p_suf):
return os.path.join(p_path, p_pref + '_%s.fits'%p_suf)
def get_files_evt_between_time(self, p_tstart, p_tstop, p_id):
"""Return set of event files occurring between tow instants.
p_tstart and p_tstop in s from mjdref
simu evt files and save them into dir_in
evt files names are ECL_EVT_CAL + t_start in s from mjdref
:param p_tstart: starting time
:type p_tstart: float
:param p_tstop: ending time
:type p_tstop: float
:param p_id: identification number
:type p_id: string
"""
dir_in = self._get_name_input(p_id)
# do simulation evt ECLAIRs
# event file size in seconds
t_start_simu = p_tstart
file_size_seconds = self.file_size
p_tstop_while = p_tstop - file_size_seconds
simu_obs_x = simX.SimuECLAIRsMaskProjection() # pb multi thread mist be local
simu_obs_x.set_catalog(self.cat_FOV)
while((t_start_simu + file_size_seconds) < (p_tstop_while)):
# simulation
# CAREFUL simu takes time argument in seconds
simu_obs_x.simu_catalog(self.file_size)
simu_obs_x.add_cxb()
evt_name = self._create_name(dir_in, 'ECL_EVT_CAL', int(t_start_simu) )
simu_obs_x.save_cal_evts(evt_name, t_start=t_start_simu)
t_start_simu += file_size_seconds
# simulation of the last file
# CAREFUL simu takes time argument in seconds
# security test: if last observation too short, add_cxb() can crash trying to add 0 photon from bkg
# -> if the remaining time is < 1s, it is simulated with the last hour
# if not : independant simulation and file
if (p_tstop - file_size_seconds - t_start_simu) < 1. :
simu_obs_x.simu_catalog(p_tstop - t_start_simu)
simu_obs_x.add_cxb()
evt_name = self._create_name(dir_in, 'ECL_EVT_CAL', int(t_start_simu) )
simu_obs_x.save_cal_evts(evt_name, t_start=t_start_simu)
return dir_in
else:
simu_obs_x.simu_catalog(self.file_size)
simu_obs_x.add_cxb()
evt_name = self._create_name(dir_in, 'ECL_EVT_CAL', int(t_start_simu) )
simu_obs_x.save_cal_evts(evt_name, t_start=t_start_simu)
t_start_simu += file_size_seconds
simu_obs_x.simu_catalog(p_tstop - t_start_simu)
simu_obs_x.add_cxb()
evt_name = self._create_name(dir_in, 'ECL_EVT_CAL', int(t_start_simu) )
simu_obs_x.save_cal_evts(evt_name, t_start=t_start_simu)
return dir_in
def _set_attitude_ECLAIRs(self, p_ptg_radec, p_ori):
"""
p_ptg_radec and p_ori in degrees
"""
ptgo = ptginst.AttitudeECLAIRs()
ptgo.set_attitude_instru_ptgori([p_ptg_radec[0], p_ptg_radec[1], p_ori])
self._set_attitude(ptgo._q_j2000_grf)
def _set_attitude(self, p_quater):
self.attitude = p_quater
#ptgo = att.ptgsvom.AttitudeECLAIRs()
# pour simplier l'interface on considere que le pointage est celui du VT
ptgo = ptginst.AttitudeECLAIRs()
ptgo.set_attitude_svom_quater(p_quater)
self.cat_FOV = catX.CatalogFovEclairs(InstruECLAIRs())
self.cat_FOV.set_ptg_instru(ptgo)
self.cat_FOV.from_astro_catalog(self.cat_astro)
#self.simu_obsX.set_catalog(self.cat_FOV)
def get_sat_attitude(self, p_time):
"""
return rotation : quaternion like numpy array with 4 values
"""
return self.attitude
def get_sat_position(self, p_time):
return np.zeros(3, dtype=np.float32)
def get_id(self):
msize = 6
mchars=string.ascii_letters + string.digits
idp = 'ECL_'+''.join(random.choice(mchars) for _ in range(msize))
self._create_directory(idp)
return idp
def close(self, p_clean=False):
if p_clean:
os.system("rm -rf %s"%self.local_path )
def insert_product(self, p_dir, p_id):
"""
insert a product file in database
copy the file from input in dir_science_products directory of the database
:param p_dir: output dir
:type p_product_fits: string
"""
dir_out = self._get_name_output(p_id)
cmd = 'cp %s/* %s'%(p_dir, dir_out)
subprocess.run(cmd, shell=True)
def insert_log(self,p_file, p_id):
pass
def insert_status(self, p_string, p_id):
f_status = open( self._get_name_base(p_id)+'/status.txt',"w")
f_status.write(p_string)
f_status.close()
def get_product(self, p_id):
tar_name = p_id + ".tar.gz"
if os.path.exists(os.path.join(self.dir_proc, tar_name)):
return os.path.join(self.dir_proc, tar_name)
elif os.path.exists(self._get_name_output(p_id)):
os.chdir(self.dir_proc)
tar=tarfile.open(tar_name, "w:gz")
tar.add(p_id)
tar.close()
return os.path.join(self.dir_proc, tar_name)
else:
return "_NOK"
def get_log(self, p_id):
pass
def get_status(self, p_id):
file_status = self._get_name_base(p_id)+'/status.txt'
if os.path.isfile(file_status):
f_status = open(file_status,"r")
status = f_status.read()
f_status.close()
return status
else:
return "ID_PB"