#!/usr/bin/env python3
'''This is a executable python script to run the ECLAIRs pipeline in a local configuration.
The script needs a configuration file in argument.
.. seealso:: process/pipeline/readme.md
Le script has 2 inputs:
* configuration file: example here ecpi/common/data/config_files
* output directory
usage: ecpi_local_script.py [-h] [-v] [-s]
config_file output_dir
local script to run the ECLAIRs pipeline v1
positional arguments:
config_file configuration file absolute path
output_dir directory to store the output files from the pipeline. For
security reasons, files are store in a sub-directory named
ECLAIRs_pipeline_dir_*, cleaned each time.
optional arguments:
-h, --help show this help message and exit
-v, --verbose increase output verbosity for additional information in
stdout (e.g., no -v : quiet, -v : info, -vv : debug
-s, --show show flag for display some graphs
This is a executable python script to run the ECLAIRs pipeline in a local
configuration. The script needs a configuration file in argument. An example
of a working configuration file could be found in
ecpi/common/data/config_files/ecpi_config.cfg
@author: Catalano Camille, APC/IN2P3/CNRS
'''
import argparse
import os
import logging
import json
import configparser
import numpy as np
import astropy.time as astrotime
import ecpi.process.pipeline.ecpi_flows as flow
import ecpi.common.sky.catalog as cat
import ecpi.common.mission.attitude as ptgsvom
from ecpi.logger_ecpi import ECPILogger
from ecpi.common.mission.observation import EclairsObservation
from ecpi.process.pipeline.ecpi_config import PipelineConfiguration
from ecpi.common.instru.model_effect import ECLAIRsDetectorEffectDefault
[docs]def read_catalog(cfg, obs_id, catalog_fov, log=None):
"""get the catalog for sources identification
catalog_ref_type :
- sources_file : configuration file with a list of sources
- swift_cat : swift-BAT catalog
- ECLAIRs_cat : a ECLAIRs sources catalog
:param cfg: configuration container from the configuration file
:type cfg: ConfigParser
:param obs_id: observation section
:type obs_id: string
:param catalog_fov: sources reference catalog
:type catalog_fov: CatalogFovEclairs
:param log: optionnal logger for unit tests
:type log: logger
:return: updated sources catalog used in the simulation
:rtype: CatalogFovEclairs
"""
if cfg[obs_id]['catalog_ref_type'] == 'sources_config':
src_cfg = configparser.ConfigParser(interpolation=configparser.ExtendedInterpolation())
src_cfg.read(cfg[obs_id]['catalog_ref_path'])
first_section = src_cfg.sections()[0]
nb_chan = len(json.loads(src_cfg[first_section]['flux']))
catalog_fov.intensity_with_channel(nb_chan)
for src in src_cfg.sections():
if src_cfg[src]['coord_type'] == '0':
catalog_fov.add_src_pix([float(src_cfg[src]['Xs']),
float(src_cfg[src]['Ys']),
json.loads(src_cfg[src]['flux']),
src])
elif src_cfg[src]['coord_type'] == '1':
astro_cat = cat.CatalogAstroWithEnergySpecSampling(np.ones(nb_chan))
astro_cat.add_src([float(src_cfg[src]['Xs']),float(src_cfg[src]['Ys']),json.loads(src_cfg[src]['flux']),src])
catalog_fov.from_astro_catalog(astro_cat)
else:
if log is not None:
log.logger.warning("coord_type not recognized in sources config file")
else:
ecpi_logger.logger.warning("coord_type not recognized in sources config file")
return
return catalog_fov
elif cfg[obs_id]['catalog_ref_type'] == 'ECLAIRs_cat':
catalog_fov.read_catalog(cfg[obs_id]['catalog_ref_path'])
return catalog_fov
elif cfg[obs_id]['catalog_ref_type'] == 'swift_cat':
swift_cat = cat.CatalogAstroSourcesBasic()
swift_cat.read_cat_swift_light()
catalog_fov.from_astro_catalog(swift_cat)
return catalog_fov
else:
if log is not None:
log.logger.warning("catalog_ref_type not recognized")
return
[docs]def read_observation_config(cfg, obs_id, log=None):
"""read the observation config parameters
:param cfg: pipeline input configuration
:type cfg: ConfigParser
:param obs_id: observation section
:type obs_id: string
:param log: optionnal logger for unit tests
:type log: logger
:return: t_0, t_1, eclairs_attitude
:rtype: Time, Time, array([ra, dec, ori])
"""
try:
t_obs = float(cfg[obs_id]['t_obs'])
except:
t_obs = 1000000000
try:
t_0 = astrotime.Time(cfg[obs_id]['t_start'], format='isot', scale='tt')
except:
t_0 = astrotime.Time('2017-01-01T00:00:00.000', format='isot', scale='tt')
t_1 = t_0 + astrotime.TimeDelta(t_obs, format='sec')
try:
eclairs_attitude = np.array([float(cfg[obs_id]['pointing_ra']),
float(cfg[obs_id]['pointing_dec']),
float(cfg[obs_id]['pointing_ori'])])
except:
eclairs_attitude = np.array([0, 0, 0])
if log is not None:
log.logger.warning("WARNING: no attitude in input. Default is 0,0,0.")
return t_0, t_1, eclairs_attitude
[docs]def read_imag_config(cfg, log=None):
"""read the general imag config parameters
:param cfg: pipeline input configuration
:type cfg: ConfigParser
:param log: optionnal logger for unit tests
:type log: logger
:return: dictionary containing the necesary parameters for the IMAG module.
:type: dict
:var version: pipeline version to determine which deconv and simu module to use ['1' or '1.1']
:type version: string
:var src_subimage_size: nb of pixels around the detected source (radius) kept in the final sky image
:type src_subimage_size: int
:var snr_limit: snr limit to stop at the source detection algorithm
:type snr_limit: float >0
:var fit_function: function to fit the precise source location.
:type fit_function: string
:var fit_sigma: standard deviation of the gaussian fit_function. Default is 1.555.
:type fit_sigma: float>0
:var off_axis: off axis correction. Default=None
:type off_axis: None, cos_theta or irf
"""
snr_limit = float(cfg['general']['snr_limit'])
src_subimage_size = int(cfg['general']['src_subimage_size'])
if cfg['general']['random_seed'] != '0':
np.random.seed(int(cfg['general']['random_seed']))
if cfg['general']['IMAG_simulation_method'] == 'projection':
version="1.1"
elif cfg['general']['IMAG_simulation_method'] == 'ray_tracing':
version="1"
else:
if log is not None:
log.logger.warning("IMAG simulation method not recognized")
else:
ecpi_logger.logger.warning("IMAG simulation method not recognized")
return
off_axis_correction = cfg['general']['off_axis_correction']
fit_function = cfg['general']['fit_function']
try:
fit_sigma = float(cfg['general']['fit_sigma'])
except:
fit_sigma = 1.155
dict_imag_cfg = {
'snr_limit': snr_limit,
'src_subimage_size': src_subimage_size,
'version': version,
'off_axis_correction': off_axis_correction,
'fit_function': fit_function,
'fit_sigma': fit_sigma
}
return dict_imag_cfg
[docs]def main(args, cfg):
"""This is the main function of the executable python script to run the ECLAIRs pipeline in local configuration.
:param args: argument parser
:type args: parse_args
:param cfg: configuration container from the configuration file
:type cfg: ConfigParser
"""
dict_imag_cfg = read_imag_config(cfg)
try:
energy_ranges = json.loads(cfg['general']['channels_ranges'])
except:
energy_ranges = [[0, 0]]
ecpi_logger.logger.info('########################## ECLAIRs pipeline local script ##########################')
ecpi_logger.logger.info('reading config file')
for obs_id in cfg.sections()[1:]:
# env config
ecpi_logger.logger.info('########################## NEW OBSERVATION ##########################')
ecpi_logger.logger.info('##########################')
ecpi_logger.logger.info('configuration of the pipeline environment')
working_dir = os.path.join(args.output_dir,'./ECLAIRs_pipeline_dir_{}'.format(obs_id))
working_dir = os.path.abspath(working_dir)
try:
os.makedirs(working_dir)
except FileExistsError:
pass
os.system('rm -rf ' + working_dir + '/*')
# observation config
ecpi_logger.logger.info('working directory : {}'.format(working_dir))
ecpi_logger.logger.info('##########################')
ecpi_logger.logger.info('configuration of the observation {}'.format(obs_id))
t_0, t_1, eclairs_attitude = read_observation_config(cfg, obs_id)
#early conf to check params
obs = EclairsObservation(t_0.isot, t_1.isot, eclairs_attitude, [0,0,0], [0,0,0], ECLAIRsDetectorEffectDefault())
obs.gti_compute()
catalog_fov = cat.CatalogFovEclairs()
eclairs_pointing = ptgsvom.AttitudeECLAIRs()
eclairs_pointing.set_attitude_instru_ptgori(obs.attitude)
catalog_fov.set_ptg_instru(eclairs_pointing)
conf = PipelineConfiguration(dict_imag_cfg['version'])
bkg_correction_mode = cfg[obs_id]['bkg_correction_mode']
conf.build_bube_config(energy_ranges, bkg_correction_mode)
cat_ref = read_catalog(cfg, obs_id, catalog_fov)
conf.build_imag_config(dict_imag_cfg, cat_ref)
#start the process
ecpi_logger.logger.info('##########################')
ecpi_logger.logger.info('start the pipeline v1')
try:
input_files_type = cfg[obs_id]['input_files_type']
except:
input_files_type = 'events'
ecpi_logger.logger.warning("No input files type. Assuming events")
if input_files_type == 'shadowgrams':
pipeline = flow.Imag(working_dir, cfg, obs, conf, obs_id)
else:
pipeline = flow.DpcoCaliBubeImag(working_dir, cfg, obs, conf, obs_id)
#####
pipeline.run(show=args.show, verbose=(args.verbose == 2))
conf.imag.ref_catalog_fov.save_catalog_pix("{}/ref_catalog_fov.fits".format(working_dir))
ecpi_logger.logger.info('##########################')
ecpi_logger.logger.info(pipeline.error_code + ' : ' + pipeline.error_message)
ecpi_logger.logger.info('observation {} finished !'.format(obs_id))
ecpi_logger.logger.info('########################## all observations have been processed ##########################')
[docs]def check_directory(value_array):
if not os.path.exists(value_array):
raise argparse.ArgumentTypeError("%s isn't a valid directory" % value_array)
return value_array
[docs]def check_logger_level(str_level):
"""Check the validity of the logger level specified in ecpi_config
and choose the appropriate one.
"""
dict_lvls = {
'debug': logging.DEBUG,
'info': logging.INFO,
'warning': logging.WARNING,
'error': logging.ERROR,
'critical': logging.CRITICAL
}
try:
return dict_lvls[str_level]
except:
return dict_lvls['warning']
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description='local script to run the ECLAIRs pipeline',
epilog="""This is a executable python script to run the ECLAIRs pipeline in a local configuration.
The script needs a configuration file in argument. An example of a working configuration file could be found in ecpi/common/data/config_files/ecpi_config.cfg
""")
parser.add_argument(
'config_file',
help='configuration file absolute path',
type=argparse.FileType('r'))
parser.add_argument(
'output_dir',
help="""directory to store the output files from the pipeline.
For security reasons, files are store in a sub-directory named ECLAIRs_pipeline_dir_*, cleaned each time.""",
type=check_directory)
group = parser.add_mutually_exclusive_group()
group.add_argument('-v', '--verbose',
action='count',
help='increase output verbosity for additional information in stdout (e.g., no -v : quiet, -v : info, -vv : debug' )
parser.add_argument('-s', '--show',
action='store_true',
help='show flag for display some graphs' )
args = parser.parse_args()
# parse ecpi_config
cfg = configparser.ConfigParser(interpolation=configparser.ExtendedInterpolation())
cfg.read(args.config_file.name)
# set logger for ECPI script
ecpi_logger = ECPILogger(cfg)
main(args, cfg)