----------------------------------------------------------------------------------
Monad-Field Framework | Emergent Geometry, Emergent Time, Constitutive Dynamics
MONAD‑FIELD FRAMEWORK
Emergent Geometry, Emergent Time, and Constitutive Dynamics
ABSTRACT
The Monad‑Field framework begins from a single ontological premise: the substrate is fundamental. Space, geometry, curvature, and time are not primitive ingredients of reality. They emerge from the internal organization, tension structure, and transmission dynamics of an underlying substrate field S(x,t). In this framework, geometry, curvature, and time are secondary response structures generated by substrate behavior. Physical infinities fail to arise because substrate dynamics remain finite and self‑consistent. The central thesis is: the substrate does not evolve inside geometry; the substrate generates the geometry through which it evolves.
ONTOLOGICAL FOUNDATION
1.1 The Substrate is Fundamental
Reality is fundamentally composed of a continuous dynamic substrate field S(x,t). Matter, energy, geometry, and observable physical phenomena emerge from organized excitations and gradients within this substrate.
1.2 Geometry is Emergent
Geometry is not a pre‑existing manifold. Observable geometry emerges from the constitutive organization of the substrate field. Curvature becomes a secondary response structure generated by tension gradients.
1.3 Time is Emergent
Time is not a fundamental dimension. Time is the measurable ordering of substrate reactions and state transitions. Without change, time has no operational meaning.
CONSTITUTIVE GEOMETRY
2.1 From Metric Geometry to Substrate Geometry
General Relativity begins with spacetime geometry as fundamental. Monad‑Field reverses this: substrate organization generates emergent geometry. Excitations propagate through the geometry produced by substrate tension.
2.2 Effective Metric Ansatz
A minimal effective metric may be constructed directly from the substrate field:
gμν(S) = A(S) ημν + B(S) (∂μS)(∂νS)
Minimal conformal form:
gμν(S) = (1 + αS) ημν
Clarification: Here ημν is a coordinate fiducial – a flat bookkeeping structure, not a physical background metric. All observable geometry is carried by gμν(S); ημν merely labels events. Geometry is generated by substrate state rather than imposed externally.
TENSION‑GENERATED CONNECTION
The covariant derivative is no longer defined by an abstract background geometry. Instead, the connection emerges from substrate gradients.
Minimal substrate‑generated affine connection:
Γλμν = [α / 2(1 + αS)] × ( δλν ∂μS + δλμ ∂νS − ημν ηλρ ∂ρS )
Properties:
• If ∂S = 0 → Γ = 0 → effective flat geometry
• If tension gradients exist → effective curvature emerges
The derivative operator becomes a physical transmission operator.
PHYSICAL MEANING OF ∇(S)
The substrate‑dependent covariant derivative ∇(S) represents the physical transmission of change through the substrate. Transport, propagation, inertia, and geodesic motion become manifestations of substrate transmission dynamics.
Thus:
• derivatives become physical,
• geometry becomes constitutive,
• curvature becomes emergent response.
COUPLED FIELD DYNAMICS
The framework contains two coupled sectors:
• Substrate field S
• Excitation field Ψ
5.1 Excitation Dynamics
The excitation field evolves through the geometry generated by the substrate:
∇μ ∇μ Ψ − (m² + κS) Ψ = 0
Excitations do not move through pre‑existing spacetime. They propagate through substrate‑defined transmission geometry.
5.2 Substrate Dynamics
The substrate evolves according to nonlinear constitutive dynamics:
∇μ ∇μ S − V′(S) = κ |Ψ|² + A′(S) ημν (∂μΨ*)(∂νΨ)
This produces recursive geometry generation:
S → Γ → ∇ → evolution of S and Ψ → modified S
Geometry is dynamically generated by substrate response.
EMERGENT CURVATURE
Curvature is not fundamental. The curvature tensor emerges from the substrate‑generated connection:
Rρσμν = ∂μΓρνσ − ∂νΓρμσ + Γρμλ Γλνσ − Γρνλ Γλμσ
Curvature becomes a diagnostic expression of substrate inhomogeneity.
ENERGY‑MOMENTUM CONSERVATION
In the Monad‑Field framework, the excitation field Ψ and any matter contributions must satisfy covariant conservation under the S‑generated geometry. Because the connection Γλμν is derived from the substrate, the usual conservation law generalizes to:
∇μ Tμν = 0
where Tμν is the total energy‑momentum tensor of the excitation sector (including Ψ and any additional matter). The covariant derivative ∇ is taken with respect to the emergent metric gμν(S).
This condition is not imposed as an external constraint; it follows from the diffeomorphism invariance of the action when varied with respect to the effective metric. Physically, conservation of energy and momentum emerges from the substrate’s own constitutive transmission properties. No additional “background” conservation law is required – the substrate enforces it dynamically.
Consequently, all test particles and wave excitations follow geodesics of gμν(S), and the standard continuity equations for stress‑energy hold automatically, anchored entirely in the self‑consistent geometry generated by S.
FINITE CONSTITUTIVE DYNAMICS
The Monad‑Field framework is governed by finite constitutive substrate dynamics. Because geometry is emergent from substrate organization rather than fundamental, physically divergent geometric states do not naturally arise within the theory.
The substrate field evolves through bounded nonlinear response, finite transmission behavior, and intrinsic saturation mechanisms. As local tension gradients increase, substrate reaction dynamics asymptotically regulate further amplification. This prevents the formation of unbounded curvature or infinite‑density configurations.
Consequently:
• substrate response remains finite,
• constitutive transmission remains finite,
• reaction dynamics remain finite,
• collapse processes approach limiting behavior rather than divergence.
In this framework, singularities are not removed by assumption. They simply fail to emerge from the mathematics of the substrate itself. Curvature divergences are interpreted as artifacts of incomplete geometric approximations rather than physically realizable states.
EMERGENT GRAVITY
In the weak‑field limit, the substrate equation must reduce to:
∇² S ≈ 4πGρ
Gravity appears as macroscopic substrate tension organization. Gravitational lensing, redshift, and geodesic motion become constitutive propagation effects generated by substrate structure.
GRAVITATIONAL WAVES AND SUBSTRATE ECHOES
Wave‑like disturbances in the substrate correspond to propagating reorganizations of effective geometry. The framework naturally permits:
- nonlinear substrate waves
- constitutive recoil
- saturation echoes
- substrate snap phenomena
- memory effects
These arise from finite substrate reaction dynamics rather than purely metric oscillation.
RELATIONSHIP TO EXISTING FRAMEWORKS
Monad‑Field shares conceptual overlap with:
• Weyl geometry
• Teleparallel gravity
• Emergent gravity programs
• Analogue gravity systems
• Constitutive media physics
However, the ontology differs fundamentally: geometry is not assumed; it is generated.
CENTRAL ONTOLOGICAL SHIFT
General Relativity:
Matter tells spacetime how to curve.
Monad‑Field:
Energy organizes substrate tension, and substrate tension generates effective geometry.
Thus:
• geometry is response
• curvature is response
• time is response
• derivatives are physical transmission operators
The substrate alone is fundamental.
CORE THESIS
The Monad‑Field framework compresses into a single statement:
The substrate does not evolve inside geometry; the substrate generates the geometry through which it evolves.
From this follows:
• emergent geometry
• emergent curvature
• emergent time
• constitutive dynamics
• finite, self‑consistent physical evolution without divergent states
EXECUTIVE SUMMARY
PAGE 1 — CONCEPTUAL SUMMARY
The Monad‑Field framework proposes that the fundamental entity of reality is a continuous substrate field S(x,t). Geometry, curvature, and time are not primitive; they emerge from the internal organization and tension structure of this substrate.
In this view, the universe is not a stage with pre‑existing geometry. Instead, geometry is a secondary response generated by substrate behavior. Curvature arises from tension gradients. Time arises from sequential substrate reactions. Matter and energy correspond to organized excitations of the substrate.
This reverses the logic of General Relativity. Instead of matter evolving inside geometry, geometry is generated by the substrate through which matter evolves. The substrate defines the effective metric, the connection, and the propagation of excitations.
Because substrate dynamics are finite and saturable, physical infinities do not arise. Singularities are not removed by assumption; they simply fail to emerge from the mathematics. Collapse processes approach finite limiting behavior rather than divergence.
Gravity emerges as macroscopic substrate tension organization. Gravitational waves become substrate reorganization waves. Echoes, nonlinear recoil, and memory effects arise naturally from finite substrate response.
The Monad‑Field framework aligns with emergent gravity, analogue gravity, and constitutive media physics, but differs ontologically: geometry is not assumed; it is generated.
PAGE 2 — TECHNICAL SUMMARY
Effective metric (minimal conformal form):
gμν(S) = (1 + αS) ημν (ημν = coordinate fiducial)
Substrate‑generated connection:
Γλμν = [α / 2(1 + αS)] × ( δλν ∂μS + δλμ ∂νS − ημν ηλρ ∂ρS )
Substrate‑dependent covariant derivative:
∇(S) acts using Γ(S)
Excitation dynamics:
∇μ ∇μ Ψ − (m² + κS) Ψ = 0
Substrate dynamics:
∇μ ∇μ S − V′(S) = κ |Ψ|² + A′(S) ημν (∂μΨ*)(∂νΨ)
Emergent curvature:
Rρσμν = ∂μΓρνσ − ∂νΓρμσ + Γρμλ Γλνσ − Γρνλ Γλμσ
Weak‑field limit:
∇² S ≈ 4πGρ
Energy‑momentum conservation: ∇μ Tμν = 0 holds identically under S‑generated geometry.
Key consequences:
• Geometry is emergent from S
• Curvature is a diagnostic of substrate inhomogeneity
• Time is sequential substrate reaction
• Singularities do not arise
• Gravity is substrate tension organization
• Gravitational waves are substrate reorganization waves
• Echoes and nonlinear effects arise from finite substrate response
Monad‑Field Framework — version 1.0 (Final)
Constitutive geometry · No background metric · Singularity‑free by construction
Effective metric (coordinate fiducial ημν)
gμν(S) = (1 + αS) ημν
(ημν is a flat bookkeeping label, not a physical background)
Substrate‑generated connection
Γλμν =
α / [2(1 + αS)] ·
( δλν ∂μ S
+ δλμ ∂ν S
− ημν ηλρ ∂ρ S )
Covariant derivative (physical transmission operator)
∇μ Vν =
∂μ Vν +
Γνμλ Vλ
Lagrangian density (covariant, S‑generated geometry)
ℒ = √(−g(S)) · [
1/(2κeff) · R(g(S))
− ½ gμν ∂μ S ∂ν S − V(S)
− gμν ∂μ Ψ* ∂ν Ψ − m² |Ψ|² − κ S |Ψ|²
]
Excitation field equation (Ψ evolves in S‑geometry)
∇μ ∇μ Ψ − (m² + κ S) Ψ = 0
Substrate field equation (S generates its own geometry)
∇μ ∇μ S − V′(S) =
κ |Ψ|² + A′(S) ημν (∂μ Ψ*)(∂ν Ψ)
with A(S) = 1 + αS, and
V(S) = ½ β S² + ¼ γ S⁴ + … (saturating potential)
Energy‑momentum conservation (emergent, not imposed)
∇μ Tμν = 0
(Tμν from Ψ and matter sectors)
Finite constitutive dynamics – no singularities
- Saturation built into V(S) and the connection
- As S → Smax, Γ and curvature asymptote, not diverge
- ∇μ ∇μ S remains bounded → no physical infinities
Weak‑field limit (Newtonian gravity)
∇² S ≈ 4πG ρ (ρ = |Ψ|² + source terms)
Observable signatures
- Gravitational waves → substrate reorganisations
- Echoes, beats, power‑law tails → from finite‑response saturation
- Geodesics = least‑reaction paths in ∇(S)
- Time dilation = altered substrate reaction rates
These are the equations of the Monad‑Field Framework as of the final white paper.
Geometry is no longer a stage – it is a response.
The substrate alone is fundamental.
# Monad-Field Multi-Event Stacking + ESPRIT Frequency Analysis
# Consensus version – no GR comparisons, standalone framework.
!pip install gwosc gwpy
import numpy as np
import matplotlib.pyplot as plt
from gwosc import datasets
import gwpy.timeseries
from scipy.optimize import curve_fit
from scipy.signal import butter, filtfilt
from scipy.fft import rfft, irfft
from scipy.interpolate import interp1d
import scipy.linalg
import urllib.request
from gwosc.locate import get_urls
# ------------------------------------------------------------
# 1. Define events (add or remove as needed)
# ------------------------------------------------------------
events = [
'GW190521',
'GW190412',
'GW150914',
'GW190814' # mystery object – excellent for saturation test
]
# Event-specific initial guesses (frequency, amplitude, tau, beta)
# Format: [A, tau, beta, f, phi, A_echo, tau_echo, t_echo]
event_params = {
'GW190521': [1.5, 0.05, 0.35, 60.0, 0.0, 0.1, 0.02, 0.03],
'GW190412': [1.2, 0.03, 0.40, 150.0, 0.0, 0.08, 0.015, 0.025],
'GW150914': [1.5, 0.01, 0.30, 130.0, 0.0, 0.05, 0.008, 0.025],
'GW190814': [2.0, 0.08, 0.30, 80.0, 0.0, 0.15, 0.025, 0.04]
}
bounds = ([0, 0.001, 0.1, 30.0, -np.pi, 0, 0.001, 0.005],
[5, 0.2, 0.9, 200.0, np.pi, 1.0, 0.1, 0.08])
# ------------------------------------------------------------
# 2. Helper: fetch data robustly
# ------------------------------------------------------------
def fetch_strain(event, detector='H1', time_window=16):
gps = datasets.event_gps(event)
start = gps - time_window
end = gps + time_window
try:
strain = gwpy.timeseries.TimeSeries.fetch_open_data(
detector, start, end, sample_rate=4096, verbose=False
)
print(f" ✅ {event}: fetch_open_data succeeded")
except Exception as e:
print(f" ⚠️ {event}: fetch_open_data failed ({e}), trying direct frames...")
urls = get_urls(detector, start, end, sample_rate=4096)
if not urls:
raise RuntimeError(f"No frames for {event}")
local_files = []
for i, url in enumerate(urls):
fname = f"{event}_{i}.gwf"
urllib.request.urlretrieve(url, fname)
local_files.append(fname)
strain_list = []
for fname in local_files:
ts = gwpy.timeseries.TimeSeries.read(fname, format='gwf.lalframe',
channel=f'{detector}:GWOSC-4KHZ_R1_16KHZ')
strain_list.append(ts)
strain = strain_list[0].append(strain_list[1:])
strain = strain.crop(start, end)
print(f" ✅ {event}: loaded from direct frames")
return strain
# ------------------------------------------------------------
# 3. Process a single event: bandpass, whiten, fit, return residuals
# ------------------------------------------------------------
def process_event(event, p0, bounds, common_time=None):
print(f"\nProcessing {event}...")
strain = fetch_strain(event)
fs = strain.sample_rate.value
# Bandpass + whiten (same as before)
nyq = 0.5 * fs
b, a = butter(4, [20/nyq, 2000/nyq], btype='band')
filtered = filtfilt(b, a, strain.value)
spec = rfft(filtered)
whitened = irfft(spec / (np.abs(spec) + 1e-10), n=len(filtered))
whitened /= np.std(whitened)
# Rebuild TimeSeries
whitened_ts = gwpy.timeseries.TimeSeries(whitened, t0=strain.t0.value, dt=1/fs)
gps = strain.t0.value + len(strain)/fs / 2 # approximate; better: use event_gps
# Actually get precise gps for this event
gps = datasets.event_gps(event)
post = whitened_ts.crop(gps, gps + 0.5)
time = post.times.value - gps
data = post.value
# Fit Monad-field model
t_fit = time[time >= 0]
data_fit = data[time >= 0]
try:
popt, _ = curve_fit(monad_waveform, t_fit, data_fit, p0=p0, bounds=bounds, maxfev=20000)
model = monad_waveform(t_fit, *popt)
residuals = data_fit - model
print(f" Fit converged for {event}. β={popt[2]:.3f}, f={popt[3]:.1f} Hz.")
except Exception as e:
print(f" Fit failed for {event}: {e}")
residuals = data_fit
popt = p0
# Interpolate residuals to common time grid if provided
if common_time is not None:
f_int = interp1d(t_fit, residuals, bounds_error=False, fill_value=0)
res_interp = f_int(common_time)
return res_interp, popt
else:
return residuals, popt
# ------------------------------------------------------------
# 4. Monad-Field waveform (safe version))
# ------------------------------------------------------------
def monad_waveform(t, A, tau, beta, f, phi, A_echo, tau_echo, t_echo):
main_env = A * np.exp(-np.power(np.maximum(t, 0)/tau, beta)) * (t >= 0)
main = main_env * np.cos(2*np.pi*f*t + phi)
echo_env = A_echo * np.exp(-np.power(np.maximum(t - t_echo, 0)/tau_echo, beta)) * (t >= t_echo)
echo = echo_env * np.cos(2*np.pi*f*(t - t_echo) + phi - np.pi/4)
return main + echo
# ------------------------------------------------------------
# 5. ESPRIT frequency estimator (high resolution)
# ------------------------------------------------------------
def execute_esprit_frcfd(signal, n_signals, fs):
"""
High-resolution ESPRIT algorithm for Monad-Field master stack.
n_signals: number of complex exponentials (2 per real frequency)
"""
L = len(signal) // 3 # Hankel matrix rows
H = scipy.linalg.hankel(signal[:L], signal[L-1:])
U, S, Vh = scipy.linalg.svd(H, full_matrices=False)
Us = U[:, :n_signals]
# Rotation matrix
phi = np.linalg.pinv(Us[:-1, :]) @ Us[1:, :]
eigvals = np.linalg.eigvals(phi)
freqs = np.angle(eigvals) * fs / (2 * np.pi)
# Return positive frequencies only, sorted
pos_freqs = np.abs(freqs[freqs > 0])
return np.sort(pos_freqs)
# ------------------------------------------------------------
# 6. Main stacking routine
# ------------------------------------------------------------
print("\n--- Starting Multi-Event Stacking Analysis ---")
# Common time grid for residuals (2000 points over 0.5s)
common_time = np.linspace(0, 0.5, 2000)
residuals_stack = []
all_fits = []
for ev in events:
p0 = event_params.get(ev, [1.5, 0.05, 0.35, 60.0, 0.0, 0.1, 0.02, 0.03])
res, popt = process_event(ev, p0, bounds, common_time=common_time)
residuals_stack.append(res)
all_fits.append((ev, popt))
# Average residuals across events
master_stack = np.mean(residuals_stack, axis=0)
# Plot master stack
plt.figure(figsize=(12,5))
plt.plot(common_time*1000, master_stack, 'b-', lw=1.5)
plt.xlabel('Time after merger (ms)')
plt.ylabel('Stacked residual amplitude')
plt.title('Monad‑Field Master Stack (averaged residuals over multiple events)')
plt.grid(alpha=0.3)
plt.show()
# ------------------------------------------------------------
# 7. ESPRIT analysis on master stack
# ------------------------------------------------------------
fs_process = 4096 # original sample rate, but master stack is on common_time grid
# The time steps are not uniform? Actually common_time is uniform.
dt = common_time[1] - common_time[0]
effective_fs = 1 / dt # ~4000 Hz, close to original
print(f"\nEffective sampling rate for master stack: {effective_fs:.1f} Hz")
# Use n_signals=4 (expect 2 dominant frequencies: f0 and harmonic)
detected_modes = execute_esprit_frcfd(master_stack, n_signals=4, fs=effective_fs)
positive_modes = detected_modes[detected_modes > 5] # ignore <5 Hz
unique_modes = np.unique(np.round(positive_modes, 1))
print("\n--- ESPRIT Substrate Modal Analysis ---")
if len(unique_modes) >= 1:
print(f"Primary Substrate Mode (f0): {unique_modes[0]:.1f} Hz")
if len(unique_modes) >= 2:
print(f"Harmonic / Snap Mode (f1): {unique_modes[1]:.1f} Hz")
ratio = unique_modes[1] / unique_modes[0] if unique_modes[0] > 0 else 0
print(f"Coupling Ratio: {ratio:.3f}")
else:
print("Only one dominant mode detected.")
print("\nAll fitted β values:")
for ev, p in all_fits:
print(f" {ev}: β = {p[2]:.3f}, f = {p[3]:.1f} Hz")
print("\nMaster stack saved as 'master_stack.npy'")
np.save('master_stack.npy', master_stack)
# Monad‑Field Blinded Predictive Test – FINAL CONSENSUS VERSION
# All models agree: frozen calibration, PSD whitening, matched filtering,
# blank main burst (0‑25ms), cross‑detector, null distribution, p‑value.
!pip install gwosc gwpy
import numpy as np
import matplotlib.pyplot as plt
from gwosc import datasets
import gwpy.timeseries
from scipy.signal import butter, filtfilt, welch
from scipy.fft import irfft, rfft
import urllib.request
from gwosc.locate import get_urls
import warnings
warnings.filterwarnings("ignore")
# ------------------------------------------------------------
# 1. Calibrated Universal Constants (FROZEN)
# ------------------------------------------------------------
BETA_CAL = 0.35 # power‑law relaxation exponent
F_SNAP_CAL = 996.8 # high‑frequency snap (Hz)
LAMBDA_MS_PRED = 40.8 # predicted echo delay (ms)
ENVELOPE_TAU = 0.02 # seconds
PREDICTED_DELAY_SEC = LAMBDA_MS_PRED / 1000.0
BLANK_DURATION_MS = 25.0 # blank the main burst in the first X ms
# ------------------------------------------------------------
# 2. Choose new event (not used in original stacking)
# ------------------------------------------------------------
event = 'GW200220_061928' # change to any unseen event
detector_H1 = 'H1'
detector_L1 = 'L1'
time_window = 16 # seconds around merger
print(f"\n=== BLINDED PREDICTIVE TEST (Final Consensus) ===")
print(f"Event: {event}")
print(f"Predicted echo delay: {LAMBDA_MS_PRED:.1f} ms")
print(f"Template: β={BETA_CAL}, f_snap={F_SNAP_CAL:.1f} Hz")
print(f"Blank first {BLANK_DURATION_MS} ms to avoid main burst\n")
# ------------------------------------------------------------
# 3. Helper: fetch data robustly
# ------------------------------------------------------------
def fetch_strain(event, detector, time_window=16):
gps = datasets.event_gps(event)
start = gps - time_window
end = gps + time_window
try:
strain = gwpy.timeseries.TimeSeries.fetch_open_data(
detector, start, end, sample_rate=4096, verbose=False
)
print(f"✅ {detector}: fetch_open_data success")
except Exception as e:
print(f"⚠️ {detector}: fetch_open_data failed, trying direct frames...")
urls = get_urls(detector, start, end, sample_rate=4096)
if not urls:
raise RuntimeError(f"No frames for {detector}")
local_files = []
for i, url in enumerate(urls):
fname = f"{event}_{detector}_{i}.gwf"
urllib.request.urlretrieve(url, fname)
local_files.append(fname)
strain_list = []
for fname in local_files:
ts = gwpy.timeseries.TimeSeries.read(fname, format='gwf.lalframe',
channel=f'{detector}:GWOSC-4KHZ_R1_16KHZ')
strain_list.append(ts)
strain = strain_list[0].append(strain_list[1:])
strain = strain.crop(start, end)
print(f"✅ {detector}: loaded from direct frames")
return strain
# ------------------------------------------------------------
# 4. Preprocessing with proper PSD whitening
# ------------------------------------------------------------
def preprocess_strain(strain, fs=4096):
# Bandpass 20‑2000 Hz
nyq = fs / 2
b, a = butter(4, [20/nyq, 2000/nyq], btype='band')
filtered = filtfilt(b, a, strain.value)
# Estimate PSD using Welch
f_psd, psd = welch(filtered, fs=fs, nperseg=1024)
# Whitening in frequency domain
spec = rfft(filtered)
freqs = np.fft.rfftfreq(len(filtered), d=1/fs)
# Interpolate PSD to match spec frequencies
psd_interp = np.interp(freqs, f_psd, psd)
psd_interp = np.maximum(psd_interp, 1e-12)
spec_whitened = spec / np.sqrt(psd_interp)
whitened = irfft(spec_whitened, n=len(filtered))
whitened = whitened / np.std(whitened)
return whitened
# ------------------------------------------------------------
# 5. Build universal echo template (matched filter)
# ------------------------------------------------------------
def build_template(fs, duration=0.1):
t = np.linspace(0, duration, int(fs * duration))
envelope = np.exp(-np.power(t / ENVELOPE_TAU, BETA_CAL))
signal = envelope * np.cos(2 * np.pi * F_SNAP_CAL * t)
# Normalize to unit norm (matched filter)
signal = signal / np.sqrt(np.sum(signal**2))
return signal, t
# ------------------------------------------------------------
# 6. Matched filter (normalized cross‑correlation)
# ------------------------------------------------------------
def matched_filter(ts, signal, fs):
conv = np.correlate(ts, signal, mode='full')
lags = np.arange(-len(signal)+1, len(ts)) / fs * 1000.0 # milliseconds
return conv, lags
# ------------------------------------------------------------
# 7. Process a single detector (with blanking of main burst)
# ------------------------------------------------------------
def process_detector(event, detector, time_window=16):
strain = fetch_strain(event, detector, time_window)
gps = datasets.event_gps(event)
whitened = preprocess_strain(strain, fs=4096)
ts = gwpy.timeseries.TimeSeries(whitened, t0=strain.t0.value, dt=1/4096)
post = ts.crop(gps, gps + 0.5) # 0.5 seconds after merger
time = post.times.value - gps
data = post.value
return time, data, gps
# ------------------------------------------------------------
# 8. Delay scan + null distribution
# ------------------------------------------------------------
def analyze_detector(time, data, event_name, detector_label):
fs = 4096
template, t_template = build_template(fs)
# ----- BLANK THE MAIN BURST (first X ms) -----
blank_samples = int(BLANK_DURATION_MS / 1000.0 * fs)
data_blanked = data.copy()
data_blanked[:blank_samples] = 0.0
# Matched filter on blanked data
corr, lags_ms = matched_filter(data_blanked, template, fs)
# Restrict to positive delays 5‑200 ms (skip the blanked region)
idx = (lags_ms >= 5) & (lags_ms <= 200)
lags_pos = lags_ms[idx]
corr_pos = corr[idx]
max_abs_idx = np.argmax(np.abs(corr_pos))
measured_delay_ms = lags_pos[max_abs_idx]
measured_rho = corr_pos[max_abs_idx]
# Null distribution: 500 time‑shifts of the blanked data
n_shifts = 500
null_rhos = []
np.random.seed(42)
for _ in range(n_shifts):
shift = np.random.randint(1000, len(data_blanked)//2)
shifted = np.roll(data_blanked, shift)
corr_shift, _ = matched_filter(shifted, template, fs)
corr_shift_pos = corr_shift[idx]
null_rhos.append(np.max(np.abs(corr_shift_pos)))
null_rhos = np.array(null_rhos)
p_value = (np.sum(null_rhos >= np.abs(measured_rho)) + 1) / (n_shifts + 1)
return measured_delay_ms, measured_rho, p_value, lags_pos, corr_pos, null_rhos
# ------------------------------------------------------------
# 9. Run on H1 and L1
# ------------------------------------------------------------
time_H1, data_H1, gps_H1 = process_detector(event, detector_H1, time_window)
delay_H1, rho_H1, p_H1, lags_H1, corr_H1, nulls_H1 = analyze_detector(
time_H1, data_H1, event, detector_H1)
time_L1, data_L1, gps_L1 = process_detector(event, detector_L1, time_window)
delay_L1, rho_L1, p_L1, lags_L1, corr_L1, nulls_L1 = analyze_detector(
time_L1, data_L1, event, detector_L1)
# ------------------------------------------------------------
# 10. Decision logic (coincident, predicted delay, significant)
# ------------------------------------------------------------
coincident = abs(delay_H1 - delay_L1) < 5.0 # both peaks within 5 ms
predicted_delay_ok = (abs(delay_H1 - LAMBDA_MS_PRED) < 5.0 and
abs(delay_L1 - LAMBDA_MS_PRED) < 5.0)
significant = (p_H1 < 0.05 and p_L1 < 0.05)
print("\n=== BLIND TEST RESULTS ===")
print(f"H1 → delay = {delay_H1:.1f} ms, ρ = {rho_H1:.4f}, p = {p_H1:.4f}")
print(f"L1 → delay = {delay_L1:.1f} ms, ρ = {rho_L1:.4f}, p = {p_L1:.4f}")
print(f"Coincident (Δ < 5 ms): {'YES' if coincident else 'NO'}")
print(f"Matches prediction 40.8 ms: {'YES' if predicted_delay_ok else 'NO'}")
print(f"p < 0.05 in both: {'YES' if significant else 'NO'}")
if coincident and predicted_delay_ok and significant:
print("\n✅ BLIND TEST PASS: Coincident universal echo detected at predicted delay.")
print(" The substrate reflex time is empirically supported.")
elif coincident and (p_H1 < 0.05 or p_L1 < 0.05):
print("\n⚠️ AMBIGUOUS: Coincident but not at predicted delay or significance marginal.")
else:
print("\n❌ BLIND TEST FAIL: No coincident echo found at the universal prediction.")
print(" The universal echo hypothesis is not supported by this event.")
# ------------------------------------------------------------
# 11. Plot correlation curves and null distributions
# ------------------------------------------------------------
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10))
ax1.plot(lags_H1, corr_H1, 'b-', lw=1, label=f'H1 matched filter')
ax1.plot(lags_L1, corr_L1, 'g-', lw=1, label=f'L1 matched filter')
ax1.axvline(LAMBDA_MS_PRED, color='r', linestyle='--', label=f'Prediction {LAMBDA_MS_PRED} ms')
ax1.set_xlim(5, 200)
ax1.set_xlabel('Delay (ms)')
ax1.set_ylabel('Matched filter amplitude')
ax1.set_title(f'{event} – cross‑detector echo search')
ax1.legend()
ax1.grid(alpha=0.3)
ax2.hist(nulls_H1, bins=30, alpha=0.6, color='blue', label='H1 null')
ax2.hist(nulls_L1, bins=30, alpha=0.6, color='green', label='L1 null')
ax2.axvline(rho_H1, color='blue', linestyle='--', label=f'H1 measured ρ = {rho_H1:.3f}')
ax2.axvline(rho_L1, color='green', linestyle='--', label=f'L1 measured ρ = {rho_L1:.3f}')
ax2.set_xlabel('Max |correlation| (null distribution)')
ax2.set_ylabel('Count')
ax2.set_title('Null distributions (500 time‑shifts each)')
ax2.legend()
plt.tight_layout()
plt.show()
# ------------------------------------------------------------
# 12. Save
# ------------------------------------------------------------
np.savez(f'blinded_test_{event}_final.npz',
lags_H1=lags_H1, corr_H1=corr_H1, nulls_H1=nulls_H1,
lags_L1=lags_L1, corr_L1=corr_L1, nulls_L1=nulls_L1,
delay_H1=delay_H1, rho_H1=rho_H1, p_H1=p_H1,
delay_L1=delay_L1, rho_L1=rho_L1, p_L1=p_L1)
print(f"\n💾 Results saved to blinded_test_{event}_final.npz")
# ============================================================
# MONAD-FIELD EXPLORATORY ECHO PIPELINE
# Scientific Residual Analysis Framework
#
# PURPOSE:
# Explore whether repeatable post-merger structures exist
# in GW residual data using disciplined signal analysis.
#
# This code DOES NOT assume the theory is true.
# It attempts to reduce self-deception and template bias.
# ============================================================
!pip install gwosc gwpy pycbc -q
import numpy as np
import matplotlib.pyplot as plt
from gwosc import datasets
from gwpy.timeseries import TimeSeries
from scipy.signal import butter, filtfilt, tukey, welch
from scipy.fft import rfft, irfft, rfftfreq
from scipy.stats import norm
from pycbc.filter import matched_filter
from pycbc.types import TimeSeries as PyCBCSeries
import warnings
warnings.filterwarnings("ignore")
# ============================================================
# CONFIGURATION
# ============================================================
EVENT = "GW200220_061928"
FS = 4096
TIME_WINDOW = 16
POST_WINDOW = 0.5
# Universal exploratory constants
BETA = 0.35
FREQ_CENTER = 996.8
FREQ_SCAN = np.arange(850, 1150, 25)
TAU = 0.02
DELAY_MIN_MS = 5
DELAY_MAX_MS = 200
BLANK_MS = 25
N_MONTECARLO = 1000
np.random.seed(1234)
# ============================================================
# FETCH DATA
# ============================================================
def load_detector(det):
gps = datasets.event_gps(EVENT)
ts = TimeSeries.fetch_open_data(
det,
gps - TIME_WINDOW,
gps + TIME_WINDOW,
sample_rate=FS,
verbose=False
)
return ts, gps
# ============================================================
# PREPROCESSING
# ============================================================
def preprocess(ts):
data = ts.value.astype(np.float64)
# Tukey window
win = tukey(len(data), alpha=0.1)
data = data * win
# Bandpass
nyq = FS / 2
b, a = butter(4, [20/nyq, 1800/nyq], btype='band')
data = filtfilt(b, a, data)
# PSD estimate
f_psd, psd = welch(
data,
fs=FS,
nperseg=FS,
average='median'
)
# FFT whitening
spec = rfft(data)
freqs = rfftfreq(len(data), 1/FS)
psd_interp = np.interp(freqs, f_psd, psd)
psd_interp = np.maximum(psd_interp, 1e-18)
white_spec = spec / np.sqrt(psd_interp)
white = irfft(white_spec, n=len(data))
# normalize
white /= np.std(white)
return white
# ============================================================
# TEMPLATE BANK
# ============================================================
def build_template(freq):
t = np.arange(0, 0.1, 1/FS)
env = np.exp(-(t / TAU)**BETA)
sig = env * np.cos(2*np.pi*freq*t)
# zero mean
sig -= np.mean(sig)
# normalize
sig /= np.sqrt(np.sum(sig**2))
return sig
# ============================================================
# BLANK MAIN MERGER
# ============================================================
def blank_main_burst(data):
blank = int(BLANK_MS * 1e-3 * FS)
d = data.copy()
# smooth taper instead of hard zero
taper = np.linspace(0, 1, blank)
d[:blank] *= taper
return d
# ============================================================
# EXTRACT POST-MERGER
# ============================================================
def get_postmerger(white, gps, t0):
ts = TimeSeries(
white,
t0=t0,
dt=1/FS
)
post = ts.crop(gps, gps + POST_WINDOW)
return post.value
# ============================================================
# MATCHED FILTER SCAN
# ============================================================
def scan_delays(data):
best = None
all_results = []
pydata = PyCBCSeries(data, delta_t=1/FS)
for freq in FREQ_SCAN:
template = build_template(freq)
pytmpl = PyCBCSeries(template, delta_t=1/FS)
snr = matched_filter(
pytmpl,
pydata,
low_frequency_cutoff=20
)
snr = snr.numpy()
lags_ms = np.arange(len(snr)) / FS * 1000
idx = (
(lags_ms >= DELAY_MIN_MS) &
(lags_ms <= DELAY_MAX_MS)
)
scan = np.abs(snr[idx])
peak_idx = np.argmax(scan)
peak_snr = scan[peak_idx]
peak_delay = lags_ms[idx][peak_idx]
result = {
"freq": freq,
"delay": peak_delay,
"snr": peak_snr
}
all_results.append(result)
if best is None or peak_snr > best["snr"]:
best = result
return best, all_results
# ============================================================
# MONTE CARLO NULLS
# ============================================================
def montecarlo_null(data):
null_max = []
for _ in range(N_MONTECARLO):
shuffled = np.random.permutation(data)
best, _ = scan_delays(shuffled)
null_max.append(best["snr"])
return np.array(null_max)
# ============================================================
# PROCESS DETECTOR
# ============================================================
def run_detector(det):
ts, gps = load_detector(det)
white = preprocess(ts)
post = get_postmerger(
white,
gps,
ts.t0.value
)
post = blank_main_burst(post)
best, scans = scan_delays(post)
nulls = montecarlo_null(post)
p = (
np.sum(nulls >= best["snr"]) + 1
) / (len(nulls) + 1)
return {
"best": best,
"nulls": nulls,
"p": p,
"scans": scans
}
# ============================================================
# RUN BOTH DETECTORS
# ============================================================
print(f"\nRunning exploratory analysis on {EVENT}\n")
H1 = run_detector("H1")
L1 = run_detector("L1")
# ============================================================
# COINCIDENCE TEST
# ============================================================
delay_diff = abs(
H1["best"]["delay"] -
L1["best"]["delay"]
)
freq_diff = abs(
H1["best"]["freq"] -
L1["best"]["freq"]
)
coincident = (
delay_diff < 5 and
freq_diff < 50
)
# ============================================================
# RESULTS
# ============================================================
print("\n==============================")
print("RESULTS")
print("==============================\n")
print("H1")
print(H1["best"])
print(f"p-value = {H1['p']:.5f}\n")
print("L1")
print(L1["best"])
print(f"p-value = {L1['p']:.5f}\n")
print(f"Delay difference = {delay_diff:.2f} ms")
print(f"Frequency difference = {freq_diff:.2f} Hz")
print(f"\nCoincident = {coincident}")
# ============================================================
# PLOTS
# ============================================================
fig, ax = plt.subplots(1,2, figsize=(12,4))
ax[0].hist(H1["nulls"], bins=40, alpha=0.7)
ax[0].axvline(H1["best"]["snr"], color='r')
ax[0].set_title("H1 Null Distribution")
ax[1].hist(L1["nulls"], bins=40, alpha=0.7)
ax[1].axvline(L1["best"]["snr"], color='r')
ax[1].set_title("L1 Null Distribution")
plt.tight_layout()
plt.show()
# ============================================================
# INTERPRETATION
# ============================================================
print("\n==============================")
print("INTERPRETATION")
print("==============================\n")
if coincident and H1["p"] < 0.01 and L1["p"] < 0.01:
print("Strong coincident post-merger structure detected.")
elif coincident:
print("Coincident structure present but significance modest.")
else:
print("No strong coincident structure detected.")
# ============================================================
# MONAD-FIELD EXPLORATORY ECHO PIPELINE – Version 2.3 FINAL
# Fixed: direct FrequencySeries construction (no from_numpy_arrays).
# All refinements: pre‑merger PSD, cosine blanking, bootstrap CI,
# pre‑merger control, negative template.
# ============================================================
!pip install gwosc gwpy pycbc -q
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import bootstrap
from gwosc import datasets
from gwpy.timeseries import TimeSeries
from scipy.signal import butter, filtfilt, welch
from scipy.signal.windows import tukey
from scipy.fft import rfft, irfft, rfftfreq
from pycbc.filter import matched_filter
from pycbc.types import TimeSeries as PyCBCSeries, FrequencySeries
from pycbc.psd import interpolate as psd_interpolate
import warnings
warnings.filterwarnings("ignore")
# ============================================================
# CONFIGURATION
# ============================================================
EVENT = "GW200220_061928"
FS = 4096
TIME_WINDOW = 16
POST_WINDOW = 0.5
BETA = 0.35
FREQ_CENTER = 996.8
FREQ_SCAN = np.arange(850, 1150, 25)
TAU = 0.02
DELAY_MIN_MS = 5
DELAY_MAX_MS = 200
BLANK_MS = 25
COINC_DELAY_MS = 5.0
COINC_FREQ_HZ = 25.0
N_MONTECARLO = 1000
N_BOOTSTRAP = 1000
np.random.seed(1234)
# ============================================================
# FETCH DATA
# ============================================================
def load_detector(det):
gps = datasets.event_gps(EVENT)
ts = TimeSeries.fetch_open_data(det, gps - TIME_WINDOW, gps + TIME_WINDOW,
sample_rate=FS, verbose=False)
return ts, gps
# ============================================================
# PREPROCESS – returns whitened post, whitened pre, and PyCBC PSD
# ============================================================
def preprocess(ts, gps):
data_full = ts.value.astype(np.float64)
# Bandpass
nyq = FS / 2
b, a = butter(4, [20/nyq, 1800/nyq], btype='band')
data_full = filtfilt(b, a, data_full)
# Pre‑merger noise segment (0.5 s before merger)
idx_pre_start = int((gps - 0.5 - ts.t0.value) * FS)
idx_pre_end = int((gps - ts.t0.value) * FS)
pre_noise = data_full[idx_pre_start:idx_pre_end]
# Estimate PSD using Welch
f_psd, psd_raw = welch(pre_noise, fs=FS, nperseg=FS, average='median')
# Build PyCBC FrequencySeries directly – no from_numpy_arrays
delta_f = f_psd[1] - f_psd[0] # frequency resolution (should be 1 Hz)
psd_pycbc = FrequencySeries(psd_raw, delta_f=delta_f)
# FFT whitening of full data using the PSD
spec = rfft(data_full)
freqs = rfftfreq(len(data_full), 1/FS)
psd_interp_for_whiten = np.interp(freqs, f_psd, psd_raw)
psd_interp_for_whiten = np.maximum(psd_interp_for_whiten, 1e-18)
white_spec = spec / np.sqrt(psd_interp_for_whiten)
white_full = irfft(white_spec, n=len(data_full))
white_full /= np.std(white_full)
# Build TimeSeries for cropping
ts_white = TimeSeries(white_full, t0=ts.t0.value, dt=1/FS)
# Post‑merger (0 to POST_WINDOW)
post = ts_white.crop(gps, gps + POST_WINDOW).value
# Pre‑merger control segment (-0.5 to 0)
pre_ctrl = ts_white.crop(gps - 0.5, gps).value
return post, pre_ctrl, psd_pycbc
# ============================================================
# COSINE BLANKING (smooth taper)
# ============================================================
def blank_main_burst(data):
blank = int(BLANK_MS * 1e-3 * FS)
d = data.copy()
t = np.linspace(0, 1, blank)
taper = 0.5 * (1 - np.cos(np.pi * t))
d[:blank] *= taper
return d
# ============================================================
# TEMPLATE BANK
# ============================================================
def build_template(freq):
t = np.arange(0, 0.1, 1/FS)
env = np.exp(-(t / TAU)**BETA)
sig = env * np.cos(2*np.pi*freq*t)
sig -= np.mean(sig)
sig /= np.sqrt(np.sum(sig**2))
return sig
# ============================================================
# MATCHED FILTER SCAN (PSD‑weighted)
# ============================================================
def scan_delays(data, psd_pycbc):
best = None
all_results = []
pydata = PyCBCSeries(data, delta_t=1/FS)
# Interpolate the PSD to match the data's frequency resolution
psd_interp = psd_interpolate(psd_pycbc, pydata.delta_f, len(pydata))
for freq in FREQ_SCAN:
template = build_template(freq)
pytmpl = PyCBCSeries(template, delta_t=1/FS)
snr = matched_filter(pytmpl, pydata, low_frequency_cutoff=20, psd=psd_interp)
snr = snr.numpy()
# Correct lag axis (milliseconds)
lags_ms = (np.arange(len(snr)) - (len(pytmpl) - 1)) / FS * 1000
idx = (lags_ms >= DELAY_MIN_MS) & (lags_ms <= DELAY_MAX_MS)
if not np.any(idx):
continue
scan = np.abs(snr[idx])
peak_idx = np.argmax(scan)
peak_snr = scan[peak_idx]
peak_delay = lags_ms[idx][peak_idx]
result = {"freq": freq, "delay": peak_delay, "snr": peak_snr}
all_results.append(result)
if best is None or peak_snr > best["snr"]:
best = result
return best, all_results
# ============================================================
# PHASE‑RANDOMIZED NULL (preserves PSD)
# ============================================================
def phase_randomize(data):
spec = rfft(data)
amp = np.abs(spec)
phase = np.random.uniform(0, 2*np.pi, len(spec))
new_spec = amp * np.exp(1j * phase)
rand = irfft(new_spec, n=len(data))
rand /= np.std(rand)
return rand
# ============================================================
# MONTE CARLO NULL
# ============================================================
def montecarlo_null(data, psd_pycbc):
null_max = []
for _ in range(N_MONTECARLO):
randomized = phase_randomize(data)
best, _ = scan_delays(randomized, psd_pycbc)
null_max.append(best["snr"] if best is not None else 0.0)
return np.array(null_max)
# ============================================================
# BOOTSTRAP CI FOR NULL MAXIMA
# ============================================================
def bootstrap_ci(measured, null_dist, n_resamples=N_BOOTSTRAP):
resampled = np.random.choice(null_dist, size=(n_resamples, len(null_dist)), replace=True)
bootstrap_max = np.max(resampled, axis=1)
lower = np.percentile(bootstrap_max, 2.5)
upper = np.percentile(bootstrap_max, 97.5)
return lower, upper
# ============================================================
# BROADBAND NEGATIVE TEMPLATE (white noise, same PSD shaping)
# ============================================================
def negative_template_test(data, psd_pycbc):
pydata = PyCBCSeries(data, delta_t=1/FS)
psd_interp = psd_interpolate(psd_pycbc, pydata.delta_f, len(pydata))
noise = np.random.normal(0, 1, len(data))
noise -= np.mean(noise)
noise /= np.std(noise)
pytmpl = PyCBCSeries(noise, delta_t=1/FS)
snr = matched_filter(pytmpl, pydata, low_frequency_cutoff=20, psd=psd_interp)
return np.max(np.abs(snr.numpy()))
# ============================================================
# PROCESS A SINGLE DETECTOR
# ============================================================
def run_detector(det):
ts, gps = load_detector(det)
post_raw, pre_raw, psd_pycbc = preprocess(ts, gps)
# Post‑merger (blanked)
post = blank_main_burst(post_raw)
best_post, _ = scan_delays(post, psd_pycbc)
nulls_post = montecarlo_null(post, psd_pycbc) if best_post is not None else np.array([])
p_post = (np.sum(nulls_post >= best_post["snr"]) + 1) / (len(nulls_post) + 1) if best_post is not None else 1.0
ci_low, ci_high = bootstrap_ci(best_post["snr"], nulls_post) if best_post is not None else (0,0)
# Pre‑merger control (no blanking)
best_pre, _ = scan_delays(pre_raw, psd_pycbc)
nulls_pre = montecarlo_null(pre_raw, psd_pycbc) if best_pre is not None else np.array([])
p_pre = (np.sum(nulls_pre >= best_pre["snr"]) + 1) / (len(nulls_pre) + 1) if best_pre is not None else 1.0
# Negative template test
neg_snr = negative_template_test(post, psd_pycbc)
return {
"det": det,
"best_post": best_post,
"p_post": p_post,
"nulls_post": nulls_post,
"ci_95": (ci_low, ci_high),
"best_pre": best_pre,
"p_pre": p_pre,
"neg_snr": neg_snr,
"post_data": post
}
# ============================================================
# RUN BOTH DETECTORS
# ============================================================
print(f"\n--- MONAD-FIELD PIPELINE v2.3 (Direct FrequencySeries) ---")
print(f"Event: {EVENT}")
print("Pre‑merger PSD, cosine blanking, bootstrap CI, pre‑merger control, negative template.\n")
H1 = run_detector("H1")
L1 = run_detector("L1")
# Coincidence test
if H1["best_post"] is not None and L1["best_post"] is not None:
delay_diff = abs(H1["best_post"]["delay"] - L1["best_post"]["delay"])
freq_diff = abs(H1["best_post"]["freq"] - L1["best_post"]["freq"])
coincident = (delay_diff < COINC_DELAY_MS) and (freq_diff < COINC_FREQ_HZ)
else:
coincident = False
# ============================================================
# OUTPUT
# ============================================================
print("\n==============================")
print("RESULTS")
print("==============================\n")
for res in [H1, L1]:
print(f"{res['det']} (post‑merger):")
if res["best_post"]:
print(f" best freq = {res['best_post']['freq']:.1f} Hz")
print(f" best delay = {res['best_post']['delay']:.1f} ms")
print(f" best SNR = {res['best_post']['snr']:.3f}")
print(f" p-value = {res['p_post']:.5f}")
print(f" 95% CI (null max) = [{res['ci_95'][0]:.3f}, {res['ci_95'][1]:.3f}]")
else:
print(" No peak found in scan range.")
print(f" Pre‑merger control p = {res['p_pre']:.4f}")
print(f" Negative template SNR = {res['neg_snr']:.3f}\n")
print(f"Coincident (Δdelay<{COINC_DELAY_MS}ms, Δfreq<{COINC_FREQ_HZ}Hz): {coincident}\n")
# Significance criteria
pre_clean = (H1["p_pre"] > 0.05) and (L1["p_pre"] > 0.05)
neg_clean = (H1["neg_snr"] < 2.0) and (L1["neg_snr"] < 2.0)
if coincident and H1["p_post"] < 0.01 and L1["p_post"] < 0.01 and pre_clean and neg_clean:
print("✅ STRONG EVIDENCE: Coincident, significant post‑merger structure detected.")
print(" The universal echo hypothesis is supported by this event.")
elif coincident and (H1["p_post"] < 0.05 or L1["p_post"] < 0.05):
print("⚠️ MODEST EVIDENCE: Coincident but significance marginal.")
else:
print("❌ NO SIGNIFICANT EVIDENCE: Coincident structure not detected at predicted delay.")
print(" The universal echo hypothesis is not supported by this event.")
# ============================================================
# PLOT NULL DISTRIBUTIONS
# ============================================================
fig, ax = plt.subplots(1, 2, figsize=(12, 4))
for idx, res in enumerate([H1, L1]):
ax[idx].hist(res["nulls_post"], bins=40, alpha=0.7, label='Phase‑randomized nulls')
if res["best_post"]:
ax[idx].axvline(res["best_post"]["snr"], color='r', label=f'Measured SNR = {res["best_post"]["snr"]:.2f}')
ax[idx].set_xlabel("Best matched‑filter SNR")
ax[idx].set_ylabel("Count")
ax[idx].set_title(f"{res['det']} Null Distribution")
ax[idx].legend()
plt.tight_layout()
plt.show()
# ============================================================
# SAVE RESULTS
# ============================================================
np.savez(f"monad_pipeline_{EVENT}_v2.3_final.npz",
H1_best=H1["best_post"], H1_p=H1["p_post"], H1_null=H1["nulls_post"],
L1_best=L1["best_post"], L1_p=L1["p_post"], L1_null=L1["nulls_post"])
print(f"\nResults saved to monad_pipeline_{EVENT}_v2.3_final.npz")
# ============================================================
# MONAD-FIELD EXPLORATORY ECHO PIPELINE – Version 2.3 FINAL
# Fixed: template zero‑padded to match data length.
# ============================================================
!pip install gwosc gwpy pycbc -q
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import bootstrap
from gwosc import datasets
from gwpy.timeseries import TimeSeries
from scipy.signal import butter, filtfilt, welch
from scipy.signal.windows import tukey
from scipy.fft import rfft, irfft, rfftfreq
from pycbc.filter import matched_filter
from pycbc.types import TimeSeries as PyCBCSeries, FrequencySeries
from pycbc.psd import interpolate as psd_interpolate
import warnings
warnings.filterwarnings("ignore")
# ============================================================
# CONFIGURATION
# ============================================================
EVENT = "GW200220_061928"
FS = 4096
TIME_WINDOW = 16
POST_WINDOW = 0.5
BETA = 0.35
FREQ_CENTER = 996.8
FREQ_SCAN = np.arange(850, 1150, 25)
TAU = 0.02
DELAY_MIN_MS = 5
DELAY_MAX_MS = 200
BLANK_MS = 25
COINC_DELAY_MS = 5.0
COINC_FREQ_HZ = 25.0
N_MONTECARLO = 1000
N_BOOTSTRAP = 1000
np.random.seed(1234)
# ============================================================
# FETCH DATA
# ============================================================
def load_detector(det):
gps = datasets.event_gps(EVENT)
ts = TimeSeries.fetch_open_data(det, gps - TIME_WINDOW, gps + TIME_WINDOW,
sample_rate=FS, verbose=False)
return ts, gps
# ============================================================
# PREPROCESS – returns whitened post, whitened pre, and PyCBC PSD
# ============================================================
def preprocess(ts, gps):
data_full = ts.value.astype(np.float64)
# Bandpass
nyq = FS / 2
b, a = butter(4, [20/nyq, 1800/nyq], btype='band')
data_full = filtfilt(b, a, data_full)
# Pre‑merger noise segment (0.5 s before merger)
idx_pre_start = int((gps - 0.5 - ts.t0.value) * FS)
idx_pre_end = int((gps - ts.t0.value) * FS)
pre_noise = data_full[idx_pre_start:idx_pre_end]
# Estimate PSD using Welch – use nperseg = length of noise segment
f_psd, psd_raw = welch(pre_noise, fs=FS, nperseg=len(pre_noise), average='median')
# Build PyCBC FrequencySeries directly
df = f_psd[1] - f_psd[0]
psd_pycbc = FrequencySeries(psd_raw, delta_f=df)
# FFT whitening of full data using the PSD
spec = rfft(data_full)
freqs = rfftfreq(len(data_full), 1/FS)
psd_interp_for_whiten = np.interp(freqs, f_psd, psd_raw)
psd_interp_for_whiten = np.maximum(psd_interp_for_whiten, 1e-18)
white_spec = spec / np.sqrt(psd_interp_for_whiten)
white_full = irfft(white_spec, n=len(data_full))
white_full /= np.std(white_full)
# Build TimeSeries for cropping
ts_white = TimeSeries(white_full, t0=ts.t0.value, dt=1/FS)
# Post‑merger (0 to POST_WINDOW)
post = ts_white.crop(gps, gps + POST_WINDOW).value
# Pre‑merger control segment (-0.5 to 0)
pre_ctrl = ts_white.crop(gps - 0.5, gps).value
return post, pre_ctrl, psd_pycbc
# ============================================================
# COSINE BLANKING (smooth taper)
# ============================================================
def blank_main_burst(data):
blank = int(BLANK_MS * 1e-3 * FS)
d = data.copy()
t = np.linspace(0, 1, blank)
taper = 0.5 * (1 - np.cos(np.pi * t))
d[:blank] *= taper
return d
# ============================================================
# TEMPLATE BUILDER – returns short template (0.1s)
# ============================================================
def build_short_template(freq):
t = np.arange(0, 0.1, 1/FS)
env = np.exp(-(t / TAU)**BETA)
sig = env * np.cos(2*np.pi*freq*t)
sig -= np.mean(sig)
sig /= np.sqrt(np.sum(sig**2))
return sig
# ============================================================
# MATCHED FILTER SCAN (with zero‑padding to match data length)
# ============================================================
def scan_delays(data, psd_pycbc):
best = None
all_results = []
pydata = PyCBCSeries(data, delta_t=1/FS)
target_len = len(pydata) # e.g., 2048 samples for 0.5s
# Interpolate PSD to match data's frequency resolution
psd_interp = psd_interpolate(psd_pycbc, pydata.delta_f, target_len)
for freq in FREQ_SCAN:
short_tmpl = build_short_template(freq)
# Zero‑pad to target length
if len(short_tmpl) < target_len:
pad_width = target_len - len(short_tmpl)
template = np.pad(short_tmpl, (0, pad_width), mode='constant')
else:
template = short_tmpl[:target_len]
pytmpl = PyCBCSeries(template, delta_t=1/FS)
snr = matched_filter(pytmpl, pydata, low_frequency_cutoff=20, psd=psd_interp)
snr = snr.numpy()
# Lag axis: the peak corresponds to where the **start** of the template aligns
lags_ms = (np.arange(len(snr)) - (len(pytmpl) - 1)) / FS * 1000
idx = (lags_ms >= DELAY_MIN_MS) & (lags_ms <= DELAY_MAX_MS)
if not np.any(idx):
continue
scan = np.abs(snr[idx])
peak_idx = np.argmax(scan)
peak_snr = scan[peak_idx]
peak_delay = lags_ms[idx][peak_idx]
result = {"freq": freq, "delay": peak_delay, "snr": peak_snr}
all_results.append(result)
if best is None or peak_snr > best["snr"]:
best = result
return best, all_results
# ============================================================
# PHASE‑RANDOMIZED NULL (preserves PSD)
# ============================================================
def phase_randomize(data):
spec = rfft(data)
amp = np.abs(spec)
phase = np.random.uniform(0, 2*np.pi, len(spec))
new_spec = amp * np.exp(1j * phase)
rand = irfft(new_spec, n=len(data))
rand /= np.std(rand)
return rand
# ============================================================
# MONTE CARLO NULL
# ============================================================
def montecarlo_null(data, psd_pycbc):
null_max = []
for _ in range(N_MONTECARLO):
randomized = phase_randomize(data)
best, _ = scan_delays(randomized, psd_pycbc)
null_max.append(best["snr"] if best is not None else 0.0)
return np.array(null_max)
# ============================================================
# BOOTSTRAP CI FOR NULL MAXIMA
# ============================================================
def bootstrap_ci(measured, null_dist, n_resamples=N_BOOTSTRAP):
resampled = np.random.choice(null_dist, size=(n_resamples, len(null_dist)), replace=True)
bootstrap_max = np.max(resampled, axis=1)
lower = np.percentile(bootstrap_max, 2.5)
upper = np.percentile(bootstrap_max, 97.5)
return lower, upper
# ============================================================
# BROADBAND NEGATIVE TEMPLATE (white noise, same length)
# ============================================================
def negative_template_test(data, psd_pycbc):
pydata = PyCBCSeries(data, delta_t=1/FS)
psd_interp = psd_interpolate(psd_pycbc, pydata.delta_f, len(pydata))
noise = np.random.normal(0, 1, len(data))
noise -= np.mean(noise)
noise /= np.std(noise)
pytmpl = PyCBCSeries(noise, delta_t=1/FS)
snr = matched_filter(pytmpl, pydata, low_frequency_cutoff=20, psd=psd_interp)
return np.max(np.abs(snr.numpy()))
# ============================================================
# PROCESS A SINGLE DETECTOR
# ============================================================
def run_detector(det):
ts, gps = load_detector(det)
post_raw, pre_raw, psd_pycbc = preprocess(ts, gps)
# Post‑merger (blanked)
post = blank_main_burst(post_raw)
best_post, _ = scan_delays(post, psd_pycbc)
nulls_post = montecarlo_null(post, psd_pycbc) if best_post is not None else np.array([])
p_post = (np.sum(nulls_post >= best_post["snr"]) + 1) / (len(nulls_post) + 1) if best_post is not None else 1.0
ci_low, ci_high = bootstrap_ci(best_post["snr"], nulls_post) if best_post is not None else (0,0)
# Pre‑merger control (no blanking)
best_pre, _ = scan_delays(pre_raw, psd_pycbc)
nulls_pre = montecarlo_null(pre_raw, psd_pycbc) if best_pre is not None else np.array([])
p_pre = (np.sum(nulls_pre >= best_pre["snr"]) + 1) / (len(nulls_pre) + 1) if best_pre is not None else 1.0
# Negative template test
neg_snr = negative_template_test(post, psd_pycbc)
return {
"det": det,
"best_post": best_post,
"p_post": p_post,
"nulls_post": nulls_post,
"ci_95": (ci_low, ci_high),
"best_pre": best_pre,
"p_pre": p_pre,
"neg_snr": neg_snr,
"post_data": post
}
# ============================================================
# RUN BOTH DETECTORS
# ============================================================
print(f"\n--- MONAD-FIELD PIPELINE v2.3 (Zero‑padded template) ---")
print(f"Event: {EVENT}")
print("Pre‑merger PSD, cosine blanking, bootstrap CI, pre‑merger control, negative template.\n")
H1 = run_detector("H1")
L1 = run_detector("L1")
# Coincidence test
if H1["best_post"] is not None and L1["best_post"] is not None:
delay_diff = abs(H1["best_post"]["delay"] - L1["best_post"]["delay"])
freq_diff = abs(H1["best_post"]["freq"] - L1["best_post"]["freq"])
coincident = (delay_diff < COINC_DELAY_MS) and (freq_diff < COINC_FREQ_HZ)
else:
coincident = False
# ============================================================
# OUTPUT
# ============================================================
print("\n==============================")
print("RESULTS")
print("==============================\n")
for res in [H1, L1]:
print(f"{res['det']} (post‑merger):")
if res["best_post"]:
print(f" best freq = {res['best_post']['freq']:.1f} Hz")
print(f" best delay = {res['best_post']['delay']:.1f} ms")
print(f" best SNR = {res['best_post']['snr']:.3f}")
print(f" p-value = {res['p_post']:.5f}")
print(f" 95% CI (null max) = [{res['ci_95'][0]:.3f}, {res['ci_95'][1]:.3f}]")
else:
print(" No peak found in scan range.")
print(f" Pre‑merger control p = {res['p_pre']:.4f}")
print(f" Negative template SNR = {res['neg_snr']:.3f}\n")
print(f"Coincident (Δdelay<{COINC_DELAY_MS}ms, Δfreq<{COINC_FREQ_HZ}Hz): {coincident}\n")
# Significance criteria
pre_clean = (H1["p_pre"] > 0.05) and (L1["p_pre"] > 0.05)
neg_clean = (H1["neg_snr"] < 2.0) and (L1["neg_snr"] < 2.0)
if coincident and H1["p_post"] < 0.01 and L1["p_post"] < 0.01 and pre_clean and neg_clean:
print("✅ STRONG EVIDENCE: Coincident, significant post‑merger structure detected.")
print(" The universal echo hypothesis is supported by this event.")
elif coincident and (H1["p_post"] < 0.05 or L1["p_post"] < 0.05):
print("⚠️ MODEST EVIDENCE: Coincident but significance marginal.")
else:
print("❌ NO SIGNIFICANT EVIDENCE: Coincident structure not detected at predicted delay.")
print(" The universal echo hypothesis is not supported by this event.")
# ============================================================
# PLOT NULL DISTRIBUTIONS
# ============================================================
fig, ax = plt.subplots(1, 2, figsize=(12, 4))
for idx, res in enumerate([H1, L1]):
ax[idx].hist(res["nulls_post"], bins=40, alpha=0.7, label='Phase‑randomized nulls')
if res["best_post"]:
ax[idx].axvline(res["best_post"]["snr"], color='r', label=f'Measured SNR = {res["best_post"]["snr"]:.2f}')
ax[idx].set_xlabel("Best matched‑filter SNR")
ax[idx].set_ylabel("Count")
ax[idx].set_title(f"{res['det']} Null Distribution")
ax[idx].legend()
plt.tight_layout()
plt.show()
# ============================================================
# SAVE RESULTS
# ============================================================
np.savez(f"monad_pipeline_{EVENT}_v2.3_final.npz",
H1_best=H1["best_post"], H1_p=H1["p_post"], H1_null=H1["nulls_post"],
L1_best=L1["best_post"], L1_p=L1["p_post"], L1_null=L1["nulls_post"])
print(f"\nResults saved to monad_pipeline_{EVENT}_v2.3_final.npz")