Source code for ecpi.common.sky.catalog

"""Catalogs of sources
"""

import os.path as osp
import os
import logging
import numpy as np
import astropy.table as astt
import astropy.units as u
import astropy.coordinates as asc
from astropy.io import fits
import matplotlib.pyplot as plt

from ecpi.common import add_path_data_ref_eclairs
import ecpi.common.mission.attitude as ptgsvom
from ecpi.common.io import fits_tools
from ecpi.common.instru.model_geom import InstruECLAIRs

#pylint:disable=W0102


[docs]class CatalogBase(object): """A simple catalog of sources """ def __init__(self): """**constructor** Initialized without any source. """ self._catalog = astt.Table([[], [], [], []], names=('elev', 'dir', 'intensity', 'name'), dtype=('f4', 'f4', 'f4', 'S20')) self._catalog['elev'].unit = u.deg self._catalog['dir'].unit = u.deg self._catalog['intensity'].unit = 'photon.cm^-2.s^-1' self.plot_coordinates = ['dir', 'elev'] self.name_file_cat = None # declare logger self.logger = logging.getLogger(__name__)
[docs] def add_src(self, info_src): """add a source to the catalog 'elev','dir','intensity','name' :param info_src: ['elev'or'ra','dir'or'dec','intensity','name'] :type info_src: [ float, float, float, string ] """ self._catalog.add_row(info_src)
# TODO: COVERAGE
[docs] def remove_src(self, p_name): """remove a source from the catalog :param p_name: name of source to remove :type p_name: string """ self._catalog.remove_row(np.where(self._catalog['name'] == p_name)[0][0])
[docs] def get_nb_element(self): """return the number of sources in the catalog The number of sources is computed from the 'name' column :return: number of sources in the catalog :rtype: int """ return len(self._catalog)
[docs] def add_from_catalog(self, other_catalog): """add to this catalog the sources from the other catalog if metadata from the catalogs do not match, an error is raised :param other_catalog: catalog from sources are added :type other_catalog: CatalogBase """ self._catalog = astt.vstack([self._catalog, other_catalog.get_catalog()], \ metadata_conflicts='silent')
[docs] def get_catalog(self): """return the table of the catalog :retunr: table of the catalog :rtype: astropy.table.Table """ return self._catalog
[docs] def save_catalog(self, name_file): """write the catalog table into a fits file If the file already exists, it is overwrite. :param name_file: PATH/name of the file to save into :type name_file: string """ self._catalog.write(name_file, format='fits', overwrite=True)
[docs] def read_catalog(self, name_file): """init the sources table from a fits file If there already are sources in the catalog, there are deleted. The file must be a fits file with a table data :param name_file: PATH/name of the file to read :type name_file: string """ self._catalog = astt.Table.read(name_file) self.name_file_cat = os.path.basename(name_file)
[docs] def plot_on_sphere(self, p_title=None, show=True, save=None): # pragma: no cover """plot the sources on a sphere .. warning:: self.name_file_cat must be manually set :param p_title: title of the plot. Default=None :type p_title: string :param show: show the plot. Default=True. :type show: bool :param save: PATH/name of the file to save in. If None, no saving. Default=None. :type save: string """ if not p_title: p_title = self.name_file_cat fig = plt.figure(p_title) ax = fig.add_subplot(111, projection="aitoff") coord_1 = self._catalog[self.plot_coordinates[0]].to("rad").data coord_2 = self._catalog[self.plot_coordinates[1]].to("rad").data ax.scatter(coord_1, coord_2) ax.grid(True) plt.title(p_title, pad=20) ax.set_xlabel(self.plot_coordinates[0]) ax.set_ylabel(self.plot_coordinates[1]) if save != None: plt.savefig(save) if not show: plt.close() if show: plt.show()
# TODO: COVERAGE
[docs] def get_row_with_name(self, name): mask = (self._catalog['name'] == name) self.logger.debug(f'mask: {mask}') return self._catalog[mask]
[docs]class CatalogAstroSourcesBasic(CatalogBase): """"A catalog in equatorial coordinate """ def __init__(self): """**constructor** Initialized without any source. """ super().__init__() self._catalog = astt.Table([[], [], [], []], names=('ra', 'dec', 'intensity', 'name'), meta={'name': 'first table'}, dtype=('f4', 'f4', 'f4', 'S20')) self._catalog['ra'].unit = u.deg self._catalog['dec'].unit = u.deg self._catalog['intensity'].unit = 'photon.cm^-2.s^-1' self.plot_coordinates = ['ra', 'dec'] self.name_file_cat = None # TODO: COVERAGE
[docs] def read_cat_swift_light(self): """set the source table with sources from the swift-BAT catalog swift-BAT catalog is in common/data/cat/BAT_70m_catalog_20nov2012.fits name is set from 'counterpart' flux if 'flux' convert to keV/20 """ self._catalog = astt.Table() p_name = add_path_data_ref_eclairs(osp.join('cat', 'BAT_70m_catalog_20nov2012.fits')) self.name_file_cat = p_name hdul = fits.open(p_name) cat_data = hdul[1].data hdul.close() ra = asc.Longitude(cat_data['ra'] * u.deg) ra.wrap_angle = 180 * u.deg col_add = astt.Column(ra, 'ra', 'f4') self._catalog.add_column(col_add, copy=True) col_add = astt.Column(cat_data['dec'] * u.deg, 'dec', 'f4') self._catalog.add_column(col_add, copy=True) col_add = astt.Column(cat_data['counterpart_name'], 'name', 'S50') self._catalog.add_column(col_add, copy=True) # photons @ 20keV col_add = astt.Column((cat_data['flux'] * 1e-12 * u.erg).to('keV') / 20.0, \ 'intensity', 'f4') self._catalog.add_column(col_add, copy=True) self._catalog['intensity'].unit = 'photon.cm^-2.s^-1' self.logger.debug(f'intensities: {self._catalog["intensity"]}')
# def read_custom_cat(self, path_cat): # self._catalog = astt.Table() # p_name = path_cat # self.name_file_cat = p_name # hdul = fits.open(p_name) # cat_data = hdul[1].data # hdul.close() # ra = asc.Longitude(cat_data['ra'] * u.deg) # ra.wrap_angle = 180 * u.deg # col_add = astt.Column(ra, 'ra', 'f4') # self._catalog.add_column(col_add, copy=True) # col_add = astt.Column(cat_data['dec'] * u.deg, 'dec', 'f4') # self._catalog.add_column(col_add, copy=True) # col_add = astt.Column(cat_data['counterpart_name'], 'name', 'S50') # self._catalog.add_column(col_add, copy=True) # # photons @ 20keV # col_add = astt.Column((cat_data['flux'] * 1e-12 * u.erg).to('keV') / 20.0, \ # 'intensity', 'f4') # self._catalog.add_column(col_add, copy=True) # self._catalog['intensity'].unit = 'photon.cm^-2.s^-1' # self.logger.debug(f'intensities: {self._catalog["intensity"]}')
[docs]class CatalogAstroWithEnergySpecSampling(CatalogBase): """ Represent astro catalog source in equatorial coordinate with its energy spectrum sampling (ph/cm2/s) """ def __init__(self, p_energy_sampling): """ :param p_energy_sampling: defined boundary of energy spectrum :type p_energy_sampling: numpy.array with dimension 1 """ super().__init__() assert(isinstance(p_energy_sampling, np.ndarray)) assert(p_energy_sampling.ndim == 1) self.energy_sampling = p_energy_sampling.copy() self.nb_channel = self.energy_sampling.size self._catalog = astt.Table(dtype=[('ra', 'f4'), ('dec', 'f4'), \ ('intensity', 'f4', (self.nb_channel,)), ('name', 'S50')]) self._catalog['ra'].unit = u.deg self._catalog['dec'].unit = u.deg self._catalog['intensity'].unit = 'ph.cm^-2.s^-1' self.plot_coordinates = ['ra', 'dec']
[docs] def sort_by_flux(self): """ bof intensity est un tableau, critère de trie ? le premier élément ? """ self._catalog.sort('intensity')
[docs] def plot_flux_name(self, p_name): # pragma: no cover tab_name = self.get_row_with_name(p_name) self.logger.debug(f'type of tab_name[0]: {type(tab_name[0])}') self.logger.debug(f'tab_name[0]: {tab_name[0]}') plt.figure() plt.title("flux of " + tab_name[0]["name"]) plt.loglog(self.energy_sampling, tab_name[0]["intensity"]) plt.xlabel("keV") plt.ylabel(r"$ph.cm^-2.s^-1$") plt.grid()
[docs] def plot_flux(self, idx): # pragma: no cover plt.figure() plt.title("flux of " + self._catalog[idx]["name"]) plt.loglog(self.energy_sampling, self._catalog[idx]["intensity"]) plt.xlabel("keV") plt.ylabel(r"$ph.cm^-2.s^-1$") plt.grid()
[docs] def plot_fluxes(self, l_idx): # pragma: no cover plt.figure() plt.title("fluxes of sources") for idx in l_idx: plt.loglog(self.energy_sampling, self._catalog[idx]["intensity"], \ label=self._catalog[idx]["name"]) plt.legend() plt.xlabel("keV") plt.ylabel(r"$ph.cm^-2.s^-1$") plt.grid()
[docs] def plot_first_fluxes(self, nb): # pragma: no cover l_idx = list(range(nb)) self.plot_fluxes(l_idx)
[docs]class CatalogFovBasic(CatalogBase): """a catalog of sources in the FOV """ def __init__(self): """**constructor** init with no source """ self._catalog = astt.Table([[], [], [], []], names=('elev', 'dir', 'intensity', 'name'), \ meta={'name': 'first table'}, dtype=('f4', 'f4', 'f4', 'S20')) self._catalog['elev'].unit = u.deg self._catalog['dir'].unit = u.deg self._catalog['intensity'].unit = 'photon.cm^-2.s^-1' self.plot_coordinates = ['dir', 'elev'] self.name_file_cat = None super().__init__()
[docs] def set_ptg_instru(self, p_ptg_instru: ptgsvom.AttitudeVT): """ set the pointing object associated to this catalog :param p_ptg_instru: pointing of the instrument for this catalog :type p_ptg_instru: pointing_instrument.AttitudeVT """ self.ptg_instru = p_ptg_instru
[docs] def intensity_with_channel(self, nb_chan): """ raz all catalog and change format """ # we must redefine intensity column with array shape # self._catalog = astt.Table(dtype=[('elev', 'f4'), ('dir', 'f4'), ('intensity', 'f4', (nb_chan,)), ('name', 'S50')])
[docs] def from_astro_catalog_with_spectrum(self, p_cat_radec): """Define catalog with astronomical catalog (in equatorial coordinate) with spectrum :param p_cat_radec: sky catalog with source spectrum :type p_cat_radec: CatalogAstroWithEnergySpecSampling """ # we must redefine intensity column with array shape self._catalog = astt.Table(dtype=p_cat_radec._catalog.dtype) self._catalog.rename_column('ra', 'elev') self._catalog.rename_column('dec', 'dir') # select source in FOV and convert radec to direlev self.from_astro_catalog(p_cat_radec)
[docs] def from_astro_catalog(self, p_cat_radec: CatalogAstroSourcesBasic, p_limit_elevation=0): """add sources from an astro catalog and convert coordinate in elevation, direction p_cat : catalaog in ra , dec coordinate p_limit_elevation deg .. note:: need set_ptg_instru :param p_cat_radec: catalog in ra, dec coordinates :type p_cat_radec: CatalogAstroSourcesBasic :param p_limit_elevation: limit in elevation in deg. Default is 0. :type p_limit_elevation: float """ # # convert ra ,dec to xyz instru frame radec = np.empty((p_cat_radec.get_nb_element(), 2)) radec[:, 0] = p_cat_radec._catalog['ra'].data radec[:, 1] = p_cat_radec._catalog['dec'].data xyz = self.ptg_instru.cat_user_to_instru(radec, deg=True) # # compute elevation and direction angle direlev = ptgsvom.xyz_to_direlev(xyz, deg=True) dir_a = direlev[:, 0] elev = direlev[:, 1] # add sources in largest cone of FOV idx_fov = np.where(elev > p_limit_elevation)[0] for idx in idx_fov: info_src = [elev[idx], dir_a[idx], p_cat_radec._catalog['intensity'][idx], \ p_cat_radec._catalog['name'][idx]] self.add_src(info_src)
[docs] def to_astro_catalog(self, verbose=False): """return an astro catalog in ra, dec coordinates based on the source table .. note:: need set_ptg_instru :return: astro catalog :rtype: CatalogAstroSourcesBasic """ cat_astro = CatalogAstroSourcesBasic() # dir_elev = np.empty((self.get_nb_element(), 2), dtype=np.float64) dir_elev[:, 0] = self._catalog['dir'] dir_elev[:, 1] = self._catalog['elev'] xyz = ptgsvom.direlev_to_xyz(dir_elev, deg=True) if verbose: lon = self._catalog['dir'] lat = self._catalog['elev'] oconv = asc.UnitSphericalRepresentation(lon, lat) ptg_xyz = np.transpose(oconv.to_cartesian().xyz) self.logger.debug(f'shape of ptg_xyz: {ptg_xyz.shape}') radec = self.ptg_instru.instru_to_cat_user(xyz, deg=True) ra = asc.Longitude(radec[:, 0] * u.deg) ra.wrap_angle = 180 * u.deg dec = asc.Latitude(radec[:, 1] * u.deg) dec.wrap_angle = 90 * u.deg col_add = astt.Column(ra, 'ra', 'f4') cat_astro._catalog = astt.Table() cat_astro._catalog.add_column(col_add, copy=True) col_add = astt.Column(dec, 'dec', 'f4') cat_astro._catalog.add_column(col_add, copy=True) col_add = astt.Column(self._catalog["name"], 'name', 'S50') cat_astro._catalog.add_column(col_add, copy=True) col_add = astt.Column(self._catalog["intensity"], 'intensity', 'f4') cat_astro._catalog.add_column(col_add, copy=True) cat_astro._catalog['intensity'].unit = 'photon.cm^-2.s^-1' return cat_astro
[docs] def plot_on_astro_sphere(self, p_title=None, show=True, save=None): # pragma: no cover """plot the sources on the sky, with RA and DEC .. note:: different to plot_on_sphere that plot with elev and dir :param p_title: title of the plot. Default=None :type p_title: string :param show: show the plot. Default=True. :type show: bool :param save: PATH/name of the file to save in. If None, no saving. Default=None. :type save: string """ if not p_title: p_title = self.name_file_cat fig = plt.figure(p_title) ax = fig.add_subplot(111, projection="aitoff") ra = self.to_astro_catalog()._catalog['ra'].to("rad").data dec = self.to_astro_catalog()._catalog['dec'].to("rad").data ax.scatter(ra, dec) ax.grid(True) plt.title(p_title, pad=20) ax.set_xlabel('ra') ax.set_ylabel('dec') if save != None: plt.savefig(save) if not show: plt.close() if show: plt.show()
[docs]class CatalogFovEclairs(CatalogFovBasic): """catalog of x-ray sources for the ECLAIRs instrument """ def __init__(self, instru_eclairs=None): """**constructor** ECLAIRs geometry initialization (in cm) size of the sky array = 200 Initialized without any source. :param instru_eclairs: ECLAIRs instrument :type instru_eclairs: InstruECLAIRs """ if instru_eclairs: self.instru_eclairs = instru_eclairs else: self.instru_eclairs = InstruECLAIRs() self.dist_mask_detect = self.instru_eclairs.dist_mask_detect super().__init__()
[docs] def from_astro_catalog(self, p_cat_radec: CatalogAstroSourcesBasic, p_limit_elevation=0): """add sources from an astro catalog Only sources in the FOV will be added .. note:: need set_ptg_instru :param p_cat_radec: catalog in ra, dec coordinates :type p_cat_radec: CatalogAstroSourcesBasic :param p_limit_elevation: limit in elevation in deg. Default is 0. :type p_limit_elevation: float """ # convert in elevation, direction in FOV super().from_astro_catalog(p_cat_radec, \ self.instru_eclairs.get_elevation_limit_fov_corner()) # remove source out of FOV for ECLAIRs square aperture not_in_fov = [] for idx, src in enumerate(self._catalog): surface = 0.0 elev = np.deg2rad(src['elev']) dira = np.deg2rad(src['dir']) for mask in self.instru_eclairs._l_mask: self.instru_eclairs.set_mask(mask) self.instru_eclairs.set_pos_source_fov(elev, dira) surface += self.instru_eclairs.get_surface_rect_proj() if surface == 0.0: not_in_fov.append(idx) self._catalog.remove_rows(not_in_fov) self.logger.info(f'final number of sources in FOV: {self.get_nb_element()}')
[docs] def add_src_pix(self, info_src_pix): """add a source in sky pixel coordinates to the catalog 'pix_y','pix_z','intensity','name' (0,0) is at the center of the sky array :param info_src_pix: sky pixel info of the source ['pix_y','pix_z','intensity','name'] :type info_src_pix: [ float, float, float, string] """ elev, direction = self.instru_eclairs.skypix_to_elevdir(info_src_pix[0], info_src_pix[1]) info_src = [elev, direction, info_src_pix[2], info_src_pix[3]] self._catalog.add_row(info_src)
# # # @param
[docs] def get_catalog_sky_pix(self): """return the catalog with the position of the sources in pixel of the sky array (0,0) is at the center of the sky array :return: catalog table with sources positions in sky_pix format :rtype: astropy.table.Table """ cat = self.get_catalog().copy() pix_y, pix_z = self.instru_eclairs.elevdir_to_skypix(np.array(cat['elev']), \ np.array(cat['dir'])) cat.rename_column('elev', 'sky_y') cat.rename_column('dir', 'sky_z') cat['sky_y'].unit = u.pix cat['sky_z'].unit = u.pix cat['sky_y'][:] = pix_y cat['sky_z'][:] = pix_z return cat
[docs] def save_catalog_pix(self, name_file): """save the catalog in pix format to a fits file :param name_file: PATH/name of the fits file :type name_file: string """ cat = self.get_catalog_sky_pix() cat.write(name_file, format='fits', overwrite=True)
[docs]class CatalogIdentifiedSources(CatalogFovBasic): """catalog of x-ray sources identified by IMAG format from General Program Software Description """ def __init__(self): """**constructor** Initialized without any source. meta_col = {'sourceID' : 'Source ID', 'Y' : "rounded sky pixel [-99:99]", 'Z' : "rounded sky pixel [-99:99]", 'Y_fit' : "sky pixel [-99:99]", 'error_Y': 'error in x at 90% c.l.', 'Z_fit' : "sky pixel [-99:99]", 'error_Z': 'error in y at 90% c.l.', 'ra': 'Right Ascension (0.:360.)', 'dec': 'Declination (-90.:90.)', 'errorrad': 'error radius at 90% c.l.', 'flux': 'fit flux', 'pflux': 'pixel flux', 'errflux': 'error in fit flux', 'snr': 'snr at the source position', 'name': 'identified source name', 'dist_src': 'distance to identified, catalog source', 'class': 'identified source type code' } """ self.name_col = ('sourceID', 'Y', 'Z', 'Y_fit', 'error_Y', 'Z_fit', 'error_Z', 'ra', 'dec', \ 'errorrad', 'flux', 'pflux', 'errflux', 'snr', 'name', 'dist_src', 'class') meta_col = {'sourceID': 'Source ID', 'Y': "rounded sky pixel [-99:99]", 'Z': "rounded sky pixel [-99:99]", 'Y_fit': "sky pixel [-99:99]", 'error_Y': 'error in x at 90% c.l.', 'Z_fit': "sky pixel [-99:99]", 'error_Z': 'error in y at 90% c.l.', 'ra': 'Right Ascension (0.:360.)', 'dec': 'Declination (-90.:90.)', 'errorrad': 'error radius at 90% c.l.', 'flux': 'fit flux', 'pflux': 'pixel flux', 'errflux': 'error in fit flux', 'snr': 'snr at the source position', 'name': 'identified source name', 'dist_src': 'distance to identified, catalog source', 'class': 'identified source type code' } dtype_col = ('S20', 'i2', 'i2', 'f8', 'f8', 'f8', 'f8', 'f8', 'f8', 'f8', 'f8', 'f8', \ 'f8', 'f8', 'S20', 'f8', 'i2') self._catalog = astt.Table(names=self.name_col, meta=meta_col, dtype=dtype_col) self._catalog['Y'].unit = u.pix self._catalog['Z'].unit = u.pix self._catalog['Y_fit'].unit = u.pix self._catalog['error_Y'].unit = u.pix self._catalog['Z_fit'].unit = u.pix self._catalog['error_Z'].unit = u.pix self._catalog['ra'].unit = u.deg self._catalog['ra'].format = "%6.3f" self._catalog['dec'].unit = u.deg self._catalog['dec'].format = "%6.3f" self._catalog['errorrad'].unit = u.deg self._catalog['flux'].unit = 'ct.s^-1.cm^-2' self._catalog['flux'].format = "%6.3f" self._catalog['pflux'].unit = 'ct.s^-1.cm^-2' self._catalog['errflux'].unit = 'ct.s^-1.cm^-2' self._catalog['errflux'].format = "%6.3f" self._catalog['snr'].format = "%6.3f" self._catalog['dist_src'].unit = u.pix self.plot_coordinates = ['ra', 'dec'] self.name_file_cat = 'identified sources' # declare logger self.logger = logging.getLogger(__name__) # This method is superseeded in IMAG by the genfits_ecl_sop_ima fonction
[docs] def save_catalog(self, file_path, attitude=['', '', ''], tstart='0', tstop='1', \ creator='IMAG?', proc_id="01"): """write the catalog table into a fits file ECL-SOPcos_theta_correction-IMA common keywords are included filename is 'ECL-SOP-IMA-' + proc_id + '.fits' :param file_path: PATH to the directory where to write the file :type file_path: string :param attitude: [ra, dec, ori] in degrees. Default is None :type attitude: [float, float, float] :param tstart: start time of the observation in s from mjdref :type tstart: float :param tstop: end time of the observation in s from mjdref :type tstop: float :param creator: program that has generated the file. Default='IMAG?' :type creator: string :param proc_id: id of the process. Default="01". :type proc_id: string """ # construction of the fits file tbhdu = fits.table_to_hdu(self._catalog) prihdr = fits.Header() prihdu = fits.PrimaryHDU(header=prihdr) obstime = float(tstop) - float(tstart) fits_tools.pointing_keywords(prihdu, attitude, obstime) fits_tools.common_keyword(prihdu, tstart, tstop, creator, 1.0, proc_id) prihdu.header['CARD'] = ("ECL-SOP-IMA", "Product type") fits_tools.common_keyword(tbhdu, tstart, tstop, creator, 1.0, proc_id) thdulist = fits.HDUList([prihdu, tbhdu]) thdulist[1].header['comment'] = "source parameters from sky images" thdulist[1].name = 'SOPIMA' filename = 'ECL-SOP-IMA-' + proc_id + '.fits' thdulist.writeto(osp.join(file_path, filename), \ overwrite=True, checksum=True)
[docs] def add_from_catalogFOVECLAIRs(self, eclairs_catal): """ add to the sources catalog a simple ECLAIRs catalog CatalogFovEclairs the unkown data are left blank .. note:: metadata_confilts may cause warnings :param eclairs_catal: sources catalog the sources are added from (elev, dir, intensity, name) :type eclairs_catal: CatalogFovEclairs """ # elev','dir','intensity','name' # reformating of eclairs_catal catalog_to_convert = astt.join( eclairs_catal.get_catalog_sky_pix(), eclairs_catal.to_astro_catalog().get_catalog() ) catalog_to_convert['intensity'].name = 'flux' catalog_to_convert['flux'].unit = self._catalog['flux'].unit catalog_to_convert['sky_y'].name = 'Y_fit' catalog_to_convert['sky_z'].name = 'Z_fit' self._catalog = astt.vstack([self._catalog, catalog_to_convert], \ metadata_conflicts='silent') self._catalog['Y'] = np.array(self._catalog['Y_fit']).copy().round().astype(int) self._catalog['Z'] = np.array(self._catalog['Z_fit']).copy().round().astype(int) self._catalog = self._catalog[self.name_col]
[docs] def cos_theta_correction(self, instru_eclairs=InstruECLAIRs(), verbose=None): """apply off axis corrections on the flux of the sources :param instru_eclairs: ECLAIRs instrument :type instru_eclairs: InstruECLAIRs :param verbose: activate print outputs :type verbose: bool (default False) """ self._catalog['flux'] = self._catalog['flux'] / \ np.sin(np.radians(instru_eclairs.skypix_to_elevdir(self._catalog['Y_fit'], \ self._catalog['Z_fit'])[0])) self._catalog['pflux'] = self._catalog['pflux'] / \ np.sin(np.radians(instru_eclairs.skypix_to_elevdir(self._catalog['Y'], \ self._catalog['Z'])[0])) self._catalog['errflux'] = self._catalog['errflux'] / \ np.sin(np.radians(instru_eclairs.skypix_to_elevdir(self._catalog['Y_fit'], \ self._catalog['Z_fit'])[0])) if verbose: self.logger.info('off axis correction with cos(theta)')
[docs] def irf_correction(self, dpix, verbose=None): """Apply the irf correction matrix to the sources fluxes. :param dpix: instrument effects :type dpix: ECLAIRsDetectorEffectDefault :param verbose: activate print outputs :type verbose: bool (default False) """ for source in self._catalog: irf_coef = dpix.get_irf_val_ecllos(source['Y'], source['Z']) source['flux'] = source['flux'] / irf_coef irf_coef = dpix.get_irf_val_ecllos(source['Y_fit'], source['Z_fit']) source['pflux'] = source['pflux'] / irf_coef source['errflux'] = source['errflux'] / irf_coef if verbose: self.logger.info('off axis correction with the irf')
[docs] def arf_correction(self, dpix, energy_range): """apply the arf correction to the sources fluxes :param dpix: instrument effects :type dpix: ECLAIRsDetectorEffectDefault :param energy_range: energy range (in PI) [low_energy, high_energy] :type energy_range: [int, int] """ for source in self._catalog: arf_coef = np.mean(dpix.arf_percent()[energy_range[0]:energy_range[1] + 1]) / 100.0 source['flux'] = source['flux'] / arf_coef source['pflux'] = source['pflux'] / arf_coef source['errflux'] = source['errflux'] / arf_coef