Mojito Processing Pipeline

Demonstrates the full TDI data processing pipeline using the process_pipeline utility from MojitoProcessor.

The pipeline applies the following steps in order:

Step

Operation

Default

1

Band-pass filter

configurable (f_low, f_high), order

2

Trim edge artefacts

2.2% from each end

3

Truncate to working length

configurable

4

Downsample

configurable target rate

5

Tukey window

α = 0.025

The final cell verifies consistency by comparing the periodogram against the Mojito L1 noise estimate.

[ ]:
import logging
import numpy as np
import matplotlib.pyplot as plt

from mojito import MojitoL1File
from MojitoProcessor import process_pipeline

# Show pipeline progress at INFO level
logging.basicConfig(
    level=logging.INFO,
    format="%(name)s | %(message)s",
)

1. Load Data

[ ]:
mojito_data_file = (
    "../Mojito_Data/NOISE_731d_0.25s_L1_source0_0_20251206T220508924302Z.h5"
)

# ── How many days to load (lazy slicing — only reads what is needed) ───────────
load_days = None  # set to None to load the full dataset

with MojitoL1File(mojito_data_file) as f:
    tdi_sampling = f.tdis.time_sampling
    ltt_sampling = f.ltts.time_sampling
    orbit_sampling = f.orbits.time_sampling

    # Consistent sample counts across all data streams
    n_tdi = int(load_days * 86400 * tdi_sampling.fs) if load_days else tdi_sampling.size
    n_ltt = int(load_days * 86400 * ltt_sampling.fs) if load_days else ltt_sampling.size
    n_orbit = (
        int(load_days * 86400 * orbit_sampling.fs) if load_days else orbit_sampling.size
    )

    data = {
        # ── TDI observables ──────────────────────────────────────────────────
        "tdis": {
            "X": f.tdis.x2[:n_tdi],
            "Y": f.tdis.y2[:n_tdi],
            "Z": f.tdis.z2[:n_tdi],
            "A": f.tdis.a2[:n_tdi],
            "E": f.tdis.e2[:n_tdi],
            "T": f.tdis.t2[:n_tdi],
        },
        "fs": tdi_sampling.fs,
        "dt": tdi_sampling.dt,
        "t_tdi": tdi_sampling.t()[:n_tdi],
        # ── Light travel times ───────────────────────────────────────────────
        "ltts": {
            "12": f.ltts.ltt_12[:n_ltt],
            "13": f.ltts.ltt_13[:n_ltt],
            "21": f.ltts.ltt_21[:n_ltt],
            "23": f.ltts.ltt_23[:n_ltt],
            "31": f.ltts.ltt_31[:n_ltt],
            "32": f.ltts.ltt_32[:n_ltt],
        },
        "ltt_derivatives": {
            "12": f.ltts.ltt_derivative_12[:n_ltt],
            "13": f.ltts.ltt_derivative_13[:n_ltt],
            "21": f.ltts.ltt_derivative_21[:n_ltt],
            "23": f.ltts.ltt_derivative_23[:n_ltt],
            "31": f.ltts.ltt_derivative_31[:n_ltt],
            "32": f.ltts.ltt_derivative_32[:n_ltt],
        },
        "ltt_times": ltt_sampling.t()[:n_ltt],
        # ── Spacecraft orbits ────────────────────────────────────────────────
        "orbits": f.orbits.positions[:n_orbit],  # (n_orbit, 3, 3)
        "velocities": f.orbits.velocities[:n_orbit],  # (n_orbit, 3, 3)
        "orbit_times": orbit_sampling.t()[:n_orbit],
        # ── Noise estimates (frequency-domain, not truncated) ────────────────
        "noise_estimates": {
            "xyz": f.noise_estimates.xyz[:],
            "aet": f.noise_estimates.aet[:],
            "freqs": f.noise_estimates.freq_sampling.f(),
        },
        # ── Metadata ─────────────────────────────────────────────────────────
        "metadata": {
            "laser_frequency": f.laser_frequency,
            "pipeline_name": f.pipeline_names,
        },
    }

