Mojito GW Processing Pipeline¶
Demonstrates the full TDI data processing pipeline for gravitational wave sources using the process_pipeline utility from MojitoProcessor.
The pipeline applies the following steps in order:
Step |
Operation |
Default |
|---|---|---|
1 |
Band-pass filter |
configurable (f_low, f_high), order |
2 |
Trim edge artefacts |
2.2% from each end |
3 |
Truncate to working length |
configurable |
4 |
Downsample |
configurable target rate |
5 |
Tukey window |
α = 0.025 |
The final cell verifies consistency by comparing the periodogram against the Mojito L1 noise estimate.
[ ]:
import logging
import numpy as np
from pathlib import Path
import matplotlib.pyplot as plt
from mojito import MojitoL1File
from mojito.download import download_brick
from mojito.writer import combine_bricks
from MojitoProcessor import process_pipeline
# Show pipeline progress at INFO level
logging.basicConfig(
level=logging.INFO,
format="%(name)s | %(message)s",
)
1. (Down)Load Data¶
Only load EMRIs, MBHs and noise using the mojito package. The process_pipeline utility will automatically apply the correct processing steps for each data type, so we can load them together and process them together.
[ ]:
DOWNLOAD_DATA = True
[ ]:
mojito_data_file = "../Mojito_Data/emri_mbh_noise.h5"
Path(mojito_data_file).parent.mkdir(parents=True, exist_ok=True)
# Let's make a cocktail with one EMRI, one MBHB, and noise
if DOWNLOAD_DATA:
emri_brick_path = download_brick("emri", source_id=1)
mbhb_brick_path = download_brick("mbhb", source_id=4)
noise_brick_path = download_brick("noise")
brick_paths = [emri_brick_path, mbhb_brick_path, noise_brick_path]
combine_bricks(brick_paths, output_path=mojito_data_file)
[ ]:
# ── How many days to load (lazy slicing — only reads what is needed) ───────────
load_days = None # set to None to load the full dataset
with MojitoL1File(mojito_data_file) as f:
tdi_sampling = f.tdis.time_sampling
ltt_sampling = f.ltts.time_sampling
orbit_sampling = f.orbits.time_sampling
# Consistent sample counts across all data streams
n_tdi = int(load_days * 86400 * tdi_sampling.fs) if load_days else tdi_sampling.size
n_ltt = int(load_days * 86400 * ltt_sampling.fs) if load_days else ltt_sampling.size
n_orbit = (
int(load_days * 86400 * orbit_sampling.fs) if load_days else orbit_sampling.size
)
data = {
# ── TDI observables ──────────────────────────────────────────────────
"tdis": {
"X": f.tdis.x2[:n_tdi],
"Y": f.tdis.y2[:n_tdi],
"Z": f.tdis.z2[:n_tdi],
"A": f.tdis.a2[:n_tdi],
"E": f.tdis.e2[:n_tdi],
"T": f.tdis.t2[:n_tdi],
},
"fs": tdi_sampling.fs,
"dt": tdi_sampling.dt,
"t_tdi": tdi_sampling.t()[:n_tdi],
# ── Light travel times ───────────────────────────────────────────────
"ltts": {
"12": f.ltts.ltt_12[:n_ltt],
"13": f.ltts.ltt_13[:n_ltt],
"21": f.ltts.ltt_21[:n_ltt],
"23": f.ltts.ltt_23[:n_ltt],
"31": f.ltts.ltt_31[:n_ltt],
"32": f.ltts.ltt_32[:n_ltt],
},
"ltt_derivatives": {
"12": f.ltts.ltt_derivative_12[:n_ltt],
"13": f.ltts.ltt_derivative_13[:n_ltt],
"21": f.ltts.ltt_derivative_21[:n_ltt],
"23": f.ltts.ltt_derivative_23[:n_ltt],
"31": f.ltts.ltt_derivative_31[:n_ltt],
"32": f.ltts.ltt_derivative_32[:n_ltt],
},
"ltt_times": ltt_sampling.t()[:n_ltt],
# ── Spacecraft orbits ────────────────────────────────────────────────
"orbits": f.orbits.positions[:n_orbit], # (n_orbit, 3, 3)
"velocities": f.orbits.velocities[:n_orbit], # (n_orbit, 3, 3)
"orbit_times": orbit_sampling.t()[:n_orbit],
# ── Noise estimates (frequency-domain, not truncated) ────────────────
"noise_estimates": {
"xyz": f.noise_estimates.xyz[:],
"aet": f.noise_estimates.aet[:],
},
# ── Metadata ─────────────────────────────────────────────────────────
"metadata": {
"laser_frequency": f.laser_frequency,
"pipeline_name": f.pipeline_names,
},
}
print(f.noise_estimates.xyz[:].shape)
n_samples = len(data["tdis"]["X"])
duration = n_samples * data["dt"]
print(f"Loaded: {n_samples:,} samples @ {data['fs']} Hz ({duration / 86400:.2f} days)")
print(f"TDI channels: {list(data['tdis'].keys())}")
2. Run the Processing Pipeline¶
All pipeline parameters are configurable here. The pipeline runs in this order:
bandpass/highpass filter → downsample → trim → truncate → window
We remark that the time-domain data will be normalised by the central frequency of the laser through the processing pipeline. The units are thus dimensionless.
[ ]:
# ── Pipeline parameters ───────────────────────────────────────────────────────
# Downsampling parameters
downsample_kwargs = {
"target_fs": 0.2, # Hz — target sampling rate (None = no downsampling).
"kaiser_window": 31.0, # Kaiser window beta parameter (higher = more aggressive anti-aliasing)
}
# Filter parameters
filter_kwargs = {
"highpass_cutoff": 5e-6, # Hz — high-pass cutoff (always applied)
"lowpass_cutoff": 0.8
* downsample_kwargs[
"target_fs"
], # Hz — low-pass cutoff (set None for high-pass only)
"order": 2, # Butterworth filter order
}
# Trim parameters
trim_kwargs = {
"fraction": 0.02, # Fraction of post-downsample duration trimmed from each end.
# Total amount of data remaining is (1 - fraction) * N, for N
# the number of samples after downsampling.
}
# Segmentation parameters
truncate_kwargs = {
"days": 7.0, # Segment length in days (splits dataset into 7-day chunks)
}
# Window parameters
window_kwargs = {
"window": "tukey", # Window type: 'tukey', 'hann', 'hamming', 'blackman', 'planck', or None for no windowing
"alpha": 0.0125, # Taper fraction for Tukey window
}
# ─────────────────────────────────────────────────────────────────────────────
processed_segments = process_pipeline(
data,
downsample_kwargs=downsample_kwargs,
filter_kwargs=filter_kwargs,
trim_kwargs=trim_kwargs,
truncate_kwargs=truncate_kwargs,
window_kwargs=window_kwargs,
)
# For the rest of the notebook, use the first segment
sp_0 = processed_segments["segment0"]
[ ]:
t_days = (sp_0.data["t"] - sp_0.data["t"][0]) / 86400 # relative time in days
fig, axes = plt.subplots(3, 1, figsize=(14, 7), sharex=True)
for i, ch in enumerate(["X", "Y", "Z"]):
axes[i].plot(t_days, sp_0.data[ch], linewidth=0.4, color=f"C{i}", label=f"TDI-{ch}")
axes[i].set_ylabel("Frac. freq.", fontsize=12)
axes[i].legend(loc="upper right", fontsize=11)
axes[i].grid(True, alpha=0.3)
axes[i].set_title(f"TDI-{ch}", fontsize=12)
axes[2].set_xlabel("Time [days]", fontsize=12)
fig.suptitle(
f"TDI X, Y, Z — segment 0 ({truncate_kwargs['days']:.0f}-day window)",
fontsize=14,
fontweight="bold",
)
plt.tight_layout()
plt.show()
3. Compute FFT and Periodogram¶
The one-sided periodogram estimate of the noise Power Spectral Density \(S\) is
where \(\tilde{n}\) is the FFT of the processed time series, \(\Delta t\) the sampling interval and \(N\) the length of the truncated data set.
[ ]:
# Data is already normalised by laser_frequency inside process_pipeline.
# Use the new periodogram() and to_aet() methods directly.
CENTRAL_FREQ = data["metadata"]["laser_frequency"]
freq, fft_xyz = sp_0.fft()
sp_0_aet = sp_0.to_aet()
_, fft_aet = sp_0_aet.fft()
# psd_norm for reference (this is the factor baked into periodogram())
psd_norm = 2 * sp_0.dt / sp_0.N
psd_xyz = {ch: (np.abs(fft_xyz[ch]) ** 2) * psd_norm for ch in ["X", "Y", "Z"]}
psd_aet = {ch: (np.abs(fft_aet[ch]) ** 2) * psd_norm for ch in ["A", "E", "T"]}
# Mojito L1 noise estimates, normalised to fractional frequency units
noise_freqs = np.logspace(-5, 0, 1000)
noise_cov_xyz = data["noise_estimates"]["xyz"]
noise_cov_aet = data["noise_estimates"]["aet"]
l1_xyz = {
ch: noise_cov_xyz[0][:, i, i] / CENTRAL_FREQ**2
for i, ch in enumerate(["X", "Y", "Z"])
}
l1_aet = {
ch: noise_cov_aet[0][:, i, i] / CENTRAL_FREQ**2
for i, ch in enumerate(["A", "E", "T"])
}
4. Periodogram vs Mojito L1 Estimate¶
A good processing pipeline produces a periodogram (coloured lines) that traces the Mojito L1 noise model (red dashed) throughout the science band (1×10⁻⁴ to 1×10⁻¹ Hz). Potential deviations at the lowest frequencies indicate residual artefacts from filtering or insufficient trimming.
[ ]:
nyquist = sp_0.fs / 2
xyz_colors = ["C0", "C1", "C2"]
aet_colors = ["#e377c2", "#9467bd", "#8c564b"]
fig, axes = plt.subplots(3, 2, figsize=(16, 10), sharex=False, sharey=True)
for i, ch in enumerate(["X", "Y", "Z"]):
ax = axes[i, 0]
ax.loglog(
freq[1:],
psd_xyz[ch][1:],
linewidth=1.0,
color=xyz_colors[i],
label=f"TDI-{ch}",
)
ax.loglog(
noise_freqs,
l1_xyz[ch],
linestyle="dashed",
color="red",
linewidth=2.0,
label="Mojito L1 estimate",
)
ax.axvspan(1e-4, 1e-1, alpha=0.15, color="green", label="Science band")
ax.set_xlim(1e-5, nyquist)
ax.set_ylim(1e-53, 1e-35)
ax.set_ylabel("PSD [1/Hz]", fontsize=20)
ax.grid(True, which="both", alpha=0.3)
ax.legend(loc="upper left", fontsize=14, framealpha=0.95)
ax.tick_params(axis="both", which="major", labelsize=16)
for i, ch in enumerate(["A", "E", "T"]):
ax = axes[i, 1]
ax.loglog(
freq[1:],
psd_aet[ch][1:],
linewidth=1.0,
color=aet_colors[i],
label=f"TDI-{ch}",
)
ax.loglog(noise_freqs, l1_aet[ch], linestyle="dashed", color="red", linewidth=2.0)
ax.axvspan(1e-4, 1e-1, alpha=0.15, color="green")
ax.set_xlim(1e-5, nyquist)
ax.set_ylim(1e-53, 1e-35)
ax.grid(True, which="both", alpha=0.3)
ax.legend(loc="upper left", fontsize=14, framealpha=0.95)
ax.tick_params(axis="both", which="major", labelsize=16)
axes[2, 0].set_xlabel("Frequency [Hz]", fontsize=20)
axes[2, 1].set_xlabel("Frequency [Hz]", fontsize=20)
fig.suptitle(
f"Processed Mojito Periodogram vs L1 Estimate (segment0)\n"
f"HP={filter_kwargs['highpass_cutoff']:.0e} Hz, "
f"LP={filter_kwargs['lowpass_cutoff']} Hz, "
f"trim={trim_kwargs['fraction']:.1%}, "
f"{truncate_kwargs['days']} days/segment, "
f"fs={sp_0.fs} Hz, "
f"window={window_kwargs['window']} α={window_kwargs.get('alpha', 'N/A')}",
fontsize=20,
fontweight="bold",
)
plt.tight_layout()
plt.show()