"""Catalogs of sources
"""
import os.path as osp
import os
import logging
import numpy as np
import astropy.table as astt
import astropy.units as u
import astropy.coordinates as asc
from astropy.io import fits
import matplotlib.pyplot as plt
from ecpi.common import add_path_data_ref_eclairs
import ecpi.common.mission.attitude as ptgsvom
from ecpi.common.io import fits_tools
from ecpi.common.instru.model_geom import InstruECLAIRs
#pylint:disable=W0102
[docs]class CatalogBase(object):
"""A simple catalog of sources
"""
def __init__(self):
"""**constructor**
Initialized without any source.
"""
self._catalog = astt.Table([[], [], [], []],
names=('elev', 'dir', 'intensity', 'name'),
dtype=('f4', 'f4', 'f4', 'S20'))
self._catalog['elev'].unit = u.deg
self._catalog['dir'].unit = u.deg
self._catalog['intensity'].unit = 'photon.cm^-2.s^-1'
self.plot_coordinates = ['dir', 'elev']
self.name_file_cat = None
# declare logger
self.logger = logging.getLogger(__name__)
[docs] def add_src(self, info_src):
"""add a source to the catalog
'elev','dir','intensity','name'
:param info_src: ['elev'or'ra','dir'or'dec','intensity','name']
:type info_src: [ float, float, float, string ]
"""
self._catalog.add_row(info_src)
# TODO: COVERAGE
[docs] def remove_src(self, p_name):
"""remove a source from the catalog
:param p_name: name of source to remove
:type p_name: string
"""
self._catalog.remove_row(np.where(self._catalog['name'] == p_name)[0][0])
[docs] def get_nb_element(self):
"""return the number of sources in the catalog
The number of sources is computed from the 'name' column
:return: number of sources in the catalog
:rtype: int
"""
return len(self._catalog)
[docs] def add_from_catalog(self, other_catalog):
"""add to this catalog the sources from the other catalog
if metadata from the catalogs do not match, an error is raised
:param other_catalog: catalog from sources are added
:type other_catalog: CatalogBase
"""
self._catalog = astt.vstack([self._catalog, other_catalog.get_catalog()], \
metadata_conflicts='silent')
[docs] def get_catalog(self):
"""return the table of the catalog
:retunr: table of the catalog
:rtype: astropy.table.Table
"""
return self._catalog
[docs] def save_catalog(self, name_file):
"""write the catalog table into a fits file
If the file already exists, it is overwrite.
:param name_file: PATH/name of the file to save into
:type name_file: string
"""
self._catalog.write(name_file, format='fits', overwrite=True)
[docs] def read_catalog(self, name_file):
"""init the sources table from a fits file
If there already are sources in the catalog, there are deleted.
The file must be a fits file with a table data
:param name_file: PATH/name of the file to read
:type name_file: string
"""
self._catalog = astt.Table.read(name_file)
self.name_file_cat = os.path.basename(name_file)
[docs] def plot_on_sphere(self, p_title=None, show=True, save=None): # pragma: no cover
"""plot the sources on a sphere
.. warning:: self.name_file_cat must be manually set
:param p_title: title of the plot. Default=None
:type p_title: string
:param show: show the plot. Default=True.
:type show: bool
:param save: PATH/name of the file to save in. If None, no saving. Default=None.
:type save: string
"""
if not p_title:
p_title = self.name_file_cat
fig = plt.figure(p_title)
ax = fig.add_subplot(111, projection="aitoff")
coord_1 = self._catalog[self.plot_coordinates[0]].to("rad").data
coord_2 = self._catalog[self.plot_coordinates[1]].to("rad").data
ax.scatter(coord_1, coord_2)
ax.grid(True)
plt.title(p_title, pad=20)
ax.set_xlabel(self.plot_coordinates[0])
ax.set_ylabel(self.plot_coordinates[1])
if save != None:
plt.savefig(save)
if not show:
plt.close()
if show:
plt.show()
# TODO: COVERAGE
[docs] def get_row_with_name(self, name):
mask = (self._catalog['name'] == name)
self.logger.debug(f'mask: {mask}')
return self._catalog[mask]
[docs]class CatalogAstroSourcesBasic(CatalogBase):
""""A catalog in equatorial coordinate
"""
def __init__(self):
"""**constructor**
Initialized without any source.
"""
super().__init__()
self._catalog = astt.Table([[], [], [], []],
names=('ra', 'dec', 'intensity', 'name'),
meta={'name': 'first table'},
dtype=('f4', 'f4', 'f4', 'S20'))
self._catalog['ra'].unit = u.deg
self._catalog['dec'].unit = u.deg
self._catalog['intensity'].unit = 'photon.cm^-2.s^-1'
self.plot_coordinates = ['ra', 'dec']
self.name_file_cat = None
# TODO: COVERAGE
[docs] def read_cat_swift_light(self):
"""set the source table with sources from the swift-BAT catalog
swift-BAT catalog is in common/data/cat/BAT_70m_catalog_20nov2012.fits
name is set from 'counterpart'
flux if 'flux' convert to keV/20
"""
self._catalog = astt.Table()
p_name = add_path_data_ref_eclairs(osp.join('cat', 'BAT_70m_catalog_20nov2012.fits'))
self.name_file_cat = p_name
hdul = fits.open(p_name)
cat_data = hdul[1].data
hdul.close()
ra = asc.Longitude(cat_data['ra'] * u.deg)
ra.wrap_angle = 180 * u.deg
col_add = astt.Column(ra, 'ra', 'f4')
self._catalog.add_column(col_add, copy=True)
col_add = astt.Column(cat_data['dec'] * u.deg, 'dec', 'f4')
self._catalog.add_column(col_add, copy=True)
col_add = astt.Column(cat_data['counterpart_name'], 'name', 'S50')
self._catalog.add_column(col_add, copy=True)
# photons @ 20keV
col_add = astt.Column((cat_data['flux'] * 1e-12 * u.erg).to('keV') / 20.0, \
'intensity', 'f4')
self._catalog.add_column(col_add, copy=True)
self._catalog['intensity'].unit = 'photon.cm^-2.s^-1'
self.logger.debug(f'intensities: {self._catalog["intensity"]}')
# def read_custom_cat(self, path_cat):
# self._catalog = astt.Table()
# p_name = path_cat
# self.name_file_cat = p_name
# hdul = fits.open(p_name)
# cat_data = hdul[1].data
# hdul.close()
# ra = asc.Longitude(cat_data['ra'] * u.deg)
# ra.wrap_angle = 180 * u.deg
# col_add = astt.Column(ra, 'ra', 'f4')
# self._catalog.add_column(col_add, copy=True)
# col_add = astt.Column(cat_data['dec'] * u.deg, 'dec', 'f4')
# self._catalog.add_column(col_add, copy=True)
# col_add = astt.Column(cat_data['counterpart_name'], 'name', 'S50')
# self._catalog.add_column(col_add, copy=True)
# # photons @ 20keV
# col_add = astt.Column((cat_data['flux'] * 1e-12 * u.erg).to('keV') / 20.0, \
# 'intensity', 'f4')
# self._catalog.add_column(col_add, copy=True)
# self._catalog['intensity'].unit = 'photon.cm^-2.s^-1'
# self.logger.debug(f'intensities: {self._catalog["intensity"]}')
[docs]class CatalogAstroWithEnergySpecSampling(CatalogBase):
"""
Represent astro catalog source in equatorial coordinate
with its energy spectrum sampling (ph/cm2/s)
"""
def __init__(self, p_energy_sampling):
"""
:param p_energy_sampling: defined boundary of energy spectrum
:type p_energy_sampling: numpy.array with dimension 1
"""
super().__init__()
assert(isinstance(p_energy_sampling, np.ndarray))
assert(p_energy_sampling.ndim == 1)
self.energy_sampling = p_energy_sampling.copy()
self.nb_channel = self.energy_sampling.size
self._catalog = astt.Table(dtype=[('ra', 'f4'), ('dec', 'f4'), \
('intensity', 'f4', (self.nb_channel,)), ('name', 'S50')])
self._catalog['ra'].unit = u.deg
self._catalog['dec'].unit = u.deg
self._catalog['intensity'].unit = 'ph.cm^-2.s^-1'
self.plot_coordinates = ['ra', 'dec']
[docs] def sort_by_flux(self):
"""
bof intensity est un tableau, critère de trie ? le premier élément ?
"""
self._catalog.sort('intensity')
[docs] def plot_flux_name(self, p_name): # pragma: no cover
tab_name = self.get_row_with_name(p_name)
self.logger.debug(f'type of tab_name[0]: {type(tab_name[0])}')
self.logger.debug(f'tab_name[0]: {tab_name[0]}')
plt.figure()
plt.title("flux of " + tab_name[0]["name"])
plt.loglog(self.energy_sampling, tab_name[0]["intensity"])
plt.xlabel("keV")
plt.ylabel(r"$ph.cm^-2.s^-1$")
plt.grid()
[docs] def plot_flux(self, idx): # pragma: no cover
plt.figure()
plt.title("flux of " + self._catalog[idx]["name"])
plt.loglog(self.energy_sampling, self._catalog[idx]["intensity"])
plt.xlabel("keV")
plt.ylabel(r"$ph.cm^-2.s^-1$")
plt.grid()
[docs] def plot_fluxes(self, l_idx): # pragma: no cover
plt.figure()
plt.title("fluxes of sources")
for idx in l_idx:
plt.loglog(self.energy_sampling, self._catalog[idx]["intensity"], \
label=self._catalog[idx]["name"])
plt.legend()
plt.xlabel("keV")
plt.ylabel(r"$ph.cm^-2.s^-1$")
plt.grid()
[docs] def plot_first_fluxes(self, nb): # pragma: no cover
l_idx = list(range(nb))
self.plot_fluxes(l_idx)
[docs]class CatalogFovBasic(CatalogBase):
"""a catalog of sources in the FOV """
def __init__(self):
"""**constructor**
init with no source
"""
self._catalog = astt.Table([[], [], [], []], names=('elev', 'dir', 'intensity', 'name'), \
meta={'name': 'first table'}, dtype=('f4', 'f4', 'f4', 'S20'))
self._catalog['elev'].unit = u.deg
self._catalog['dir'].unit = u.deg
self._catalog['intensity'].unit = 'photon.cm^-2.s^-1'
self.plot_coordinates = ['dir', 'elev']
self.name_file_cat = None
super().__init__()
[docs] def set_ptg_instru(self, p_ptg_instru: ptgsvom.AttitudeVT):
"""
set the pointing object associated to this catalog
:param p_ptg_instru: pointing of the instrument for this catalog
:type p_ptg_instru: pointing_instrument.AttitudeVT
"""
self.ptg_instru = p_ptg_instru
[docs] def intensity_with_channel(self, nb_chan):
"""
raz all catalog and change format
"""
# we must redefine intensity column with array shape
#
self._catalog = astt.Table(dtype=[('elev', 'f4'),
('dir', 'f4'),
('intensity', 'f4', (nb_chan,)),
('name', 'S50')])
[docs] def from_astro_catalog_with_spectrum(self, p_cat_radec):
"""Define catalog with astronomical catalog (in equatorial coordinate) with spectrum
:param p_cat_radec: sky catalog with source spectrum
:type p_cat_radec: CatalogAstroWithEnergySpecSampling
"""
# we must redefine intensity column with array shape
self._catalog = astt.Table(dtype=p_cat_radec._catalog.dtype)
self._catalog.rename_column('ra', 'elev')
self._catalog.rename_column('dec', 'dir')
# select source in FOV and convert radec to direlev
self.from_astro_catalog(p_cat_radec)
[docs] def from_astro_catalog(self, p_cat_radec: CatalogAstroSourcesBasic, p_limit_elevation=0):
"""add sources from an astro catalog and convert coordinate in elevation, direction
p_cat : catalaog in ra , dec coordinate
p_limit_elevation deg
.. note:: need set_ptg_instru
:param p_cat_radec: catalog in ra, dec coordinates
:type p_cat_radec: CatalogAstroSourcesBasic
:param p_limit_elevation: limit in elevation in deg. Default is 0.
:type p_limit_elevation: float
"""
# # convert ra ,dec to xyz instru frame
radec = np.empty((p_cat_radec.get_nb_element(), 2))
radec[:, 0] = p_cat_radec._catalog['ra'].data
radec[:, 1] = p_cat_radec._catalog['dec'].data
xyz = self.ptg_instru.cat_user_to_instru(radec, deg=True)
# # compute elevation and direction angle
direlev = ptgsvom.xyz_to_direlev(xyz, deg=True)
dir_a = direlev[:, 0]
elev = direlev[:, 1]
# add sources in largest cone of FOV
idx_fov = np.where(elev > p_limit_elevation)[0]
for idx in idx_fov:
info_src = [elev[idx], dir_a[idx], p_cat_radec._catalog['intensity'][idx], \
p_cat_radec._catalog['name'][idx]]
self.add_src(info_src)
[docs] def to_astro_catalog(self, verbose=False):
"""return an astro catalog in ra, dec coordinates based on the source table
.. note:: need set_ptg_instru
:return: astro catalog
:rtype: CatalogAstroSourcesBasic
"""
cat_astro = CatalogAstroSourcesBasic()
#
dir_elev = np.empty((self.get_nb_element(), 2), dtype=np.float64)
dir_elev[:, 0] = self._catalog['dir']
dir_elev[:, 1] = self._catalog['elev']
xyz = ptgsvom.direlev_to_xyz(dir_elev, deg=True)
if verbose:
lon = self._catalog['dir']
lat = self._catalog['elev']
oconv = asc.UnitSphericalRepresentation(lon, lat)
ptg_xyz = np.transpose(oconv.to_cartesian().xyz)
self.logger.debug(f'shape of ptg_xyz: {ptg_xyz.shape}')
radec = self.ptg_instru.instru_to_cat_user(xyz, deg=True)
ra = asc.Longitude(radec[:, 0] * u.deg)
ra.wrap_angle = 180 * u.deg
dec = asc.Latitude(radec[:, 1] * u.deg)
dec.wrap_angle = 90 * u.deg
col_add = astt.Column(ra, 'ra', 'f4')
cat_astro._catalog = astt.Table()
cat_astro._catalog.add_column(col_add, copy=True)
col_add = astt.Column(dec, 'dec', 'f4')
cat_astro._catalog.add_column(col_add, copy=True)
col_add = astt.Column(self._catalog["name"], 'name', 'S50')
cat_astro._catalog.add_column(col_add, copy=True)
col_add = astt.Column(self._catalog["intensity"], 'intensity', 'f4')
cat_astro._catalog.add_column(col_add, copy=True)
cat_astro._catalog['intensity'].unit = 'photon.cm^-2.s^-1'
return cat_astro
[docs] def plot_on_astro_sphere(self, p_title=None, show=True, save=None): # pragma: no cover
"""plot the sources on the sky, with RA and DEC
.. note:: different to plot_on_sphere that plot with elev and dir
:param p_title: title of the plot. Default=None
:type p_title: string
:param show: show the plot. Default=True.
:type show: bool
:param save: PATH/name of the file to save in. If None, no saving. Default=None.
:type save: string
"""
if not p_title:
p_title = self.name_file_cat
fig = plt.figure(p_title)
ax = fig.add_subplot(111, projection="aitoff")
ra = self.to_astro_catalog()._catalog['ra'].to("rad").data
dec = self.to_astro_catalog()._catalog['dec'].to("rad").data
ax.scatter(ra, dec)
ax.grid(True)
plt.title(p_title, pad=20)
ax.set_xlabel('ra')
ax.set_ylabel('dec')
if save != None:
plt.savefig(save)
if not show:
plt.close()
if show:
plt.show()
[docs]class CatalogFovEclairs(CatalogFovBasic):
"""catalog of x-ray sources for the ECLAIRs instrument
"""
def __init__(self, instru_eclairs=None):
"""**constructor**
ECLAIRs geometry initialization (in cm)
size of the sky array = 200
Initialized without any source.
:param instru_eclairs: ECLAIRs instrument
:type instru_eclairs: InstruECLAIRs
"""
if instru_eclairs:
self.instru_eclairs = instru_eclairs
else:
self.instru_eclairs = InstruECLAIRs()
self.dist_mask_detect = self.instru_eclairs.dist_mask_detect
super().__init__()
[docs] def from_astro_catalog(self, p_cat_radec: CatalogAstroSourcesBasic, p_limit_elevation=0):
"""add sources from an astro catalog
Only sources in the FOV will be added
.. note:: need set_ptg_instru
:param p_cat_radec: catalog in ra, dec coordinates
:type p_cat_radec: CatalogAstroSourcesBasic
:param p_limit_elevation: limit in elevation in deg. Default is 0.
:type p_limit_elevation: float
"""
# convert in elevation, direction in FOV
super().from_astro_catalog(p_cat_radec, \
self.instru_eclairs.get_elevation_limit_fov_corner())
# remove source out of FOV for ECLAIRs square aperture
not_in_fov = []
for idx, src in enumerate(self._catalog):
surface = 0.0
elev = np.deg2rad(src['elev'])
dira = np.deg2rad(src['dir'])
for mask in self.instru_eclairs._l_mask:
self.instru_eclairs.set_mask(mask)
self.instru_eclairs.set_pos_source_fov(elev, dira)
surface += self.instru_eclairs.get_surface_rect_proj()
if surface == 0.0:
not_in_fov.append(idx)
self._catalog.remove_rows(not_in_fov)
self.logger.info(f'final number of sources in FOV: {self.get_nb_element()}')
[docs] def add_src_pix(self, info_src_pix):
"""add a source in sky pixel coordinates to the catalog
'pix_y','pix_z','intensity','name'
(0,0) is at the center of the sky array
:param info_src_pix: sky pixel info of the source ['pix_y','pix_z','intensity','name']
:type info_src_pix: [ float, float, float, string]
"""
elev, direction = self.instru_eclairs.skypix_to_elevdir(info_src_pix[0], info_src_pix[1])
info_src = [elev, direction, info_src_pix[2], info_src_pix[3]]
self._catalog.add_row(info_src)
# #
# @param
[docs] def get_catalog_sky_pix(self):
"""return the catalog with the position of the sources in pixel of the sky array
(0,0) is at the center of the sky array
:return: catalog table with sources positions in sky_pix format
:rtype: astropy.table.Table
"""
cat = self.get_catalog().copy()
pix_y, pix_z = self.instru_eclairs.elevdir_to_skypix(np.array(cat['elev']), \
np.array(cat['dir']))
cat.rename_column('elev', 'sky_y')
cat.rename_column('dir', 'sky_z')
cat['sky_y'].unit = u.pix
cat['sky_z'].unit = u.pix
cat['sky_y'][:] = pix_y
cat['sky_z'][:] = pix_z
return cat
[docs] def save_catalog_pix(self, name_file):
"""save the catalog in pix format to a fits file
:param name_file: PATH/name of the fits file
:type name_file: string
"""
cat = self.get_catalog_sky_pix()
cat.write(name_file, format='fits', overwrite=True)
[docs]class CatalogIdentifiedSources(CatalogFovBasic):
"""catalog of x-ray sources identified by IMAG
format from General Program Software Description
"""
def __init__(self):
"""**constructor**
Initialized without any source.
meta_col = {'sourceID' : 'Source ID',
'Y' : "rounded sky pixel [-99:99]",
'Z' : "rounded sky pixel [-99:99]",
'Y_fit' : "sky pixel [-99:99]",
'error_Y': 'error in x at 90% c.l.',
'Z_fit' : "sky pixel [-99:99]",
'error_Z': 'error in y at 90% c.l.',
'ra': 'Right Ascension (0.:360.)',
'dec': 'Declination (-90.:90.)',
'errorrad': 'error radius at 90% c.l.',
'flux': 'fit flux',
'pflux': 'pixel flux',
'errflux': 'error in fit flux',
'snr': 'snr at the source position',
'name': 'identified source name',
'dist_src': 'distance to identified, catalog source',
'class': 'identified source type code'
}
"""
self.name_col = ('sourceID', 'Y', 'Z', 'Y_fit', 'error_Y', 'Z_fit', 'error_Z', 'ra', 'dec', \
'errorrad', 'flux', 'pflux', 'errflux', 'snr', 'name', 'dist_src', 'class')
meta_col = {'sourceID': 'Source ID',
'Y': "rounded sky pixel [-99:99]",
'Z': "rounded sky pixel [-99:99]",
'Y_fit': "sky pixel [-99:99]",
'error_Y': 'error in x at 90% c.l.',
'Z_fit': "sky pixel [-99:99]",
'error_Z': 'error in y at 90% c.l.',
'ra': 'Right Ascension (0.:360.)',
'dec': 'Declination (-90.:90.)',
'errorrad': 'error radius at 90% c.l.',
'flux': 'fit flux',
'pflux': 'pixel flux',
'errflux': 'error in fit flux',
'snr': 'snr at the source position',
'name': 'identified source name',
'dist_src': 'distance to identified, catalog source',
'class': 'identified source type code'
}
dtype_col = ('S20', 'i2', 'i2', 'f8', 'f8', 'f8', 'f8', 'f8', 'f8', 'f8', 'f8', 'f8', \
'f8', 'f8', 'S20', 'f8', 'i2')
self._catalog = astt.Table(names=self.name_col, meta=meta_col, dtype=dtype_col)
self._catalog['Y'].unit = u.pix
self._catalog['Z'].unit = u.pix
self._catalog['Y_fit'].unit = u.pix
self._catalog['error_Y'].unit = u.pix
self._catalog['Z_fit'].unit = u.pix
self._catalog['error_Z'].unit = u.pix
self._catalog['ra'].unit = u.deg
self._catalog['ra'].format = "%6.3f"
self._catalog['dec'].unit = u.deg
self._catalog['dec'].format = "%6.3f"
self._catalog['errorrad'].unit = u.deg
self._catalog['flux'].unit = 'ct.s^-1.cm^-2'
self._catalog['flux'].format = "%6.3f"
self._catalog['pflux'].unit = 'ct.s^-1.cm^-2'
self._catalog['errflux'].unit = 'ct.s^-1.cm^-2'
self._catalog['errflux'].format = "%6.3f"
self._catalog['snr'].format = "%6.3f"
self._catalog['dist_src'].unit = u.pix
self.plot_coordinates = ['ra', 'dec']
self.name_file_cat = 'identified sources'
# declare logger
self.logger = logging.getLogger(__name__)
# This method is superseeded in IMAG by the genfits_ecl_sop_ima fonction
[docs] def save_catalog(self, file_path, attitude=['', '', ''], tstart='0', tstop='1', \
creator='IMAG?', proc_id="01"):
"""write the catalog table into a fits file ECL-SOPcos_theta_correction-IMA
common keywords are included
filename is 'ECL-SOP-IMA-' + proc_id + '.fits'
:param file_path: PATH to the directory where to write the file
:type file_path: string
:param attitude: [ra, dec, ori] in degrees. Default is None
:type attitude: [float, float, float]
:param tstart: start time of the observation in s from mjdref
:type tstart: float
:param tstop: end time of the observation in s from mjdref
:type tstop: float
:param creator: program that has generated the file. Default='IMAG?'
:type creator: string
:param proc_id: id of the process. Default="01".
:type proc_id: string
"""
# construction of the fits file
tbhdu = fits.table_to_hdu(self._catalog)
prihdr = fits.Header()
prihdu = fits.PrimaryHDU(header=prihdr)
obstime = float(tstop) - float(tstart)
fits_tools.pointing_keywords(prihdu, attitude, obstime)
fits_tools.common_keyword(prihdu, tstart, tstop, creator, 1.0, proc_id)
prihdu.header['CARD'] = ("ECL-SOP-IMA", "Product type")
fits_tools.common_keyword(tbhdu, tstart, tstop, creator, 1.0, proc_id)
thdulist = fits.HDUList([prihdu, tbhdu])
thdulist[1].header['comment'] = "source parameters from sky images"
thdulist[1].name = 'SOPIMA'
filename = 'ECL-SOP-IMA-' + proc_id + '.fits'
thdulist.writeto(osp.join(file_path, filename), \
overwrite=True, checksum=True)
[docs] def add_from_catalogFOVECLAIRs(self, eclairs_catal):
"""
add to the sources catalog a simple ECLAIRs catalog CatalogFovEclairs
the unkown data are left blank
.. note:: metadata_confilts may cause warnings
:param eclairs_catal: sources catalog the sources are added
from (elev, dir, intensity, name)
:type eclairs_catal: CatalogFovEclairs
"""
# elev','dir','intensity','name'
# reformating of eclairs_catal
catalog_to_convert = astt.join(
eclairs_catal.get_catalog_sky_pix(),
eclairs_catal.to_astro_catalog().get_catalog()
)
catalog_to_convert['intensity'].name = 'flux'
catalog_to_convert['flux'].unit = self._catalog['flux'].unit
catalog_to_convert['sky_y'].name = 'Y_fit'
catalog_to_convert['sky_z'].name = 'Z_fit'
self._catalog = astt.vstack([self._catalog, catalog_to_convert], \
metadata_conflicts='silent')
self._catalog['Y'] = np.array(self._catalog['Y_fit']).copy().round().astype(int)
self._catalog['Z'] = np.array(self._catalog['Z_fit']).copy().round().astype(int)
self._catalog = self._catalog[self.name_col]
[docs] def cos_theta_correction(self, instru_eclairs=InstruECLAIRs(), verbose=None):
"""apply off axis corrections on the flux of the sources
:param instru_eclairs: ECLAIRs instrument
:type instru_eclairs: InstruECLAIRs
:param verbose: activate print outputs
:type verbose: bool (default False)
"""
self._catalog['flux'] = self._catalog['flux'] / \
np.sin(np.radians(instru_eclairs.skypix_to_elevdir(self._catalog['Y_fit'], \
self._catalog['Z_fit'])[0]))
self._catalog['pflux'] = self._catalog['pflux'] / \
np.sin(np.radians(instru_eclairs.skypix_to_elevdir(self._catalog['Y'], \
self._catalog['Z'])[0]))
self._catalog['errflux'] = self._catalog['errflux'] / \
np.sin(np.radians(instru_eclairs.skypix_to_elevdir(self._catalog['Y_fit'], \
self._catalog['Z_fit'])[0]))
if verbose:
self.logger.info('off axis correction with cos(theta)')
[docs] def irf_correction(self, dpix, verbose=None):
"""Apply the irf correction matrix to the sources fluxes.
:param dpix: instrument effects
:type dpix: ECLAIRsDetectorEffectDefault
:param verbose: activate print outputs
:type verbose: bool (default False)
"""
for source in self._catalog:
irf_coef = dpix.get_irf_val_ecllos(source['Y'], source['Z'])
source['flux'] = source['flux'] / irf_coef
irf_coef = dpix.get_irf_val_ecllos(source['Y_fit'], source['Z_fit'])
source['pflux'] = source['pflux'] / irf_coef
source['errflux'] = source['errflux'] / irf_coef
if verbose:
self.logger.info('off axis correction with the irf')
[docs] def arf_correction(self, dpix, energy_range):
"""apply the arf correction to the sources fluxes
:param dpix: instrument effects
:type dpix: ECLAIRsDetectorEffectDefault
:param energy_range: energy range (in PI) [low_energy, high_energy]
:type energy_range: [int, int]
"""
for source in self._catalog:
arf_coef = np.mean(dpix.arf_percent()[energy_range[0]:energy_range[1] + 1]) / 100.0
source['flux'] = source['flux'] / arf_coef
source['pflux'] = source['pflux'] / arf_coef
source['errflux'] = source['errflux'] / arf_coef