Source code for process.pipeline.ecpi_script

#!/usr/bin/env python3
'''This is a executable python script to run the ECLAIRs pipeline in a local configuration.

The script needs a configuration file in argument.

.. seealso:: process/pipeline/readme.md

Le script has 2 inputs:
* configuration file: example here ecpi/common/data/config_files
* output directory

usage: ecpi_local_script.py [-h] [-v] [-s]
                                           config_file output_dir

local script to run the ECLAIRs pipeline v1

positional arguments:
  config_file    configuration file absolute path
  output_dir     directory to store the output files from the pipeline. For
                 security reasons, files are store in a sub-directory named
                 ECLAIRs_pipeline_dir_*, cleaned each time.

optional arguments:
  -h, --help     show this help message and exit
  -v, --verbose  increase output verbosity for additional information in
                 stdout (e.g., no -v : quiet, -v : info, -vv : debug
  -s, --show     show flag for display some graphs

This is a executable python script to run the ECLAIRs pipeline in a local
configuration. The script needs a configuration file in argument. An example
of a working configuration file could be found in
ecpi/common/data/config_files/ecpi_config.cfg

@author: Catalano Camille, APC/IN2P3/CNRS

'''

import argparse
import os
import configparser
import astropy.time as astrotime
import numpy as np
import logging
import json

import process.pipeline.ecpi_flows as flow
import common.sky.catalog as cat
from common.mission.observation import EclairsObservation
from process.pipeline.ecpi_config import PipelineConfiguration
from common.instru.model_effect import ECLAIRsDetectorEffectDefault
import common.mission.attitude as ptgsvom