n_samples = len(data["tdis"]["X"])
duration = n_samples * data["dt"]
print(f"Loaded: {n_samples:,} samples @ {data['fs']} Hz ({duration / 86400:.2f} days)")
print(f"TDI channels: {list(data['tdis'].keys())}")

2. Run the Processing Pipeline

All pipeline parameters are configurable here. The pipeline runs in this order:

bandpass/highpass filter → downsample → trim → truncate → window

We remark that the time-domain data will be normalised by the central frequency of the laser through the processing pipeline. The units are thus dimensionless.

[ ]:
# ── Pipeline parameters ───────────────────────────────────────────────────────

# Downsampling parameters
downsample_kwargs = {
    "target_fs": 0.2,  # Hz — target sampling rate (None = no downsampling).
    "kaiser_window": 31.0,  # Kaiser window beta parameter (higher = more aggressive anti-aliasing)
}

# Filter parameters
filter_kwargs = {
    "highpass_cutoff": 5e-6,  # Hz — high-pass cutoff (always applied)
    "lowpass_cutoff": 0.8
    * downsample_kwargs[
        "target_fs"
    ],  # Hz — low-pass cutoff (set None for high-pass only)
    "order": 2,  # Butterworth filter order
}

# Trim parameters
trim_kwargs = {
    "fraction": 0.02,  # Fraction of post-downsample duration trimmed from each end.
    # Total amount of data remaining is (1 - fraction) * N, for N
    # the number of samples after downsampling.
}

# Segmentation parameters
truncate_kwargs = {
    "days": 7.0,  # Segment length in days (splits dataset into 7-day chunks)
}

# Window parameters
window_kwargs = {
    "window": "tukey",  # Window type: 'tukey', 'hann', 'hamming', 'blackman', 'planck', or None for no windowing
    "alpha": 0.0125,  # Taper fraction for Tukey window
}
# ─────────────────────────────────────────────────────────────────────────────

processed_segments = process_pipeline(
    data,
    downsample_kwargs=downsample_kwargs,
    filter_kwargs=filter_kwargs,
    trim_kwargs=trim_kwargs,
    truncate_kwargs=truncate_kwargs,
    window_kwargs=window_kwargs,
)

# For the rest of the notebook, use the first segment
sp_0 = processed_segments["segment0"]
[ ]:
t_days = (sp_0.data["t"] - sp_0.data["t"][0]) / 86400  # relative time in days

fig, axes = plt.subplots(3, 1, figsize=(14, 7), sharex=True)

for i, ch in enumerate(["X", "Y", "Z"]):
    axes[i].plot(t_days, sp_0.data[ch], linewidth=0.4, color=f"C{i}", label=f"TDI-{ch}")
    axes[i].set_ylabel("Frac. freq.", fontsize=12)
    axes[i].legend(loc="upper right", fontsize=11)
    axes[i].grid(True, alpha=0.3)
    axes[i].set_title(f"TDI-{ch}", fontsize=12)

axes[2].set_xlabel("Time [days]", fontsize=12)
fig.suptitle(
    f"TDI X, Y, Z — segment 0 ({truncate_kwargs['days']:.0f}-day window)",
    fontsize=14,
    fontweight="bold",
)
plt.tight_layout()
plt.show()

3. Compute FFT and Periodogram

The one-sided periodogram estimate of the noise Power Spectral Density \(S\) is

\[\hat{S}(f_k) = \frac{2\,\Delta t}{N} \left|\tilde{n}(f_k)\right|^2\]

where \(\tilde{n}\) is the FFT of the processed time series, \(\Delta t\) the sampling interval and \(N\) the length of the truncated data set.

[ ]:
# Data is already normalised by laser_frequency inside process_pipeline.
# Use the new periodogram() and to_aet() methods directly.
CENTRAL_FREQ = data["metadata"]["laser_frequency"]

freq, fft_xyz = sp_0.fft()

sp_0_aet = sp_0.to_aet()
_, fft_aet = sp_0_aet.fft()

# psd_norm for reference (this is the factor baked into periodogram())
psd_norm = 2 * sp_0.dt / sp_0.N

