Source code for process.pipeline.ecpi_flows

'''main module to run a ECLAIRs' pipeline's flows

Created on 20 mars. 2018

@author: Catalano Camille, APC/IN2P3/CNRS
'''

    
import os
import astropy.table as at
import astropy.units as u
import numpy as np
from copy import deepcopy

import common.io.events as Xevt
from process.bube.core import BUBE
from process.imag.core import IMAG
import process.bube.io.outputs as bube_out
import process.imag.io.outputs as imag_out
from common.io.fits_tools import set_array_from_file
from process.bube.core.BubeProcesses import DetectorImages
from analysis.lib.images import SkyImageStatistics


[docs]class WorkFlow(object): """ECLAIRs pipeline workflow base object init the attributes checking the parameters """ def __init__(self, work_dir, input_dir, obs, conf, proc_id): """**Constructor** get the different parameters used by the process. check the validity of the parameters. construct the right deconvolution module for the process. :param work_dir: working directory where to save scientific products :param input_dir: local directory with evts in it :param obs: observation parameters :param conf: pipeline config :param proc_id: id number of the process :type work_dir: string (absolute path) :type input_dir: string (absolute path) :type obs: EclairsObservation :type conf: PipelineConfiguration :type proc_id: string """ self.error_code = "OK" self.error_message = "I love it when a plan comes together." self.check_work_dir(work_dir) self.check_input_dir(input_dir) self.work_dir = work_dir self.input_dir = input_dir self.config = conf self.obs = obs self.proc_id = proc_id if self.config.error_code != "OK": self.error_code = self.config.error_code self.error_message = self.config.error_message elif self.obs.error_code != "OK": self.error_code = self.obs.error_code self.error_message = self.obs.error_message
[docs] def check_work_dir(self, work_dir): """check the working directory parameter :param work_dir: working directory where to save scientific products :type work_dir: any (should be string (absolute path)) """ if not os.path.exists(work_dir): self.error_code = "ConfigError" self.error_message = "InputParamsError : working directory is not a valid directory path"
[docs] def check_input_dir(self, input_dir): """check the input local directory parameter check in addition if there is any fits file in the directory :param input_dir: local directory with evts in it :type input_dir: any (should be string (absolute path)) """ if not os.path.exists(input_dir): self.error_code = "ConfigError" self.error_message = "InputParamsError : input evts directory is not a valid directory path" if not any(fname.endswith('.fits') for fname in os.listdir(input_dir)): self.error_code = "ConfigError" self.error_message = "InputParamsError : input evts directory has no fits files"
[docs] def is_ok(self): """answer to the question if the pipeline status is 'OK' :return: True if status=='OK', False if not :rtype: bool """ return self.error_code == 'OK'
[docs] def run(self): pass
def _read_evts(self, evts_dirname): """read events from a directory :param evts_dirname: PATH of the directory with all the evts files. The directory must contains only evts files. :type evts_dirname: string :return: events :rtype: XEvtData """ evts = Xevt.EclairsCalEvtData() for evts_file in os.listdir(path=evts_dirname): try: evts.read(evts_dirname + '/' + evts_file, add_data=True) except (ValueError, KeyError): return 0, 'IOError', 'ReadEventsFileError : corrupted table Fits files. Impossible to read the events' return evts, 'OK', "I love it when a plan comes together." def _dpco(self, verbose=None): """run the dpco module read the L1 evts file :param verbose: activate print outputs :type verbose: bool (default False) """ corr_evts, self.error_code, self.error_message = self._read_evts(self.input_dir) if self.error_code != "OK": return corr_evts.tb_evts.remove_column('EBD') try: FLAG = at.Column(np.zeros(len(corr_evts.tb_evts)), name='FLAG', meta={'FLAG' : 'Flag pixel status'}, dtype='b') corr_evts.tb_evts.add_column(FLAG) except: corr_evts.tb_evts['FLAG'] = corr_evts.tb_evts['FLAG'] * 0 if verbose: print("No correction is applied to the detector counts") self.evts = corr_evts def _cali(self, verbose=None): """run the cali module apply gain and offset :param verbose: activate print outputs :type verbose: bool (default False) """ try: PI = at.Column(np.array(self.evts.tb_evts['PHA']), name='PI', meta={'PI' : 'Pulse height Invariant energy channel (0-1023)'}, dtype='i2', unit=u.adu) self.evts.tb_evts.add_column(PI, 4) except: self.evts.tb_evts['PI'] = self.evts.tb_evts['PHA'] if verbose: print("No calibration: PHA=PI") def _bube(self, verbose=None): """run the bube module create the shadowgrams from the events save detector images and computed gti :param verbose: activate print outputs :type verbose: bool (default False) """ if verbose: print('select and binning of the calibrated events...') self.detector_images, self.obs, self.error_code, self.error_message = BUBE.bube( self.evts, self.obs, self.config.bube ) if self.error_code != 'OK': return bube_out.save_detector_images(self.detector_images, self.obs.start_time, self.obs.end_time, self.work_dir, self.config.name, self.proc_id) bube_out.save_gti(self.obs, self.work_dir, self.config.name, self.proc_id) def _off_axis_correction(self, sky_images, catalog_sources, verbose=None): """apply the off axis correction on imag outputs :param sky_images: sky images from imag :type sky_images: SkyImages :param catalog_sources: identified sources catalog from imag :type catalog_sources: CatalogIdentifiedSources :param verbose: activate print outputs :type verbose: bool (default False) :return: corrected sky images and catalog :rtype: SkyImages, CatalogIdentifiedSources """ if self.config.imag.off_axis == 'irf': sky_images.irf_correction(self.obs.dpix) catalog_sources.irf_correction(self.obs.dpix, verbose) elif self.config.imag.off_axis == 'cos_theta': sky_images.cos_theta_correction() catalog_sources.cos_theta_correction(verbose=verbose) elif verbose: print('No off axis effects correction') return sky_images, catalog_sources def _imag(self, show=False, verbose=None): """run the imag module create and save the sky images, models images and found sources catalog :param show: show and save plots of the sky :type show: bool (defaut False) :param verbose: activate print outputs :type verbose: bool (default False) """ energy_ranges = self.detector_images.energy_ranges if verbose: print('detection of x-ray sources...') self.catalogs_final = [] self.catalogs_sources = [] self.sky_images = [] self.new_source_flag = [] # imag for each shadowgrams and energy ranges for shadowgram, shadowgram_var, energy_range in zip(self.detector_images.shadowgrams, self.detector_images.shadowgrams_var, energy_ranges): catalog_sources, sky_images, new_source_flag = IMAG.imag( shadowgram, shadowgram_var, self.obs, self.config.imag, self.proc_id ) sky_images, catalog_sources = self._off_axis_correction(sky_images, catalog_sources, verbose) self.catalogs_sources.append(catalog_sources) self.sky_images.append(sky_images) self.new_source_flag.append(new_source_flag) #final statistics analysis image_analysis = SkyImageStatistics(sky_images.cleaned_sky_count, sky_images.cleaned_sky_snr) stats, srcs = image_analysis.run(100, self.config.imag.snr_limit) self.catalogs_final.append(image_analysis._build_catalog(srcs)) if catalog_sources.get_nb_element() == 100: print(f'WARNING : max findable sources reached (100) for E={energy_range}keV') elif catalog_sources.get_nb_element() == 0: print(f'WARNING : No source was found for E={energy_range}keV') if new_source_flag == 1: print(f"WARNING: !!!! An unkown source has been found !!! for E={energy_range}keV") if verbose: print(f"{catalog_sources.get_nb_element()} sources were found for E={energy_range}keV") print("stats on sky remains: mean={:.2E}, median={:.2E}, std={:.2E}".format(stats[0], stats[1], stats[2])) catalog_sources.plot_on_sphere(f"identified x-ray sources for E={energy_range}keV", show=show, save=self.work_dir + "/id_sources_sky_plot_{}.png".format(self.proc_id)) debug = 1 if debug: imag_out.save_sky_images_debug(self.sky_images, self.work_dir, self.obs.start_time, self.obs.end_time, energy_ranges, self.config.name, self.proc_id) imag_out.save_catalogs(self.catalogs_sources, self.work_dir, self.obs.start_time, self.obs.end_time, energy_ranges, self.config.name, self.proc_id) imag_out.save_catalogs(self.catalogs_final, self.work_dir, self.obs.start_time, self.obs.end_time, energy_ranges, self.config.name, self.proc_id+'_final') imag_out.save_sky_images(self.sky_images, self.work_dir, self.obs.start_time, self.obs.end_time, energy_ranges, self.config.name, self.proc_id) if verbose: print(f'found sources catalog in {self.work_dir}') if self.config.imag.ref_catalog_fov is not None: self.config.imag.ref_catalog_fov.plot_on_astro_sphere("x-ray sources in FOV", show=show, save=self.work_dir + "/sources_inFOV_sky_plot.png")
[docs]class BubeImag(WorkFlow): """find sources in the evts between start_time and end_time Class to process the pipeline finding sources during a stable pointing observation. Save a catalog of the detected x-ray sources. """ def __init__(self, work_dir, input_dir, obs, conf, proc_id): """**Constructor** get the different parameters used by the process. check the validity of the parameters. construct the right deconvolution module for the process. :param work_dir: working directory where to save scientific products :param input_dir: local directory with evts in it :param obs: observation parameters :param conf: pipeline config :param proc_id: id number of the process :type work_dir: string (absolute path) :type input_dir: string (absolute path) :type obs: EclairsObservation :type conf: PipelineConfiguration :type proc_id: string """ super().__init__(work_dir, input_dir, obs, conf, proc_id) if not hasattr(self.config, 'imag') or not hasattr(self.config, 'bube'): self.error_code = 'ConfigError' self.error_message = 'Missing configuration for BubeImag workflow'
[docs] def set_snr_limit(self, snr_limit): self.config.snr_limit = snr_limit
[docs] def run(self, show=False, verbose=False): """main process get the evts files and create the shadowgrams to process deconvolution of the shadowgrams and iterative searching/cleaning on sources :outputfiles: ECL-DET-IMA.fits: Prepared detector images\n ECL-DET-UBC.fits: Uniformity & Background Corrected detector images\n ECL-GTI-3.fits: Good Time Intervals\n ECL-DET-MOD.fits: Source Model Images\n ECL-SKY-IMA.fits: Reconstructed Sky Images\n ECL-SOP-IMA.fits: Sources parameters from sky Images\n .. note:: for the output products see: General Program Software Description Document Requirements for the Scientific Data Products and Processing [...] (SR4) :param show: show and save plots of the sky :type show: bool (defaut False) :param verbose: activate print outputs :type verbose: bool (default False) """ if self.error_code != 'OK': return if verbose: print('evts files in {}'.format(self.input_dir)) if self.obs.attitude is not None: print("ECLAIRs' attitude: RA={:05.2f}deg, DEC={:05.2f}deg, orientation={:05.2f}deg".format(self.obs.attitude[0], self.obs.attitude[1], self.obs.attitude[2])) ###################################### #read calibrated events self.evts, self.error_code, self.error_message = self._read_evts(self.input_dir) if self.error_code != 'OK': return ###################################### #bube module self._bube(verbose) if self.error_code != 'OK': return ##################################### #imaging module self._imag(show, verbose)
[docs]class DpcoCaliBubeImag(WorkFlow): """find sources in the L1 evts between start_time and end_time Class to process the pipeline finding sources during a stable pointing observation. Save a catalog of the detected x-ray sources. """ def __init__(self, work_dir, input_dir, obs, conf, proc_id): """**Constructor** get the different parameters used by the process. check the validity of the parameters. construct the right deconvolution module for the process. :param work_dir: working directory where to save scientific products :param input_dir: local directory with evts in it :param obs: observation parameters :param conf: pipeline config :param proc_id: id number of the process :type work_dir: string (absolute path) :type input_dir: string (absolute path) :type obs: EclairsObservation :type conf: PipelineConfiguration :type proc_id: string """ super().__init__(work_dir, input_dir, obs, conf, proc_id) if not hasattr(self.config, 'imag') or not hasattr(self.config, 'bube'): self.error_code = 'ConfigError' self.error_message = 'Missing configuration for DpcoCaliBubeImag workflow'
[docs] def set_snr_limit(self, snr_limit): self.config.snr_limit = snr_limit
[docs] def run(self, show=False, verbose=False): """main process get the evts files and create the shadowgrams to process deconvolution of the shadowgrams and iterative searching/cleaning on sources :outputfiles: ECL-DET-IMA.fits: Prepared detector images\n ECL-DET-UBC.fits: Uniformity & Background Corrected detector images\n ECL-GTI-3.fits: Good Time Intervals\n ECL-DET-MOD.fits: Source Model Images\n ECL-SKY-IMA.fits: Reconstructed Sky Images\n ECL-SOP-IMA.fits: Sources parameters from sky Images\n .. note:: for the output products see: General Program Software Description Document Requirements for the Scientific Data Products and Processing [...] (SR4) :param show: show and save plots of the sky :type show: bool (defaut False) :param verbose: activate print outputs :type verbose: bool (default False) """ if self.error_code != 'OK': return if verbose: print('evts files in {}'.format(self.input_dir)) if self.obs.attitude is not None: print("ECLAIRs' attitude: RA={:05.2f}deg, DEC={:05.2f}deg, orientation={:05.2f}deg".format(self.obs.attitude[0], self.obs.attitude[1], self.obs.attitude[2])) ###################################### #dpco module self._dpco(verbose) if self.error_code != 'OK': return ###################################### #cali module self._cali(verbose) if self.error_code != 'OK': return ###################################### #bube module self._bube(verbose) if self.error_code != 'OK': return ##################################### #imaging module self._imag(show, verbose)
[docs]class Imag(WorkFlow): """find sources in the shadowgrams between start_time and end_time Class to process the pipeline finding sources during a stable pointing observation. Save a catalog of the detected x-ray sources. """ def __init__(self, work_dir, input_dir, obs, conf, proc_id): """**Constructor** get the different parameters used by the process. check the validity of the parameters. construct the right deconvolution module for the process. :param work_dir: working directory where to save scientific products :param input_dir: local directory with evts in it :param obs: observation parameters :param conf: pipeline config :param proc_id: id number of the process :type work_dir: string (absolute path) :type input_dir: string (absolute path) :type obs: EclairsObservation :type conf: PipelineConfiguration :type proc_id: string """ super().__init__(work_dir, input_dir, obs, conf, proc_id) if not hasattr(self.config, 'imag') or not hasattr(self.config, 'bube'): self.error_code = 'ConfigError' self.error_message = 'Missing configuration for Imag workflow'
[docs] def set_snr_limit(self, snr_limit): self.config.snr_limit = snr_limit
[docs] def run(self, show=False, verbose=False): """main process get the shadowgrams deconvolution of the shadowgram and iterative searching/cleaning on sources :outputfiles: ECL-DET-MOD.fits: Source Model Images\n ECL-SKY-IMA.fits: Reconstructed Sky Images\n ECL-SOP-IMA.fits: Sources parameters from sky Images\n .. note:: for the output products see: General Program Software Description Document Requirements for the Scientific Data Products and Processing [...] (SR4) :param show: show and save plots of the sky :type show: bool (defaut False) :param verbose: activate print outputs :type verbose: bool (default False) """ if self.error_code != 'OK': return if verbose: print('shadowgram files in {}'.format(self.input_dir)) if self.obs.attitude is not None: print("ECLAIRs' attitude: RA={:05.2f}deg, DEC={:05.2f}deg, orientation={:05.2f}deg".format(self.obs.attitude[0], self.obs.attitude[1], self.obs.attitude[2])) ###################################### #read sahdowgrams and loop on imaging module proc_id_base = deepcopy(self.proc_id) for shadowgram_file in os.listdir(path=self.input_dir): self.proc_id = proc_id_base + shadowgram_file.split('.')[0][10:] shadowgram = set_array_from_file(self.input_dir + '/' + shadowgram_file, 1) self.detector_images = DetectorImages(shadowgram, shadowgram, self.obs.obs_time, self.config.bube.energy_ranges) self._imag(show, verbose)