Source code for MojitoProcessor.process.sigprocess

"""
Signal Processing utilities for LISA TDI data

Provides a SignalProcessor class for filtering, decimating, trimming, and windowing
multi-channel time series data with automatic state tracking.
"""

import logging
from fractions import Fraction
from typing import Dict, List, Optional, Tuple

import numpy as np
from scipy import signal
from scipy.signal.windows import blackman, blackmanharris, hamming, hann, tukey

__all__ = [
    "SignalProcessor",
    "process_pipeline",
]

# Default Kaiser window beta parameter for anti-aliasing filter
KAISER_BETA_DEFAULT = 31.0

logger = logging.getLogger(__name__)


def planck_window(N, alpha=0.05):
    """
    Construct a Planck-taper window of length N.

    Parameters
    ----------
    N : int
        Number of points in the window.
    alpha : float, optional
        Fraction of the window length to taper at each end.
        Must be between 0 and 0.5. Default is 0.05.

    Returns
    -------
    w : numpy.ndarray
        The window function of length N.
    """
    # Ensure epsilon is within valid bounds
    epsilon = alpha
    epsilon = np.clip(epsilon, 1e-9, 0.5)

    n = np.arange(N)
    w = np.zeros(N)

    # Define the transition regions
    n1 = epsilon * (N - 1)
    n2 = (1 - epsilon) * (N - 1)

    # Region 1: Rising taper (0 <= n < n1)
    mask1 = n < n1
    z1 = np.where(
        mask1,
        epsilon * (N - 1) / (n + 1e-15)
        + epsilon * (N - 1) / (n - epsilon * (N - 1) + 1e-15),
        0.0,
    )
    # w = np.where(mask1, 1.0 / (1.0 + np.exp(z1)), w)
    z1 = np.clip(z1, -700.0, 700.0)
    w = np.where(mask1, 1.0 / (1.0 + np.exp(z1)), w)

    # Region 2: Flat top (n1 <= n <= n2)
    mask2 = (n >= n1) & (n <= n2)
    w = np.where(mask2, 1.0, w)

    # Region 3: Falling taper (n2 < n < N)
    mask3 = n > n2
    z2 = np.where(
        mask3,
        epsilon * (N - 1) / (N - 1 - n + 1e-15)
        + epsilon * (N - 1) / (N - 1 - n - epsilon * (N - 1) + 1e-15),
        0.0,
    )
    z2 = np.clip(z2, -700.0, 700.0)
    w = np.where(mask3, 1.0 / (1.0 + np.exp(z2)), w)

    return w


