Source code for arpes.analysis.decomposition

"""Provides array decomposition approaches like principal component analysis for xarray types."""

from __future__ import annotations

from functools import wraps
from typing import TYPE_CHECKING, Literal, TypedDict, Unpack, cast

import sklearn
import xarray as xr
from sklearn.decomposition import NMF, PCA, FactorAnalysis, FastICA
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler

from arpes.constants import TWO_DIMENSION
from arpes.provenance import Provenance, provenance
from arpes.utilities import normalize_to_spectrum

if TYPE_CHECKING:
    import numpy as np
    from numpy.typing import NDArray
    from sklearn.base import BaseEstimator

__all__ = (
    "factor_analysis_along",
    "ica_along",
    "nmf_along",
    "pca_along",
)


class PCAParam(TypedDict, total=False):
    n_composition: float | Literal["mle", "auto"] | None
    copy: bool
    whiten: str | bool
    svd_solver: Literal["auto", "full", "arpack", "randomiozed"]
    tol: float
    iterated_power: int | Literal["auto"]
    n_oversamples: int
    power_interation_normalizer: Literal["auto", "QR", "LU", "none"]
    random_state: int | None


class FastICAParam(TypedDict, total=False):
    n_composition: float | None
    algorithm: Literal["Parallel", "deflation"]
    whiten: bool | Literal["unit-variance", "arbitrary-variance"]
    fun: Literal["logosh", "exp", "cube"]
    fun_args: dict[str, float] | None
    max_iter: int
    tol: float
    w_int: NDArray[np.float64]
    whiten_solver: Literal["eigh", "svd"]
    random_state: int | None


class NMFParam(TypedDict, total=False):
    n_composition: int | Literal["auto"] | None
    init: Literal["random", "nndsvd", "nndsvda", "nndsvdar", "custom"] | None
    solver: Literal["cd", "mu"]
    beta_loss: float | Literal["frobenius", "kullback-leibler", "itakura-saito"]
    tol: float
    max_iter: int
    random_state: int | None
    alpha_W: float
    alpha_H: float
    l1_ratio: float
    verbose: int
    shuffle: bool


class FactorAnalysisParam(TypedDict, total=False):
    n_composition: int | None
    tol: float
    copy: bool
    max_iter: int
    noise_variance_init: NDArray[np.float64] | None
    svd_method: Literal["lapack", "randomized"]
    iterated_power: int
    rotation: Literal["varimax", "quartimax"] | None
    random_state: int | None


class DecompositionParam(PCAParam, FastICAParam, NMFParam, FactorAnalysisParam):  # type: ignore[misc]
    pass


[docs] def decomposition_along( data: xr.DataArray, axes: list[str], *, decomposition_cls: type[BaseEstimator], correlation: bool = False, **kwargs: Unpack[DecompositionParam], ) -> tuple[xr.DataArray, sklearn.base.BaseEstimator]: """Change the basis of multidimensional data according to `sklearn` decomposition classes. This allows for robust and simple PCA, ICA, factor analysis, and other decompositions of your data even when it is very high dimensional. Generally speaking, PCA and similar techniques work when data is 2D, i.e. a sequence of 1D observations. We can make the same techniques work by unravelling a ND dataset into 1D (i.e. np.ndarray.ravel()) and unravelling a KD set of observations into a 1D set of observations. This is basically grouping axes. As an example, if you had a 4D dataset which consisted of 2D-scanning valence band ARPES, then the dimensions on our dataset would be "[x, y, eV, phi]". We can group these into [spatial=(x, y), spectral=(eV, phi)] and perform PCA or another analysis of the spectral features over different spatial observations. If our data was called `f`, this can be accomplished with: ``` transformed, decomp = decomposition_along(f.stack(spectral=['eV', 'phi']), ['x', 'y'], PCA) transformed.dims # -> [X, Y, components] ``` The results of `decomposition_along` can be explored with `arpes.widgets.pca_explorer`, regardless of the decomposition class. Args: data: Input data, can be N-dimensional but should only include one "spectral" axis. axes: Several axes to be treated as a single axis labeling the list of observations. decomposition_cls: A sklearn.decomposition class (such as PCA or ICA) to be used to perform the decomposition. correlation: Controls whether StandardScaler() is used as the first stage of the data ingestion pipeline for sklearn. kwargs: forward to ``decomposition_cls`` Returns: A tuple containing the projected data and the decomposition fit instance. """ data = data if isinstance(data, xr.DataArray) else normalize_to_spectrum(data) if len(axes) > 1: flattened_data: xr.DataArray = data.stack(fit_axis=axes) stacked = True else: flattened_data = data.transpose(..., axes[0]) stacked = False if len(flattened_data.dims) != TWO_DIMENSION: msg = f"Inappropriate number of dimensions after flattening: [{flattened_data.dims}]" raise ValueError( msg, ) pipeline = sklearn.Pipeline = ( make_pipeline(StandardScaler(), decomposition_cls(**kwargs)) if correlation else make_pipeline(decomposition_cls(**kwargs)) ) pipeline.fit(flattened_data.values.T) decomp = pipeline.steps[-1][1] transform = decomp.transform(flattened_data.values.T) into = flattened_data.copy(deep=True) into_first = into.dims[0] into = into.isel({into_first: slice(0, transform.shape[1])}) into = into.rename({into_first: "components"}) into.values = transform.T if stacked: into = into.unstack("fit_axis") provenance_context: Provenance = cast( "Provenance", { "what": "sklearn decomposition", "by": "decomposition_along", "axes": axes, "correlation": False, "decomposition_cls": decomposition_cls.__name__, }, ) provenance(into, data, provenance_context) return into, decomp
[docs] @wraps(decomposition_along) def pca_along( data: xr.DataArray, axes: list[str], *, correlation: bool = False, **kwargs: Unpack[PCAParam], ) -> tuple[xr.DataArray, sklearn.decomposition.PCA]: """Specializes `decomposition_along` with `sklearn.decomposition.PCA`.""" return decomposition_along( data, axes, correlation=correlation, decomposition_cls=PCA, **kwargs, )
[docs] @wraps(decomposition_along) def factor_analysis_along( data: xr.DataArray, axes: list[str], *, correlation: bool = False, **kwargs: Unpack[FactorAnalysisParam], ) -> tuple[xr.DataArray, sklearn.decomposition.FactorAnalysis]: """Specializes `decomposition_along` with `sklearn.decomposition.FactorAnalysis`.""" return decomposition_along( data, axes, correlation=correlation, decomposition_cls=FactorAnalysis, **kwargs, )
[docs] @wraps(decomposition_along) def ica_along( data: xr.DataArray, axes: list[str], *, correlation: bool = False, **kwargs: Unpack[FastICAParam], ) -> tuple[xr.DataArray, sklearn.decomposition.FastICA]: """Specializes `decomposition_along` with `sklearn.decomposition.FastICA`.""" return decomposition_along( data, axes, correlation=correlation, decomposition_cls=FastICA, **kwargs, )
[docs] @wraps(decomposition_along) def nmf_along( data: xr.DataArray, axes: list[str], *, correlation: bool = False, **kwargs: Unpack[NMFParam], ) -> tuple[xr.DataArray, sklearn.decomposition.NMF]: """Specializes `decomposition_along` with `sklearn.decomposition.NMF`.""" return decomposition_along( data, axes, decomposition_cls=NMF, correlation=correlation, **kwargs, )