Source code for arpes.utilities.conversion.api

"""Public API for momentum-space (k-space) conversion of ARPES data.

This module defines the user-facing entry points for converting ARPES
data from angle/energy space into momentum space. The functions provided
here are stable, documented, and intended to be imported and used directly
by analysis scripts and notebooks.

Design principles:
    - This module exposes *high-level orchestration functions* only.
    - Implementation details (coordinate transforms, interpolation,
      grid construction) are delegated to lower-level modules.
    - Functions here operate on `xarray.DataArray` objects and return
      new objects without modifying inputs in place.

Scope:
    - Backward (interpolating) k-space conversion via `convert_to_kspace`
    - Chunk-aware conversion for large energy stacks
    - Provenance tracking for reproducibility

Non-goals:
    - This module does not implement low-level coordinate transforms.
    - This module does not define interpolation kernels or numerical solvers.
    - Forward (coordinate-only) conversion routines live elsewhere.

Typical usage:
    >>> from arpes.utilities.conversion.api import convert_to_kspace
    >>> kdata = convert_to_kspace(data, kx=..., ky=...)

Module structure:
    - convert_to_kspace:
        Primary public entry point for k-space conversion.
    - _chunk_convert:
        Internal helper for chunk-wise conversion (not part of the public API).

See Also:
    - arpes.utilities.conversion.coordinates:
        Low-level coordinate transformation utilities.
    - arpes.utilities.conversion.core:
        Internal conversion machinery and interpolation logic.
"""

from __future__ import annotations

import warnings
from logging import DEBUG, INFO
from typing import TYPE_CHECKING, Unpack

import numpy as np
import xarray as xr

from arpes.debug import setup_logger
from arpes.provenance import update_provenance
from arpes.utilities import normalize_to_spectrum
from arpes.xarray_extensions.accessor.spectrum_type import AngleUnit

from .coordinates import convert_coordinates
from .core import _is_dims_match_coordinate_convert
from .grids import (
    determine_momentum_axes_from_measurement_axes,
    is_dimension_convertible_to_momentum,
)
from .kx_ky_conversion import ConvertKp, ConvertKxKy
from .kz_conversion import ConvertKpKz

if TYPE_CHECKING:
    from collections.abc import Hashable

    from numpy.typing import NDArray

    from arpes._typing.attrs_property import KspaceCoords
    from arpes._typing.base import MOMENTUM
    from arpes.utilities.conversion.base import CoordinateConverter
    from arpes.utilities.conversion.calibration import DetectorCalibration


LOGLEVELS = (DEBUG, INFO)
LOGLEVEL = LOGLEVELS[1]
logger = setup_logger(__name__, LOGLEVEL)


