'''database simulation module
Created on 15 mai 2018
@author: Colley Jean-Marc, APC/IN2P3/CNRS
'''
import subprocess
import database.interface_db as dbs
import numpy as np
import os, os.path
import astropy.time as astrotime
from astropy.coordinates.angles import Angle
import common.sky.catalog as catX
import simu.lib.InstruX as simX
import string
import random
import tarfile
from common.instru.model_geom import InstruECLAIRs
import common.mission.attitude as ptginst
from common.mission.time import convert_svomref_seconds_in_mjd
[docs]class DataBaseSVOMSimV1(dbs.DataBaseSVOMInterface):
def __init__(self, p_name, p_local_path, p_pwd=None, p_url=None):
'''
open one connection between database SVOM and pipeline ECLAIRs with identifiant p_name
return exception if can't open connexion
'''
self.local_path = p_local_path
if not os.path.exists(p_local_path):
try:
os.makedirs(p_local_path)
except FileNotFoundError:
raise dbs.DBSvomError("Can't create directory %s"%self.local_path)
self.dir_proc = os.path.join(p_local_path, 'processing')
try:
os.makedirs(self.dir_proc)
except FileExistsError:
pass
self.name_com = p_name
# evts simu config
self.file_size = 3600.0
self.cat_astro = catX.CatalogAstroSourcesBasic()
self.cat_astro.read_cat_swift_light()
ra_ang = Angle("22h53m57.7")
dec_ang = Angle("16d08m53.6s")
ra_dec = np.array([ra_ang.deg, dec_ang.deg])
self._set_attitude_ECLAIRs(ra_dec, 0)
def _get_name_base(self, p_id):
return os.path.join(self.dir_proc, p_id)
def _get_name_input(self, p_id):
base = self._get_name_base(p_id)
return os.path.join(base, 'input')
def _get_name_output(self, p_id):
base = self._get_name_base(p_id)
return os.path.join(base, 'output')
def _create_directory(self, p_id):
try:
os.makedirs(self._get_name_base(p_id))
except FileExistsError:
pass
try:
os.makedirs(self._get_name_input(p_id))
except FileExistsError:
pass
try:
os.makedirs(self._get_name_output(p_id))
except FileExistsError:
pass
[docs] def del_all_products(self):
os.system("rm -rf %s/*"%self.dir_proc)
def _create_name(self, p_path, p_pref, p_suf):
return os.path.join(p_path, p_pref + '_%s.fits'%p_suf)
[docs] def get_files_evt_between_time(self, p_tstart, p_tstop, p_id):
"""
p_tstart and p_tstop in s from mjdref
simu evt files and save them into dir_in
evt files names are ECL_EVT_CAL + t_start in s from mjdref
"""
dir_in = self._get_name_input(p_id)
# do simulation evt ECLAIRs
# event file size in seconds
t_start_simu = p_tstart
file_size_seconds = self.file_size
p_tstop_while = p_tstop - file_size_seconds
#simu_obs_x = simX.SimuECLAIRsV1() # pb multi thread mist be local
simu_obs_x = simX.SimuECLAIRsMaskProjection() # pb multi thread mist be local
simu_obs_x.set_catalog(self.cat_FOV)
while((t_start_simu + file_size_seconds) < (p_tstop_while)):
# simulation
# CAREFUL simu takes time argument in seconds
simu_obs_x.simu_catalog(self.file_size)
simu_obs_x.add_cxb()
evt_name = self._create_name(dir_in, 'ECL_EVT_CAL', int(t_start_simu) )
simu_obs_x.save_cal_evts(evt_name, t_start=t_start_simu)
t_start_simu += file_size_seconds
# simulation of the last file
# CAREFUL simu takes time argument in seconds
# security test: if last observation too short, add_cxb() can crash trying to add 0 photon from bkg
# -> if the remaining time is < 1s, it is simulated with the last hour
# if not : independant simulation and file
if (p_tstop - file_size_seconds - t_start_simu) < 1. :
simu_obs_x.simu_catalog(p_tstop - t_start_simu)
simu_obs_x.add_cxb()
evt_name = self._create_name(dir_in, 'ECL_EVT_CAL', int(t_start_simu) )
simu_obs_x.save_cal_evts(evt_name, t_start=t_start_simu)
return dir_in
else:
simu_obs_x.simu_catalog(self.file_size)
simu_obs_x.add_cxb()
evt_name = self._create_name(dir_in, 'ECL_EVT_CAL', int(t_start_simu) )
simu_obs_x.save_cal_evts(evt_name, t_start=t_start_simu)
t_start_simu += file_size_seconds
simu_obs_x.simu_catalog(p_tstop - t_start_simu)
simu_obs_x.add_cxb()
evt_name = self._create_name(dir_in, 'ECL_EVT_CAL', int(t_start_simu) )
simu_obs_x.save_cal_evts(evt_name, t_start=t_start_simu)
return dir_in
def _set_attitude_ECLAIRs(self, p_ptg_radec, p_ori):
"""
p_ptg_radec and p_ori in degrees
"""
ptgo = ptginst.AttitudeECLAIRs()
ptgo.set_attitude_instru_ptgori([p_ptg_radec[0], p_ptg_radec[1], p_ori])
self._set_attitude(ptgo._q_j2000_grf)
def _set_attitude(self, p_quater):
self.attitude = p_quater
#ptgo = att.ptgsvom.AttitudeECLAIRs()
# pour simplier l'interface on considere que le pointage est celui du VT
ptgo = ptginst.AttitudeECLAIRs()
ptgo.set_attitude_svom_quater(p_quater)
self.cat_FOV = catX.CatalogFovEclairs(InstruECLAIRs())
self.cat_FOV.set_ptg_instru(ptgo)
self.cat_FOV.from_astro_catalog(self.cat_astro)
#self.simu_obsX.set_catalog(self.cat_FOV)
[docs] def get_sat_attitude(self, p_time):
"""
return rotation : quaternion like numpy array with 4 values
"""
return self.attitude
[docs] def get_sat_position(self, p_time):
return np.zeros(3, dtype=np.float32)
[docs] def get_id(self):
msize = 6
mchars=string.ascii_letters + string.digits
idp = 'ECL_'+''.join(random.choice(mchars) for _ in range(msize))
self._create_directory(idp)
return idp
[docs] def close(self, p_clean=False):
if p_clean:
os.system("rm -rf %s"%self.local_path )
[docs] def insert_product(self, p_dir, p_id):
"""
insert a product file in database
copy the file from input in dir_science_products directory of the database
:param p_dir: output dir
:type p_product_fits: string
"""
dir_out = self._get_name_output(p_id)
cmd = 'cp %s/* %s'%(p_dir, dir_out)
subprocess.run(cmd, shell=True)
[docs] def insert_log(self,p_file, p_id):
pass
[docs] def insert_status(self, p_string, p_id):
f_status = open( self._get_name_base(p_id)+'/status.txt',"w")
f_status.write(p_string)
f_status.close()
[docs] def get_product(self, p_id):
tar_name = p_id + ".tar.gz"
if os.path.exists(os.path.join(self.dir_proc, tar_name)):
return os.path.join(self.dir_proc, tar_name)
elif os.path.exists(self._get_name_output(p_id)):
os.chdir(self.dir_proc)
tar=tarfile.open(tar_name, "w:gz")
tar.add(p_id)
tar.close()
return os.path.join(self.dir_proc, tar_name)
else:
return "_NOK"
[docs] def get_log(self, p_id):
pass
[docs] def get_status(self, p_id):
file_status = self._get_name_base(p_id)+'/status.txt'
if os.path.isfile(file_status):
f_status = open(file_status,"r")
status = f_status.read()
f_status.close()
return status
else:
return "ID_PB"