Source code for arpes.analysis.general

"""Some general purpose analysis routines otherwise defying categorization."""

import warnings
from logging import DEBUG, INFO

import numpy as np
import xarray as xr
from lmfit.models import ConstantModel

from arpes._typing.base import DataType, ReduceMethod
from arpes.constants import K_BOLTZMANN_EV_KELVIN
from arpes.debug import setup_logger
from arpes.fits import AffineBroadenedFD
from arpes.fits.fit_models.functional_forms import fermi_dirac
from arpes.provenance import update_provenance
from arpes.utilities import normalize_to_spectrum

from .filters import gaussian_filter_arr

__all__ = (
    "condense",
    "fit_fermi_edge",
    "normalize_by_fermi_distribution",
    "rebin",
    "symmetrize_axis",
)

LOGLEVELS = (DEBUG, INFO)
LOGLEVEL = LOGLEVELS[1]
logger = setup_logger(__name__, LOGLEVEL)


@update_provenance("Fit Fermi Edge")
def fit_fermi_edge(
    data: xr.DataArray,
    energy_range: slice | None = None,
) -> xr.Dataset:
    """Fits a Fermi edge.

    Not much easier than doing it manually, but this can be
    useful sometimes inside procedures where you don't want to reimplement this logic.

    Args:
        data(DataArray): ARPES data
        energy_range(slice | tuple[float, float]): energy range for fitting

    Returns:
        The Fermi edge location.
    """
    assert isinstance(data, xr.DataArray)
    if energy_range is None:
        energy_range = slice(-0.1, 0.1)

    model = AffineBroadenedFD() + ConstantModel()
    params = AffineBroadenedFD().guess(
        data.sel(eV=energy_range).sel(phi=0, method="nearest").values,
        data.sel(eV=energy_range).coords["eV"].values,
    )
    return data.sel(eV=energy_range).S.modelfit("eV", model=model, params=params)


@update_provenance("Normalized by the 1/Fermi Dirac Distribution at sample temp")
def normalize_by_fermi_distribution(
    data: xr.DataArray,
    max_gain: float | np.floating = 0.0,
    rigid_shift: float = 0,
    instrumental_broadening: float = 0,
    total_broadening: float = 0,
) -> xr.DataArray:
    """Normalizes a scan by 1/the fermi dirac distribution.

    You can control the maximum gain with ``clamp``, and whether
    the Fermi edge needs to be shifted (this is for those desperate situations where you want
    something that "just works") via ``rigid_shift``.

    Args:
        data: Input
        max_gain: Maximum value for the gain. By default the value used
            is the mean of the spectrum.
        rigid_shift: How much to shift the spectrum chemical potential.
        instrumental_broadening: Instrumental broadening to use for
            convolving the distribution
        total_broadening: the value for total broadning.

    Pass the nominal value for the chemical potential in the scan. I.e. if the chemical potential is
    at BE=0.1, pass rigid_shift=0.1.

    Returns:
        Normalized DataArray
    """
    data = data if isinstance(data, xr.DataArray) else normalize_to_spectrum(data)
    if total_broadening:
        distrib = fermi_dirac(
            x=data.coords["eV"].values,
            center=rigid_shift,
            width=total_broadening,
        )
    else:
        distrib = fermi_dirac(
            x=data.coords["eV"].values,
            center=rigid_shift,
            width=K_BOLTZMANN_EV_KELVIN * data.S.temp,
        )
    assert isinstance(distrib, np.ndarray)
    # don't boost by more than 90th percentile of input, by default
    max_gain = (
        max_gain or min(
            float(np.mean(data.values, dtype=np.float64)),
            float(np.percentile(data.values, 10)),
        )
    )

    np.place(distrib, distrib < 1 / max_gain, 1 / max_gain)
    distrib_arr = xr.DataArray(distrib, {"eV": data.coords["eV"].values}, ["eV"])

    distrib_arr = (
        gaussian_filter_arr(distrib_arr, sigma={"eV": instrumental_broadening})
        if instrumental_broadening
        else distrib_arr
    )

    return data / distrib_arr