psd_xyz = {ch: (np.abs(fft_xyz[ch]) ** 2) * psd_norm for ch in ["X", "Y", "Z"]}
psd_aet = {ch: (np.abs(fft_aet[ch]) ** 2) * psd_norm for ch in ["A", "E", "T"]}

# Mojito L1 noise estimates, normalised to fractional frequency units
noise_freqs = data["noise_estimates"]["freqs"]
noise_cov_xyz = data["noise_estimates"]["xyz"]
noise_cov_aet = data["noise_estimates"]["aet"]

l1_xyz = {
    ch: noise_cov_xyz[0][:, i, i] / CENTRAL_FREQ**2
    for i, ch in enumerate(["X", "Y", "Z"])
}
l1_aet = {
    ch: noise_cov_aet[0][:, i, i] / CENTRAL_FREQ**2
    for i, ch in enumerate(["A", "E", "T"])
}

4. Periodogram vs Mojito L1 Estimate

A good processing pipeline produces a periodogram (coloured lines) that traces the Mojito L1 noise model (red dashed) throughout the science band (1×10⁻⁴ to 1×10⁻¹ Hz). Potential deviations at the lowest frequencies indicate residual artefacts from filtering or insufficient trimming.

[ ]:
nyquist = sp_0.fs / 2
xyz_colors = ["C0", "C1", "C2"]
aet_colors = ["#e377c2", "#9467bd", "#8c564b"]

fig, axes = plt.subplots(3, 2, figsize=(16, 10), sharex=False, sharey=True)

for i, ch in enumerate(["X", "Y", "Z"]):
    ax = axes[i, 0]
    ax.loglog(
        freq[1:],
        psd_xyz[ch][1:],
        linewidth=1.0,
        color=xyz_colors[i],
        label=f"TDI-{ch}",
    )
    ax.loglog(
        noise_freqs,
        l1_xyz[ch],
        linestyle="dashed",
        color="red",
        linewidth=2.0,
        label="Mojito L1 estimate",
    )
    ax.axvspan(1e-4, 1e-1, alpha=0.15, color="green", label="Science band")
    ax.set_xlim(1e-5, nyquist)
    ax.set_ylim(1e-53, 1e-35)
    ax.set_ylabel("PSD [1/Hz]", fontsize=20)
    ax.grid(True, which="both", alpha=0.3)
    ax.legend(loc="upper left", fontsize=14, framealpha=0.95)
    ax.tick_params(axis="both", which="major", labelsize=16)
for i, ch in enumerate(["A", "E", "T"]):
    ax = axes[i, 1]
    ax.loglog(
        freq[1:],
        psd_aet[ch][1:],
        linewidth=1.0,
        color=aet_colors[i],
        label=f"TDI-{ch}",
    )
    ax.loglog(noise_freqs, l1_aet[ch], linestyle="dashed", color="red", linewidth=2.0)
    ax.axvspan(1e-4, 1e-1, alpha=0.15, color="green")
    ax.set_xlim(1e-5, nyquist)
    ax.set_ylim(1e-53, 1e-35)
    ax.grid(True, which="both", alpha=0.3)
    ax.legend(loc="upper left", fontsize=14, framealpha=0.95)
    ax.tick_params(axis="both", which="major", labelsize=16)

axes[2, 0].set_xlabel("Frequency [Hz]", fontsize=20)
axes[2, 1].set_xlabel("Frequency [Hz]", fontsize=20)

fig.suptitle(
    f"Processed Mojito Periodogram vs L1 Estimate (segment0)\n"
    f"HP={filter_kwargs['highpass_cutoff']:.0e} Hz, "
    f"LP={filter_kwargs['lowpass_cutoff']} Hz, "
    f"trim={trim_kwargs['fraction']:.1%}, "
    f"{truncate_kwargs['days']} days/segment, "
    f"fs={sp_0.fs} Hz, "
    f"window={window_kwargs['window']} α={window_kwargs.get('alpha', 'N/A')}",
    fontsize=20,
    fontweight="bold",
)
plt.tight_layout()
plt.show()