[docs] class SignalProcessor: """ Signal processor for multi-channel time series data. Handles filtering, downsampling, trimming, and windowing while automatically tracking sampling parameters (fs, N, T, dt). Parameters ---------- data : dict Dictionary of channel data, e.g., {'X': array, 'Y': array, 'Z': array} fs : float Sampling frequency in Hz Attributes ---------- data : dict Current processed data (updated after each operation). Includes a ``'t'`` key giving the time array ``[t0, t0+dt, ..., t0+(N-1)*dt]`` in seconds (or ``[0, dt, ..., (N-1)*dt]`` when ``t0`` is ``None``). fs : float Current sampling frequency in Hz N : int Current number of samples per channel T : float Current duration in seconds dt : float Current sampling period in seconds t : ndarray Time array ``[t0, t0+dt, ..., t0+(N-1)*dt]`` in seconds. channels : list List of channel names Example ------- >>> sp = SignalProcessor({'X': x_data, 'Y': y_data}, fs=4.0) >>> filtered = sp.filter(low=1e-4, high=1.0, order=6) >>> trimmed = sp.trim(fraction=0.02) # Trim 2% total (1% each end) >>> windowed = sp.apply_window(window='tukey', alpha=0.05) >>> t = sp.data['t'] # time array [t0, t0+dt, ..., t0+(N-1)*dt] """ def __init__( self, data: Dict[str, np.ndarray], fs: float, t0: Optional[float] = None, ): """ Initialize SignalProcessor with multi-channel data. Parameters ---------- data : dict Dictionary mapping channel names to 1D numpy arrays fs : float Sampling frequency in Hz t0 : float, optional TCB timestamp of the first sample in seconds. Should be set from the L1 data file (``data["t_tdi"][0]``). Defaults to ``None`` when working outside of a full Mojito pipeline. """ self._data = {ch: arr.copy() for ch, arr in data.items()} self.fs = float(fs) self.t0 = float(t0) if t0 is not None else None self.channels = list(data.keys()) # Validate all channels have same length lengths = [len(arr) for arr in self._data.values()] if len(set(lengths)) != 1: raise ValueError(f"All channels must have same length. Got: {lengths}") self._update_params() def _update_params(self): """Update derived parameters N, T, dt based on current data and fs.""" self.N = len(self._data[self.channels[0]]) self.dt = 1.0 / self.fs self.T = self.N * self.dt @property def data(self) -> dict: """ Channel data as a dict, including a ``'t'`` key for the time array. The time array is ``t0 + np.arange(N) * dt`` (seconds). When ``t0`` is ``None``, the array starts from 0. Returns ------- dict All channel arrays plus ``'t'``. """ result = dict(self._data) result["t"] = self.t return result @property def t(self) -> np.ndarray: """Time array ``[t0, t0+dt, ..., t0+(N-1)*dt]`` in seconds.""" t_start = self.t0 if self.t0 is not None else 0.0 return t_start + np.arange(self.N) * self.dt
[docs] def filter( self, *, low: Optional[float] = None, high: Optional[float] = None, order: int = 2, filter_type: str = "butterworth", zero_phase: bool = True, ) -> Dict[str, np.ndarray]: """ Apply filter to all channels (auto-detects highpass/lowpass/bandpass). Automatically determines filter type based on provided cutoff frequencies: - Only `low` set: highpass filter - Only `high` set: lowpass filter - Both `low` and `high` set: bandpass filter Parameters ---------- low : float, optional Lower cutoff frequency in Hz (highpass) high : float, optional Upper cutoff frequency in Hz (lowpass) order : int, optional Filter order (default: 6) filter_type : str, optional Filter type: 'butterworth', 'chebyshev1', 'chebyshev2', 'bessel' (default: 'butterworth') zero_phase : bool, optional Use zero-phase filtering (filtfilt) if True, else single-pass (default: True) Returns ------- filtered_data : dict Dictionary of filtered channel data Raises ------ ValueError If neither `low` nor `high` is provided Examples -------- >>> # Highpass only >>> sp.filter(low=5e-6, order=2) >>> # Lowpass only >>> sp.filter(high=0.1, order=2) >>> # Bandpass >>> sp.filter(low=1e-4, high=0.1, order=6) """ if low is None and high is None: raise ValueError("Must specify at least one of 'low' or 'high' cutoff") nyquist = self.fs / 2 # Validate cutoff frequencies if low is not None: if low <= 0: raise ValueError(f"low cutoff must be positive, got {low}") if low >= nyquist: raise ValueError( f"low cutoff ({low} Hz) must be below Nyquist ({nyquist} Hz)" ) if high is not None: if high <= 0: raise ValueError(f"high cutoff must be positive, got {high}") if high >= nyquist: raise ValueError( f"high cutoff ({high} Hz) must be below Nyquist ({nyquist} Hz)" ) if low is not None and high is not None and low >= high: raise ValueError( f"low cutoff ({low} Hz) must be less than high cutoff ({high} Hz)" ) if order <= 0 or not isinstance(order, int): raise ValueError(f"Filter order must be a positive integer, got {order}") # Auto-detect filter type if low is not None and high is not None: btype = "bandpass" Wn = [low, high] elif low is not None: btype = "highpass" Wn = low else: # high is not None btype = "lowpass" Wn = high # Design filter — cheby1/cheby2 require extra ripple/attenuation args # butterworth/bessel called via partial to unify the interface def _butter(n, wn, **kw): return signal.butter(n, wn, **kw) def _bessel(n, wn, **kw): return signal.bessel(n, wn, **kw) def _cheby1(n, wn, **kw): return signal.cheby1(n, 1.0, wn, **kw) def _cheby2(n, wn, **kw): return signal.cheby2(n, 40.0, wn, **kw) filter_funcs = { "butterworth": _butter, "bessel": _bessel, "chebyshev1": _cheby1, "chebyshev2": _cheby2, } if filter_type not in filter_funcs: raise ValueError( f"Unknown filter type: {filter_type}. " f"Choose from {list(filter_funcs.keys())}" ) sos = filter_funcs[filter_type]( order, Wn, btype=btype, fs=self.fs, output="sos" ) # Apply filter to all channels filtered_data = {} for ch in self.channels: if zero_phase: filtered_data[ch] = signal.sosfiltfilt(sos, self._data[ch]) else: filtered_data[ch] = signal.sosfilt(sos, self._data[ch]) # Update internal state self._data = filtered_data # fs, N, T, dt remain unchanged after filtering return filtered_data
[docs] def downsample( self, target_fs: float, window: tuple = ("kaiser", KAISER_BETA_DEFAULT), padtype: str = "line", ) -> Tuple[Dict[str, np.ndarray], float]: """ Resample all channels to a target sampling rate using polyphase filtering. Uses ``scipy.signal.resample_poly`` which applies a zero-phase FIR anti-aliasing filter via polyphase decomposition. Accepts arbitrary rational target rates (e.g., 4 Hz -> 0.4 Hz), unlike ``decimate`` which requires an integer factor. Parameters ---------- target_fs : float Desired output sampling frequency in Hz. Must be positive and less than or equal to the current sampling frequency (this method is a downsampler). window : tuple or array_like, optional Window specification passed to ``scipy.signal.resample_poly`` for FIR anti-aliasing filter design. Default ``('kaiser', 5.0)`` is scipy's own default and gives good stopband attenuation. padtype : str, optional Edge-padding strategy. Options: ``'line'`` (default), ``'constant'``, ``'mean'``, ``'median'``, ``'maximum'``, ``'minimum'``. ``'line'`` extends the signal linearly from each end, reducing edge transients for slowly-varying data such as LISA TDI channels. Returns ------- resampled_data : dict Dictionary mapping channel names to resampled 1D arrays. new_fs : float Actual output sampling frequency in Hz (exact rational result ``self.fs * up / down``). Raises ------ ValueError If ``target_fs`` is not positive. ValueError If ``target_fs`` exceeds the current sampling frequency. ValueError If the rational approximation of ``target_fs / self.fs`` produces ``up == 0``. Notes ----- The up/down integers are computed via:: ratio = Fraction(target_fs / self.fs).limit_denominator(10000) up, down = ratio.numerator, ratio.denominator Common use cases from 4 Hz source data: * 4 Hz -> 1 Hz: up=1, down=4 * 4 Hz -> 0.4 Hz: up=1, down=10 * 4 Hz -> 2 Hz: up=1, down=2 * 4 Hz -> 3 Hz: up=3, down=4 Examples -------- >>> sp = SignalProcessor({'X': x_data, 'Y': y_data}, fs=4.0) >>> sp.filter(low=5e-6, order=2) >>> sp.trim(fraction=0.022) # Trim 2.2% total >>> resampled, new_fs = sp.downsample(target_fs=1.0) >>> print(new_fs) # 1.0 """ if target_fs <= 0: raise ValueError(f"target_fs must be positive, got {target_fs}") if target_fs > self.fs: raise ValueError( f"target_fs ({target_fs} Hz) exceeds current sampling frequency " f"({self.fs} Hz). resample_poly is a downsampler; for upsampling " f"use scipy.signal.resample_poly directly." ) ratio = Fraction(target_fs / self.fs).limit_denominator(10000) up, down = ratio.numerator, ratio.denominator if up == 0: raise ValueError( f"Cannot represent target_fs={target_fs} Hz as a rational " f"fraction of {self.fs} Hz within limit_denominator=10000. " f"The target rate is too far below the source rate." ) resampled_data = {} for ch in self.channels: resampled_data[ch] = signal.resample_poly( self._data[ch], up, down, window=window, padtype=padtype ) self._data = resampled_data self.fs = self.fs * up / down self._update_params() return resampled_data, self.fs
[docs] def trim(self, fraction: float) -> Dict[str, np.ndarray]: """ Trim data by removing a fraction of the dataset. Parameters ---------- fraction : float Total fraction of data to remove (e.g., 0.01 = 1%). Returns ------- trimmed_data : dict Dictionary of trimmed channel data Raises ------ ValueError If fraction is not in range [0, 1] or would remove all data Examples -------- >>> # Trim 2% total (1% from each end) >>> sp.trim(fraction=0.02) >>> # Trim 5% from start only >>> sp.trim(fraction=0.05) """ if not 0 <= fraction < 1: raise ValueError(f"fraction must be in [0, 1), got {fraction}") # No trimming needed if fraction == 0: return dict(self._data) # Split fraction equally between both ends trim_samples = int(self.N * fraction / 2) if trim_samples == 0: logger.warning( "trim: fraction %.2e too small to remove any samples " "(need at least %d samples per end). Skipping trim.", fraction, 1, ) return dict(self._data) if 2 * trim_samples >= self.N: raise ValueError( f"Cannot trim {fraction*100:.1f}% from both ends " f"({2*trim_samples} samples). Would remove all data." ) trimmed_data = { ch: arr[trim_samples:-trim_samples] for ch, arr in self._data.items() } # Update internal state self._data = trimmed_data if self.t0 is not None: self.t0 += trim_samples * self.dt self._update_params() return trimmed_data
[docs] def apply_window( self, window: str = "tukey", **window_params ) -> Dict[str, np.ndarray]: """ Apply window function to all channels. Parameters ---------- window : str, optional Window type: 'tukey', 'blackmanharris', 'hann', 'hamming', 'blackman', 'planck' (default: 'tukey') **window_params : Additional parameters for the window function. ``alpha`` (float, default 0.05) is accepted by 'tukey' and 'planck'. Other windows ignore extra keyword arguments (a warning is emitted). Returns ------- windowed_data : dict Dictionary of windowed channel data Examples -------- >>> sp.apply_window('tukey', alpha=0.05) >>> sp.apply_window('blackmanharris') >>> sp.apply_window('hann') """ _supported = { "tukey", "blackmanharris", "hann", "hamming", "blackman", "planck", } if window not in _supported: raise ValueError( f"Unknown window type: {window!r}. " f"Choose from {sorted(_supported)}" ) # Warn if extra kwargs passed to windows that do not accept them if window not in {"tukey", "planck"} and window_params: logger.warning( "apply_window: extra parameters %s ignored for '%s' window", list(window_params.keys()), window, ) alpha = window_params.get("alpha", 0.05) if window == "tukey": win = tukey(self.N, alpha=alpha) elif window == "planck": win = planck_window(self.N, alpha=alpha) elif window == "blackmanharris": win = blackmanharris(self.N) elif window == "hann": win = hann(self.N) elif window == "hamming": win = hamming(self.N) else: # blackman win = blackman(self.N) # Apply window to all channels windowed_data = {ch: arr * win for ch, arr in self._data.items()} # Update internal state self._data = windowed_data # fs, N, T, dt remain unchanged after windowing return windowed_data
[docs] def periodogram(self) -> Tuple[np.ndarray, Dict[str, np.ndarray]]: """ Compute the one-sided power spectral density for each channel. The data is assumed to have already been windowed (e.g. by :meth:`apply_window`), so no additional window is applied here. Normalisation follows Parseval's theorem: the integral of the one-sided PSD over positive frequencies equals the mean square of the signal. Returns ------- freqs : ndarray Frequency array in Hz, shape ``(N//2 + 1,)``. psds : dict Dictionary mapping channel names to one-sided PSD arrays (units²/Hz), each with the same shape as ``freqs``. Examples -------- >>> freqs, psds = sp.periodogram() >>> plt.loglog(freqs[1:], psds['X'][1:]) """ freqs = np.fft.rfftfreq(self.N, d=self.dt) psds = {} for ch in self.channels: fft_vals = np.fft.rfft(self._data[ch]) psd = (np.abs(fft_vals) ** 2) / (self.fs * self.N) psd[1:-1] *= 2 # double non-DC/Nyquist bins for one-sided spectrum psds[ch] = psd return freqs, psds
[docs] def fft(self) -> Tuple[np.ndarray, Dict[str, np.ndarray]]: """ Compute the one-sided complex FFT spectrum for each channel. The data is assumed to have already been windowed (e.g. by :meth:`apply_window`), so no additional window is applied here. Returns raw complex amplitudes from ``numpy.fft.rfft``. Returns ------- freqs : ndarray Frequency array in Hz, shape ``(N//2 + 1,)``. ffts : dict Dictionary mapping channel names to complex FFT arrays, each with shape ``(N//2 + 1,)``. Examples -------- >>> freqs, ffts = sp.fft() >>> plt.loglog(freqs[1:], np.abs(ffts['X'][1:])) """ freqs = np.fft.rfftfreq(self.N, d=self.dt) ffts = {} for ch in self.channels: ffts[ch] = np.fft.rfft(self._data[ch]) return freqs, ffts
[docs] def to_aet(self) -> "SignalProcessor": """ Transform XYZ Michelson channels to noise-orthogonal AET channels. Uses the standard equal-arm combination: .. code-block:: text A = (Z - X) / sqrt(2) E = (X - 2Y + Z) / sqrt(6) T = (X + Y + Z) / sqrt(3) Returns a new :class:`SignalProcessor` with channels ``['A', 'E', 'T']``, inheriting ``fs``, ``t0``, and all derived parameters from the original. Returns ------- SignalProcessor New processor with AET channel data. Raises ------ ValueError If any of the channels ``'X'``, ``'Y'``, ``'Z'`` are missing. Examples -------- >>> sp_xyz = processed_segments['segment0'] >>> sp_aet = sp_xyz.to_aet() >>> freqs, psds = sp_aet.periodogram() """ if set(self.channels) == {"A", "E", "T"}: raise ValueError( "to_aet() converts XYZ Michelson channels to AET. " "This SignalProcessor already holds AET channels — " "no conversion is needed." ) missing = {"X", "Y", "Z"} - set(self.channels) if missing: raise ValueError( f"to_aet() requires channels {{'X', 'Y', 'Z'}}. " f"Missing: {missing}" ) X, Y, Z = self._data["X"], self._data["Y"], self._data["Z"] aet_data = { "A": (Z - X) / np.sqrt(2), "E": (X - 2 * Y + Z) / np.sqrt(6), "T": (X + Y + Z) / np.sqrt(3), } return SignalProcessor(aet_data, fs=self.fs, t0=self.t0)
[docs] def get_params(self) -> dict: """ Get current signal parameters. Returns ------- params : dict Dictionary containing fs, N, T, dt, and channels """ return { "fs": self.fs, "N": self.N, "T": self.T, "dt": self.dt, "channels": self.channels, }
def __repr__(self): t0_str = f"{self.t0:.6g} s" if self.t0 is not None else "None" return ( f"SignalProcessor(channels={self.channels}, " f"N={self.N}, fs={self.fs:.3f} Hz, T={self.T:.2f} s, t0={t0_str})" )
[docs] def process_pipeline( data: dict, channels: Optional[List[str]] = None, *, filter_kwargs: Optional[Dict] = None, downsample_kwargs: Optional[Dict] = None, trim_kwargs: Optional[Dict] = None, truncate_kwargs: Optional[Dict] = None, window_kwargs: Optional[Dict] = None, ) -> dict: """ Run the full TDI data processing pipeline on a MojitoData object. Applies the following steps in order: 1. **Filter** — band-pass (if ``lowpass_cutoff`` given) or high-pass only 2. **Downsample** — polyphase resampling to ``target_fs`` (optional) 3. **Trim** — removes edge artefacts introduced by the filter from both ends 4. **Truncate** — selects the first ``truncate_days`` of the processed data 5. **Window** — tapers edges to reduce spectral leakage Pipeline progress is emitted at ``logging.INFO`` level via the ``MojitoUtils.SigProcessing`` logger. Parameters ---------- data : dict Loaded LISA L1 data dict (from :func:`~MojitoProcessor.io.read.load_file`). Must contain ``data['tdis']`` (channel arrays) and ``data['fs']`` (sampling rate in Hz). channels : list of str, optional TDI channels to process. Default ``['X', 'Y', 'Z']``. filter_kwargs : dict, optional Filter parameters. Keys: - ``highpass_cutoff`` (float): High-pass cutoff in Hz (default: 5e-6) - ``lowpass_cutoff`` (float, optional): Low-pass cutoff for band-pass - ``order`` (int): Filter order (default: 2) - ``filter_type`` (str): Filter type (default: 'butterworth') downsample_kwargs : dict, optional Downsampling parameters. Keys: - ``target_fs`` (float): Target sampling rate in Hz - ``kaiser_window`` (float): Kaiser window beta parameter (default: 31.0) trim_kwargs : dict, optional Trimming parameters. Omit (or pass ``None``) to skip trimming. Keys: - ``fraction`` (float): Fraction to trim from each end (default: 0.0) truncate_kwargs : dict, optional Segmentation parameters. Keys: - ``days`` (float): Segment length in days (default: 4.0) Dataset is split into non-overlapping segments of this length. Each segment is independently windowed. Set to ``None`` to disable segmentation (returns single segment with full dataset). Note: Remainder samples shorter than a full segment are discarded. window_kwargs : dict, optional Windowing parameters. Omit (or pass ``None``) to skip windowing. Keys: - ``window`` (str): Window type - 'tukey', 'hann', etc. (default: 'tukey') - ``alpha`` (float): Taper fraction for Tukey window (default: 0.025) Returns ------- segments : dict of SignalProcessor Dictionary mapping segment names (``'segment0'``, ``'segment1'``, …) to :class:`SignalProcessor` objects. Each segment contains windowed data ready for FFT analysis. Access via ``segments['segment0'].data``, ``segments['segment0'].fs``, etc. """ # Set defaults if channels is None: channels = ["X", "Y", "Z"] if filter_kwargs is None: filter_kwargs = {} if downsample_kwargs is None: downsample_kwargs = {} if trim_kwargs is None: trim_kwargs = {} if truncate_kwargs is None: truncate_kwargs = {} if window_kwargs is None: window_kwargs = {} # Capture whether the user actually requested windowing / trimming before # we normalise the dicts — empty dict is falsy, so omitting the kwarg # (None → {}) and passing an empty dict both mean "skip". do_window = bool(window_kwargs) # Extract filter parameters with defaults highpass_cutoff = filter_kwargs.get("highpass_cutoff", 5e-6) lowpass_cutoff = filter_kwargs.get("lowpass_cutoff", None) filter_order = filter_kwargs.get("order", 2) # Extract downsample parameters target_fs = downsample_kwargs.get("target_fs", None) kaiser_window = downsample_kwargs.get("kaiser_window", KAISER_BETA_DEFAULT) # Extract trim parameters (fraction=0.0 → no-op) trim_fraction = trim_kwargs.get("fraction", 0.0) # Extract truncate parameters truncate_days = truncate_kwargs.get("days", 4.0) if truncate_kwargs else None # Extract window parameters window = window_kwargs.get("window", "tukey") if window == "planck": window_alpha = window_kwargs.get("alpha", 0.05) else: window_alpha = window_kwargs.get("alpha", 0.025) # Validate Tukey window alpha (only relevant when windowing is requested) if do_window and window == "tukey" and not 0 <= window_alpha <= 1: raise ValueError(f"Tukey window alpha must be in [0, 1], got {window_alpha}") # Validate filter order filter_order_raw = filter_kwargs.get("order", 2) if not isinstance(filter_order_raw, int) or filter_order_raw <= 0: raise ValueError( f"filter order must be a positive integer, got {filter_order_raw}" ) # Validate kaiser_window beta if kaiser_window < 0: raise ValueError( f"kaiser_window beta must be non-negative, got {kaiser_window}" ) # Validate truncate_days if truncate_days is not None and truncate_days <= 0: raise ValueError( f"truncate_kwargs 'days' must be positive, got {truncate_days}" ) # Warn if lowpass_cutoff exceeds Nyquist of the target sampling rate if lowpass_cutoff is not None and target_fs is not None: target_nyquist = target_fs / 2 if lowpass_cutoff > target_nyquist: logger.warning( "lowpass_cutoff (%.4g Hz) exceeds Nyquist of target_fs " "(%.4g Hz). Frequencies above %.4g Hz will be aliased after " "downsampling. Consider setting lowpass_cutoff <= %.4g Hz.", lowpass_cutoff, target_nyquist, target_nyquist, target_nyquist, ) missing = [ch for ch in channels if ch not in data["tdis"]] if missing: raise ValueError( f"Channels {missing} not found in data. " f"Available: {list(data['tdis'].keys())}" ) if "t_tdi" not in data: raise ValueError( "'t_tdi' is required in the data dict. " "Set data['t_tdi'] to the array of TCB timestamps for the TDI samples " "(e.g. tdi_sampling.t() from MojitoL1File)." ) try: laser_frequency = float(data["metadata"]["laser_frequency"]) except KeyError as exc: raise ValueError( "'metadata[\"laser_frequency\"]' is required in the data dict. " "Set data['metadata']['laser_frequency'] to the central laser frequency " "in Hz (e.g. f.laser_frequency from MojitoL1File)." ) from exc # ------------------------------------------------------------------ # # Step 1 — initialise with the full dataset, normalised by laser freq # ------------------------------------------------------------------ # sp = SignalProcessor( {ch: data["tdis"][ch] / laser_frequency for ch in channels}, fs=data["fs"], t0=float(data["t_tdi"][0]), ) logger.info( "Step 1/5 | Init: %d samples @ %.4g Hz (%.2f days), channels=%s", sp.N, sp.fs, sp.T / 86400, channels, ) # ------------------------------------------------------------------ # # Step 2 — filter (band-pass or high-pass) # ------------------------------------------------------------------ # if lowpass_cutoff is not None: sp.filter( low=highpass_cutoff, high=lowpass_cutoff, order=filter_order, zero_phase=True, ) logger.info( "Step 2/5 | Band-pass: [%.1e, %.1e] Hz, order=%d (zero-phase Butterworth)", highpass_cutoff, lowpass_cutoff, filter_order, ) else: sp.filter( low=highpass_cutoff, order=filter_order, zero_phase=True, ) logger.info( "Step 2/5 | High-pass: cutoff=%.1e Hz, order=%d (zero-phase Butterworth)", highpass_cutoff, filter_order, ) # ------------------------------------------------------------------ # # Step 3 — downsample (optional) # ------------------------------------------------------------------ # if target_fs is not None and target_fs != sp.fs: pre_fs, pre_N = sp.fs, sp.N sp.downsample(target_fs=target_fs, window=("kaiser", kaiser_window)) logger.info( "Step 3/5 | Resample: %.4g Hz → %.4g Hz, %d%d samples " "(Nyquist = %.4g Hz)", pre_fs, sp.fs, pre_N, sp.N, sp.fs / 2, ) else: logger.info("Step 3/5 | Resample: skipped (fs = %.4g Hz)", sp.fs) # ------------------------------------------------------------------ # # Step 4 — trim edge artefacts # ------------------------------------------------------------------ # sp.trim(fraction=trim_fraction) logger.info( "Step 4/5 | Trim: %.2f%% from each end → %d samples (%.2f days)", trim_fraction, sp.N, sp.T / 86400, ) # ------------------------------------------------------------------ # # Step 5 — segment into chunks and window each independently # ------------------------------------------------------------------ # if truncate_days is not None and truncate_days > 0: segment_samples = int(truncate_days * 86400 * sp.fs) n_segments = sp.N // segment_samples if n_segments == 0: logger.warning( "Step 5/5 | Segment: data (%.2f days) shorter than segment " "length (%.2f days) - creating single segment", sp.T / 86400, truncate_days, ) n_segments = 1 segment_samples = sp.N segments = {} for i in range(n_segments): start_idx = i * segment_samples end_idx = min(start_idx + segment_samples, sp.N) # Create segment — t0 advances by i full segment lengths after trimming segment_data = {ch: sp.data[ch][start_idx:end_idx] for ch in sp.channels} segment_t0 = ( sp.t0 + i * segment_samples * sp.dt if sp.t0 is not None else None ) seg_sp = SignalProcessor(segment_data, fs=sp.fs, t0=segment_t0) # Apply window to this segment (optional) if do_window: seg_sp.apply_window(window=window, alpha=window_alpha) segments[f"segment{i}"] = seg_sp logger.info( "Step 5/5 | Segment: created %d segments × %.2f days each | Window: %s", n_segments, truncate_days, f"{window} (alpha={window_alpha:.4g})" if do_window else "none", ) return segments # No segmentation — apply window to full dataset (optional) if do_window: sp.apply_window(window=window, alpha=window_alpha) logger.info( "Step 5/5 | Window: %s | Ready — N=%d, fs=%.4g Hz, dt=%.4g s, T=%.4f days", f"{window} (alpha={window_alpha:.4g})" if do_window else "none", sp.N, sp.fs, sp.dt, sp.T / 86400, ) return {"segment0": sp}