[docs] @update_provenance("Symmetrize about axis") def symmetrize_axis( data: DataType, axis_name: str, flip_axes: list[str] | None = None, ) -> DataType: """Symmetrizes data across an axis. It would be better ultimately to be able to implement an arbitrary symmetry (such as a mirror or rotational symmetry about a line or point) and to symmetrize data by that method. Args: data: input data axis_name: name of axis to be symmetrized. flip_axes (list[str]): lis of axis name to be flipped flipping. Returns: Data after symmetrization procedure. """ data = data.assign_coords( {axis_name: (data.coords[axis_name].values - data.coords[axis_name].values[0])}, ) selector = {} selector[axis_name] = slice(None, None, -1) rev: DataType = data.sel(selector) rev = rev.assign_coords({axis_name: -rev.coords[axis_name].values}) if flip_axes is None: flip_axes = [] for axis in flip_axes: selector = {} selector[axis] = slice(None, None, -1) rev = rev.sel(selector) rev = rev.assign_coords({axis_name: -rev.coords[axis_name].values}) return rev.combine_first(data)
[docs] @update_provenance("Condensed array") def condense(data: DataType) -> DataType: # pragma: no cover """Clips the data so that only regions where there is substantial weight are included. In practice this usually means selecting along the ``eV`` axis, although other selections might be made. Args: data: xarray.DataArray Returns: The clipped data. """ warnings.warn( "This method will be deprecated. Don't use it. Instead, use data.sel(eV=slice(0, 0.05))", category=DeprecationWarning, stacklevel=2, ) if "eV" in data.dims: data = data.sel(eV=slice(None, 0.05)) return data
[docs] @update_provenance("Rebinned array") def rebin( data: DataType, shape: dict[str, int] | None = None, bin_width: dict[str, int] | None = None, method: ReduceMethod = "sum", **kwargs: int, ) -> DataType: """Rebins the data onto a different (smaller) shape. (xarray groupby_bins is used internally) By default the behavior is to split the data into chunks that are integrated over. When both ``shape`` and ``bin_width`` are supplied, ``shape`` is used. Dimensions corresponding to missing entries in ``shape`` or ``reduction`` will not be changed Args: data: ARPES data shape(dict[str, int]): Target shape (key is dimension (coords) name, the value is the size of the coords after rebinning.) The priority is higher than that of the reduction argument. bin_width(dict[str, int]): Factor to reduce each dimension by The dict key is dimension name and it's value is the binning width in pixel. method: sum or mean after groupby_bins (default sum) **kwargs: Treated as bin_width. Like as eV=2, phi=3 to set. Returns: The rebinned data. """ bin_width = bin_width or {} for k, v in kwargs.items(): if k in data.dims: bin_width[k] = v if shape is None: shape = {} for k, v in bin_width.items(): shape[k] = len(data.coords[k]) // v assert bool(shape), "Set shape/bin_width" for bin_axis, bins in shape.items(): data = _bin(data, bin_axis, bins, method) return data
def _bin( data: DataType, bin_axis: str, bins: int, method: ReduceMethod, ) -> DataType: """Bin data along a specified axis and replace bin coordinates with bin centers. Args: data: xarray DataArray or Dataset to bin. bin_axis: Name of the coordinate along which to bin. bins: Number of bins. method: Reduction method, either "sum" or "mean". Returns: Binned xarray object with updated coordinates. """ if method not in ("sum", "mean"): msg = "method must be 'sum' or 'mean'" raise TypeError(msg) grouped = data.groupby_bins(bin_axis, bins) data = getattr(grouped, method)().rename({bin_axis + "_bins": bin_axis}) bin_edges = data.coords[bin_axis].values first_left = np.nextafter(bin_edges[0].left, bin_edges[0].right) bin_centers = [(first_left + bin_edges[0].right) / 2] + [ (b.left + b.right) / 2 for b in bin_edges[1:] ] data.coords[bin_axis] = np.array(bin_centers) return data