'''main module to run a ECLAIRs' pipeline's flows
Created on 20 mars. 2018
@author: Catalano Camille, APC/IN2P3/CNRS
'''
import os
import logging
import astropy.table as at
import astropy.units as u
import numpy as np
from copy import deepcopy
import ecpi.common.io.events as Xevt
from ecpi.process.bube.core import main_bube
from ecpi.process.imag.core import main_imag
import ecpi.process.bube.io.outputs as bube_out
import ecpi.process.imag.io.outputs as imag_out
from ecpi.common.io.fits_tools import set_array_from_file
from ecpi.analysis.lib.images import SkyImageStatistics
from ecpi.common.mission.observation import EclairsObservation
from ecpi.process.pipeline.ecpi_config import PipelineConfiguration
[docs]class WorkFlow(object):
"""ECLAIRs pipeline workflow base object
init the attributes checking the parameters
"""
def __init__(self, work_dir, cfg, obs, conf, proc_id):
"""**Constructor**
get the different parameters used by the process.
check the validity of the parameters.
construct the right deconvolution module for the process.
:param work_dir: working directory where to save scientific products
:param cfg: pipeline input configuration
:param obs: observation parameters
:param conf: pipeline config
:param proc_id: id number of the process
:type work_dir: string (absolute path)
:type cfg: ConfigParser
:type obs: EclairsObservation
:type conf: PipelineConfiguration
:type proc_id: string
"""
# declare logger
self.logger = logging.getLogger(__name__)
self.logger.info('creating an instance of Workflow')
self.error_code = "OK"
self.error_message = "I love it when a plan comes together."
self.check_work_dir(work_dir)
self.work_dir = work_dir
self.input_dir = self.check_input_dir(cfg, proc_id)
self.cfg = cfg
self.config = conf
self.obs = obs
self.proc_id = proc_id
if self.config.error_code != "OK":
self.error_code = self.config.error_code
self.error_message = self.config.error_message
elif self.obs.error_code != "OK":
self.error_code = self.obs.error_code
self.error_message = self.obs.error_message
[docs] def check_work_dir(self, work_dir):
"""check the working directory parameter
:param work_dir: working directory where to save scientific products
:type work_dir: any (should be string (absolute path))
"""
if not os.path.exists(work_dir):
self.error_code = "ConfigError"
self.error_message = "InputParamsError : working directory is not a valid directory path"
[docs] def is_ok(self):
"""answer to the question if the pipeline status is 'OK'
:return: True if status=='OK', False if not
:rtype: bool
"""
return self.error_code == 'OK'
def _read_evts(self, evts_dirname):
"""read events from a directory
:param evts_dirname: PATH of the directory with all the evts files.
The directory must contains only evts files.
:type evts_dirname: string
:return: events
:rtype: XEvtData
"""
evts = Xevt.EclairsCalEvtData()
for evts_file in os.listdir(path=evts_dirname):
try:
evts.read(evts_dirname + '/' + evts_file, add_data=True)
except (ValueError, KeyError):
return 0, 'IOError', 'ReadEventsFileError : corrupted table Fits files. Impossible to read the events'
return evts, 'OK', "I love it when a plan comes together."
def _dpco(self, verbose=None):
"""run the dpco module
read the L1 evts file
:param verbose: activate print outputs
:type verbose: bool (default False)
"""
corr_evts, self.error_code, self.error_message = self._read_evts(self.input_dir)
if self.error_code != "OK":
return
corr_evts.tb_evts.remove_column('EBD')
try:
FLAG = at.Column(np.zeros(len(corr_evts.tb_evts)), name='FLAG', meta={'FLAG' : 'Flag pixel status'}, dtype='b')
corr_evts.tb_evts.add_column(FLAG)
except:
corr_evts.tb_evts['FLAG'] = corr_evts.tb_evts['FLAG'] * 0
if verbose:
self.logger.info('No correction is applied to the detector counts')
self.evts = corr_evts
def _cali(self, verbose=None):
"""run the cali module
apply gain and offset
:param verbose: activate print outputs
:type verbose: bool (default False)
"""
try:
PI = at.Column(np.array(self.evts.tb_evts['PHA']), name='PI', meta={'PI' : 'Pulse height Invariant energy channel (0-1023)'}, dtype='i2', unit=u.adu)
self.evts.tb_evts.add_column(PI, 4)
except:
self.evts.tb_evts['PI'] = self.evts.tb_evts['PHA']
if verbose:
self.logger.info('No calibration: PHA=PI')
def _bube(self, verbose=None):
"""run the bube module
create the shadowgrams from the events
save detector images and computed gti
:param verbose: activate print outputs
:type verbose: bool (default False)
"""
if verbose:
self.logger.info('Selection and binning of the calibrated events...')
self.detector_images, self.obs, self.error_code, self.error_message = main_bube.bube(
self.evts,
self.obs,
self.config.bube
)
if self.error_code != 'OK':
return
bube_out.save_detector_images(self.detector_images, self.obs.attitude, self.obs.start_time, self.obs.end_time, self.work_dir, self.config.name, self.proc_id)
bube_out.save_gti(self.obs, self.work_dir, self.obs.attitude, self.config.name, self.proc_id)
def _off_axis_correction(self, sky_images, catalog_sources, verbose=None):
"""apply the off axis correction on imag outputs
:param sky_images: sky images from imag
:type sky_images: SkyImages
:param catalog_sources: identified sources catalog from imag
:type catalog_sources: CatalogIdentifiedSources
:param verbose: activate print outputs
:type verbose: bool (default False)
:return: corrected sky images and catalog
:rtype: SkyImages, CatalogIdentifiedSources
"""
if self.config.imag.off_axis == 'irf':
sky_images.irf_correction(self.obs.dpix)
catalog_sources.irf_correction(self.obs.dpix, verbose)
elif self.config.imag.off_axis == 'cos_theta':
sky_images.cos_theta_correction()
catalog_sources.cos_theta_correction(verbose=verbose)
elif verbose:
self.logger.info('No off axis effects correction')
return sky_images, catalog_sources
def _imag(self, show=False, verbose=None):
"""run the imag module
create and save the sky images, models images and found sources catalog
:param show: show and save plots of the sky
:type show: bool (defaut False)
:param verbose: activate print outputs
:type verbose: bool (default False)
"""
energy_ranges = self.detector_images.energy_ranges
if verbose:
self.logger.info('Searching for x-ray sources...')
self.catalogs_final = []
self.catalogs_sources = []
self.sky_images = []
self.new_source_flag = []
# imag for each shadowgrams and energy ranges
for shadowgram, shadowgram_var, energy_range in zip(self.detector_images.shadowgrams,
self.detector_images.shadowgrams_var,
energy_ranges):
# call IMAG module.
catalog_sources, sky_images, new_source_flag = main_imag.imag(
shadowgram, shadowgram_var, self.obs, self.config.imag, self.proc_id
)
# correct off axis effects
sky_images, catalog_sources = self._off_axis_correction(
sky_images, catalog_sources, verbose
)
self.catalogs_sources.append(catalog_sources)
self.sky_images.append(sky_images)
self.new_source_flag.append(new_source_flag)
#final statistics analysis
image_analysis = SkyImageStatistics(sky_images.cleaned_sky_count, sky_images.cleaned_sky_snr)
stats, srcs = image_analysis.run(100, self.config.imag.snr_limit)
self.catalogs_final.append(image_analysis._build_catalog(srcs))
if catalog_sources.get_nb_element() == 100:
self.logger.warning(f'Max number of possible sources reached (100) for E={energy_range}keV')
elif catalog_sources.get_nb_element() == 0:
self.logger.warning(f'No source was found for E={energy_range}keV')
if new_source_flag == 1:
self.logger.warning(f"An unknown source has been found !!! for E={energy_range}keV")
if verbose:
self.logger.info(f"{catalog_sources.get_nb_element()} sources were found for E={energy_range}keV")
self.logger.info(f"Stats on sky remains: \
mean={stats[0]:.2E}, median={stats[1]:.2E}, std={stats[2]:.2E}")
catalog_sources.plot_on_sphere(f"identified x-ray sources for E={energy_range}keV", show=show, save=self.work_dir + "/id_sources_sky_plot_{}.png".format(self.proc_id))
debug = 1
obs_info_dict = {
'tstart': self.obs.start_time,
'tstop': self.obs.end_time,
'energy_ranges': energy_ranges,
'attitude': self.obs.attitude
}
if debug:
imag_out.save_sky_images_debug(self.sky_images, self.work_dir, obs_info_dict, self.config.name, self.proc_id)
imag_out.save_catalogs(self.catalogs_sources, self.work_dir, obs_info_dict, self.config.name, self.proc_id)
imag_out.save_catalogs(self.catalogs_final, self.work_dir, obs_info_dict, self.config.name, self.proc_id+'_final')
imag_out.save_sky_images(self.sky_images, self.work_dir, obs_info_dict, self.config.name, self.proc_id)
if verbose:
self.logger.info(f'Found sources catalog in {self.work_dir}')
if self.config.imag.ref_catalog_fov is not None:
self.config.imag.ref_catalog_fov.plot_on_astro_sphere("x-ray sources in FOV", show=show, save=self.work_dir + "/sources_inFOV_sky_plot.png")
[docs]class BubeImag(WorkFlow):
"""find sources in the evts between start_time and end_time
Class to process the pipeline finding sources during a stable pointing observation.
Save a catalog of the detected x-ray sources.
"""
def __init__(self, work_dir, cfg, obs, conf, proc_id):
"""**Constructor**
get the different parameters used by the process.
check the validity of the parameters.
construct the right deconvolution module for the process.
:param work_dir: working directory where to save scientific products
:param cfg: pipeline input configuration
:param obs: observation parameters
:param conf: pipeline config
:param proc_id: id number of the process
:type work_dir: string (absolute path)
:type cfg: ConfigParser print(cfg['general']['snr_limit'])
print(cvb)
:type obs: EclairsObservation
:type conf: PipelineConfiguration
:type proc_id: string
"""
super().__init__(work_dir, cfg, obs, conf, proc_id)
if not hasattr(self.config, 'imag') or not hasattr(self.config, 'bube'):
self.error_code = 'ConfigError'
self.error_message = 'Missing configuration for BubeImag workflow'
[docs] def set_snr_limit(self, snr_limit):
self.config.snr_limit = snr_limit
[docs] def run(self, show=False, verbose=False):
"""main process
get the evts files and create the shadowgrams to process
deconvolution of the shadowgrams and iterative searching/cleaning on sources
:outputfiles:
ECL-DET-IMA.fits: Prepared detector images\n
ECL-DET-UBC.fits: Uniformity & Background Corrected detector images\n
ECL-GTI-3.fits: Good Time Intervals\n
ECL-DET-MOD.fits: Source Model Images\n
ECL-SKY-IMA.fits: Reconstructed Sky Images\n
ECL-SOP-IMA.fits: Sources parameters from sky Images\n
.. note::
for the output products see:
General Program Software Description Document
Requirements for the Scientific Data Products and Processing [...] (SR4)
:param show: show and save plots of the sky
:type show: bool (defaut False)
:param verbose: activate print outputs
:type verbose: bool (default False)
"""
if self.error_code != 'OK':
return
if verbose:
self.logger.info(f'Events files in {self.input_dir}')
if self.obs.attitude is not None:
self.logger.info(f'Attitude ECLAIRs: \
RA={self.obs.attitude[0]:05.2f} deg \
DEC={self.obs.attitude[1]:05.2f} deg \
ORI={self.obs.attitude[2]:05.2f}deg')
######################################
#read calibrated events
self.evts, self.error_code, self.error_message = self._read_evts(self.input_dir)
if self.error_code != 'OK':
return
######################################
#bube module
self._bube(verbose)
if self.error_code != 'OK':
return
#####################################
#imaging module
self._imag(show, verbose)
[docs]class DpcoCaliBubeImag(WorkFlow):
"""find sources in the L1 evts between start_time and end_time
Class to process the pipeline finding sources during a stable pointing observation.
Save a catalog of the detected x-ray sources.
"""
def __init__(self, work_dir, cfg, obs : EclairsObservation, conf : PipelineConfiguration, proc_id):
"""**Constructor**
get the different parameters used by the process.
check the validity of the parameters.
construct the right deconvolution module for the process.
:param work_dir: working directory where to save scientific products
:param cfg: pipeline input configuration
:param obs: observation parameters
:param conf: pipeline config
:param proc_id: id number of the process
:type work_dir: string (absolute path)
:type cfg: ConfigParser
:type obs: EclairsObservation
:type conf: PipelineConfiguration
:type proc_id: string
"""
super().__init__(work_dir, cfg, obs, conf, proc_id)
if not hasattr(self.config, 'imag') or not hasattr(self.config, 'bube'):
self.error_code = 'ConfigError'
self.error_message = 'Missing configuration for DpcoCaliBubeImag workflow'
[docs] def set_snr_limit(self, snr_limit):
self.config.snr_limit = snr_limit
[docs] def run(self, show=False, verbose=False):
"""main process
get the evts files and create the shadowgrams to process
deconvolution of the shadowgrams and iterative searching/cleaning on sources
:outputfiles:
ECL-DET-IMA.fits: Prepared detector images\n
ECL-DET-UBC.fits: Uniformity & Background Corrected detector images\n
ECL-GTI-3.fits: Good Time Intervals\n
ECL-DET-MOD.fits: Source Model Images\n
ECL-SKY-IMA.fits: Reconstructed Sky Images\n
ECL-SOP-IMA.fits: Sources parameters from sky Images\n
.. note::
for the output products see:
General Program Software Description Document
Requirements for the Scientific Data Products and Processing [...] (SR4)
:param show: show and save plots of the sky
:type show: bool (defaut False)
:param verbose: activate print outputs
:type verbose: bool (default False)
"""
if self.error_code != 'OK':
return
if verbose:
self.logger.info(f'Events files in {self.input_dir}')
if self.obs.attitude is not None:
self.logger.info(f'Attitude ECLAIRs: \
RA={self.obs.attitude[0]:05.2f} deg \
DEC={self.obs.attitude[1]:05.2f} deg \
ORI={self.obs.attitude[2]:05.2f}deg')
######################################
#dpco module
self._dpco(verbose)
if self.error_code != 'OK':
return
######################################
#cali module
self._cali(verbose)
if self.error_code != 'OK':
return
######################################
#bube module
self._bube(verbose)
if self.error_code != 'OK':
return
#####################################
#imaging module
self._imag(show, verbose)
[docs]class Imag(WorkFlow):
"""find sources in the shadowgrams between start_time and end_time
Class to process the pipeline finding sources during a stable pointing observation.
Save a catalog of the detected x-ray sources.
"""
def __init__(self, work_dir, cfg, obs, conf, proc_id):
"""**Constructor**
get the different parameters used by the process.
check the validity of the parameters.
construct the right deconvolution module for the process.
:param work_dir: working directory where to save scientific products
:param cfg: pipeline input configuration
:param obs: observation parameters
:param conf: pipeline config
:param proc_id: id number of the process
:type work_dir: string (absolute path)
:type cfg: ConfigParser
:type obs: EclairsObservation
:type conf: PipelineConfiguration
:type proc_id: string
"""
super().__init__(work_dir, cfg, obs, conf, proc_id)
if not hasattr(self.config, 'imag') or not hasattr(self.config, 'bube'):
self.error_code = 'ConfigError'
self.error_message = 'Missing configuration for Imag workflow'
[docs] def set_snr_limit(self, snr_limit):
self.config.snr_limit = snr_limit
[docs] def run(self, show=False, verbose=False):
"""main process
get the shadowgrams
deconvolution of the shadowgram and iterative searching/cleaning on sources
:outputfiles:
ECL-DET-MOD.fits: Source Model Images\n
ECL-SKY-IMA.fits: Reconstructed Sky Images\n
ECL-SOP-IMA.fits: Sources parameters from sky Images\n
.. note::
for the output products see:
General Program Software Description Document
Requirements for the Scientific Data Products and Processing [...] (SR4)
:param show: show and save plots of the sky
:type show: bool (defaut False)
:param verbose: activate print outputs
:type verbose: bool (default False)
"""
if self.error_code != 'OK':
return
if verbose:
self.logger.info(f'Shadowgram files in {self.input_dir}')
if self.obs.attitude is not None:
self.logger.info(f'Attitude ECLAIRs: \
RA={self.obs.attitude[0]:05.2f} deg \
DEC={self.obs.attitude[1]:05.2f} deg \
ORI={self.obs.attitude[2]:05.2f}deg')
######################################
#read sahdowgrams and loop on imaging module
proc_id_base = deepcopy(self.proc_id)
for shadowgram_file in os.listdir(path=self.input_dir):
self.proc_id = proc_id_base + shadowgram_file.split('.')[0][10:]
shadowgram = set_array_from_file(self.input_dir + '/' + shadowgram_file, 1)
self.detector_images = main_bube.DetectorImages(shadowgram, shadowgram, self.obs.obs_time, self.config.bube.energy_ranges)
self._imag(show, verbose)