[docs] @update_provenance("Automatically k-space converted") def convert_to_kspace( # noqa: PLR0913 arr: xr.DataArray, *, bounds: dict[MOMENTUM, tuple[float, float]] | None = None, resolution: dict[MOMENTUM, float] | None = None, calibration: DetectorCalibration | None = None, coords: KspaceCoords | None = None, allow_chunks: bool = False, **kwargs: Unpack[KspaceCoords], ) -> xr.DataArray: """Converts volumetric the data to momentum space ("backwards"). Typically what you want. Works in general by regridding the data into the new coordinate space and then interpolating back into the original data. For forward conversion, see sibling methods. Forward conversion works by totally unchanged by the conversion (if we do not apply a Jacobian correction), but the converting the coordinates, rather than by interpolating the data. As a result, the data will be coordinates will no longer have equal spacing. This is only really useful for zero and one dimensional data because for two dimensional data, the coordinates must become two dimensional in order to fully specify every data point (this is true in generality, in 3D the coordinates must become 3D as well). The only exception to this is if the extra axes do not need to be k-space converted. As is the case where one of the dimensions is `cycle` or `delay`, for instance. You can request a particular resolution for the new data with the `resolution=` parameter, or a specific set of bounds with the `bounds=` Examples: Convert a 2D cut with automatically inferred range and resolution. >>> convert_to_kspace(arpes.io.load_example_data()) # doctest: +SKIP xr.DataArray(...) Convert a 3D map with a specified momentum window >>> convert_to_kspace( # doctest: +SKIP fermi_surface_map, kx=np.linspace(-1, 1, 200), ky=np.linspace(-1, 1, 350), ) xr.DataArray(...) Args: arr (xr.DataArray): ARPES data bounds (dict[MOMENTUM, tuple[float, float]], optional): The key is the axis name. The value is the bounds. Defaults to {}. If not set this arg, set coords. resolution (dict[Momentum, float], optional): dict for the energy/angular resolution. calibration (DetectorCalibration, optional): DetectorCalibration object. Defaults to None. coords (KspaceCoords, optional): Coordinate of k-space. Defaults to {}. allow_chunks (bool): [description]. Defaults to False. **kwargs: treated as coords. Raises: NotImplementedError: [description] AnalysisError: [description] ValueError: [description] Returns: xr.DataArray: Converted ARPES (k-space) data. """ coords = {} if coords is None else coords assert coords is not None coords.update(kwargs) bounds = bounds or {} arr = arr if isinstance(arr, xr.DataArray) else normalize_to_spectrum(arr) assert isinstance(arr, xr.DataArray) if arr.S.angle_unit is AngleUnit.DEG: arr = arr.S.switched_angle_unit() logger.debug(f"bounds (covnert_to_kspace): {bounds}") logger.debug(f"keys in coords (convert_to_kspace): {coords.keys()}") # Chunking logic if allow_chunks and ("eV" in arr.dims) and len(arr.eV) > 50: # noqa: PLR2004 return _chunk_convert( arr=arr, bounds=bounds, resolution=resolution, calibration=calibration, coords=coords, **kwargs, ) momentum_incompatibles: list[str] = [ str(d) for d in arr.dims if not is_dimension_convertible_to_momentum(str(d)) and str(d) != "eV" ] momentum_compatibles: list[str] = sorted( # Literal["phi", "theta", "beta", "chi", "psi", "hv"] [str(d) for d in arr.dims if is_dimension_convertible_to_momentum(str(d))], ) if not momentum_compatibles: return arr # no need to convert, might be XPS or similar converted_dims: list[str] = ( (["eV"] if ("eV" in arr.dims) else []) + determine_momentum_axes_from_measurement_axes( momentum_compatibles, ) # axis_names: list[Literal["phi", "beta", "psi", "theta", "hv"]], + momentum_incompatibles ) tupled_momentum_compatibles = tuple(momentum_compatibles) convert_cls: type[ConvertKp | ConvertKxKy | ConvertKpKz] | None = None if _is_dims_match_coordinate_convert(tupled_momentum_compatibles): convert_cls = { ("phi",): ConvertKp, ("beta", "phi"): ConvertKxKy, ("phi", "theta"): ConvertKxKy, ("phi", "psi"): ConvertKxKy, # ('chi', 'phi',): ConvertKxKy, ("hv", "phi"): ConvertKpKz, }.get(tupled_momentum_compatibles) assert convert_cls is not None, "Cannot select convert class" converter: CoordinateConverter = convert_cls( arr=arr, dim_order=converted_dims, calibration=calibration, ) converted_coordinates: dict[Hashable, NDArray[np.floating]] = converter.get_coordinates( resolution=resolution, bounds=bounds, ) if not set(coords.keys()).issubset(converted_coordinates.keys()): extra = set(coords.keys()).difference(converted_coordinates.keys()) msg = f"Unexpected passed coordinates: {extra}" raise ValueError(msg) converted_coordinates.update(**coords) # type: ignore[misc] result = convert_coordinates( arr, target_coordinates=converted_coordinates, coordinate_transform={ "dims": converted_dims, "transforms": {str(dim): converter.conversion_for(dim) for dim in arr.dims}, }, ) assert isinstance(result, xr.DataArray) return result
def _chunk_convert( arr: xr.DataArray, bounds: dict[MOMENTUM, tuple[float, float]] | None = None, resolution: dict[MOMENTUM, float] | None = None, calibration: DetectorCalibration | None = None, coords: KspaceCoords | None = None, **kwargs: Unpack[KspaceCoords], ) -> xr.DataArray: DESIRED_CHUNK_SIZE = 1000 * 1000 * 20 TOO_LARGE_CHUNK_SIZE = 100 n_chunks: np.int_ = np.prod(arr.shape) // DESIRED_CHUNK_SIZE if n_chunks == 0: warnings.warn( "Data size is sufficiently small, set allow_chunks=False", stacklevel=2, ) n_chunks += 1 if n_chunks > TOO_LARGE_CHUNK_SIZE: warnings.warn( "Input array is very large. Please consider resampling.", stacklevel=2, ) chunk_thickness = np.max(len(arr.eV) // n_chunks, 1) logger.debug(f"Chunking along energy: {n_chunks}, thickness {chunk_thickness}") finished = [] low_idx = 0 high_idx = chunk_thickness while low_idx < len(arr.eV): chunk = arr.isel(eV=slice(low_idx, high_idx)) if len(chunk.eV) == 1: chunk = chunk.squeeze(dim="eV") kchunk = convert_to_kspace( arr=chunk, bounds=bounds, resolution=resolution, calibration=calibration, coords=coords, allow_chunks=False, **kwargs, ) if "eV" not in kchunk.dims: kchunk = kchunk.expand_dims("eV") assert isinstance(kchunk, xr.DataArray) finished.append(kchunk) low_idx = high_idx high_idx = min(len(arr.eV), high_idx + chunk_thickness) return xr.concat(finished, dim="eV")