Source code for pygsdata.readers

"""Module defining the readers for the different file formats."""

import pickle
import warnings
from pathlib import Path
from typing import Any

import h5py
import hickle
import numpy as np
from astropy import units as un
from astropy.coordinates import Longitude
from astropy.time import Time

from .gsdata import GSData
from .history import History
from .select import (
    freq_selector,
    load_selector,
    lst_selector,
    time_selector,
)
from .telescope import Telescope

GSDATA_READERS = {}


[docs] def gsdata_reader( formats: list[str], select_on_read: bool = False, ): """Register a function as a reader for GSData objects.""" def inner(func): func.select_on_read = select_on_read func.suffices = formats GSDATA_READERS[func.__name__] = func return func return inner
@gsdata_reader(select_on_read=True, formats=["gsh5"]) def read_gsh5( filename: str | Path | h5py.File, selectors: dict[str, Any], group: str = "/" ) -> GSData: """Read a GSH5 file to construct the object.""" if isinstance(filename, str | Path): with h5py.File(filename, "r") as fl: return read_gsh5(fl[group], selectors) version = filename.attrs.get("version", "1.0") major = version.split(".")[0] reader = getattr(_GSH5Readers, f"v{major}", None) if reader is None: raise ValueError(f"Unsupported file format version: {version}") return reader(filename, selectors) @gsdata_reader(select_on_read=False, formats=["gspkl"]) def read_gspkl( filename: str | Path, ) -> GSData: """Read a GSPKL file to construct the object.""" with Path(filename).open("rb") as fl: return pickle.load(fl) class _GSH5Readers: @staticmethod def v2( fl: h5py.File, selectors: dict[str, Any], ): meta = fl["metadata"] telescope = Telescope.from_hdf5(meta["telescope"]) loads = [x.decode() for x in meta["loads"][()]] times = meta["times"][()] times = Time(times, format="jd", location=telescope.location) time_mask = None time_mask = time_selector( times, loads, **selectors.get("time_selector", {}), ) if time_mask is None: time_mask = np.ones(len(times), dtype=bool) lsts = Longitude(meta["lsts"][()] * un.hourangle) lst_mask = lst_selector( lsts, loads, **selectors.get("lst_selector", {}), ) if lst_mask is None: lst_mask = np.ones(len(lsts), dtype=bool) time_mask &= lst_mask extra_kw = {} if "time_ranges" in meta: time_ranges = meta["time_ranges"][time_mask] time_ranges = Time(time_ranges, format="jd", location=telescope.location) extra_kw["time_ranges"] = time_ranges lst_ranges = Longitude(meta["lst_ranges"][time_mask] * un.hourangle) extra_kw["lst_ranges"] = lst_ranges else: warnings.warn( "You wrote this file with a buggy version of pygsdata that did not " "include time_ranges and lst_ranges in the file. The time_ranges and " "lst_ranges in your object will be set to the default values given " "your integration time and times.", stacklevel=2, ) freqs = un.Quantity(meta["freqs"][:], unit=meta["freqs"].attrs["unit"]) freq_mask = freq_selector(freqs, **selectors.get("freq_selector", {})) data_unit = meta.attrs["data_unit"] objname = meta.attrs["name"] intg_time = meta["effective_integration_time"][()] * un.s load_mask = load_selector( loads, **selectors.get("load_selector", {"indx": slice(None)}), ) auxiliary_measurements = { name: fl["auxiliary_measurements"][name][time_mask] for name in fl["auxiliary_measurements"] } or None dgrp = fl["data"] data = dgrp["data"][load_mask][:, :, time_mask][..., freq_mask] nsamples = dgrp["nsamples"][load_mask][:, :, time_mask][..., freq_mask] if intg_time.size > 1: intg_time = intg_time[load_mask][:, :, time_mask] flg_grp = dgrp["flags"] flags = {} if "names" in flg_grp.attrs: flag_keys = flg_grp.attrs["names"] for name in flag_keys: flags[name] = hickle.load(fl, f"data/flags/{name}") if "load" in flags[name].axes and not np.all(load_mask): flags[name] = flags[name].select(idx=load_mask, axis="load") if "time" in flags[name].axes and not np.all(time_mask): flags[name] = flags[name].select(idx=time_mask, axis="time") if "freq" in flags[name].axes and not np.all(freq_mask): flags[name] = flags[name].select(idx=freq_mask, axis="freq") history = History.from_repr(meta.attrs["history"]) residuals = ( dgrp["residuals"][load_mask][:, :, time_mask][..., freq_mask] if "residuals" in dgrp else None ) return GSData( data=data, times=times[time_mask][:, load_mask], lsts=lsts[time_mask][:, load_mask], freqs=freqs[freq_mask], data_unit=data_unit, loads=loads, auxiliary_measurements=auxiliary_measurements, filename=fl.file.filename, nsamples=nsamples, flags=flags, history=history, telescope=telescope, residuals=residuals, name=objname, effective_integration_time=intg_time, **extra_kw, )