Source code for xrtpy.response.effective_area

__all__ = [
    "EffectiveAreaFundamental",
]

import datetime
import math
from functools import cached_property
from pathlib import Path
from typing import NamedTuple

import astropy.time
import numpy as np
import scipy.interpolate
import scipy.io
import sunpy.io.special
import sunpy.time
from astropy import units as u
from astropy.utils.data import get_pkg_data_filename
from scipy import interpolate

from xrtpy.response.channel import Channel, resolve_filter_name
from xrtpy.util.time import epoch

index_mapping_to_fw1_name = {
    "Open": 0,
    "Al-poly": 1,
    "C-poly": 2,
    "Be-thin": 3,
    "Be-med": 4,
    "Al-med": 5,
}

index_mapping_to_fw2_name = {
    "Open": 0,
    "Al-mesh": 1,
    "Ti-poly": 2,
    "G-band": 3,
    "Al-thick": 4,
    "Be-thick": 5,
}

index_mapping_to_multi_filter = {
    "Al-poly/Al-mesh": 9,
    "Al-poly/Ti-poly": 10,
    "Al-poly/Al-thick": 11,
    "Al-poly/Be-thick": 12,
    "C-poly/Ti-poly": 13,
}


_ccd_contam_filename = (
    Path(__file__).parent.absolute() / "data" / "xrt_contam_on_ccd.sav"
)

_filter_contam_filename = (
    Path(__file__).parent.absolute() / "data" / "xrt_contam_on_filter.geny"
)


class ParsedFilter(NamedTuple):
    filter1: str
    filter2: str | None
    is_combo: bool


def parse_filter_input(filter_string):
    """
    Parses and validates a filter input string using official filter mappings.

    Parameters
    ----------
    filter_string : str
        A string representing either a single filter (e.g., "Al-poly")
        or a filter combo (e.g., "Al-poly/Ti-poly").

    Returns
    -------
    ParsedFilter
        Named tuple with 'filter1', 'filter2', and 'is_combo' boolean.

    Raises
    ------
    ValueError
        If the filter or combo is invalid.
    """
    if not isinstance(filter_string, str):
        raise TypeError("Filter name must be a string.")

    standardized = resolve_filter_name(filter_string.strip())

    if "/" in standardized:
        if standardized not in index_mapping_to_multi_filter:
            raise ValueError(
                f"'{standardized}' is not a valid filter combination.\n"
                f"Valid combinations are: {sorted(index_mapping_to_multi_filter)}"
            )
        f1, f2 = standardized.split("/")
        return ParsedFilter(filter1=f1, filter2=f2, is_combo=True)

    elif (
        standardized in index_mapping_to_fw1_name
        or standardized in index_mapping_to_fw2_name
    ):
        return ParsedFilter(filter1=standardized, filter2=None, is_combo=False)

    else:
        raise ValueError(
            f"'{standardized}' is not a recognized filter in either filter wheel.\n"
            f"Valid FW1 filters: {sorted(index_mapping_to_fw1_name)}\n"
            f"Valid FW2 filters: {sorted(index_mapping_to_fw2_name)}"
        )


