Source code for ecpi.process.pipeline.script_ecpi

#!/usr/bin/env python3
'''This is a executable python script to run the ECLAIRs pipeline in a local configuration.

The script needs a configuration file in argument.

.. seealso:: process/pipeline/readme.md

Le script has 2 inputs:
* configuration file: example here ecpi/common/data/config_files
* output directory

usage: ecpi_local_script.py [-h] [-v] [-s]
                                           config_file output_dir

local script to run the ECLAIRs pipeline v1

positional arguments:
  config_file    configuration file absolute path
  output_dir     directory to store the output files from the pipeline. For
                 security reasons, files are store in a sub-directory named
                 ECLAIRs_pipeline_dir_*, cleaned each time.

optional arguments:
  -h, --help     show this help message and exit
  -v, --verbose  increase output verbosity for additional information in
                 stdout (e.g., no -v : quiet, -v : info, -vv : debug
  -s, --show     show flag for display some graphs

This is a executable python script to run the ECLAIRs pipeline in a local
configuration. The script needs a configuration file in argument. An example
of a working configuration file could be found in
ecpi/common/data/config_files/ecpi_config.cfg

@author: Catalano Camille, APC/IN2P3/CNRS

'''

import argparse
import os
import logging
import json
import configparser
import numpy as np
import astropy.time as astrotime

import ecpi.process.pipeline.ecpi_flows as flow
import ecpi.common.sky.catalog as cat
import ecpi.common.mission.attitude as ptgsvom
from ecpi.logger_ecpi import ECPILogger
from ecpi.common.mission.observation import EclairsObservation
from ecpi.process.pipeline.ecpi_config import PipelineConfiguration
from ecpi.common.instru.model_effect import ECLAIRsDetectorEffectDefault


