Source code for ecpi.process.pipeline.ecpi_flows

'''main module to run a ECLAIRs' pipeline's flows

Created on 20 mars. 2018

@author: Catalano Camille, APC/IN2P3/CNRS
'''

    
import os
import logging
import astropy.table as at
import astropy.units as u
import numpy as np
from copy import deepcopy

import ecpi.common.io.events as Xevt
from ecpi.process.bube.core import main_bube
from ecpi.process.imag.core import main_imag
import ecpi.process.bube.io.outputs as bube_out
import ecpi.process.imag.io.outputs as imag_out
from ecpi.common.io.fits_tools import set_array_from_file
from ecpi.analysis.lib.images import SkyImageStatistics
from ecpi.common.mission.observation import EclairsObservation
from ecpi.process.pipeline.ecpi_config import PipelineConfiguration


[docs]class WorkFlow(object): """ECLAIRs pipeline workflow base object init the attributes checking the parameters """ def __init__(self, work_dir, cfg, obs, conf, proc_id): """**Constructor** get the different parameters used by the process. check the validity of the parameters. construct the right deconvolution module for the process. :param work_dir: working directory where to save scientific products :param cfg: pipeline input configuration :param obs: observation parameters :param conf: pipeline config :param proc_id: id number of the process :type work_dir: string (absolute path) :type cfg: ConfigParser :type obs: EclairsObservation :type conf: PipelineConfiguration :type proc_id: string """ # declare logger self.logger = logging.getLogger(__name__) self.logger.info('creating an instance of Workflow') self.error_code = "OK" self.error_message = "I love it when a plan comes together." self.check_work_dir(work_dir) self.work_dir = work_dir self.input_dir = self.check_input_dir(cfg, proc_id) self.cfg = cfg self.config = conf self.obs = obs self.proc_id = proc_id if self.config.error_code != "OK": self.error_code = self.config.error_code self.error_message = self.config.error_message elif self.obs.error_code != "OK": self.error_code = self.obs.error_code self.error_message = self.obs.error_message
[docs] def check_work_dir(self, work_dir): """check the working directory parameter :param work_dir: working directory where to save scientific products :type work_dir: any (should be string (absolute path)) """ if not os.path.exists(work_dir): self.error_code = "ConfigError" self.error_message = "InputParamsError : working directory is not a valid directory path"
[docs] def check_input_dir(self, cfg, proc_id): """check the working directory parameter :param cfg: pipeline input configuration :type cfg: ConfigParser :param proc_id: id number of the process :type proc_id: string :return: input directory :rtype: string """ input_dir = '' try: input_dir = cfg[proc_id]['input'] except: self.error_code = "ConfigError" self.error_message = "InputParamsError : key error with {}".format(proc_id) if not os.path.exists(input_dir): self.error_code = "ConfigError" self.error_message = "InputParamsError : input evts directory is not a valid directory path" else: if not any(fname.endswith('.fits') for fname in os.listdir(input_dir)): self.error_code = "ConfigError" self.error_message = "InputParamsError : input evts directory has no fits files" return input_dir
[docs] def is_ok(self): """answer to the question if the pipeline status is 'OK' :return: True if status=='OK', False if not :rtype: bool """ return self.error_code == 'OK'
[docs] def run(self): pass
def _read_evts(self, evts_dirname): """read events from a directory :param evts_dirname: PATH of the directory with all the evts files. The directory must contains only evts files. :type evts_dirname: string :return: events :rtype: XEvtData """ evts = Xevt.EclairsCalEvtData() for evts_file in os.listdir(path=evts_dirname): try: evts.read(evts_dirname + '/' + evts_file, add_data=True) except (ValueError, KeyError): return 0, 'IOError', 'ReadEventsFileError : corrupted table Fits files. Impossible to read the events' return evts, 'OK', "I love it when a plan comes together." def _dpco(self, verbose=None): """run the dpco module read the L1 evts file :param verbose: activate print outputs :type verbose: bool (default False) """ corr_evts, self.error_code, self.error_message = self._read_evts(self.input_dir) if self.error_code != "OK": return corr_evts.tb_evts.remove_column('EBD') try: FLAG = at.Column(np.zeros(len(corr_evts.tb_evts)), name='FLAG', meta={'FLAG' : 'Flag pixel status'}, dtype='b') corr_evts.tb_evts.add_column(FLAG) except: corr_evts.tb_evts['FLAG'] = corr_evts.tb_evts['FLAG'] * 0 if verbose: self.logger.info('No correction is applied to the detector counts') self.evts = corr_evts def _cali(self, verbose=None): """run the cali module apply gain and offset :param verbose: activate print outputs :type verbose: bool (default False) """ try: PI = at.Column(np.array(self.evts.tb_evts['PHA']), name='PI', meta={'PI' : 'Pulse height Invariant energy channel (0-1023)'}, dtype='i2', unit=u.adu) self.evts.tb_evts.add_column(PI, 4) except: self.evts.tb_evts['PI'] = self.evts.tb_evts['PHA'] if verbose: self.logger.info('No calibration: PHA=PI') def _bube(self, verbose=None): """run the bube module create the shadowgrams from the events save detector images and computed gti :param verbose: activate print outputs :type verbose: bool (default False) """ if verbose: self.logger.info('Selection and binning of the calibrated events...') self.detector_images, self.obs, self.error_code, self.error_message = main_bube.bube( self.evts, self.obs, self.config.bube ) if self.error_code != 'OK': return bube_out.save_detector_images(self.detector_images, self.obs.attitude, self.obs.start_time, self.obs.end_time, self.work_dir, self.config.name, self.proc_id) bube_out.save_gti(self.obs, self.work_dir, self.obs.attitude, self.config.name, self.proc_id) def _off_axis_correction(self, sky_images, catalog_sources, verbose=None): """apply the off axis correction on imag outputs :param sky_images: sky images from imag :type sky_images: SkyImages :param catalog_sources: identified sources catalog from imag :type catalog_sources: CatalogIdentifiedSources :param verbose: activate print outputs :type verbose: bool (default False) :return: corrected sky images and catalog :rtype: SkyImages, CatalogIdentifiedSources """ if self.config.imag.off_axis == 'irf': sky_images.irf_correction(self.obs.dpix) catalog_sources.irf_correction(self.obs.dpix, verbose) elif self.config.imag.off_axis == 'cos_theta': sky_images.cos_theta_correction() catalog_sources.cos_theta_correction(verbose=verbose) elif verbose: self.logger.info('No off axis effects correction') return sky_images, catalog_sources def _imag(self, show=False, verbose=None): """run the imag module create and save the sky images, models images and found sources catalog :param show: show and save plots of the sky :type show: bool (defaut False) :param verbose: activate print outputs :type verbose: bool (default False) """ energy_ranges = self.detector_images.energy_ranges if verbose: self.logger.info('Searching for x-ray sources...') self.catalogs_final = [] self.catalogs_sources = [] self.sky_images = [] self.new_source_flag = [] # imag for each shadowgrams and energy ranges for shadowgram, shadowgram_var, energy_range in zip(self.detector_images.shadowgrams, self.detector_images.shadowgrams_var, energy_ranges): # call IMAG module. catalog_sources, sky_images, new_source_flag = main_imag.imag( shadowgram, shadowgram_var, self.obs, self.config.imag, self.proc_id ) # correct off axis effects sky_images, catalog_sources = self._off_axis_correction( sky_images, catalog_sources, verbose ) self.catalogs_sources.append(catalog_sources) self.sky_images.append(sky_images) self.new_source_flag.append(new_source_flag) #final statistics analysis image_analysis = SkyImageStatistics(sky_images.cleaned_sky_count, sky_images.cleaned_sky_snr) stats, srcs = image_analysis.run(100, self.config.imag.snr_limit) self.catalogs_final.append(image_analysis._build_catalog(srcs)) if catalog_sources.get_nb_element() == 100: self.logger.warning(f'Max number of possible sources reached (100) for E={energy_range}keV') elif catalog_sources.get_nb_element() == 0: self.logger.warning(f'No source was found for E={energy_range}keV') if new_source_flag == 1: self.logger.warning(f"An unknown source has been found !!! for E={energy_range}keV") if verbose: self.logger.info(f"{catalog_sources.get_nb_element()} sources were found for E={energy_range}keV") self.logger.info(f"Stats on sky remains: \ mean={stats[0]:.2E}, median={stats[1]:.2E}, std={stats[2]:.2E}") catalog_sources.plot_on_sphere(f"identified x-ray sources for E={energy_range}keV", show=show, save=self.work_dir + "/id_sources_sky_plot_{}.png".format(self.proc_id)) debug = 1 obs_info_dict = { 'tstart': self.obs.start_time, 'tstop': self.obs.end_time, 'energy_ranges': energy_ranges, 'attitude': self.obs.attitude } if debug: imag_out.save_sky_images_debug(self.sky_images, self.work_dir, obs_info_dict, self.config.name, self.proc_id) imag_out.save_catalogs(self.catalogs_sources, self.work_dir, obs_info_dict, self.config.name, self.proc_id) imag_out.save_catalogs(self.catalogs_final, self.work_dir, obs_info_dict, self.config.name, self.proc_id+'_final') imag_out.save_sky_images(self.sky_images, self.work_dir, obs_info_dict, self.config.name, self.proc_id) if verbose: self.logger.info(f'Found sources catalog in {self.work_dir}') if self.config.imag.ref_catalog_fov is not None: self.config.imag.ref_catalog_fov.plot_on_astro_sphere("x-ray sources in FOV", show=show, save=self.work_dir + "/sources_inFOV_sky_plot.png")
[docs]class BubeImag(WorkFlow): """find sources in the evts between start_time and end_time Class to process the pipeline finding sources during a stable pointing observation. Save a catalog of the detected x-ray sources. """ def __init__(self, work_dir, cfg, obs, conf, proc_id): """**Constructor** get the different parameters used by the process. check the validity of the parameters. construct the right deconvolution module for the process. :param work_dir: working directory where to save scientific products :param cfg: pipeline input configuration :param obs: observation parameters :param conf: pipeline config :param proc_id: id number of the process :type work_dir: string (absolute path) :type cfg: ConfigParser print(cfg['general']['snr_limit']) print(cvb) :type obs: EclairsObservation :type conf: PipelineConfiguration :type proc_id: string """ super().__init__(work_dir, cfg, obs, conf, proc_id) if not hasattr(self.config, 'imag') or not hasattr(self.config, 'bube'): self.error_code = 'ConfigError' self.error_message = 'Missing configuration for BubeImag workflow'
[docs] def set_snr_limit(self, snr_limit): self.config.snr_limit = snr_limit
[docs] def run(self, show=False, verbose=False): """main process get the evts files and create the shadowgrams to process deconvolution of the shadowgrams and iterative searching/cleaning on sources :outputfiles: ECL-DET-IMA.fits: Prepared detector images\n ECL-DET-UBC.fits: Uniformity & Background Corrected detector images\n ECL-GTI-3.fits: Good Time Intervals\n ECL-DET-MOD.fits: Source Model Images\n ECL-SKY-IMA.fits: Reconstructed Sky Images\n ECL-SOP-IMA.fits: Sources parameters from sky Images\n .. note:: for the output products see: General Program Software Description Document Requirements for the Scientific Data Products and Processing [...] (SR4) :param show: show and save plots of the sky :type show: bool (defaut False) :param verbose: activate print outputs :type verbose: bool (default False) """ if self.error_code != 'OK': return if verbose: self.logger.info(f'Events files in {self.input_dir}') if self.obs.attitude is not None: self.logger.info(f'Attitude ECLAIRs: \ RA={self.obs.attitude[0]:05.2f} deg \ DEC={self.obs.attitude[1]:05.2f} deg \ ORI={self.obs.attitude[2]:05.2f}deg') ###################################### #read calibrated events self.evts, self.error_code, self.error_message = self._read_evts(self.input_dir) if self.error_code != 'OK': return ###################################### #bube module self._bube(verbose) if self.error_code != 'OK': return ##################################### #imaging module self._imag(show, verbose)
[docs]class DpcoCaliBubeImag(WorkFlow): """find sources in the L1 evts between start_time and end_time Class to process the pipeline finding sources during a stable pointing observation. Save a catalog of the detected x-ray sources. """ def __init__(self, work_dir, cfg, obs : EclairsObservation, conf : PipelineConfiguration, proc_id): """**Constructor** get the different parameters used by the process. check the validity of the parameters. construct the right deconvolution module for the process. :param work_dir: working directory where to save scientific products :param cfg: pipeline input configuration :param obs: observation parameters :param conf: pipeline config :param proc_id: id number of the process :type work_dir: string (absolute path) :type cfg: ConfigParser :type obs: EclairsObservation :type conf: PipelineConfiguration :type proc_id: string """ super().__init__(work_dir, cfg, obs, conf, proc_id) if not hasattr(self.config, 'imag') or not hasattr(self.config, 'bube'): self.error_code = 'ConfigError' self.error_message = 'Missing configuration for DpcoCaliBubeImag workflow'
[docs] def set_snr_limit(self, snr_limit): self.config.snr_limit = snr_limit
[docs] def run(self, show=False, verbose=False): """main process get the evts files and create the shadowgrams to process deconvolution of the shadowgrams and iterative searching/cleaning on sources :outputfiles: ECL-DET-IMA.fits: Prepared detector images\n ECL-DET-UBC.fits: Uniformity & Background Corrected detector images\n ECL-GTI-3.fits: Good Time Intervals\n ECL-DET-MOD.fits: Source Model Images\n ECL-SKY-IMA.fits: Reconstructed Sky Images\n ECL-SOP-IMA.fits: Sources parameters from sky Images\n .. note:: for the output products see: General Program Software Description Document Requirements for the Scientific Data Products and Processing [...] (SR4) :param show: show and save plots of the sky :type show: bool (defaut False) :param verbose: activate print outputs :type verbose: bool (default False) """ if self.error_code != 'OK': return if verbose: self.logger.info(f'Events files in {self.input_dir}') if self.obs.attitude is not None: self.logger.info(f'Attitude ECLAIRs: \ RA={self.obs.attitude[0]:05.2f} deg \ DEC={self.obs.attitude[1]:05.2f} deg \ ORI={self.obs.attitude[2]:05.2f}deg') ###################################### #dpco module self._dpco(verbose) if self.error_code != 'OK': return ###################################### #cali module self._cali(verbose) if self.error_code != 'OK': return ###################################### #bube module self._bube(verbose) if self.error_code != 'OK': return ##################################### #imaging module self._imag(show, verbose)
[docs]class Imag(WorkFlow): """find sources in the shadowgrams between start_time and end_time Class to process the pipeline finding sources during a stable pointing observation. Save a catalog of the detected x-ray sources. """ def __init__(self, work_dir, cfg, obs, conf, proc_id): """**Constructor** get the different parameters used by the process. check the validity of the parameters. construct the right deconvolution module for the process. :param work_dir: working directory where to save scientific products :param cfg: pipeline input configuration :param obs: observation parameters :param conf: pipeline config :param proc_id: id number of the process :type work_dir: string (absolute path) :type cfg: ConfigParser :type obs: EclairsObservation :type conf: PipelineConfiguration :type proc_id: string """ super().__init__(work_dir, cfg, obs, conf, proc_id) if not hasattr(self.config, 'imag') or not hasattr(self.config, 'bube'): self.error_code = 'ConfigError' self.error_message = 'Missing configuration for Imag workflow'
[docs] def set_snr_limit(self, snr_limit): self.config.snr_limit = snr_limit
[docs] def run(self, show=False, verbose=False): """main process get the shadowgrams deconvolution of the shadowgram and iterative searching/cleaning on sources :outputfiles: ECL-DET-MOD.fits: Source Model Images\n ECL-SKY-IMA.fits: Reconstructed Sky Images\n ECL-SOP-IMA.fits: Sources parameters from sky Images\n .. note:: for the output products see: General Program Software Description Document Requirements for the Scientific Data Products and Processing [...] (SR4) :param show: show and save plots of the sky :type show: bool (defaut False) :param verbose: activate print outputs :type verbose: bool (default False) """ if self.error_code != 'OK': return if verbose: self.logger.info(f'Shadowgram files in {self.input_dir}') if self.obs.attitude is not None: self.logger.info(f'Attitude ECLAIRs: \ RA={self.obs.attitude[0]:05.2f} deg \ DEC={self.obs.attitude[1]:05.2f} deg \ ORI={self.obs.attitude[2]:05.2f}deg') ###################################### #read sahdowgrams and loop on imaging module proc_id_base = deepcopy(self.proc_id) for shadowgram_file in os.listdir(path=self.input_dir): self.proc_id = proc_id_base + shadowgram_file.split('.')[0][10:] shadowgram = set_array_from_file(self.input_dir + '/' + shadowgram_file, 1) self.detector_images = main_bube.DetectorImages(shadowgram, shadowgram, self.obs.obs_time, self.config.bube.energy_ranges) self._imag(show, verbose)