Source code for arpes.analysis.mask

"""Utilities for applying masks to data."""

from __future__ import annotations

from typing import TYPE_CHECKING

import numpy as np
import xarray as xr
from matplotlib.path import Path as mPath

from arpes.provenance import update_provenance
from arpes.utilities import normalize_to_spectrum

if TYPE_CHECKING:
    from collections.abc import Iterable

    from _typeshed import Incomplete
    from numpy.typing import NDArray


__all__ = (
    "apply_mask",
    "apply_mask_to_coords",
    "polys_to_mask",
    "raw_poly_to_mask",
)


[docs] def raw_poly_to_mask(poly: Incomplete) -> dict[str, Incomplete]: """Converts a polygon into a mask definition. There's not currently much metadata attached to masks, but this is around if we ever decide that we need to implement more complicated masking schemes. In particular, we might want to store also whether the interior or exterior is the masked region, but this is functionally achieved for now with the `invert` flag in other functions. Args: poly: Polygon implementing a masked region. Returns: The mask. """ return { "poly": poly, }
[docs] def polys_to_mask( mask_dict: dict[str, Incomplete], coords: xr.Coordinates, shape: Iterable[int], radius: float = 0, *, invert: bool = False, ) -> NDArray[np.bool_]: """Converts a mask definition in terms of the underlying polygon to a True/False mask array. Uses the coordinates and shape of the target data in order to determine which pixels should be masked. This process "specializes" a mask to a particular shape, whereas masks given by polygon definitions are general to any data with appropriate dimensions, because waypoints are given in unitful values rather than index values. Args: mask_dict (dict): dict object to represent mask. dim and polys keys are required. coords (xr.coordinates): coordinates shape (list[tuple[int, ...]]): Shape of mask radius (float): Additional margin on the path in coordinates of *points*. invert (bool): if true, flip True/False in mask. Returns: The mask. """ dims = mask_dict["dims"] polys = mask_dict["polys"] polys = [ [[np.searchsorted(coords[dims[i]], coord) for i, coord in enumerate(p)] for p in poly] for poly in polys ] mask_grids = np.meshgrid(*[np.arange(s) for s in shape]) points = np.vstack([k.flatten() for k in mask_grids]).T mask = None for poly in polys: grid: NDArray[np.bool_] = mPath(poly).contains_points(points, radius=radius) grid = grid.reshape(list(shape)[::-1]).T mask = grid if mask is None else np.logical_or(mask, grid) assert isinstance(mask, np.ndarray) if invert: mask = np.logical_not(mask) return mask
[docs] def apply_mask_to_coords( data: xr.Dataset, # data.data_vars is used mask: dict[str, NDArray[np.float64] | Iterable[Iterable[float]]], # (N, 2) array dims: list[str], *, invert: bool = True, ) -> NDArray[np.bool_]: """Performs broadcasted masking along a given dimension. Args: data: The data you want to mask. mask: The mask to apply, should be dimensionally equivalent to what you request in `dims`. dims: The dimensions which should be masked. invert: Whether the mask should be inverted. Returns: The masked data. """ as_array = np.stack([data.data_vars[d].values for d in dims], axis=-1) shape = as_array.shape dest_shape = shape[:-1] new_shape = [np.prod(dest_shape), len(dims)] mask_array = ( mPath(np.array(mask["poly"])) .contains_points(as_array.reshape(new_shape)) .reshape(dest_shape) ) if invert: mask_array = np.logical_not(mask_array) return mask_array
[docs] @update_provenance("Apply boolean mask to data") def apply_mask( data: xr.DataArray, mask: dict[str, Incomplete] | NDArray[np.bool_], replace: float = np.nan, radius: float = 0.0, *, invert: bool = False, ) -> xr.DataArray: """Applies a logical mask, i.e. one given in terms of polygons, to a specific piece of data. This can be used to set values outside or inside a series of polygon masks to a given value or to NaN. Expanding or contracting the masked region can be accomplished with the radius argument, but by default strict inclusion is used. Some masks include a `fermi` parameter which allows for clipping the detector boundaries in a semi-automated fashion. If this is included, only 200meV above the Fermi level will be included in the returned data. This helps to prevent very large and undesirable regions filled with only the replacement value which can complicate automated analyses that rely on masking. Args: data: Data to mask. mask: Logical definition of the mask, appropriate for passing to `polys_to_mask` replace: The value to substitute for pixels masked. radius: Radius by which to expand the masked area. invert: Allows logical inversion of the masked parts of the data. By default, the area inside the polygon sequence is replaced by `replace`. Returns: Data with values masked out. """ data = data if isinstance(data, xr.DataArray) else normalize_to_spectrum(data) fermi: float | None = None if isinstance(mask, dict): fermi = mask.get("fermi", None) dims: tuple[str, ...] = mask.get("dims", data.dims) assert isinstance(mask, dict) mask_arr: NDArray[np.bool_] = polys_to_mask( mask_dict=mask, coords=data.coords, shape=[s for i, s in enumerate(data.shape) if data.dims[i] in dims], radius=radius, invert=invert, ) else: mask_arr = mask masked_data = data.copy(deep=True) masked_data.values = masked_data.values * 1.0 masked_data.values[mask_arr] = replace if fermi is not None: return masked_data.sel(eV=slice(None, fermi + 0.2)) return masked_data