Source code for firefly_instrument

"""

Provides a set of functions to handle instrumental effects.

:func:`log_rebin` has been pulled from
:mod:`mangadap.contrib.ppxf_util.py` and modified.

*Source location*:
    $MANGADAP_DIR/python/mangadap/util/instrument.py

*Python2/3 compliance*::

    from __future__ import division
    from __future__ import print_function
    from __future__ import absolute_import
    
    import sys
    if sys.version > '3':
        long = int

*Imports*::

    import warnings
    import numpy
    from scipy.interpolate import InterpolatedUnivariateSpline
    import astropy.constants

*Revision history*:
    | **24 May 2017 - Implemented the new downgrader version (D. Goddard)

"""

from __future__ import division
from __future__ import print_function
from __future__ import absolute_import

import sys
if sys.version > '3':
    long = int

import warnings
from scipy.interpolate import InterpolatedUnivariateSpline
import numpy as np
import astropy
from astropy.constants import c as speedOfLight
c = speedOfLight.to('km/s').value
from constants import *

[docs]def where_not(indx, size): """ Return a tuple with the indices of a vector that were *not* selected by a call to `np.where`_. **The function currently only works for 1D vectors.** Args: indx (tuple): Tuple returned by a call to `np.where`_ for a 1D vector. size (int): Length of the original vector in the call to `np.where`_. .. warning:: Performs **no** checks of the input. """ return (np.setdiff1d(np.arange(0,size), indx[0]),)
[docs]def spectral_coordinate_step(wave, log=False, base=10.0): """ Return the sampling step for the input wavelength vector. If the sampling is logarithmic, return the change in the logarithm of the wavelength; otherwise, return the linear step in angstroms. Args: wave (np.ndarray): Wavelength coordinates of each spectral channel in angstroms. log (bool): (**Optional**) Input spectrum has been sampled geometrically. base (float): (**Optional**) If sampled geometrically, the sampling is done using a logarithm with this base. For natural logarithm, use np.exp(1). Returns: float: Spectral sampling step in either angstroms (log=False) or the step in log(angstroms). """ dw = np.diff(np.log(wave))/np.log(base) if log else np.diff(wave) # print('mean: ', np.mean(dw)) # print('sdev: ', np.std(dw)) return np.mean(dw)
# return (np.diff(np.log(wave[0:2]))/np.log(base) if log else np.diff(wave[0:2]))[0]
[docs]def spectrum_velocity_scale(wave): """ Determine the velocity sampling of an input wavelength coordinate vector. .. note:: The wavelength vector is assumed to be geometrically sampled! However, the input units expected to be in angstroms, not, e.g., log(angstrom). Args: wave (np.ndarray): Wavelength coordinates of each spectral channel in angstroms. It is expected that the spectrum has been sampled geometrically Returns: float: Velocity scale of the spectrum in km/s. """ return astropy.constants.c.to('km/s').value*spectral_coordinate_step(wave, log=True, base=np.exp(1.))
[docs]class convolution_integral_element: """ Support class for variable sigma convolution. See :func:`convolution_variable_sigma`. OUT OF DATE; DO NOT USE Args: y (np.ndarray): Vector to convolve sigma (np.ndarray): Coordinate-dependent standard deviation of the Gaussian kernel ye (np.ndarray): (**Optional**) Error in the vector to convolve Raises: ValueError: Raised if *y* is not a 1D vector, or if the shape of *y* and *sigma* (and *ye* if provided) are different. Attributes: x (np.ndarray): Pixel coordinate vector y (np.ndarray): Vector to convolve ye (np.ndarray): Error in the vector to convolve sigma (np.ndarray): Coordinate-dependent standard deviation of the Gaussian kernel norm (np.ndarray): Gaussian normalization; calculated once for efficiency .. todo:: - Allow to switch to pixel sampled Gaussian kernel? """ def __init__(self, y, sigma, ye=None): if len(y.shape) != 1: raise ValueError('y must be a 1D array!') if y.shape != sigma.shape: raise ValueError('y and sigma must have the same shape!') if ye is not None and ye.shape != y.shape: raise ValueError('y and ye must have the same shape!') self.x = np.arange(sigma.size, dtype=np.float64) self.y = y self.ye = ye self.sigma = sigma self.norm = np.sqrt(2.0*np.pi)*self.sigma def _get_kernel(self, xc): """Calculate the kernel vector when centered at *xc*. .. todo:: - Function is about 30% slower when using erf() as opposed to exp(). erf() needed when sigma is small, but may be efficient to include some decision on when it's safe to use the quick way. """ d = (self.x-xc) gf = np.square(d/self.sigma) close_value = np.where(gf < 50.0) # outkern = np.exp(-0.5*gf[close_value])/self.norm[close_value] outkern = (erf((d[close_value]+0.5)/np.sqrt(2)/self.sigma[close_value]) - erf((d[close_value]-0.5)/np.sqrt(2)/self.sigma[close_value]))/2. return close_value, outkern # def _get_kernel(self, xc): # """Calculate the kernel vector when centered at *xc*.""" # return np.exp(-0.5*np.square((self.x-xc)/self.sigma))/self.norm def __call__(self, xc): """ Calculates the weighted mean of :attr:`y`, where the weights are defined by a Gaussian with standard deviation :attr:`sigma` and centered at xc. Args: xc (float): Center for the Gaussian weighting function Returns: float: The weighted mean of :attr:`y` """ # kernel = self._get_kernel(xc) # return np.sum(self.y*kernel) / np.sum(kernel) close_array, kernel = self._get_kernel(xc) return np.sum(self.y[close_array]*kernel) / np.sum(kernel) # Need to test equivalence and speed of these two implementations # return integrate.simps(self.y*kernel) / integrate.simps(kernel)
[docs] def error(self, xc): """ Calculates the error in the weighted mean of :attr:`y` using nominal error propagation. The weights are defined by a Gaussian with standard deviation :attr:`sigma` and centered at xc. Args: xc (float): Center for the Gaussian weighting function Returns: float: The error in the weighted mean of :attr:`y` """ close_array, kernel = self._get_kernel(xc) return np.sqrt(np.sum(np.square(self.ye[close_array]*kernel)) / np.sum(kernel))
# kernel = self._get_kernel(xc) # return np.sqrt(np.sum(np.square(self.ye*kernel)) / np.sum(kernel))
[docs]class VariableGaussianKernel: """ Support class for variable sigma convolution. See :func:`convolution_variable_sigma`. Stolen from M. Cappellari's gaussian_filter1d function. Args: y (np.ndarray): Vector to convolve sigma (np.ndarray): Coordinate-dependent standard deviation of the Gaussian kernel ye (np.ndarray): (**Optional**) Error in the vector to convolve Raises: ValueError: Raised if *y* is not a 1D vector, or if the shape of *y* and *sigma* (and *ye* if provided) are different. Attributes: x (np.ndarray): Pixel coordinate vector y (np.ndarray): Vector to convolve ye (np.ndarray): Error in the vector to convolve sigma (np.ndarray): Coordinate-dependent standard deviation of the Gaussian kernel norm (np.ndarray): Gaussian normalization; calculated once for efficiency .. todo:: - Allow to switch to pixel sampled Gaussian kernel? """ def __init__(self, sigma, minsig=0.01, nsig=3.0, integral=False): self.n = sigma.size # Vector length self.sigma = sigma.clip(min=minsig) # Force sigmas to minimum self.p = int(np.ceil(np.amax(nsig*self.sigma))) # Kernel covers up to nsig*sigma self.m = 2*self.p + 1 # Kernel length x2 = np.square(np.linspace(-self.p, self.p, self.m)) # X^2 for kernel # Kernel will have size m x n self.kernel = (erf((x2[:,None]+0.5)/np.sqrt(2)/self.sigma) - erf((x2[:,None]-0.5)/np.sqrt(2)/self.sigma))/2. if integral else \ np.exp(-x2[:, None]/(2*np.square(self.sigma))) self.kernel /= np.sum(self.kernel, 0)[None, :] # Normalize kernel def _check_shape(self, y, ye=None): """ Make sure that the shapes are appropriate for the defined kernel. """ if len(y.shape) != 1: raise ValueError('y must be a 1D array!') if y.size != self.n: raise ValueError('y and sigma must have the same shape!') if ye is not None and (len(ye.shape) != 1 or ye.size != self.n): raise ValueError('ye length does not must have the correct shape!') def _create_a(self, y): a = np.zeros(self.kernel.shape) for i in range(self.m): a[i,self.p:-self.p] = y[i:self.n-self.m+i+1] return a def convolve(self, y, ye=None): self._check_shape(y, ye=ye) # Create m copies of the shifted input function a = self._create_a(y) if ye is None: return np.sum(a*self.kernel,0) # Construct the error ae = self._create_a(np.square(ye)) return np.sum(a*self.kernel,0), np.sqrt(np.sum(ae*self.kernel,0))
[docs]def convolution_variable_sigma(y, sigma, ye=None, integral=False): r""" Convolve a discretely sampled function :math:`y(x)` with a Gaussian kernel, :math:`g`, where the standard deviation of the kernel is a function of :math:`x`, :math:`\sigma(x)`. Nominal calculations can be performed to propagate the error in the result; these calculations **do not** include the covariance between the pixels, which will mean that the calculations likely have significant error! The convolution is defined as: .. math:: (y\ast g)(x) &= \int_{-\infty}^{\infty} y(X)\ g(\sigma,x-X)\ dX \\ &= \int_{-\infty}^{\infty} \frac{y(X)}{\sqrt{2\pi}\ \sigma(X)}\ \exp\left(-\frac{(x-X)^2}{2\ \sigma(X)^2}\right) dX . To minimize edge effects and account for the censoring of the data (finite range in :math:`x`), the convolution is actually calculated as a definite integral and normalized as follows: .. math:: (y\ast g)(x) \sim\frac{ \int_{x-n_\sigma*\sigma(x)}^{x+n_\sigma*\sigma(x)} y(X)\ g(\sigma,x-X)\ dX}{ \int_{x-n_\sigma*\sigma(x)}^{x+n_\sigma*\sigma(x)} g(\sigma,x-X)\ dX} . The above is identical to getting the weighted mean of :math:`y` at each position :math:`x`, where the weights are defined by a Gaussian kernel centered at :math:`x` with a variable dispersion. Use of this function requires: - *y* and *sigma* must be 1D vectors - *y* and *sigma* must be uniformly sampled on the same grid - *sigma* must be in pixel units. Args: y (np.ndarray): A uniformly sampled function to convolve. sigma (np.ndarray): The standard deviation of the Gaussian kernel sampled at the same positions as *y*. The units of *sigma* **must** be in pixels. ye (np.ndarray): (**Optional**) Errors in the function :math:`y(x)`. Returns: np.ndarray: Arrays with the convolved function :math:`(y\ast g)(x)` sampled at the same positions as the input :math:`x` vector and its error. The second array will be returned as None if the error vector is not provided. """ # kernel = convolution_integral_element(y,sigma,ye=ye) # conv = np.array([kernel(x) for x in kernel.x]) # if ye is None: # return conv # return conv, np.array([kernel.error(x) for x in kernel.x]) return VariableGaussianKernel(sigma, integral=integral).convolve(y,ye=ye)
[docs]class spectral_resolution: r""" Container class for the resolution, :math:`R = \lambda/\Delta\lambda`, of a spectrum. The primary functionality is to determine the parameters necessary to match the resolution of one spectrum to another. It can also be used as a function to interpolate the spectral resolution at a given wavelenth. Args: wave (np.ndarray): A 1D vector with the wavelength in angstroms. The sampling may be either in linear steps of wavelength or :math:`\log_{10}` steps. sres (np.ndarray): A 1D vector with the spectral resolution, :math:`R`, sampled at the positions of the provided wavelength vector. log10 (bool): (**Optional**) Flag that the spectrum has been binned logarithmically (base 10) in wavelength interp_ext (int or str): (**Optional**) The value to pass as *ext* to the interpolator, which defines its behavior when attempting to sample the spectral resolution beyond where it is defined. See `scipy.interpolate.InterpolatedUnivariateSpline`_. Default is to extrapolate. Raises: ValueError: Raised if *wave* is not a 1D vector or if *wave* and *sres* do not have the same shape. Attributes: interpolator (`scipy.interpolate.InterpolatedUnivariateSpline`_): An object used to interpolate the spectral resolution at any given wavelength. The interpolation is hard-wired to be **linear** and its extrapolation behavior is defined by *interp_ext*. The wavelength and resolution vectors are held by this object for later reference if needed. log10 (bool): Flag that the spectrum has been binned logarithmically (base 10) in wavelength cnst (:class:`mangadap.util.constants`): Used to define the conversion factor between a Gaussian sigma and FWHM c (float): The speed of light; provided by `astropy.constants`_. dv (float): The velocity step per pixel in km/s. Defined using :func:`spectrum_velocity_scale` if :attr:`log10` is True; otherwise set to None. dw (float): The wavelength step per pixel in angstroms. Defined as the wavelength step between the first two pixels if :attr:`log10` is False; otherwise set to None. min_sig (float): Minimum standard deviation allowed for the kernel used to match two spectral resolutions. See :func:`GaussianKernelDifference`. sig_pd (np.ndarray): The standard deviation in pixels required to match the spectral resolution of this object to the resolution defined by a different spectral_resolution object. See :func:`GaussianKernelDifference`. sig_mask (np.ndarray): A *uint* vector used to identify measurements of :attr:`sig_pd` that should **not** be used to match the spectral resolution of this object to the resolution defined by a different spectral_resolution object. See :func:`GaussianKernelDifference`. sig_vo (float): A constant offset of the kernal standard deviation **in km/s** that has been applied to :attr:`sig_pd`. See :func:`GaussianKernelDifference`. .. todo:: - Allow it to determine if the binning is linear or geometric, then use the *log10* keyword to distinguish between natural log and :math:`log_{10}` binning. - Allow for more than one type of line-spread function? .. warning:: The default behavior of the interpolator is to extrapolate the input spectral resolution vector when trying to sample from regions beyond where it is sampled. Use *interp_ext* change this; see `scipy.interpolate.InterpolatedUnivariateSpline`_. .. _scipy.interpolate.InterpolatedUnivariateSpline: http://docs.scipy.org/doc/scipy/reference/generated/scipy.interpolate.InterpolatedUnivariateSpline.html .. _astropy.constants: http://docs.astropy.org/en/stable/constants/index.html """ def __init__(self, wave, sres, log10=False, interp_ext='extrapolate'): # Check the sizes if len(wave.shape) != 1: raise ValueError('wave must be a 1D array!') if wave.shape != sres.shape: raise ValueError('wave and sres must have the same shape!') # k=1; always use linear interpolation if sys.version < '3': #print('WARNING: InterpolatedUnivariateSpline may have different behavior in python2!') self.interpolator = InterpolatedUnivariateSpline(wave, sres, k=1, ext = interp_ext) else: self.interpolator = InterpolatedUnivariateSpline(wave, sres, k=1, ext = interp_ext) self.log10 = log10 self.cnst = constants() self.c = astropy.constants.c.to('km/s').value self.dv = spectrum_velocity_scale(wave) if log10 else None self.dw = None if log10 else wave[1] - wave[0] # No resolution matching calculated yet self.min_sig = None self.sig_pd = None self.sig_mask = None self.sig_vo = None def __call__(self, w): """Interpolate the spectral resolution at wavelength *w*.""" return self.interpolator(w) def _finalize_GaussianKernelDifference(self, sig2_pd): r""" Given the calculated :math:`\sigma^2_{p,d}`, calculate and save the attributes :attr:`sig_pd` and :attr:`sig_mask`. See :func:`GaussianKernelDifference`. """ indx = np.where(np.isclose(sig2_pd, 0.0)) nindx = where_not(indx, sig2_pd.size) self.sig_pd = sig2_pd.copy() self.sig_pd[nindx] = sig2_pd[nindx]/np.sqrt(np.absolute(sig2_pd[nindx])) self.sig_pd[indx] = 0.0 self.sig_mask = np.array(self.sig_pd < -self.min_sig).astype(np.uint) def _convert_vd2pd(self, sig2_vd): r""" Convert from :math:`\sigma^2_{v,d}` to :math:`\sigma^2_{p,d}`. See :func:`GaussianKernelDifference`. """ return sig2_vd / np.square(self.dv) if self.log10 else \ sig2_vd / np.square(self.c*self.dw/self.wave()) def _convert_pd2vd(self, sig2_pd): r""" Convert from :math:`\sigma^2_{p,d}` to :math:`\sigma^2_{v,d}`. See :func:`GaussianKernelDifference`. """ return sig2_pd * np.square(self.dv) if self.log10 else \ sig2_pd * np.square(self.c*self.dw/self.wave())
[docs] def wave(self): """ Return the wavelength vector; held by :attr:`interpolator`. """ return self.interpolator._data[0]
[docs] def sres(self): """ Return the resolution vector; held by :attr:`interpolator`. """ return self.interpolator._data[1]
[docs] def match(self, new_sres, no_offset=True, min_sig_pix=0.0): """ Currently only an alias for :func:`GaussianKernelDifference`. """ self.GaussianKernelDifference(new_sres, no_offset=no_offset, min_sig_pix=min_sig_pix)
[docs] def GaussianKernelDifference(self, new_sres, no_offset=True, min_sig_pix=0.0): r""" Determine the parameters of a Gaussian kernel required to convert the resolution of this object to the resolution of a different the :class:`spectral_resolution` object, *new_sres*. The spectral resolution is defined as :math:`R = \lambda/\Delta\lambda`, where :math:`\Delta\lambda` is the FWHM of the spectral resolution element. The standard deviation of the resolution element in angstroms is then .. math:: \sigma_\lambda = \frac{\lambda}{f R}, \ \ {\rm where} \ \ f = \frac{{\rm FWHM_\lambda}}{\sigma_\lambda}. Assuming a Gaussian (in angstroms) line-spread function: .. math:: \sigma^2_{\lambda,2} = \sigma^2_{\lambda,1} + \sigma^2_{\lambda,d} such that .. math:: \sigma^2_{\lambda,d} = \left(\frac{\lambda}{f}\right)^2 (R^{-2}_2 - R^{-2}_1) is the defining parameter of the Gaussian kernel needed to take a spectrum of resolution :math:`R_1` to one with a resolution of :math:`R_2`. For input to :func:`convolution_variable_sigma`, the *wavelength-dependent* parameter of the Gaussian kernel is converted to pixels. This function allows for the input spectra to be linearly sampled in angstroms or log10(angstroms). For the former (*log10=False*), .. math:: \sigma^2_{p,d} = \left(\frac{\lambda}{f\ \delta\lambda}\right)^2 (R^{-2}_2 - R^{-2}_1) where :math:`\delta\lambda` is the size of the pixel in angstroms. If the units are log10(angstrom) (*log10=True*), we approximate the velocity width of each pixel to be :math:`\delta v = c \ln(10.0) (\log\lambda[1]-\log\lambda[0])`, such that .. math:: \sigma^2_{p,d} &= \left(\frac{c}{ \delta v \lambda}\right)^2 \sigma^2_{\lambda,d} \\ &= \left(\frac{c}{ f\ \delta v}\right)^2 (R^{-2}_2 - R^{-2}_1)\ ; :math:`c` is the speed of light in km/s. The nominal use of this algorithm assumes :math:`R_1 \geq R_2`. However, in practice, :func:`convolution_variable_sigma` only uses a Gaussian kernel up to some minimum value of :math:`\epsilon_\sigma`; below this, the kernel is assumed to be a Delta function. Therefore, as long as .. math:: \sigma_{p,d} \equiv \sigma^2_{p,d}/\sqrt{|\sigma^2_{p,d}|} \geq -\epsilon_\sigma\ , the behavior of :func:`convolution_variable_sigma` should not be affected. Even so, there may be spectral regions that do not have :math:`\sigma_{p,d} \geq -\epsilon_\sigma`; for such spectral regions there are three choices: (**Option 1**) trim the spectral range to only those spectral regions where the existing resolution is better than the target resolution, (**Option 2**) match the existing resolution to the target resolution up to some constant offset that must be accounted for in subsequent analyses, or (**Option 3**) allow for a wavelength dependent difference in the spectral resolution that must be accounted for in subsequent analyses. The choice of either Option 1 or 2 is selected by setting *no_offset* to, respectively, True or False; Option 1 is the default behavior. Currently, Option 3 is not allowed. For Option 1, pixels with :math:`\sigma_{p,d} < -\epsilon_\sigma` are masked (*sigma_mask = 1*); however, the returned values of :math:`\sigma_{p,d}` are left unchanged. For Option 2, we define .. math:: \sigma^2_{v,o} = -{\rm min}(\sigma^2_{v,d}) - {\rm max}(\epsilon_\sigma \delta v)^2 where :math:`\delta v` is constant for the logarithmically binned spectrum and is wavelength dependent for the linearly binned spectra; in the latter case, the velocity step is determined for each pixel:: _wave = self.wave() dv = self.c * (2.0*(_wave[1:] - _wave[0:-1]) / (_wave[1:] + _wave[0:-1])) If :math:`\sigma^2_{v,o} > 0.0`, it must be that :math:`{\rm min}(\sigma^2_{v,d}) < -{\rm max}(\epsilon_\sigma \delta v)^2`, such that an offset should be applied. In that case, the returned kernel parameters are .. math:: \sigma^\prime_{v,d} = \sqrt{\sigma^2_{v,d} + \sigma^2_{v,o}}\ . with the units converted to pixels using the equations above, no pixels are masked, and :math:`\sqrt{\sigma^2_{v,o}}` is returned for the offset. Otherwise, the offset is set to 0. .. todo:: Allow to check cases when the convolution kernel is indpendent of wavelength such that the convolution can be sped up by performing the convolution using an FFT. For example, in the case where the spectrum is logarithmically binned and both :math:`R_1` and :math:`R_2` are *independent* of wavelength, the convolution kernel is independent of wavelength. Args: new_sres (:class:`spectral_resolution`): Spectral resolution to match to. no_offset (bool): (**Optional**) Force :math:`\sigma^2_{v,o} = 0` by masking regions with :math:`\sigma_{p,d} < -\epsilon_\sigma`; i.e., the value of this arguments selects Option 1 (True) or Option 2 (False). min_sig_pix (float): (**Optional**) Minimum value of the standard deviation allowed before assuming the kernel is a Delta function. """ # Save the minimum pixel sigma to allow self.min_sig = min_sig_pix # Interpolate the new spectral resolution vector at the wavelengths # of the input spectral resolution _wave = self.wave() _sres = self.sres() interp_sres = new_sres(_wave) # Determine the variance (in angstroms) of Gaussian needed to match # input resolution to the new values sig2_wd = np.square(_wave/self.cnst.sig2fwhm) \ * (1.0/np.square(interp_sres) - 1.0/np.square(_sres)) # Convert to km/s sig2_vd = np.square(self.c/_wave) * sig2_wd # Option 1: if no_offset: # Convert the variance to pixel coordinates sig2_pd = sig2_vd / np.square(self.dv) if self.log10 else \ sig2_wd / np.square(self.dw) # No offset applied self.sig_vo = 0.0 # Option 2: else: # Calculate the velocity step of each pixel dv = self.c * (2.0*(_wave[1:] - _wave[0:-1]) / (_wave[1:] + _wave[0:-1])) # Get the needed *velocity* offset (this is the square) self.sig_vo = - np.amin(sig2_vd) - np.square(self.min_sig * np.amax(dv)) # Apply it if it's larger than 0 if self.sig_vo > 0: sig2_vd += self.sig_vo self.sig_vo = np.sqrt(self.sig_vo) else: self.sig_vo = 0.0 # Convert the variance to pixel coordinates sig2_pd = self._convert_vd2pd(sig2_vd) self._finalize_GaussianKernelDifference(sig2_pd)
# def ZeroGaussianKernelDifference(self, min_sig_pix=0.0): # self.min_sig = min_sig_pix # sig2_pd = np.zeros(self.wave().shape, dtype=np.float64) # self._finalize_GaussianKernelDifference(sig2_pd)
[docs] def offset_GaussianKernelDifference(self, offset): r""" If the properties required to match the resolution of one spectrum to another has already been calculated (see :func:`GaussianKernelDifference`), this allows for one to apply an additional offset. The additional offset **must** be in km/s (not pixels). The offset is applied in quadrature; however, the offset can be negative such that one can reduce :attr:`sig_pd`. Once converted to km/s, the offset is applied by calculating: .. math:: \sigma^{\prime\ 2}_{v,d} = \sigma^{2}_{v,d} + \sigma_{off}|\sigma_{off}|\ . :attr:`sig_vo` is adjusted in the same way, and the change in :math:`\sigma^{\prime\ 2}_{v,d}` is then propagated to :attr:`sig_pd` and :attr:`sig_mask`. Args: offset (float): Value of the standard deviation in km/s to add in quadrature to a previously calculated :attr:`sig_pd`. Raises: ValueError: Raised if the kernel properties have not yet been defined. """ if None in [self.min_sig, self.sig_pd, self.sig_mask, self.sig_vo]: # print('WARNING: No kernel difference yet defined. Assuming 0.') # self.ZeroGaussianKernelDifference() raise ValueError('No kernel defined yet. Run GaussianKernelDifference first.') if np.isclose(offset,0.0): return off2 = offset*np.absolute(offset) sig2_vo = self.sig_vo*np.absolute(self.sig_vo) + off2 self.sig_vo = 0.0 if np.isclose(sig2_vo, 0.0) \ else sig2_vo/np.sqrt(np.absolute(sig2_vo)) sig2_vd = self._convert_pd2vd(self.sig_pd*np.absolute(sig_pd)) + off2 self._finalize_GaussianKernelDifference(self._convert_vd2pd(sig2_vd))
[docs] def adjusted_resolution(self, indx=None): r""" Return the resolution that should result from applying :func:`convolution_variable_sigma` to the spectrum associated with this spectral resolution object using :attr:`sigma_pd`. I.e., calculate: .. math:: R_2 = \left[ \left(\frac{f}{c}\right)^2 \sigma^2_{v,d} + R^{-2}_1\right]^{-1/2}\ . Args: indx (tuple): (**Optional**) Selection tuple used to return a subset of the full resolution vector. Returns: np.ndarray: The (full or selected) vector with the adjusted resolution. .. todo:: Allow to reset the resolution of this object to the adjusted resolution and reset the kernel variables to None. """ if indx is None: return 1.0/np.sqrt( np.square(self.cnst.sig2fwhm/self.c) \ * self._convert_pd2vd(self.sig_pd*np.absolute(self.sig_pd)) \ + 1.0/np.square(self.sres()) ) return 1.0/np.sqrt( np.square(self.cnst.sig2fwhm/self.c) \ * (self._convert_pd2vd(self.sig_pd*np.absolute(self.sig_pd)))[indx] \ + 1.0/np.square(self.sres()[indx]) )
[docs]def match_spectral_resolution(wave, flux, sres, new_sres_wave, new_sres, ivar=None, mask=None, min_sig_pix=0.0, no_offset=True, variable_offset=False, log10=False, new_log10=False): r""" Adjust the existing spectral resolution of a spectrum to a **lower** resolution as best as possible. The primary functionality is in :class:`spectral_resolution`, which determines the Gaussian kernel parameters needed to match the resolution, and :func:`convolve_variable_sigma`, which actually performs the convolution to match the resolution. In particular, see :func:`spectral_resolution.GaussianKernelDifference` for a description of how the kernel parameters are determined and how regions where the target resolution is **higher** than the existing resolution. In this case, one of the options is to adopt an offset of the resolution (in km/s) that could be corrected for in subsequent analysis. In this case, setting *variable_offset* to True allows the offset to be different for all input spectra. If one expects to combine the spectra, the default behavior should be used, forcing all the spectra to have a constant offset. Args: wave (np.ndarray): A 1D or 2D (:math:`N_{\rm spec}\times N_{\rm pix}`) array with the wavelength in angstroms for a set of spectra. The sampling may be either in linear steps of wavelength or :math:`\log_{10}` steps, as set using *log10*. flux (np.ndarray): A 1D or 2D (:math:`N_{\rm spec}\times N_{\rm pix}`) array with the flux sampled at the provided wavelengths. sres (np.ndarray): A 1D or 2D (:math:`N_{\rm spec}\times N_{\rm pix}`) array with the spectral resolution, :math:`R`, at the provided wavelengths. new_sres_wave (np.ndarray): A 1D vector with the wavelength in angstroms at which the new resolution of the input spectra has been sampled. The sampling may be either in linear steps of wavelength or :math:`\log_{10}` steps, as set using *new_log10*. new_sres (np.ndarray): A 1D vector with the new resolution for the input spectra. ivar (np.ndarray): (**Optional**) A 1D or 2D (:math:`N_{\rm spec}\times N_{\rm pix}`) array with the inverse variance of the flux sampled at the provided wavelengths. This vector is used to estimate the noise in the resolution-matched spectra. .. warning:: The accuracy of the errors still remain untested! mask (np.ndarray): (**Optional**) A 1D or 2D (:math:`N_{\rm spec}\times N_{\rm pix}`) array with a *uint* mask for the flux sampled at the provided wavelengths. no_offset (bool): (**Optional**) Force :math:`\sigma^2_{v,o} = 0` by masking regions with :math:`\sigma_{p,d} < -\epsilon_\sigma`; i.e., the value of this arguments selects Option 1 (True) or Option 2 (False). See :func:`spectral_resolution.GaussianKernelDifference`. min_sig_pix (float): (**Optional**) Minimum value of the standard deviation in pixels allowed before assuming the kernel is a Delta function. variable_offset (bool): (**Optional**) Flag to allow the offset applied to each spectrum (when the input contains more than one spectraum) to be tailored to each spectrum. Otherwise (*variable_offset=False*) the offset is forced to be the same for all spectra. log10 (bool): (**Optional**) Flag that the spectrum has been binned logarithmically (base 10) in wavelength new_log10 (bool): (**Optional**) Flag that the coordinates of the new spectral resolution are spectrum as been binned logarithmically (base 10) in wavelength. Returns: np.ndarray: Five objects are returned: - A 1D or 2D (:math:`N_{\rm spec}\times N_{\rm pix}`) array with the resolution-matched flux sampled at the input wavelengths. - A 1D or 2D (:math:`N_{\rm spec}\times N_{\rm pix}`) array with the spectral resolution, :math:`R`, of the resolution-matched spectra at the provided wavelengths. - A 1D vector with any constant offset in resolution **in km/s** between the targetted value and the result. See :func:`spectral_resolution.GaussianKernelDifference`. - A 1D or 2D (:math:`N_{\rm spec}\times N_{\rm pix}`) array with a *uint* mask for the resolution-matched flux sampled at the input wavelengths. This is returned regardless of whether an input mask was provided. Any pixel that had a resolution that was lower than the target resolution (up to some tolerance defined by *min_sig_pix*) is returned as masked. - A 1D or 2D (:math:`N_{\rm spec}\times N_{\rm pix}`) array with the inverse variance of the resolution-matched flux sampled at the input wavelengths. If *ivar* is not provided, a 'None' returned as the last element Raises: ValueError: Raised if: - the input *wave* array is 2D and the *sres* array is not; a 1D wavelength array is allowed for a 2D *sres* array but not vice versa - the number of spectral pixels in *wave*, *flux*, and *sres* is not the same - the shape of the *flux*, *mask* (if provided), and *ivar* (if provided) are not the same - the shape of the *new_sres_wave* and *new_sres* arrays are not the same and/or not 1D .. todo:: - Add interp_ext != 'extrapolate' option? - Better way to use warnings? """ # Check the dimensionality of wave and sres wave_matrix = len(wave.shape) == 2 sres_matrix = len(sres.shape) == 2 if wave_matrix and not sres_matrix: raise ValueError('If input wavelength array is 2D, the spectral resolution array must' \ ' also be 2D') # Check the shapes if (wave_matrix == sres_matrix and wave.shape != sres.shape) or \ (not wave_matrix and sres_matrix and wave.shape[0] != sres.shape[1]): raise ValueError('Input spectral resolution and coordinate arrays must have the same' \ ' number of spectral channels!') if (wave_matrix and wave.shape != flux.shape) or \ (not wave_matrix and len(flux.shape) == 2 and wave.shape[0] != flux.shape[1]) or \ (not wave_matrix and len(flux.shape) == 1 and wave.shape != flux.shape): raise ValueError('Input flux and coordinate arrays must have the same number of' \ ' spectral channels!') if (mask is not None and mask.shape != flux.shape): raise ValueError('Input flux and mask arrays must have the same shape!') if (ivar is not None and ivar.shape != flux.shape): raise ValueError('Input flux and ivar arrays must have the same shape!') if len(sres.shape) > len(flux.shape): raise ValueError('Shape of the spectral resolution array must be <= to the flux array.') if len(new_sres_wave.shape) != 1 or len(new_sres.shape) != 1: raise ValueError('New spectral resolution and coordinate arrays must be 1D!') if new_sres_wave.shape != new_sres.shape: raise ValueError('New spectral resolution and coordinate arrays must have the same shape!') # Raise a warning if the new_sres vector will have to be # extrapolated for the input wavelengths if np.amin(wave) < new_sres_wave[0] or np.amax(wave) > new_sres_wave[-1]: warnings.warn('Mapping to the new spectral resolution will require extrapolating the' \ ' provided input vectors!') # Initialize some variables nspec = 1 if len(flux.shape) == 1 else flux.shape[0] nsres = 1 if len(sres.shape) == 1 else sres.shape[0] if sres_matrix and nspec != nsres: raise ValueError('For 2D matrices, number of spectral resolution vectors must match the ' \ 'number of spectra.') spec_dim = len(flux.shape) sres_dim = len(sres.shape) sigma_offset = np.zeros(nspec, dtype=np.float64) new_res = spectral_resolution(new_sres_wave, new_sres, log10=new_log10) res = np.empty(nspec, dtype=object) # Get the kernel parameters necessary to match all spectra to the # new resolution if nsres == 1 and sres_dim == 1: res[0] = spectral_resolution(wave, sres, log10=log10) res[0].match(new_res, no_offset=no_offset, min_sig_pix=min_sig_pix) sigma_offset[0] = res[0].sig_vo for i in range(1,nspec): res[i] = res[0] # pyplot.plot(wave, res[0].sig_pd) # pyplot.show() else: for i in range(0,nsres): _wave = wave[i,:].ravel() if wave_matrix else wave _sres = sres[i,:].ravel() if sres_matrix else sres res[i] = spectral_resolution(_wave, _sres, log10=log10) res[i].match(new_res, no_offset=no_offset, min_sig_pix=min_sig_pix) sigma_offset[i] = res[i].sig_vo # Force all the offsets to be the same, if requested if not no_offset and not variable_offset: common_offset = np.max(sigma_offset) offset_diff = np.sqrt( np.square(common_offset) - np.square(sigma_offset)) for r, o in zip(res,offset_diff): r.offset_GaussianKernelDifference(o) # Perform the convolutions out_flux = flux.copy() out_ivar = None if ivar is None else np.ma.MaskedArray(ivar.copy()) noise = None if ivar is None else np.ma.sqrt(1.0/out_ivar) out_sres = sres.copy() if mask is None: mask = np.zeros(flux.shape, dtype=np.uint) out_mask = mask.copy() if nspec == 1 and spec_dim == 1: indx = np.where(res[0].sig_pd > min_sig_pix) if ivar is None: out_flux[indx] = convolution_variable_sigma(flux[indx], res[0].sig_pd[indx]) else: out_flux[indx], out_ivar[indx] \ = convolution_variable_sigma(flux[indx], res[0].sig_pd[indx], ye=None if ivar is None else noise[indx]) out_sres[indx] = res[0].adjusted_resolution(indx=indx) out_mask = np.array((res[0].sig_mask == 1) | (mask == 1)).astype(np.uint) else: for i in range(0,nspec): #print('Matching resolution ... {0}/{1}'.format(i+1,nspec), end='\r') indx = np.where(res[i].sig_pd > min_sig_pix) if ivar is None: out_flux[i,indx] = convolution_variable_sigma(flux[i,indx].ravel(), res[i].sig_pd[indx]) else: out_flux[i,indx], out_ivar[i,indx] \ = convolution_variable_sigma(flux[i,indx].ravel(), res[i].sig_pd[indx], ye=None if ivar is None else noise[i,indx].ravel()) # out_flux[i,indx] = convolution_variable_sigma(flux[i,indx].ravel(), res[i].sig_pd[indx]) out_mask[i,:] = np.array((res[i].sig_mask == 1) \ | (mask[i,:] == 1)).astype(np.uint) if nsres == 1 and i == 0: out_sres[indx] = res[i].adjusted_resolution(indx=indx) continue elif nsres > 1: out_sres[i,indx] = res[i].adjusted_resolution(indx=indx) #print('Matching resolution ... DONE ') # TODO: Add this functionality from the IDL version? # # Finally, the code masks a number of pixels at the beginning and # end of the spectra to remove regions affected by errors in the # convolution due to the censoring of the data. The number of # pixels is the FWHM of the largest Gaussian applied in the # convolution: ceil(sig2fwhm*max(diff_sig_w)/dw). This is currently # hard-wired and should be tested. if ivar is not None: out_ivar = np.square(1.0/out_ivar) # When returning out_ivar, convert it to a normal array return out_flux, out_sres, sigma_offset, out_mask, np.asarray(out_ivar) #return out_flux, out_sres, sigma_offset, out_mask, None return out_flux, out_sres, sigma_offset, out_mask
#def log_rebin(lamRange, spec, oversample=None, velscale=None, flux=False, log10=False, # newRange=None, wave_in_ang=False, unobs=0.0): # """ # .. note:: # # Copyright (C) 2001-2014, Michele Cappellari # E-mail: cappellari_at_astro.ox.ac.uk # # This software is provided as is without any warranty whatsoever. # Permission to use, for non-commercial purposes is granted. # Permission to modify for personal or internal use is granted, # provided this copyright and disclaimer are included unchanged at # the beginning of the file. All other rights are reserved. # # Logarithmically rebin a spectrum, while rigorously conserving the # flux. Basically the photons in the spectrum are simply # ridistributed according to a new grid of pixels, with non-uniform # size in the spectral direction. # # This routine makes the `standard' zero-order assumption that the # spectrum is *constant* within each pixels. It is possible to perform # log-rebinning by assuming the spectrum is represented by a # piece-wise polynomial of higer degree, while still obtaining a # uniquely defined linear problem, but this reduces to a deconvolution # and amplifies noise. # # .. warning:: # # This assumption can be poor for sharp features in the spectrum. # Beware if resampling spectra with strong, marginally sampled # features! # # This same routine can be used to compute approximate errors of the # log-rebinned spectrum. To do this type the command # # >>> err2New, logLam, velscale = log_rebin(lamRange, np.square(err)) # # and the desired errors will be given by np.sqrt(err2New). # # .. warning:: # # This rebinning of the error-spectrum is very *approximate* as it # does not consider the correlation introduced by the rebinning! # # Args: # # lamRange (np.ndarray): two elements vector containing the # central wavelength of the first and last pixels in the # spectrum, which is assumed to have constant wavelength # scale! E.g. from the values in the standard FITS keywords: # LAMRANGE = CRVAL1 + [0,CDELT1*(NAXIS1-1)]. It must be # LAMRANGE[0] < LAMRANGE[1]. # spec (np.ndarray): Input spectrum. # oversample (int): (**Optional**) Oversampling can be done, not # to loose spectral resolution, especally for extended # wavelength ranges and to avoid aliasing. Default is to # provide the same number of output pixels as input. # velscale (float): (**Optional**) Velocity scale in km/s per # pixels. If this variable is not defined, then it will # contain in output the velocity scale. If this variable is # defined by the user it will be used to set the output number # of pixels and wavelength scale. # flux (bool): (**Optional**) Set this keyword to preserve total # flux. In this case the log rebinning changes the pixels # flux in proportion to their dLam so the following command # will show large differences beween the spectral shape before # and after :func:`log_rebin`:: # # # Plot log-rebinned spectrum # pyplot.plot(exp(logLam), specNew) # pyplot.plot(np.arange(lamRange[0],lamRange[1],spec.size), spec, 'g') # pyplot.show() # # By default, when this keyword is *not* set, the above two # lines produce two spectra that almost perfectly overlap each # other. # log10 (bool): (**Optional**) Flag that the spectrum should be # binned in units of base-10 log wavelength, instead of # natural log # newRange (np.ndarray): (**Optional**) Force the spectrum to # be sampled to a new spectral range (lamRange is the # *existing* spectral range). # wave_in_ang (bool): (**Optional**) Return the wavelength # coordinates in angstroms, not log(angstroms) # unobs (float): (**Optional**) Default value for unobserved # spectral regions. # # Returns: # np.ndarray, float: Returns three variables: logarithmically # rebinned spectrum, the log of the wavelength at the geometric # center of each pixel, and the velocity scale of each pixel in # km/s. # # Raises: # ValueError: Raised if the input spectrum is not a # one-dimensional np.ndarray. # # *Modification History*: # | **V1.0.0**: Using interpolation. Michele Cappellari, Leiden, # 22 October 2001 # | **V2.0.0**: Analytic flux conservation. MC, Potsdam, 15 June # 2003 # | **V2.1.0**: Allow a velocity scale to be specified by the # user. MC, Leiden, 2 August 2003 # | **V2.2.0**: Output the optional logarithmically spaced # wavelength at the geometric mean of the wavelength at the # border of each pixel. Thanks to Jesus Falcon-Barroso. MC, # Leiden, 5 November 2003 # | **V2.2.1**: Verify that lamRange[0] < lamRange[1]. MC, # Vicenza, 29 December 2004 # | **V2.2.2**: Modified the documentation after feedback from # James Price. MC, Oxford, 21 October 2010 # | **V2.3.0**: By default now preserve the shape of the spectrum, # not the total flux. This seems what most users expect from # the procedure. Set the keyword /FLUX to preserve flux like # in previous version. MC, Oxford, 30 November 2011 # | **V3.0.0**: Translated from IDL into Python. MC, Santiago, 23 # November 2013 # | **V3.1.0**: Fully vectorized log_rebin. Typical speed up by # two orders of magnitude. MC, Oxford, 4 March 2014 # | **05 Jun 2015**: (K. Westfall, KBW) Pulled from ppxf_util.py. # Conform to mangadap documentation standard. Transcribe # edits made to IDL version that provides for the log10 and # newRange arguments. Add option to return wavelength in # angstroms, not log(angstroms). Break out determination of # input and output spectrum pixel coordinates to a new # function, :func:`log_rebin_pix`. Added default value for # unobserved pixels. Default behavior unchanged. # # .. todo:: # # - Allow to resample an already geometrically binned spectrum # # """ # lamRange = np.asarray(lamRange) # # if type(spec) != np.ndarray: # raise ValueError('Input spectrum must be a np.ndarray') # s = spec.shape # if len(s) != 1: # raise ValueError('input spectrum must be a vector') # n = s[0] # # # This is broken out into a separate procedure so that it can be # # called to determine the size of the rebinned spectra without # # actually doing the rebinning # dLam, m, logscale, velscale = \ # log_rebin_pix(lamRange, n, oversample=oversample, velscale=velscale, log10=log10, # newRange=newRange) # print(dLam) # print(m) # print(logscale) # print(velscale) # # # Get the sampling of the existing spectrum # lim = lamRange/dLam + [-0.5, 0.5] # All in units of dLam # borders = np.linspace(*lim, num=n+1) # Linearly sampled pixels # # print(borders) # print(dLam) # # # Set limits to a new wavelength range # if newRange is not None: # lim = np.asarray(newRange)/dLam + [-0.5, 0.5] # # # Set the limits to the (base-10 or natural) log of the wavelength # logLim = np.log(lim) if not log10 else np.log10(lim) # logLim[1] = logLim[0] + m*logscale # Set last wavelength, based on integer # of pixels # # # Geometrically spaced pixel borders for the new spectrum ## newBorders = np.logspace(*logLim, num=m+1, base=(10.0 if log10 else np.exp(1))) # newBorders = np.power(10., np.linspace(*logLim, num=m+1)) if log10 else \ # np.exp(np.linspace(*logLim, num=m+1)) # # # print(newBorders) # print(m) # print(logscale) # # # Get the new spectrum by performing an analytic integral # k = (newBorders - borders[0]).clip(0, n-1).astype(int) # specNew = np.add.reduceat(spec, k)[:-1] # specNew *= np.diff(k) > 0 # fix for design flaw of reduceat() # specNew += np.diff((newBorders - borders[k])*spec[k]) # # # Don't conserve the flux # if not flux: # specNew /= np.diff(newBorders) # # # Output log(wavelength): log of geometric mean # LamNew = np.sqrt(newBorders[1:]*newBorders[:-1])*dLam # # # Set values for unobserved regions # if newRange is not None and (newRange[0] < lamRange[0] or newRange[1] > lamRange[1]): # specNew[ (LamNew < lamRange[0]) | (LamNew > lamRange[1]) ] = unobs # # # Return log(wavelength), if requested # if not wave_in_ang: # LamNew = np.log10(LamNew) if log10 else np.log(LamNew) # # # Return spectrum, wavelength coordinates, and pixel size in km/s # return specNew, LamNew, velscale # # # #def log_rebin_pix(lamRange, n, oversample=None, velscale=None, log10=False, newRange=None): # """ # Determine the number of new pixels and their coordinate step when # rebinning a spectrum in geometrically stepped bins. The input # spectrum must be sampled linearly in wavelength. This is primarily # a support routine for :func:`log_rebin`. # # Although breaking this out from the main :func:`log_rebin` function # leads to a few repeat calculations in that function, the use of this # function is in determine a common wavelength range for a large # number of spectra before resampling the spectra themselves. See # :class:`mangadap.TemplateLibrary` . # # Args: # lamRange (np.ndarray): two elements vector containing the # central wavelength of the first and last pixels in the # spectrum, which is assumed to have constant wavelength # scale! E.g. from the values in the standard FITS keywords: # LAMRANGE = CRVAL1 + [0,CDELT1*(NAXIS1-1)]. It must be # LAMRANGE[0] < LAMRANGE[1]. # n (int): Number of pixels in the original spectrum. # oversample (int): (**Optional**) Oversampling can be done, not # to loose spectral resolution, especally for extended # wavelength ranges and to avoid aliasing. Default is to # provide the same number of output pixels as input. # velscale (float): (**Optional**) Velocity scale in km/s per # pixels. If this variable is not defined, then it will # contain in output the velocity scale. If this variable is # defined by the user it will be used to set the output number # of pixels and wavelength scale. # log10 (bool): (**Optional**) Flag that the spectrum should be # binned in units of base-10 log wavelength, instead of # natural log # newRange (np.ndarray): (**Optional**) Force the spectrum to # be sampled to a new spectral range (lamRange is the # *existing* spectral range). # # Returns: # float, int: Returns # # 1. the linear wavelength step of each pixel in the input # spectrum, # 2. the number of pixels for the rebinned spectrum, # 3. the log-linear wavelength step for each pixel in the new # spectrum, and # 4. the velocity step for each pixel in the new spectrum. # # Raises: # ValueError: Raised if the input wavelength range (*lamRange* or # *newRange*) does not have two elements or is not sorted. # """ # lamRange = np.asarray(lamRange) # if len(lamRange) != 2: # raise ValueError('lamRange must contain two elements') # if lamRange[0] >= lamRange[1]: # raise ValueError('It must be lamRange[0] < lamRange[1]') # # # Size of output spectrum # m = int(n) if oversample is None else int(n*oversample) # # # Get the sampling of the existing spectrum # dLam = np.diff(lamRange)/(n - 1.) # Assume constant dLam # lim = lamRange/dLam + [-0.5, 0.5] # All in units of dLam # # # Get the sampling for the new spectrum, if requested to be # # different # if newRange is not None: # newRange = np.asarray(newRange) # if len(newRange) != 2: # raise ValueError('newRange must contain two elements') # if newRange[0] >= newRange[1]: # raise ValueError('It must be newRange[0] < newRange[1]') # lim = newRange/dLam + [-0.5, 0.5] # Set limits to a new wavelength range # # # Adjust the length # nn = int((lamRange[1]-lamRange[0]-newRange[1]+newRange[0])/dLam) # m = m-nn if oversample is None else m-nn*oversample # # # Set the limits to the (base-10 or natural) log of the wavelength # logLim = np.log(lim) if not log10 else np.log10(lim) # # c = astropy.constants.c.to('km/s').value # Speed of light in km/s (use astropy definition) # # # Set the velocity scale, if velscale not provided; otherwise force # # the sampling based in the input velscale # if velscale is None: # Velocity scale is not set by user # velscale = np.diff(logLim)[0]/m*c # Only for output # if log10: # velscale *= np.log(10.) # Adjust to log base-10 # # logscale = velscale/c # dlambda/lambda = dln(lambda) # if log10: # logscale /= np.log(10.) # Convert to dlog10(lambda) # m = int(np.diff(logLim)/logscale) # Number of output pixels # # return dLam, m, logscale, velscale def _pixel_borders(xlim, npix, log=False, base=10.0): """ Determine the borders of the pixels in a vector. Args: xlim (np.ndarray) : (Geometric) Centers of the first and last pixel in the vector. npix (int) : Number of pixels in the vector. log (bool) : (**Optional**) The input range is (to be) logarithmically sampled. base (float) : (**Optional**) The base of the logarithmic sampling. The default is 10.0; use np.exp(1.) for the natural logarithm. Returns: np.ndarray, float: A vector with the (npix+1) borders of the pixels and the sampling rate. If logarithmically binned, the sampling is the step in :math`\log x`. """ if log: logRange = np.log(xlim)/np.log(base) dlogx = np.diff(logRange)/(npix-1.) borders = np.power(base, np.linspace(*(logRange/dlogx + [-0.5, 0.5]), num=npix+1)*dlogx) return borders, dlogx dx = np.diff(xlim)/(npix-1.) borders = np.linspace(*(xlim/dx + np.array([-0.5, 0.5])), num=npix+1)*dx return borders, dx
[docs]def resample_vector_npix(outRange=None, dx=None, log=False, base=10.0, default=None): """ Determine the number of pixels needed to resample the vector. Args: outRange (list or np.ndarray) : Two-element array with the starting and ending x coordinate of the pixel centers to divide into pixels of a given width. If *log* is True, this must still be the linear value of the x coordinate, not log(x)!. dx (float) : Linear or logarithmic pixel width. log (bool) : Flag that the range should be logarithmically binned. base (float) : Base for the logarithm default (int) : Default number of pixels to use. The default is returned if either *outRange* or *dx* are not provided. Returns: int, np.ndarray: Returns two objects: The number of pixels to cover *outRange* with pixels of width *dx* and the adjusted range such that number of pixels of size dx is the exact integer. Raises: ValueError: Raised if the range is not a two-element vector """ # If the range or sampling are not provided, the number of pixels is # already set if outRange is None or dx is None: return default, outRange if len(outRange) != 2: raise ValueError('Output range must be a 2-element vector.') _outRange = np.atleast_1d(outRange).copy() npix = int( np.diff(np.log(_outRange))/np.log(base) / dx) + 1 if log else \ int(np.diff(_outRange)/dx) + 1 # _outRange = outRange _outRange[1] = np.power(base, np.log(_outRange[0])/np.log(base) + dx*(npix-1)) \ if log else _outRange[0] + dx*(npix-1) return npix, _outRange
[docs]def resample_vector(y, xRange=None, inLog=False, newRange=None, newpix=None, newLog=True, dx=None, base=10.0, ext_value=0.0, conserve=False, flat=True): """ Resample the provided vector to a new grid using integration. This is a generalization of the routine :func:`log_rebin` provided by Michele Cappellari in the pPXF package. Args: y (np.ndarray): Vector to resample. Must be 1-D. xRange (array): (**Optional**) A two-element array with the starting and ending value for the coordinates of the centers of the first and last pixels in y. If not provided, the pixel coordinates are used; i.e., xRange = [0,y.size-1]. inLog (bool): (**Optional**) Flag that the input vector is logarithmically spaced within xRange. Cannot be used if xRange is not provided! newRange (array): (**Optional**) Coordinates for the centers of the first and last pixel in the output vector. If not provided, assumed to be the same as the input range. newpix (int): (**Optional**) Number of pixels for the output vector. If not provided, assumed to be the same as the input vector. newLog (bool): (**Optional**) The output vector should be logarithmically binned in the x-coordinates. dx (float): (**Optional**) The sampling step for the output vector. If *newLog* is True, this has to be the change in the logarithm of x for the output vector! If not provided, the sampling is set by the output range (see *newRange* above) and number of pixels (see *newpix* above). base (float): (**Optional**) When logarithmically binning the output vector, use this as the base. The default is 10.0; use np.exp(1) for natural logarithm. ext_value (float): (**Optional**) Set extrapolated values to the provided float. conserve (bool): (**Optional**) Conserve the integral of the input vector. For example, if the input vector is a spectrum in flux units, you should conserve the flux in the resampling; if the spectrum is in units of flux density, you do not want to conserve the integral. flat (bool): (**Optional**) Assume the 'true' y function is flat across a pixel, as is done in M. Cappellari's log_rebin routine; this is the default behavior. If set to False, the integration follows a basic linear interpolation across the pixel. Returns: np.ndarray: Two numpy arrays with the new x coordinates and new y values for the resampled vector. Raises: ValueError: Raised if *y* is not of type np.ndarray, if *y* is not one-dimensional, or if *xRange* is not provided and the input vector is logarithmically binned (see *inLog* above). .. todo: - Need to check if this works rebinning from log to log! """ # Check operation can be performed if not isinstance(y, np.ndarray): raise ValueError('Input vector must be a np.ndarray!') if len(y.shape) != 1: raise ValueError('Input must be a 1-D vector!') if xRange is None and inLog: raise ValueError('To specify the input vector as logarithmically binned, you must ' \ 'provided the coordinates of the first and last pixel!') # Get the pixel borders of the input vector n = int(y.shape[0]) inRange = np.array([0, n-1]) if xRange is None else np.array(xRange) inBorders, inPscale = _pixel_borders(inRange, n, log=inLog, base=base) # Set the output range, number of pixels, pixel borders, and output # coordinate vector outRange = inRange if newRange is None else np.array(newRange) m, _outRange = resample_vector_npix(outRange=outRange, log=newLog, base=base, dx=dx, default=(n if newpix is None else newpix)) # print(outRange, m, _outRange) outRange = outRange if _outRange is None else _outRange outBorders, outPscale = _pixel_borders(outRange, m, log=newLog, base=base) outX = np.sqrt(outBorders[1:]*outBorders[:-1]) if newLog \ else (outBorders[1:]+outBorders[:-1])/2.0 #If the input is logarithmically binned, work in log space if inLog: inBorders = np.log(inBorders)/np.log(base) outBorders = np.log(outBorders)/np.log(base) # Convert the borders to the input pixel units outBorders /= inPscale inBorders /= inPscale # Perform the integration if flat: # Get the new spectrum by performing an analytic integral # assuming the value is flat across each pixel # - From M. Cappellari k = (outBorders - inBorders[0]).clip(0, n-1).astype(int) outY = np.add.reduceat(y, k)[:-1] outY *= np.diff(k) > 0 outY += np.diff((outBorders - inBorders[k])*y[k]) else: # Get the new spectrum by performing an analytic integral # assuming a basic linear interpolation between the pixel values # - Not as elegant as Michele's code above, but it gets the # job done # Create a sorted list of the pixel centers and the output # borders inCenters = np.sqrt(inBorders[1:]*inBorders[:-1]) if newLog \ else (inBorders[1:]+inBorders[:-1])/2.0 _x = np.append( outBorders, inCenters ) srt = np.argsort(_x) _x = _x[srt] # Linearly interpolate the input function at the output border positions _y = np.append(y, y[-1]) d = (outBorders-inCenters[0]) k = d.clip(0,n-1).astype(int) _y = np.append( (1-(d-k))*y[k] + (d-k)*_y[k+1], y)[srt] # Flag the input pixel centers for removal from reduceat flg = np.zeros(_x.size, dtype=int) flg[m+1:] = 1 _f = flg[srt] # Compute the integrand if inLog: _y *= np.power(base, _x*inPscale) integ = (_y[1:]+_y[:-1])*np.diff(_x)/2.0 # Use reduceat to calculate the integral indx = _f < 1 k = np.arange(_y.size)[indx] outY = np.add.reduceat(integ, k[:-1]) if k[-1] == _y.size-1 \ else np.add.reduceat(integ, k)[:-1] # Do not conserve the integral over the size of the pixel if not conserve: outY /= np.diff(outBorders) if inLog: outY /= outX # Set values for extrapolated regions if outRange is not None and (outRange[0] < inRange[0] or outRange[1] > inRange[1]): outY[ (outX < inRange[0]) | (outX > inRange[1]) ] = ext_value # Return new coordinates and rebinned values return outX, outY
[docs]def downgrade(wave, flux, deltal_in, sigma_galaxy, wave_instrument, r_instrument): """ Adapted from the manga DAP downgrader from Kyle Westfall. Downgrades an input spectrum to a given galaxy velocity dispersion using the input SEDs resolution and the resolution of the observation. Returns flux of downgraded SED. """ sig2fwhm = 2.0 * np.sqrt(2.0 * np.log(2.0)) fwhm = deltal_in/wave*c sigma = fwhm/sig2fwhm sres = wave/deltal_in new_sig = np.zeros(wave.shape, dtype=np.float64) # match wavelength between model and instrument to downgrade def find_nearest(array,value): idx = (np.abs(array-value)).argmin() return idx,array[idx] for wi,w in enumerate(wave): index, value = find_nearest(wave_instrument,w) sig_instrument = c/r_instrument[index]/sig2fwhm new_sig[wi] = np.sqrt(sigma_galaxy**2.0 +sig_instrument**2.0) new_fwhm = sig2fwhm * new_sig new_sres = c / new_fwhm if len(wave)<5: raise ValueError("Not enough wavelength points...!") a_wave = wave[2]-wave[1] b_wave = wave[3]-wave[2] if b_wave-a_wave < 0.000001*a_wave: log_wave = False else: log_wave = True new_flux, matched_sres, sigma_offset, new_mask = match_spectral_resolution(wave, flux, sres, wave, new_sres, min_sig_pix=0.0, log10=log_wave, new_log10=log_wave) return new_flux