Source code for arpes.endstations.plugin.Elettra_spectromicroscopy

"""Implements data loading for the spectromicroscopy beamline at Elettra."""

from __future__ import annotations

import os
import warnings
from pathlib import Path
from typing import TYPE_CHECKING, ClassVar

import h5py
import numpy as np
import xarray as xr

from arpes.endstations import HemisphericalEndstation, SynchrotronEndstation
from arpes.utilities import unwrap_xarray_item
from arpes.configuration.interface import get_data_path

if TYPE_CHECKING:
    from _typeshed import Incomplete
    from numpy.typing import NDArray

    from arpes._typing.attrs_property import ScanDesc

__all__ = ("SpectromicroscopyElettraEndstation",)


def collect_coord(index: int, dset: h5py.Dataset) -> tuple[str, NDArray[np.float64]]:
    """Uses the beamline metadata to normalize the coordinate information for a given axis.

    Args:
        index: The index of the coordinate to extract from metadata.
        dset: The HDF dataset containing Elettra spectromicroscopy data.

    Returns:
        The coordinate extracted at `index` from the metadata. The return convention here is to
        provide a tuple consisting of the extracted coordinate name, and the values for that
        coordinate.
    """
    shape = dset.shape
    name = dset.attrs[f"Dim{index} Name Units"][0].decode()
    start, delta = dset.attrs[f"Dim{index} Values"]
    num = shape[index]
    coords = np.linspace(start, start + delta * (num - 1), num)
    if name == "P":
        name = "phi"
    return name, coords


def h5_dataset_to_dataarray(dset: h5py.Dataset) -> xr.DataArray:
    flat_coords = [collect_coord(i, dset) for i in range(len(dset.shape))]

    def unwrap_bytestring(
        possibly_bytestring: bytes | list | tuple | NDArray[np.float64],
    ) -> str | list:
        if isinstance(possibly_bytestring, bytes):
            return possibly_bytestring.decode()

        if isinstance(possibly_bytestring, list | tuple | np.ndarray):
            return [unwrap_bytestring(elem) for elem in possibly_bytestring]

        return possibly_bytestring

    DROP_KEYS = {
        "Dim0 Name Units",
        "Dim1 Name Units",
        "Dim2 Name Units",
        "Dim3 Name Units",
        "Dim0 Values",
        "Dim1 Values",
        "Dim2 Values",
        "Dim3 Values",
    }

    coords = dict(flat_coords)
    attrs = {k: unwrap_bytestring(v) for k, v in dset.attrs.items() if k not in DROP_KEYS}

    # attr normalization
    attrs["T"] = round(attrs["Angular Coord"][0], 1)
    attrs["P"] = attrs["Angular Coord"][1]

    coords["P"] = attrs["P"]

    del attrs["Angular Coord"]  # temp
    del attrs["Date Time Start Stop"]  # temp
    del attrs["Temperature (K)"]  # temp
    del attrs["DET Limits"]  # temp
    del attrs["Energy Window (eV)"]  # temp
    del attrs["Ring Current (mA)"]  # temp
    del attrs["Stage Coord (XYZR)"]  # temp

    ring_info = attrs.pop("Ring En (GeV) GAP (mm) Photon (eV)", None)
    if False:  # <- not trustworthy info, try to autodetect the photon energy
        if isinstance(ring_info, list):
            en, gap, hv = ring_info
        else:
            ring_info = "".join(c for c in ring_info if c not in {"[", "]"})
            en, gap, hv = (float(item.strip()) for item in ring_info.split(","))

        attrs["hv"] = hv
        coords["hv"] = hv
        attrs["undulator_gap"] = gap
        attrs["ring_energy"] = en

    return xr.DataArray(
        dset[:],
        coords=coords,
        dims=[flat_coord[0] for flat_coord in flat_coords],
        attrs=attrs,
    )