[docs]def read_catalog(cfg, obs_id, catalog_fov): """get the catalog for sources identification catalog_ref_type : - sources_file : configuration file with a list of sources - swift_cat : swift-BAT catalog - ECLAIRs_cat : a ECLAIRs sources catalog :param cfg: configuration container from the configuration file :type cfg: ConfigParser :param obs_id: observation section :type obs_id: string :param catalog_fov: sources reference catalog :type catalog_fov: CatalogFovEclairs :return: updated sources catalog used in the simulation :rtype: CatalogFovEclairs """ if cfg[obs_id]['catalog_ref_type'] == 'sources_config': src_cfg = configparser.ConfigParser() src_cfg.read(cfg[obs_id]['catalog_ref_path']) first_section = src_cfg.sections()[0] nb_chan = len(json.loads(src_cfg[first_section]['flux'])) catalog_fov.intensity_with_channel(nb_chan) for src in src_cfg.sections(): if src_cfg[src]['coord_type'] == '0': catalog_fov.add_src_pix([float(src_cfg[src]['Xs']),float(src_cfg[src]['Ys']),json.loads(src_cfg[src]['flux']),src]) elif src_cfg[src]['coord_type'] == '1': astro_cat = cat.CatalogAstroWithEnergySpecSampling(np.ones(nb_chan)) astro_cat.add_src([float(src_cfg[src]['Xs']),float(src_cfg[src]['Ys']),json.loads(src_cfg[src]['flux']),src]) catalog_fov.from_astro_catalog(astro_cat) else: logging.warning("coord_type not recognized in sources config file") return return catalog_fov elif cfg[obs_id]['catalog_ref_type'] == 'ECLAIRs_cat': catalog_fov.read_catalog(cfg[obs_id]['catalog_ref_path']) return catalog_fov elif cfg[obs_id]['catalog_ref_type'] == 'swift_cat': swift_cat = cat.CatalogAstroSourcesBasic() swift_cat.read_cat_swift_light() catalog_fov.from_astro_catalog(swift_cat) return catalog_fov else: logging.warning("catalog_ref_type not recognized") return
[docs]def read_observation_config(cfg, obs_id): """read the observation config parameters :param cfg: pipeline input configuration :type cfg: ConfigParser :param obs_id: observation section :type obs_id: string :return: t_0, t_1, eclairs_attitude :rtype: Time, Time, array([ra, dec, ori]) """ try: t_obs = float(cfg[obs_id]['t_obs']) except: t_obs = 1000000000 try: t_0 = astrotime.Time(cfg[obs_id]['t_start'], format='isot', scale='tt') except: t_0 = astrotime.Time('2017-01-01T00:00:00.000', format='isot', scale='tt') t_1 = t_0 + astrotime.TimeDelta(t_obs, format='sec') try: eclairs_attitude = np.array([float(cfg[obs_id]['pointing_ra']), float(cfg[obs_id]['pointing_dec']), float(cfg[obs_id]['pointing_ori'])]) except: eclairs_attitude = np.array([0, 0, 0]) logging.warning("WARNING: no attitude in input. Default is 0,0,0.") return t_0, t_1, eclairs_attitude
[docs]def read_imag_config(cfg): """read the general imag config parameters :param cfg: pipeline input configuration :type cfg: ConfigParser :return: snr_limit, src_subimage_size, version, off_axis_correction :rtype: float, int, string, string """ snr_limit = float(cfg['general']['snr_limit']) src_subimage_size = int(cfg['general']['src_subimage_size']) if cfg['general']['random_seed'] != '0': np.random.seed(int(cfg['general']['random_seed'])) if cfg['general']['IMAG_simulation_method'] == 'projection': version="1.1" elif cfg['general']['IMAG_simulation_method'] == 'ray_tracing': version="1" else: logging.warning("IMAG simulation method not recognized") return off_axis_correction = cfg['general']['off_axis_correction'] return snr_limit, src_subimage_size, version, off_axis_correction
[docs]def main(args): """This is the main function of the executable python script to run the ECLAIRs pipeline in local configuration. :param args: argument parser :type args: parse_args """ logging.info('########################## ECLAIRs pipeline local script ##########################') logging.info('reading config file') cfg = configparser.ConfigParser() cfg.read(args.config_file.name) snr_limit, src_subimage_size, version, off_axis_correction = read_imag_config(cfg) try: energy_ranges = json.loads(cfg['general']['channels_ranges']) except: energy_ranges = [[0, 0]] for obs_id in cfg.sections()[1:]: # env config logging.info('########################## NEW OBSERVATION ##########################') logging.info('##########################') logging.info('configuration of the pipeline environment') working_dir = os.path.join(args.output_dir,'./ECLAIRs_pipeline_dir_{}'.format(obs_id)) working_dir = os.path.abspath(working_dir) try: os.makedirs(working_dir) except FileExistsError: pass os.system('rm -rf ' + working_dir + '/*') # observation config logging.info('working directory : {}'.format(working_dir)) logging.info('##########################') logging.info('configuration of the observation {}'.format(obs_id)) t_0, t_1, eclairs_attitude = read_observation_config(cfg, obs_id) #early conf to check params obs = EclairsObservation(t_0.isot, t_1.isot, eclairs_attitude, [0,0,0], [0,0,0], ECLAIRsDetectorEffectDefault()) obs.gti_compute() catalog_fov = cat.CatalogFovEclairs() eclairs_pointing = ptgsvom.AttitudeECLAIRs() eclairs_pointing.set_attitude_instru_ptgori(obs.attitude) catalog_fov.set_ptg_instru(eclairs_pointing) conf = PipelineConfiguration(version) conf.build_bube_config(energy_ranges) conf.build_imag_config(version, src_subimage_size, snr_limit, read_catalog(cfg, obs_id, catalog_fov), off_axis_correction) #start the process logging.info('##########################') logging.info('start the pipeline v1') try: input_files_type = cfg[obs_id]['input_files_type'] except: input_files_type = 'events' logging.warning("WARNING: no input files type. Assuming events") if input_files_type == 'shadowgrams': pipeline = flow.Imag(working_dir, cfg[obs_id]['input'], obs, conf, obs_id) else: pipeline = flow.DpcoCaliBubeImag(working_dir, cfg[obs_id]['input'], obs, conf, obs_id) ##### pipeline.run(show=args.show, verbose=(args.verbose == 2)) conf.imag.ref_catalog_fov.save_catalog_pix("{}/ref_catalog_fov.fits".format(working_dir)) logging.info('##########################') logging.info(pipeline.error_code + ' : ' + pipeline.error_message) logging.info('observation {} finished !'.format(obs_id)) logging.info('########################## all observations have been processed ##########################')
[docs]def check_directory(value_array): if not os.path.exists(value_array): raise argparse.ArgumentTypeError("%s isn't a valid directory" % value_array) return value_array
if __name__ == '__main__': parser = argparse.ArgumentParser( description='local script to run the ECLAIRs pipeline', epilog="""This is a executable python script to run the ECLAIRs pipeline in a local configuration. The script needs a configuration file in argument. An example of a working configuration file could be found in ecpi/common/data/config_files/ecpi_config.cfg """) parser.add_argument( 'config_file', help='configuration file absolute path', type=argparse.FileType('r')) parser.add_argument( 'output_dir', help="""directory to store the output files from the pipeline. For security reasons, files are store in a sub-directory named ECLAIRs_pipeline_dir_*, cleaned each time.""", type=check_directory) group = parser.add_mutually_exclusive_group() group.add_argument('-v', '--verbose', action='count', help='increase output verbosity for additional information in stdout (e.g., no -v : quiet, -v : info, -vv : debug' ) parser.add_argument('-s', '--show', action='store_true', help='show flag for display some graphs' ) args = parser.parse_args() if args.verbose == 2: logging.basicConfig(format='%(message)s', level=logging.DEBUG) elif args.verbose == 1: logging.basicConfig(format='%(message)s', level=logging.INFO) main(args)