[docs] class EffectiveAreaFundamental: """ Class for calculating the effective area for an XRT filter at a specific observation date. This class handles the calculations required to determine the effective area of a filter used in the X-Ray Telescope (XRT) on the Hinode satellite. It considers various factors such as contamination on the CCD and filters, as well as the geometry and transmission of the XRT channel. Parameters ---------- filter_name : str The name of the filter. observation_date : str or datetime.datetime The date of the observation. Acceptable formats include any string or datetime object that can be parsed by `sunpy.time.parse_time`. """ def __init__(self, filter_name, observation_date): self._raw_input_name = filter_name self._parsed_filter = parse_filter_input(filter_name) self._filter1_name = self._parsed_filter.filter1 self._filter2_name = self._parsed_filter.filter2 self._is_combo = self._parsed_filter.is_combo # Store the standardized/resolved name if self._is_combo: self._name = f"{self._filter1_name}/{self._filter2_name}" else: self._name = self._filter1_name self._observation_date = sunpy.time.parse_time(observation_date) self._channel = Channel(self.name) self.observation_date = observation_date self._channel = Channel(self.name) @property def name(self) -> str: """ The resolved name of the filter or filter combination. """ return self._name @property def filter1_name(self) -> str: """ Name of the first filter given. """ return self._filter1_name @property def filter2_name(self) -> str: """ Name of the second filter given. """ return self._filter2_name @property def is_combo(self) -> bool: """ True if the filter is a combination (e.g., 'Al-poly/Ti-poly'), False otherwise. """ return self._is_combo @property def observation_date(self) -> str: """ Date of observation. """ return self._observation_date @observation_date.setter def observation_date(self, date): """ Validates the user's requested observation date. """ observation_date = sunpy.time.parse_time(date) if observation_date <= epoch: raise ValueError( f"\nInvalid date: {observation_date.datetime}.\n" f"Date must be after {epoch}." ) modified_time_path = Path(_ccd_contam_filename).stat().st_mtime modified_time = astropy.time.Time(modified_time_path, format="unix") # REMOVE # latest_available_ccd_data = _ccd_contamination_file_time[-1].datetime.strftime( # "%Y/%m/%d" # ) latest_available_ccd_data = self._ccd_contamination_file_time[ -1 ].datetime.strftime("%Y/%m/%d") modified_time_datetime = datetime.datetime.fromtimestamp( modified_time_path ).strftime("%Y/%m/%d") if observation_date > modified_time: raise ValueError( "\nNo contamination data is presently available for " f"{observation_date.datetime}.\nThe latest available data is on " f"{latest_available_ccd_data}.\nContamination data is " "updated periodically. The last update was on " f"{modified_time_datetime}. If this is more " "than one month ago, please raise an issue at: " "https://github.com/HinodeXRT/xrtpy/issues/new" ) self._observation_date = observation_date @cached_property def _filter_contam_file(self): return scipy.io.readsav(_filter_contam_filename) @cached_property def _filter_contamination_file_time(self): return astropy.time.Time( self._filter_contam_file["p1"], format="utime", scale="utc" ) @cached_property def _filter_contamination(self): return self._filter_contam_file["p2"]
[docs] @cached_property def ccd_contam_data(self): return scipy.io.readsav(_ccd_contam_filename)
@cached_property def _ccd_contamination_file_time(self): """ CCD contamination time axis from the .sav file. """ return astropy.time.Time( self.ccd_contam_data["p1"], format="utime", scale="utc" ) @cached_property def _ccd_contamination(self): """ CCD contamination thickness values from the .sav file. """ return self.ccd_contam_data["p2"] @property def contamination_on_CCD(self): """ Calculate the thickness of the contamination layer on the CCD. This property interpolates the contamination data over time to determine the thickness of the contamination layer on the CCD at the observation date. The contamination layer is measured in Angstroms (Å). Returns ------- astropy.units.Quantity The thickness of the contamination layer on the CCD, in Angstroms. Notes ----- The interpolation is performed using a linear interpolation method over the available contamination data points. The `observation_date` attribute is used to provide the point at which to evaluate the interpolation. Raises ------ ValueError If the observation date is outside the range of the available contamination data. """ interpolater = scipy.interpolate.interp1d( self._ccd_contamination_file_time.utime, self._ccd_contamination, kind="linear", ) return interpolater(self.observation_date.utime) @staticmethod def _get_filter_wheel_number(filter_name: str) -> int: """Returns 0 if the filter is in FW1, 1 if in FW2. Raises error if unknown.""" if filter_name in index_mapping_to_fw1_name: return 0 elif filter_name in index_mapping_to_fw2_name: return 1 else: raise ValueError(f"Filter '{filter_name}' not found in FW1 or FW2.") @property def _filter_wheel_number(self): """Defining chosen filter to its corresponding filter wheel.""" return 0 if self.name in index_mapping_to_fw1_name else 1 @property def _combo_filter_name_split(self): """Returns (filter1, filter2) even for a single filter (filter2 is None).""" return self.filter1_name, self.filter2_name @property def _filter1_wheel(self): """Returns filter wheel number for filter1 (0 or 1).""" return self._get_filter_wheel_number(self.filter1_name) @property def _filter2_wheel(self): """Returns filter wheel number for filter2 if combo; otherwise None.""" if not self.is_combo or self.filter2_name is None: return None return self._get_filter_wheel_number(self.filter2_name) @staticmethod def _get_filter_index(filter_name: str) -> int: """Returns the index value for a filter name from either wheel mapping.""" if filter_name in index_mapping_to_fw1_name: return index_mapping_to_fw1_name[filter_name] elif filter_name in index_mapping_to_fw2_name: return index_mapping_to_fw2_name[filter_name] else: raise ValueError( f"Filter '{filter_name}' not found in filter index mappings." ) @property def filter_data_dates_to_seconds(self): """Returns the contamination file time axis in seconds (utime).""" return self._filter_contamination_file_time.utime @property def filter_observation_date_to_seconds(self): """Returns the observation date in seconds (utime).""" return self.observation_date.utime @property def _filter1_data(self): """Returns filter contamination data for filter 1.""" filter1_index = ( index_mapping_to_fw1_name.get(self.filter1_name) if self._filter1_wheel == 0 else index_mapping_to_fw2_name.get(self.filter1_name) ) return self._filter_contamination[filter1_index][self._filter1_wheel] @property def _filter2_data(self): """Returns filter contamination data for filter 2, if combo.""" if not self.is_combo or self.filter2_name is None: return None filter2_index = ( index_mapping_to_fw1_name.get(self.filter2_name) if self._filter2_wheel == 0 else index_mapping_to_fw2_name.get(self.filter2_name) ) return self._filter_contamination[filter2_index][self._filter2_wheel] @property def contamination_on_filter1(self) -> u.angstrom: """ Interpolates contamination layer thickness on filter1. """ interpolater = scipy.interpolate.interp1d( self._filter_contamination_file_time.utime, self._filter1_data, kind="linear", ) return interpolater(self.observation_date.utime) @property def contamination_on_filter2(self) -> u.Quantity | None: """ Interpolates the contamination thickness on filter2, if using a combo filter. Returns ------- astropy.units.Quantity or None Contamination thickness in Angstroms for filter2 if present; otherwise None. """ if not self.is_combo or self._filter2_data is None: return None interpolater = scipy.interpolate.interp1d( self._filter_contamination_file_time.utime, self._filter2_data, kind="linear", ) return interpolater(self.observation_date.utime)
[docs] @cached_property def n_DEHP_attributes(self): """(Diethylhexylphthalate) Wavelength (nm), Delta, Beta.""" _n_DEHP_filename = get_pkg_data_filename( "data/n_DEHP.txt", package="xrtpy.response" ) with Path(_n_DEHP_filename).open() as n_DEHP: list_of_DEHP_attributes = [] for line in n_DEHP: stripped_line = line.strip() line_list = stripped_line.split() list_of_DEHP_attributes.append(line_list) return list_of_DEHP_attributes
[docs] @cached_property def n_DEHP_wavelength(self): """(Diethylhexylphthalate) Wavelength given in Angstrom (Å).""" # Convert wavelength values from nanometers to Angstroms wavelength_str = [ self.n_DEHP_attributes[i][0] for i in range(2, len(self.n_DEHP_attributes)) ] return np.array([float(i) * 10 for i in wavelength_str])
[docs] @cached_property def n_DEHP_delta(self): """(Diethylhexylphthalate) Delta.""" delta_str = [ self.n_DEHP_attributes[i][1] for i in range(2, len(self.n_DEHP_attributes)) ] # Converting from str to float delta_float = np.array( [float(delta_str[i]) for i in range(len(self.n_DEHP_wavelength))] ) # Interpolate so ranges are the same return interpolate.interp1d(self.n_DEHP_wavelength, delta_float)( self.n_DEHP_wavelength )
[docs] @cached_property def n_DEHP_beta(self): """(Diethylhexylphthalate) Beta.""" beta_str = [ self.n_DEHP_attributes[i][2] for i in range(2, len(self.n_DEHP_attributes)) ] # Converting from str to float beta_float = np.array( [float(beta_str[i]) for i in range(len(self.n_DEHP_wavelength))] ) # Interpolate so ranges are the same return interpolate.interp1d(self.n_DEHP_wavelength, beta_float)( self.n_DEHP_wavelength )
@cached_property def _transmission_equation(self): """ Define equations used to calculate the effective area. This method sets up the necessary equations to calculate the effective area of a filter in the X-Ray Telescope (XRT) on the Hinode satellite. The calculations are based on the principles of modern optics as described in G.R Fowles, "Introduction to Modern Optics," 2nd Edition, pp 96-101. Returns ------- tuple A tuple containing the index of refraction, sine of the angle of incidence, cosine of the angle of incidence, maximum wavelength, index of refraction of the medium at the entrance, index of refraction of the medium at the exit, and the angle of incidence. """ n_o = 1.0 # index of medium at entrance of filter (assumed vacuum) n_t = 1.0 # index of medium at exit of filter (assumed vacuum) incidence_angle = 0 # Angle of incidence on Filter in radians wavelength_max = 4000 # Max wavelength in Angstroms index = [ (complex((1 - self.n_DEHP_delta[i]), (1.0 * self.n_DEHP_beta[i]))) for i in range(4000) ] # Snell's law sin_a = (n_o * np.sin(incidence_angle)) / index cos_a = 1 return (index, sin_a, cos_a, wavelength_max, n_o, n_t, incidence_angle) @cached_property def _angular_wavenumber_CCD(self): """Define angular wavenumber on CCD.""" index, _, cos_a, wavelength_max, _, _, _ = self._transmission_equation # Define wavevector angular_wavenumber = np.array( [ (2.0 * math.pi * index[i] * cos_a) / self.n_DEHP_wavelength[i] for i in range(4000) ] ) # Multiply by thickness angular_wavenumber_thickness = angular_wavenumber * self.contamination_on_CCD real_angular_wavenumber = angular_wavenumber_thickness.real imaginary_angular_wavenumber = angular_wavenumber_thickness.imag return [ (complex(real_angular_wavenumber[i], imaginary_angular_wavenumber[i])) for i in range(4000) ] @cached_property def _filter_contamination_angular_wavenumber(self): """Define angular wavenumber for the filter(s), considering contamination thickness.""" index, _, cos_a, _, _, _, _ = self._transmission_equation angular_wavenumber = np.array( [ (2.0 * math.pi * index[i] * cos_a) / self.n_DEHP_wavelength[i] for i in range(4000) ] ) if self.is_combo: angular_wavenumber_thickness_f1 = ( angular_wavenumber * self.contamination_on_filter1 ) angular_wavenumber_thickness_f2 = ( angular_wavenumber * self.contamination_on_filter2 ) real_angular_f1 = angular_wavenumber_thickness_f1.real imag_angular_f1 = angular_wavenumber_thickness_f1.imag real_angular_f2 = angular_wavenumber_thickness_f2.real imag_angular_f2 = angular_wavenumber_thickness_f2.imag angular_wavenumber_combo = [ complex( real_angular_f1[i] + real_angular_f2[i], imag_angular_f1[i] + imag_angular_f2[i], ) for i in range(4000) ] return angular_wavenumber_combo else: angular_wavenumber_thickness = ( angular_wavenumber * self.contamination_on_filter1 ) real_angular = angular_wavenumber_thickness.real imag_angular = angular_wavenumber_thickness.imag return [complex(real_angular[i], imag_angular[i]) for i in range(4000)] @cached_property def _CCD_contamination_transmission(self): """Calculate transmission matrix coefficient and transmittance on the CCD.""" index, _, _, _, n_o, n_t, _ = self._transmission_equation i_i = complex(0, 1) # Define complex number # Define transfer matrix M = [ [ [ np.cos(self._angular_wavenumber_CCD[i]), (-i_i * np.sin(self._angular_wavenumber_CCD[i])) / index[i], ], [ -i_i * np.sin(self._angular_wavenumber_CCD[i]) * index[i], np.cos(self._angular_wavenumber_CCD[i]), ], ] for i in range(4000) ] transmittance = [ 2 * n_o / ( (M[i][0][0] * n_o) + (M[i][0][1] * n_o * n_t) + (M[i][1][0]) + (M[i][1][1] * n_t) ) for i in range(4000) ] return np.array([abs(transmittance[i] ** 2) for i in range(4000)]) @property def wavelength(self): """Array of wavelengths for every X-ray channel in Angstroms (Å).""" _wave = self._channel.wavelength.to_value("AA") return _wave * u.Angstrom @property def channel_geometry_aperture_area(self): """XRT flight model geometry aperture area.""" return self._channel.geometry.geometry_aperture_area @property def channel_transmission(self): """XRT channel transmission.""" return np.interp( self.wavelength, self._channel.wavelength, self._channel.transmission ) def _contamination_interpolator(self, x, y): return np.interp(self.wavelength.to_value("Angstrom"), x, y) @property def _interpolated_CCD_contamination_transmission(self): """Interpolate filter contam transmission to the wavelength.""" return self._contamination_interpolator( self.n_DEHP_wavelength, self._CCD_contamination_transmission, ) @cached_property def _filter_contamination_transmission(self): """Calculate transmission matrix coefficient and transmittance on the filter(s).""" index, _, _, _, n_o, n_t, _ = self._transmission_equation i_i = complex(0, 1) # angular_wave = self._filterwheel_angular_wavenumber #handles both cases - filter 1 or 2 or angular_wave = self._filter_contamination_angular_wavenumber M = [ [ [ np.cos(angular_wave[i]), (-i_i * np.sin(angular_wave[i])) / index[i], ], [ -i_i * np.sin(angular_wave[i]) * index[i], np.cos(angular_wave[i]), ], ] for i in range(4000) ] transmittance = [ 2 * n_o / ( (M[i][0][0] * n_o) + (M[i][0][1] * n_o * n_t) + (M[i][1][0]) + (M[i][1][1] * n_t) ) for i in range(4000) ] return np.array([abs(transmittance[i] ** 2) for i in range(4000)]) @property def _interpolated_filter_contamination_transmission(self): """Interpolate filter contam transmission to the wavelength.""" return self._contamination_interpolator( self.n_DEHP_wavelength, self._filter_contamination_transmission, )
[docs] @u.quantity_input def effective_area(self) -> u.cm**2: r""" Calculate the Effective Area. The effective area is calculated by considering the geometry of the XRT flight model, the channel transmission, and the contamination layers on both the CCD and the filter. Returns ------- astropy.units.Quantity Effective area in cm\ :math:`^2`. Notes ----- The effective area is a crucial parameter for determining the sensitivity of the XRT to X-ray emissions. This method combines various factors, including the physical properties of the filter and CCD, to compute the total effective area. """ return ( self.channel_geometry_aperture_area * self.channel_transmission * self._interpolated_CCD_contamination_transmission * self._interpolated_filter_contamination_transmission )