[docs] class SpectromicroscopyElettraEndstation( HemisphericalEndstation, SynchrotronEndstation, ): """Data loading for the nano-ARPES beamline "Spectromicroscopy Elettra". Information available on the beamline can be accessed `here <https://www.elettra.trieste.it/elettra-beamlines/spectromicroscopy>`_. """ PRINCIPAL_NAME = "Spectromicroscopy Elettra" ALIASES: ClassVar[list[str]] = ["Spectromicroscopy", "nano-ARPES Elettra"] _TOLERATED_EXTENSIONS: ClassVar[set[str]] = { ".hdf5", } _SEARCH_PATTERNS = ( r"[\-a-zA-Z0-9_\w]+_[0]+{}$", r"[\-a-zA-Z0-9_\w]+_{}$", r"[\-a-zA-Z0-9_\w]+{}$", r"[\-a-zA-Z0-9_\w]+[0]{}$", r"{}" + (r"\\" if os.path.sep == "\\" else "/") + r"[\-a-zA-Z0-9_\w]+_001$", ) @classmethod def files_for_search(cls: type, directory: str | Path) -> list[Path]: """Determines which files should be considered as candidates. Spectromicroscopy Elettra uses directories to group associated files together, so we have to find those. """ base_files: list[Path] = [] for file in Path(directory).iterdir(): p = Path(directory) / file if p.is_dir(): base_files = base_files + [Path(file) / f for f in p.iterdir()] else: base_files = [*base_files, Path(file)] return list( filter( lambda f: Path(f).suffix in cls._TOLERATED_EXTENSIONS, base_files, ), ) ANALYZER_INFORMATION: ClassVar[dict[str, str | float | bool]] = { "analyzer": "Custom: in vacuum hemispherical", "analyzer_name": "Spectromicroscopy analyzer", "parallel_deflectors": False, "perpendicular_deflectors": False, "analyzer_radius": np.nan, "analyzer_type": "hemispherical", } RENAME_COORDS: ClassVar[dict[str, str]] = { "KE": "eV", "X": "x", "Y": "y", "Z": "z", "P": "psi", "Angle": "phi", } RENAME_KEYS: ClassVar[dict[str, str]] = { "Ep (eV)": "pass_energy", "Dwell Time (s)": "dwell_time", "Lens Mode": "lens_mode", "MCP Voltage": "mcp_voltage", "N of Scans": "n_scans", "Pressure (mbar)": "pressure", "Ring Current (mA)": "ring_current", # 'Ring En (GeV) Gap (mm) Photon(eV)': None, "Sample ID": "sample", "Stage Coord (XYZR)": "stage_coords", "Temperature (K)": "temperature", } CONCAT_COORDS: ClassVar[list[str]] = ["T", "P"] def concatenate_frames( self, frames: list[xr.Dataset], scan_desc: ScanDesc | None = None, ) -> xr.Dataset: """Concatenates frame for spectromicroscopy at Elettra. The scan axis is determined dynamically by checking for uniqueness across frames. The truth here is a bit more complicated because Elettra supports "diagonal" scans but frequently users set a very small offset in the other angular coordinate. """ if scan_desc: warnings.warn("scan_desc is not supported", stacklevel=2) if not frames: msg = "Could not read any frames." raise ValueError(msg) if len(frames) == 1: return frames[0] # determine which axis to stitch them together along, and then do this scan_coord = None max_different_values = -np.inf best_coordinates = [] for possible_scan_coord in self.CONCAT_COORDS: coordinates = [f.coords.get(possible_scan_coord, None) for f in frames] coordinates = [ None if hasattr(c, "shape") and len(c.shape) else unwrap_xarray_item(c) for c in coordinates ] n_different_values = len(set(coordinates)) if n_different_values > max_different_values and None not in coordinates: max_different_values = n_different_values scan_coord = possible_scan_coord best_coordinates = coordinates assert scan_coord is not None fs = [] for c, f in zip(best_coordinates, frames, strict=True): frame_spectrum = f.spectrum frame_spectrum.coords[scan_coord] = c fs.append(frame_spectrum) return xr.Dataset({"spectrum": xr.concat(fs, scan_coord)}) def resolve_frame_locations( self, scan_desc: ScanDesc | None = None, ) -> list[Path]: """Determines all files associated with a given scan. This beamline saves several HDF files in scan associated folders, so this amounts to checking whether the scan is multi-file and associating sibling files if so. """ if scan_desc is None: msg = "Must pass dictionary as file scan_desc to all endstation loading code." raise ValueError( msg, ) original_data_loc = scan_desc.get("path", scan_desc.get("file")) assert original_data_loc is not None if not Path(original_data_loc).exists(): data_path = get_data_path() assert data_path is not None original_data_loc = Path(data_path) / original_data_loc p = Path(original_data_loc) if p.parent.parent.stem in ([*list(self._SEARCH_DIRECTORIES), "data"]): return list(p.parent.glob("*.hdf5")) return [p] def load_single_frame( self, frame_path: str | Path = "", scan_desc: ScanDesc | None = None, **kwargs: Incomplete, ) -> xr.Dataset: """Loads a single HDF file with spectromicroscopy Elettra data.""" if scan_desc: warnings.warn("scan_desc is not supported.", stacklevel=2) if kwargs: warnings.warn("Any kwargs is not supported.", stacklevel=2) with h5py.File(frame_path, "r") as f: arrays = {k: h5_dataset_to_dataarray(f[k]) for k in f} if len(arrays) == 1: arrays = {"spectrum": next(iter(arrays.values()))} return xr.Dataset(arrays) def postprocess_final( self, data: xr.Dataset, scan_desc: ScanDesc | None = None, ) -> xr.Dataset: """Performs final postprocessing of the data. This mostly amounts to: 1. Adjusting for the work function and converting kinetic to binding energy 2. Adjusting angular coordinates to standard conventions 3. Microns -> millimeters on spatial coordinates """ data = data.rename({k: v for k, v in self.RENAME_COORDS.items() if k in data.coords}) if "eV" in data.coords: approx_workfunction = 3.46 data.coords["hv"] = 27.0 if data.eV.mean().item() < 29 else 74.0 # noqa: PLR2004 data.eV.values += approx_workfunction - data.coords["hv"].item() for coord, default in {"psi": 90.0, "phi": 0.0}.items(): if coord not in data.coords: data.coords[coord] = default data.coords["psi"] = np.deg2rad(data.coords["psi"] - 90) data.coords["psi"] = np.deg2rad(data.coords["psi"] - 90) data.coords["phi"] = np.deg2rad(data.coords["phi"] + data.spectrum.attrs["T"]) data.coords["beta"] = 0.0 data.coords["chi"] = 0.0 data.coords["alpha"] = np.pi / 2 data.coords["theta"] = 0.0 for i, dim_name in enumerate(["x", "y", "z"]): if dim_name in data.coords: data.coords[dim_name] = data.coords[dim_name] / 1000.0 else: try: data.coords[dim_name] = [ dv for dv in data.data_vars.values() if "eV" in dv.dims ][0].attrs["stage_coords"][i] / 1000.0 except IndexError: data.coords[dim_name] = 0.0 return super().postprocess_final(data, scan_desc)