Source code for simu.lib.database

'''database simulation module

Created on 15 mai 2018

@author: Colley Jean-Marc, APC/IN2P3/CNRS
'''

import subprocess
import database.interface_db as dbs
import numpy as np
import os, os.path
import astropy.time as astrotime
from astropy.coordinates.angles import Angle
import common.sky.catalog as catX
import simu.lib.InstruX as simX
import string 
import random
import tarfile
from common.instru.model_geom import InstruECLAIRs
import common.mission.attitude as ptginst

from common.mission.time import convert_svomref_seconds_in_mjd

[docs]class DataBaseSVOMSimV1(dbs.DataBaseSVOMInterface): def __init__(self, p_name, p_local_path, p_pwd=None, p_url=None): ''' open one connection between database SVOM and pipeline ECLAIRs with identifiant p_name return exception if can't open connexion ''' self.local_path = p_local_path if not os.path.exists(p_local_path): try: os.makedirs(p_local_path) except FileNotFoundError: raise dbs.DBSvomError("Can't create directory %s"%self.local_path) self.dir_proc = os.path.join(p_local_path, 'processing') try: os.makedirs(self.dir_proc) except FileExistsError: pass self.name_com = p_name # evts simu config self.file_size = 3600.0 self.cat_astro = catX.CatalogAstroSourcesBasic() self.cat_astro.read_cat_swift_light() ra_ang = Angle("22h53m57.7") dec_ang = Angle("16d08m53.6s") ra_dec = np.array([ra_ang.deg, dec_ang.deg]) self._set_attitude_ECLAIRs(ra_dec, 0) def _get_name_base(self, p_id): return os.path.join(self.dir_proc, p_id) def _get_name_input(self, p_id): base = self._get_name_base(p_id) return os.path.join(base, 'input') def _get_name_output(self, p_id): base = self._get_name_base(p_id) return os.path.join(base, 'output') def _create_directory(self, p_id): try: os.makedirs(self._get_name_base(p_id)) except FileExistsError: pass try: os.makedirs(self._get_name_input(p_id)) except FileExistsError: pass try: os.makedirs(self._get_name_output(p_id)) except FileExistsError: pass
[docs] def del_all_products(self): os.system("rm -rf %s/*"%self.dir_proc)
def _create_name(self, p_path, p_pref, p_suf): return os.path.join(p_path, p_pref + '_%s.fits'%p_suf)
[docs] def get_files_evt_between_time(self, p_tstart, p_tstop, p_id): """ p_tstart and p_tstop in s from mjdref simu evt files and save them into dir_in evt files names are ECL_EVT_CAL + t_start in s from mjdref """ dir_in = self._get_name_input(p_id) # do simulation evt ECLAIRs # event file size in seconds t_start_simu = p_tstart file_size_seconds = self.file_size p_tstop_while = p_tstop - file_size_seconds #simu_obs_x = simX.SimuECLAIRsV1() # pb multi thread mist be local simu_obs_x = simX.SimuECLAIRsMaskProjection() # pb multi thread mist be local simu_obs_x.set_catalog(self.cat_FOV) while((t_start_simu + file_size_seconds) < (p_tstop_while)): # simulation # CAREFUL simu takes time argument in seconds simu_obs_x.simu_catalog(self.file_size) simu_obs_x.add_cxb() evt_name = self._create_name(dir_in, 'ECL_EVT_CAL', int(t_start_simu) ) simu_obs_x.save_cal_evts(evt_name, t_start=t_start_simu) t_start_simu += file_size_seconds # simulation of the last file # CAREFUL simu takes time argument in seconds # security test: if last observation too short, add_cxb() can crash trying to add 0 photon from bkg # -> if the remaining time is < 1s, it is simulated with the last hour # if not : independant simulation and file if (p_tstop - file_size_seconds - t_start_simu) < 1. : simu_obs_x.simu_catalog(p_tstop - t_start_simu) simu_obs_x.add_cxb() evt_name = self._create_name(dir_in, 'ECL_EVT_CAL', int(t_start_simu) ) simu_obs_x.save_cal_evts(evt_name, t_start=t_start_simu) return dir_in else: simu_obs_x.simu_catalog(self.file_size) simu_obs_x.add_cxb() evt_name = self._create_name(dir_in, 'ECL_EVT_CAL', int(t_start_simu) ) simu_obs_x.save_cal_evts(evt_name, t_start=t_start_simu) t_start_simu += file_size_seconds simu_obs_x.simu_catalog(p_tstop - t_start_simu) simu_obs_x.add_cxb() evt_name = self._create_name(dir_in, 'ECL_EVT_CAL', int(t_start_simu) ) simu_obs_x.save_cal_evts(evt_name, t_start=t_start_simu) return dir_in
def _set_attitude_ECLAIRs(self, p_ptg_radec, p_ori): """ p_ptg_radec and p_ori in degrees """ ptgo = ptginst.AttitudeECLAIRs() ptgo.set_attitude_instru_ptgori([p_ptg_radec[0], p_ptg_radec[1], p_ori]) self._set_attitude(ptgo._q_j2000_grf) def _set_attitude(self, p_quater): self.attitude = p_quater #ptgo = att.ptgsvom.AttitudeECLAIRs() # pour simplier l'interface on considere que le pointage est celui du VT ptgo = ptginst.AttitudeECLAIRs() ptgo.set_attitude_svom_quater(p_quater) self.cat_FOV = catX.CatalogFovEclairs(InstruECLAIRs()) self.cat_FOV.set_ptg_instru(ptgo) self.cat_FOV.from_astro_catalog(self.cat_astro) #self.simu_obsX.set_catalog(self.cat_FOV)
[docs] def get_sat_attitude(self, p_time): """ return rotation : quaternion like numpy array with 4 values """ return self.attitude
[docs] def get_sat_position(self, p_time): return np.zeros(3, dtype=np.float32)
[docs] def get_id(self): msize = 6 mchars=string.ascii_letters + string.digits idp = 'ECL_'+''.join(random.choice(mchars) for _ in range(msize)) self._create_directory(idp) return idp
[docs] def close(self, p_clean=False): if p_clean: os.system("rm -rf %s"%self.local_path )
[docs] def insert_product(self, p_dir, p_id): """ insert a product file in database copy the file from input in dir_science_products directory of the database :param p_dir: output dir :type p_product_fits: string """ dir_out = self._get_name_output(p_id) cmd = 'cp %s/* %s'%(p_dir, dir_out) subprocess.run(cmd, shell=True)
[docs] def insert_log(self,p_file, p_id): pass
[docs] def insert_status(self, p_string, p_id): f_status = open( self._get_name_base(p_id)+'/status.txt',"w") f_status.write(p_string) f_status.close()
[docs] def get_product(self, p_id): tar_name = p_id + ".tar.gz" if os.path.exists(os.path.join(self.dir_proc, tar_name)): return os.path.join(self.dir_proc, tar_name) elif os.path.exists(self._get_name_output(p_id)): os.chdir(self.dir_proc) tar=tarfile.open(tar_name, "w:gz") tar.add(p_id) tar.close() return os.path.join(self.dir_proc, tar_name) else: return "_NOK"
[docs] def get_log(self, p_id): pass
[docs] def get_status(self, p_id): file_status = self._get_name_base(p_id)+'/status.txt' if os.path.isfile(file_status): f_status = open(file_status,"r") status = f_status.read() f_status.close() return status else: return "ID_PB"