[docs]def read_catalog(cfg, obs_id, catalog_fov, log=None): """get the catalog for sources identification catalog_ref_type : - sources_file : configuration file with a list of sources - swift_cat : swift-BAT catalog - ECLAIRs_cat : a ECLAIRs sources catalog :param cfg: configuration container from the configuration file :type cfg: ConfigParser :param obs_id: observation section :type obs_id: string :param catalog_fov: sources reference catalog :type catalog_fov: CatalogFovEclairs :param log: optionnal logger for unit tests :type log: logger :return: updated sources catalog used in the simulation :rtype: CatalogFovEclairs """ if cfg[obs_id]['catalog_ref_type'] == 'sources_config': src_cfg = configparser.ConfigParser(interpolation=configparser.ExtendedInterpolation()) src_cfg.read(cfg[obs_id]['catalog_ref_path']) first_section = src_cfg.sections()[0] nb_chan = len(json.loads(src_cfg[first_section]['flux'])) catalog_fov.intensity_with_channel(nb_chan) for src in src_cfg.sections(): if src_cfg[src]['coord_type'] == '0': catalog_fov.add_src_pix([float(src_cfg[src]['Xs']), float(src_cfg[src]['Ys']), json.loads(src_cfg[src]['flux']), src]) elif src_cfg[src]['coord_type'] == '1': astro_cat = cat.CatalogAstroWithEnergySpecSampling(np.ones(nb_chan)) astro_cat.add_src([float(src_cfg[src]['Xs']),float(src_cfg[src]['Ys']),json.loads(src_cfg[src]['flux']),src]) catalog_fov.from_astro_catalog(astro_cat) else: if log is not None: log.logger.warning("coord_type not recognized in sources config file") else: ecpi_logger.logger.warning("coord_type not recognized in sources config file") return return catalog_fov elif cfg[obs_id]['catalog_ref_type'] == 'ECLAIRs_cat': catalog_fov.read_catalog(cfg[obs_id]['catalog_ref_path']) return catalog_fov elif cfg[obs_id]['catalog_ref_type'] == 'swift_cat': swift_cat = cat.CatalogAstroSourcesBasic() swift_cat.read_cat_swift_light() catalog_fov.from_astro_catalog(swift_cat) return catalog_fov else: if log is not None: log.logger.warning("catalog_ref_type not recognized") return
[docs]def read_observation_config(cfg, obs_id, log=None): """read the observation config parameters :param cfg: pipeline input configuration :type cfg: ConfigParser :param obs_id: observation section :type obs_id: string :param log: optionnal logger for unit tests :type log: logger :return: t_0, t_1, eclairs_attitude :rtype: Time, Time, array([ra, dec, ori]) """ try: t_obs = float(cfg[obs_id]['t_obs']) except: t_obs = 1000000000 try: t_0 = astrotime.Time(cfg[obs_id]['t_start'], format='isot', scale='tt') except: t_0 = astrotime.Time('2017-01-01T00:00:00.000', format='isot', scale='tt') t_1 = t_0 + astrotime.TimeDelta(t_obs, format='sec') try: eclairs_attitude = np.array([float(cfg[obs_id]['pointing_ra']), float(cfg[obs_id]['pointing_dec']), float(cfg[obs_id]['pointing_ori'])]) except: eclairs_attitude = np.array([0, 0, 0]) if log is not None: log.logger.warning("WARNING: no attitude in input. Default is 0,0,0.") return t_0, t_1, eclairs_attitude
[docs]def read_imag_config(cfg, log=None): """read the general imag config parameters :param cfg: pipeline input configuration :type cfg: ConfigParser :param log: optionnal logger for unit tests :type log: logger :return: dictionary containing the necesary parameters for the IMAG module. :type: dict :var version: pipeline version to determine which deconv and simu module to use ['1' or '1.1'] :type version: string :var src_subimage_size: nb of pixels around the detected source (radius) kept in the final sky image :type src_subimage_size: int :var snr_limit: snr limit to stop at the source detection algorithm :type snr_limit: float >0 :var fit_function: function to fit the precise source location. :type fit_function: string :var fit_sigma: standard deviation of the gaussian fit_function. Default is 1.555. :type fit_sigma: float>0 :var off_axis: off axis correction. Default=None :type off_axis: None, cos_theta or irf """ snr_limit = float(cfg['general']['snr_limit']) src_subimage_size = int(cfg['general']['src_subimage_size']) if cfg['general']['random_seed'] != '0': np.random.seed(int(cfg['general']['random_seed'])) if cfg['general']['IMAG_simulation_method'] == 'projection': version="1.1" elif cfg['general']['IMAG_simulation_method'] == 'ray_tracing': version="1" else: if log is not None: log.logger.warning("IMAG simulation method not recognized") else: ecpi_logger.logger.warning("IMAG simulation method not recognized") return off_axis_correction = cfg['general']['off_axis_correction'] fit_function = cfg['general']['fit_function'] try: fit_sigma = float(cfg['general']['fit_sigma']) except: fit_sigma = 1.155 dict_imag_cfg = { 'snr_limit': snr_limit, 'src_subimage_size': src_subimage_size, 'version': version, 'off_axis_correction': off_axis_correction, 'fit_function': fit_function, 'fit_sigma': fit_sigma } return dict_imag_cfg
[docs]def main(args, cfg): """This is the main function of the executable python script to run the ECLAIRs pipeline in local configuration. :param args: argument parser :type args: parse_args :param cfg: configuration container from the configuration file :type cfg: ConfigParser """ dict_imag_cfg = read_imag_config(cfg) try: energy_ranges = json.loads(cfg['general']['channels_ranges']) except: energy_ranges = [[0, 0]] ecpi_logger.logger.info('########################## ECLAIRs pipeline local script ##########################') ecpi_logger.logger.info('reading config file') for obs_id in cfg.sections()[1:]: # env config ecpi_logger.logger.info('########################## NEW OBSERVATION ##########################') ecpi_logger.logger.info('##########################') ecpi_logger.logger.info('configuration of the pipeline environment') working_dir = os.path.join(args.output_dir,'./ECLAIRs_pipeline_dir_{}'.format(obs_id)) working_dir = os.path.abspath(working_dir) try: os.makedirs(working_dir) except FileExistsError: pass os.system('rm -rf ' + working_dir + '/*') # observation config ecpi_logger.logger.info('working directory : {}'.format(working_dir)) ecpi_logger.logger.info('##########################') ecpi_logger.logger.info('configuration of the observation {}'.format(obs_id)) t_0, t_1, eclairs_attitude = read_observation_config(cfg, obs_id) #early conf to check params obs = EclairsObservation(t_0.isot, t_1.isot, eclairs_attitude, [0,0,0], [0,0,0], ECLAIRsDetectorEffectDefault()) obs.gti_compute() catalog_fov = cat.CatalogFovEclairs() eclairs_pointing = ptgsvom.AttitudeECLAIRs() eclairs_pointing.set_attitude_instru_ptgori(obs.attitude) catalog_fov.set_ptg_instru(eclairs_pointing) conf = PipelineConfiguration(dict_imag_cfg['version']) bkg_correction_mode = cfg[obs_id]['bkg_correction_mode'] conf.build_bube_config(energy_ranges, bkg_correction_mode) cat_ref = read_catalog(cfg, obs_id, catalog_fov) conf.build_imag_config(dict_imag_cfg, cat_ref) #start the process ecpi_logger.logger.info('##########################') ecpi_logger.logger.info('start the pipeline v1') try: input_files_type = cfg[obs_id]['input_files_type'] except: input_files_type = 'events' ecpi_logger.logger.warning("No input files type. Assuming events") if input_files_type == 'shadowgrams': pipeline = flow.Imag(working_dir, cfg, obs, conf, obs_id) else: pipeline = flow.DpcoCaliBubeImag(working_dir, cfg, obs, conf, obs_id) ##### pipeline.run(show=args.show, verbose=(args.verbose == 2)) conf.imag.ref_catalog_fov.save_catalog_pix("{}/ref_catalog_fov.fits".format(working_dir)) ecpi_logger.logger.info('##########################') ecpi_logger.logger.info(pipeline.error_code + ' : ' + pipeline.error_message) ecpi_logger.logger.info('observation {} finished !'.format(obs_id)) ecpi_logger.logger.info('########################## all observations have been processed ##########################')
[docs]def check_directory(value_array): if not os.path.exists(value_array): raise argparse.ArgumentTypeError("%s isn't a valid directory" % value_array) return value_array
[docs]def check_logger_level(str_level): """Check the validity of the logger level specified in ecpi_config and choose the appropriate one. """ dict_lvls = { 'debug': logging.DEBUG, 'info': logging.INFO, 'warning': logging.WARNING, 'error': logging.ERROR, 'critical': logging.CRITICAL } try: return dict_lvls[str_level] except: return dict_lvls['warning']
if __name__ == '__main__': parser = argparse.ArgumentParser( description='local script to run the ECLAIRs pipeline', epilog="""This is a executable python script to run the ECLAIRs pipeline in a local configuration. The script needs a configuration file in argument. An example of a working configuration file could be found in ecpi/common/data/config_files/ecpi_config.cfg """) parser.add_argument( 'config_file', help='configuration file absolute path', type=argparse.FileType('r')) parser.add_argument( 'output_dir', help="""directory to store the output files from the pipeline. For security reasons, files are store in a sub-directory named ECLAIRs_pipeline_dir_*, cleaned each time.""", type=check_directory) group = parser.add_mutually_exclusive_group() group.add_argument('-v', '--verbose', action='count', help='increase output verbosity for additional information in stdout (e.g., no -v : quiet, -v : info, -vv : debug' ) parser.add_argument('-s', '--show', action='store_true', help='show flag for display some graphs' ) args = parser.parse_args() # parse ecpi_config cfg = configparser.ConfigParser(interpolation=configparser.ExtendedInterpolation()) cfg.read(args.config_file.name) # set logger for ECPI script ecpi_logger = ECPILogger(cfg) main(args, cfg)