Source code for arpes.endstations.plugin.merlin

"""The MERLIN ARPES Endstation at the Advanced Light Source."""

from __future__ import annotations

import re
from logging import DEBUG, INFO
from pathlib import Path
from typing import TYPE_CHECKING, ClassVar

import numpy as np
import xarray as xr

from arpes.debug import setup_logger
from arpes.endstations import (
    HemisphericalEndstation,
    SESEndstation,
    SynchrotronEndstation,
)

if TYPE_CHECKING:
    from collections.abc import Callable

    from _typeshed import Incomplete

    from arpes._typing.attrs_property import ScanDesc, Spectrometer

__all__ = ("BL403ARPESEndstation",)

LOGLEVELS = (DEBUG, INFO)
LOGLEVEL = LOGLEVELS[1]
logger = setup_logger(__name__, LOGLEVEL)


[docs] class BL403ARPESEndstation(SynchrotronEndstation, HemisphericalEndstation, SESEndstation): """The MERLIN ARPES Endstation at the Advanced Light Source.""" PRINCIPAL_NAME = "ALS-BL403" ALIASES: ClassVar[list[str]] = [ "BL403", "BL4", "BL4.0.3", "ALS-BL403", "ALS-BL4", ] _TOLERATED_EXTENSIONS: ClassVar[set[str]] = { ".pxt", } _SEARCH_PATTERNS = ( r"[\-a-zA-Z0-9_\w+]+_{}_S[0-9][0-9][0-9]$", r"[\-a-zA-Z0-9_\w+]+_{}_R[0-9][0-9][0-9]$", r"[\-a-zA-Z0-9_\w+]+_[0]+{}_S[0-9][0-9][0-9]$", r"[\-a-zA-Z0-9_\w+]+_[0]+{}_R[0-9][0-9][0-9]$", # more generic r"[\-a-zA-Z0-9_\w]+_[0]+{}$", r"[\-a-zA-Z0-9_\w]+_{}$", r"[\-a-zA-Z0-9_\w]+{}$", r"[\-a-zA-Z0-9_\w]+[0]{}$", ) RENAME_KEYS: ClassVar[dict[str, str]] = { "Polar": "theta", "Polar Compens": "theta", # these are caps-ed because they are dimensions in some cases! "BL Energy": "hv", "tilt": "beta", "polar": "theta", "azimuth": "chi", "temperature_sensor_a": "temperature_cryotip", "temperature_sensor_b": "temperature", "cryostat_temp_a": "temp_cryotip", "cryostat_temp_b": "temp", "bl_energy": "hv", "polar_compens": "theta", "K2200 V": "volts", "pwr_supply_v": "volts", "mcp": "mcp_voltage", "slit_plate": "slit_number", "user": "experimenter", "sample": "sample_name", "mesh_current": "photon_flux", "ring_energy": "beam_energy", "epu_pol": "undulator_polarization", "epu_gap": "undulator_gap", "epu_z": "undulator_z", "center_energy": "daq_center_energy", "low_energy": "sweep_low_energy", "high_energy": "sweep_high_energy", "energy_step": "sweep_step", "number_of_sweeps": "n_sweeps", } MERGE_ATTRS: ClassVar[Spectrometer] = { "analyzer": "R8000", "analyzer_name": "Scienta R8000", "parallel_deflectors": False, "perpendicular_deflectors": False, "analyzer_radius": np.nan, "analyzer_type": "hemispherical", "repetition_rate": 5e8, "undulator_harmonic": 2, # TODO: "undulator_type": "elliptically_polarized_undulator", } ATTR_TRANSFORMS: ClassVar[dict[str, Callable[..., dict[str, float | list[str] | str]]]] = { "acquisition_mode": lambda _: _.lower(), "lens_mode": lambda _: { "lens_mode": None, "lens_mode_name": _, }, "undulator_polarization": int, "region_name": lambda _: { "daq_region_name": _, "daq_region": _, }, } def concatenate_frames( self, frames: list[xr.Dataset], scan_desc: ScanDesc | None = None, ) -> xr.Dataset: """Concatenates frames from different files into a single scan. Above standard process here, we need to look for a Motor_Pos.txt file which contains the coordinates of the scanned axis so that we can stitch the different elements together. """ if len(frames) < 2: # noqa: PLR2004 return super().concatenate_frames(frames) if scan_desc is None: scan_desc = {} # determine which axis to stitch them together along, and then do this original_filename = scan_desc.get("file", scan_desc.get("path")) assert original_filename is not None internal_match = re.match( r"([a-zA-Z0-9\w+_]+)_[S][0-9][0-9][0-9]\.pxt", Path(original_filename).name, ) if internal_match is not None: if internal_match.groups(): motors_path = str( Path(original_filename).parent / f"{internal_match.groups()[0]}_Motor_Pos.txt", ) try: with Path(motors_path).open() as f: lines = f.readlines() axis_name = lines[0].strip() axis_name = self.RENAME_KEYS.get(axis_name, axis_name) values = [float(_.strip()) for _ in lines[1 : len(frames) + 1]] for v, frame in zip(values, frames, strict=True): frame.coords[axis_name] = v frames.sort(key=lambda x: x.coords[axis_name]) for frame in frames: # promote x, y, z to coords so they get concatted for _ in [ frame, *[dv for dv in frame.data_vars.values() if "eV" in dv.dims], ]: for c in ["x", "y", "z"]: if c not in _.coords: _.coords[c] = _.attrs[c] return xr.concat(frames, axis_name, coords="different") except Exception: logger.exception("Exception occurs.") else: internal_match = re.match( r"([a-zA-Z0-9\w+_]+)_[R][0-9][0-9][0-9]\.pxt", Path(original_filename).name, ) if internal_match is not None and internal_match.groups(): return xr.merge(frames) return super().concatenate_frames(frames) def load_single_frame( self, frame_path: str | Path = "", scan_desc: ScanDesc | None = None, **kwargs: Incomplete, ) -> xr.Dataset: """Loads all regions for a single .pxt frame, and perform per-frame normalization.""" from arpes.load_pxt import find_ses_files_associated, read_single_pxt if scan_desc is None: scan_desc = {} ext = Path(frame_path).suffix if "nc" in ext: # was converted to hdf5/NetCDF format with Conrad's Igor scripts scan_desc["path"] = frame_path return self.load_SES_nc(scan_desc=scan_desc, **kwargs) p = Path(scan_desc.get("path", scan_desc.get("file", ""))) # find files with same name stem, indexed in format R### regions = find_ses_files_associated(p, separator="R") if len(regions) == 1: pxt_data = read_single_pxt(frame_path).assign_coords( {"eV": -read_single_pxt(frame_path).eV.values}, ) # negate energy return xr.Dataset({"spectrum": pxt_data}, attrs=pxt_data.attrs) # need to merge several different detector 'regions' in the same scan region_files = [self.load_single_region(region_path) for region_path in regions] # can they share their energy axes? all_same_energy = True for reg in region_files[1:]: dim = "eV" + reg.attrs["Rnum"] all_same_energy = all_same_energy and np.array_equal( region_files[0].coords["eV000"], reg.coords[dim], ) if all_same_energy: for i, reg in enumerate(region_files): dim = "eV" + reg.attrs["Rnum"] region_files[i] = reg.rename({dim: "eV"}) else: pass return self.concatenate_frames(region_files, scan_desc=scan_desc) def load_single_region( self, region_path: str | Path = "", scan_desc: ScanDesc | None = None, **kwargs: Incomplete, ) -> xr.Dataset: """Loads a single region for multi-region scans.""" if scan_desc: logger.debug("BL403ARPESEndstation: scan_desc is not used in this class.") if kwargs: for k, v in kwargs.items(): logger.debug(f"BL403ARPESEndstation: key {k}: value{v} is not used in this class.") from arpes.load_pxt import read_single_pxt name = Path(region_path).stem num = name[-3:] pxt_data = read_single_pxt(region_path).assign_coords( {"eV": -read_single_pxt(region_path).eV.values}, ) # negate energy pxt_data = pxt_data.rename({"eV": "eV" + num}) pxt_data.attrs["Rnum"] = num pxt_data.attrs["alpha"] = np.pi / 2 return xr.Dataset( {"spectrum" + num: pxt_data}, attrs=pxt_data.attrs, ) # separate spectra for possibly unrelated data def postprocess_final( self, data: xr.Dataset, scan_desc: ScanDesc | None = None, ) -> xr.Dataset: """Performs final data normalization for MERLIN data. Additional steps we perform here are: 1. We attach the slit information for the R8000 used on MERLIN. 2. We normalize the undulator polarization from the sentinel values recorded by the beamline. 3. We convert angle units to radians. Args: data: The input data scan_desc: Originating load parameters Returns: Processed copy of the data """ ls = [data, *[dv for dv in data.data_vars.values() if "eV" in dv.dims]] for dat in ls: if "slit_number" in dat.attrs: slit_lookup = { 1: ("straight", 0.1), 7: ("curved", 0.5), } shape, width = slit_lookup.get(dat.attrs["slit_number"], (None, None)) dat.attrs["slit_shape"] = shape dat.attrs["slit_width"] = width if "undulator_polarization" in dat.attrs: phase_angle_lookup = {0: (0, 0), 2: (np.pi / 2, 0)} # LH # LV polarization_theta, polarization_alpha = phase_angle_lookup[ int(dat.attrs["undulator_polarization"]) ] dat.attrs["probe_polarization_theta"] = polarization_theta dat.attrs["probe_polarization_alpha"] = polarization_alpha for angle_attr in ("alpha", "beta", "chi", "psi", "theta"): if angle_attr in dat.attrs: dat.attrs[angle_attr] = np.deg2rad(float(dat.attrs[angle_attr])) for cname in ("theta", "beta", "chi", "phi"): if cname not in dat.attrs and cname not in dat.coords and cname in dat.attrs: dat.attrs[cname] = data.attrs[cname] dat.attrs["grating"] = "HEG" dat.attrs["alpha"] = np.pi / 2 dat.attrs["psi"] = 0 for c in ("beta", "chi", "psi", "phi", "theta"): if c in data.dims: data.coords[c] = np.deg2rad(data.coords[c]) return super().postprocess_final(data, scan_desc)