Monad Coupled Field Dynamics | Emergent Geometry, Emergent Time, Constitutive Dynamics

----------------------------------------------------------------------------------
Monad-Field Framework | Emergent Geometry, Emergent Time, Constitutive Dynamics

MONAD‑FIELD FRAMEWORK

Emergent Geometry, Emergent Time, and Constitutive Dynamics

ABSTRACT

The Monad‑Field framework begins from a single ontological premise: the substrate is fundamental. Space, geometry, curvature, and time are not primitive ingredients of reality. They emerge from the internal organization, tension structure, and transmission dynamics of an underlying substrate field S(x,t). In this framework, geometry, curvature, and time are secondary response structures generated by substrate behavior. Physical infinities fail to arise because substrate dynamics remain finite and self‑consistent. The central thesis is: the substrate does not evolve inside geometry; the substrate generates the geometry through which it evolves.

ONTOLOGICAL FOUNDATION

1.1 The Substrate is Fundamental

Reality is fundamentally composed of a continuous dynamic substrate field S(x,t). Matter, energy, geometry, and observable physical phenomena emerge from organized excitations and gradients within this substrate.

1.2 Geometry is Emergent

Geometry is not a pre‑existing manifold. Observable geometry emerges from the constitutive organization of the substrate field. Curvature becomes a secondary response structure generated by tension gradients.

1.3 Time is Emergent

Time is not a fundamental dimension. Time is the measurable ordering of substrate reactions and state transitions. Without change, time has no operational meaning.

CONSTITUTIVE GEOMETRY

2.1 From Metric Geometry to Substrate Geometry

General Relativity begins with spacetime geometry as fundamental. Monad‑Field reverses this: substrate organization generates emergent geometry. Excitations propagate through the geometry produced by substrate tension.

2.2 Effective Metric Ansatz

A minimal effective metric may be constructed directly from the substrate field:

gμν(S) = A(S) ημν + B(S) (∂μS)(∂νS)

Minimal conformal form:

gμν(S) = (1 + αS) ημν

Clarification: Here ημν is a coordinate fiducial – a flat bookkeeping structure, not a physical background metric. All observable geometry is carried by gμν(S); ημν merely labels events. Geometry is generated by substrate state rather than imposed externally.

TENSION‑GENERATED CONNECTION

The covariant derivative is no longer defined by an abstract background geometry. Instead, the connection emerges from substrate gradients.

Minimal substrate‑generated affine connection:

Γλμν = [α / 2(1 + αS)] × ( δλνμS + δλμνS − ημν ηλρρS )

Properties:
• If ∂S = 0 → Γ = 0 → effective flat geometry
• If tension gradients exist → effective curvature emerges

The derivative operator becomes a physical transmission operator.

PHYSICAL MEANING OF ∇(S)

The substrate‑dependent covariant derivative ∇(S) represents the physical transmission of change through the substrate. Transport, propagation, inertia, and geodesic motion become manifestations of substrate transmission dynamics.

Thus:
• derivatives become physical,
• geometry becomes constitutive,
• curvature becomes emergent response.

COUPLED FIELD DYNAMICS

The framework contains two coupled sectors:
• Substrate field S
• Excitation field Ψ

5.1 Excitation Dynamics

The excitation field evolves through the geometry generated by the substrate:

μμ Ψ − (m² + κS) Ψ = 0

Excitations do not move through pre‑existing spacetime. They propagate through substrate‑defined transmission geometry.

5.2 Substrate Dynamics

The substrate evolves according to nonlinear constitutive dynamics:

μμ S − V′(S) = κ |Ψ|² + A′(S) ημν (∂μΨ*)(∂νΨ)

This produces recursive geometry generation:

S → Γ → ∇ → evolution of S and Ψ → modified S

Geometry is dynamically generated by substrate response.

EMERGENT CURVATURE

Curvature is not fundamental. The curvature tensor emerges from the substrate‑generated connection:

Rρσμν = ∂μΓρνσ − ∂νΓρμσ + Γρμλ Γλνσ − Γρνλ Γλμσ

Curvature becomes a diagnostic expression of substrate inhomogeneity.

ENERGY‑MOMENTUM CONSERVATION

In the Monad‑Field framework, the excitation field Ψ and any matter contributions must satisfy covariant conservation under the S‑generated geometry. Because the connection Γλμν is derived from the substrate, the usual conservation law generalizes to:

μ Tμν = 0

where Tμν is the total energy‑momentum tensor of the excitation sector (including Ψ and any additional matter). The covariant derivative ∇ is taken with respect to the emergent metric gμν(S).

This condition is not imposed as an external constraint; it follows from the diffeomorphism invariance of the action when varied with respect to the effective metric. Physically, conservation of energy and momentum emerges from the substrate’s own constitutive transmission properties. No additional “background” conservation law is required – the substrate enforces it dynamically.

Consequently, all test particles and wave excitations follow geodesics of gμν(S), and the standard continuity equations for stress‑energy hold automatically, anchored entirely in the self‑consistent geometry generated by S.

FINITE CONSTITUTIVE DYNAMICS

The Monad‑Field framework is governed by finite constitutive substrate dynamics. Because geometry is emergent from substrate organization rather than fundamental, physically divergent geometric states do not naturally arise within the theory.

The substrate field evolves through bounded nonlinear response, finite transmission behavior, and intrinsic saturation mechanisms. As local tension gradients increase, substrate reaction dynamics asymptotically regulate further amplification. This prevents the formation of unbounded curvature or infinite‑density configurations.

Consequently:
• substrate response remains finite,
• constitutive transmission remains finite,
• reaction dynamics remain finite,
• collapse processes approach limiting behavior rather than divergence.

In this framework, singularities are not removed by assumption. They simply fail to emerge from the mathematics of the substrate itself. Curvature divergences are interpreted as artifacts of incomplete geometric approximations rather than physically realizable states.

EMERGENT GRAVITY

In the weak‑field limit, the substrate equation must reduce to:

∇² S ≈ 4πGρ

Gravity appears as macroscopic substrate tension organization. Gravitational lensing, redshift, and geodesic motion become constitutive propagation effects generated by substrate structure.

GRAVITATIONAL WAVES AND SUBSTRATE ECHOES

Wave‑like disturbances in the substrate correspond to propagating reorganizations of effective geometry. The framework naturally permits:

  • nonlinear substrate waves
  • constitutive recoil
  • saturation echoes
  • substrate snap phenomena
  • memory effects

These arise from finite substrate reaction dynamics rather than purely metric oscillation.

RELATIONSHIP TO EXISTING FRAMEWORKS

Monad‑Field shares conceptual overlap with:
• Weyl geometry
• Teleparallel gravity
• Emergent gravity programs
• Analogue gravity systems
• Constitutive media physics

However, the ontology differs fundamentally: geometry is not assumed; it is generated.

CENTRAL ONTOLOGICAL SHIFT

General Relativity:
Matter tells spacetime how to curve.

Monad‑Field:
Energy organizes substrate tension, and substrate tension generates effective geometry.

Thus:
• geometry is response
• curvature is response
• time is response
• derivatives are physical transmission operators

The substrate alone is fundamental.

CORE THESIS

The Monad‑Field framework compresses into a single statement:

The substrate does not evolve inside geometry; the substrate generates the geometry through which it evolves.

From this follows:
• emergent geometry
• emergent curvature
• emergent time
• constitutive dynamics
• finite, self‑consistent physical evolution without divergent states


EXECUTIVE SUMMARY

PAGE 1 — CONCEPTUAL SUMMARY

The Monad‑Field framework proposes that the fundamental entity of reality is a continuous substrate field S(x,t). Geometry, curvature, and time are not primitive; they emerge from the internal organization and tension structure of this substrate.

In this view, the universe is not a stage with pre‑existing geometry. Instead, geometry is a secondary response generated by substrate behavior. Curvature arises from tension gradients. Time arises from sequential substrate reactions. Matter and energy correspond to organized excitations of the substrate.

This reverses the logic of General Relativity. Instead of matter evolving inside geometry, geometry is generated by the substrate through which matter evolves. The substrate defines the effective metric, the connection, and the propagation of excitations.

Because substrate dynamics are finite and saturable, physical infinities do not arise. Singularities are not removed by assumption; they simply fail to emerge from the mathematics. Collapse processes approach finite limiting behavior rather than divergence.

Gravity emerges as macroscopic substrate tension organization. Gravitational waves become substrate reorganization waves. Echoes, nonlinear recoil, and memory effects arise naturally from finite substrate response.

The Monad‑Field framework aligns with emergent gravity, analogue gravity, and constitutive media physics, but differs ontologically: geometry is not assumed; it is generated.

PAGE 2 — TECHNICAL SUMMARY

Effective metric (minimal conformal form):
gμν(S) = (1 + αS) ημν  (ημν = coordinate fiducial)

Substrate‑generated connection:
Γλμν = [α / 2(1 + αS)] × ( δλνμS + δλμνS − ημν ηλρρS )

Substrate‑dependent covariant derivative:
(S) acts using Γ(S)

Excitation dynamics:
μμ Ψ − (m² + κS) Ψ = 0

Substrate dynamics:
μμ S − V′(S) = κ |Ψ|² + A′(S) ημν (∂μΨ*)(∂νΨ)

Emergent curvature:
Rρσμν = ∂μΓρνσ − ∂νΓρμσ + Γρμλ Γλνσ − Γρνλ Γλμσ

Weak‑field limit:
∇² S ≈ 4πGρ

Energy‑momentum conservation: μ Tμν = 0 holds identically under S‑generated geometry.

Key consequences:
• Geometry is emergent from S
• Curvature is a diagnostic of substrate inhomogeneity
• Time is sequential substrate reaction
• Singularities do not arise
• Gravity is substrate tension organization
• Gravitational waves are substrate reorganization waves
• Echoes and nonlinear effects arise from finite substrate response

Monad‑Field Framework — version 1.0 (Final)
Constitutive geometry · No background metric · Singularity‑free by construction

Effective metric (coordinate fiducial ημν)

gμν(S) = (1 + αS) ημν
μν is a flat bookkeeping label, not a physical background)


Substrate‑generated connection

Γλμν = α / [2(1 + αS)] · ( δλνμ S + δλμν S − ημν ηλρρ S )


Covariant derivative (physical transmission operator)

μ Vν = ∂μ Vν + Γνμλ Vλ


Lagrangian density (covariant, S‑generated geometry)

ℒ = √(−g(S)) · [
 1/(2κeff) · R(g(S))
 − ½ gμνμ S ∂ν S − V(S)
 − gμνμ Ψ* ∂ν Ψ − m² |Ψ|² − κ S |Ψ|²
]


Excitation field equation (Ψ evolves in S‑geometry)

μμ Ψ − (m² + κ S) Ψ = 0


Substrate field equation (S generates its own geometry)

μμ S − V′(S) = κ |Ψ|² + A′(S) ημν (∂μ Ψ*)(∂ν Ψ)

with A(S) = 1 + αS, and V(S) = ½ β S² + ¼ γ S⁴ + … (saturating potential)


Energy‑momentum conservation (emergent, not imposed)

μ Tμν = 0  (Tμν from Ψ and matter sectors)


Finite constitutive dynamics – no singularities

  • Saturation built into V(S) and the connection
  • As S → Smax, Γ and curvature asymptote, not diverge
  • μμ S remains bounded → no physical infinities

Weak‑field limit (Newtonian gravity)

∇² S ≈ 4πG ρ   (ρ = |Ψ|² + source terms)


Observable signatures

  • Gravitational waves → substrate reorganisations
  • Echoes, beats, power‑law tails → from finite‑response saturation
  • Geodesics = least‑reaction paths in ∇(S)
  • Time dilation = altered substrate reaction rates

These are the equations of the Monad‑Field Framework as of the final white paper.
Geometry is no longer a stage – it is a response.
The substrate alone is fundamental.

# Monad-Field Multi-Event Stacking + ESPRIT Frequency Analysis # Consensus version – no GR comparisons, standalone framework. !pip install gwosc gwpy import numpy as np import matplotlib.pyplot as plt from gwosc import datasets import gwpy.timeseries from scipy.optimize import curve_fit from scipy.signal import butter, filtfilt from scipy.fft import rfft, irfft from scipy.interpolate import interp1d import scipy.linalg import urllib.request from gwosc.locate import get_urls # ------------------------------------------------------------ # 1. Define events (add or remove as needed) # ------------------------------------------------------------ events = [ 'GW190521', 'GW190412', 'GW150914', 'GW190814' # mystery object – excellent for saturation test ] # Event-specific initial guesses (frequency, amplitude, tau, beta) # Format: [A, tau, beta, f, phi, A_echo, tau_echo, t_echo] event_params = { 'GW190521': [1.5, 0.05, 0.35, 60.0, 0.0, 0.1, 0.02, 0.03], 'GW190412': [1.2, 0.03, 0.40, 150.0, 0.0, 0.08, 0.015, 0.025], 'GW150914': [1.5, 0.01, 0.30, 130.0, 0.0, 0.05, 0.008, 0.025], 'GW190814': [2.0, 0.08, 0.30, 80.0, 0.0, 0.15, 0.025, 0.04] } bounds = ([0, 0.001, 0.1, 30.0, -np.pi, 0, 0.001, 0.005], [5, 0.2, 0.9, 200.0, np.pi, 1.0, 0.1, 0.08]) # ------------------------------------------------------------ # 2. Helper: fetch data robustly # ------------------------------------------------------------ def fetch_strain(event, detector='H1', time_window=16): gps = datasets.event_gps(event) start = gps - time_window end = gps + time_window try: strain = gwpy.timeseries.TimeSeries.fetch_open_data( detector, start, end, sample_rate=4096, verbose=False ) print(f" ✅ {event}: fetch_open_data succeeded") except Exception as e: print(f" ⚠️ {event}: fetch_open_data failed ({e}), trying direct frames...") urls = get_urls(detector, start, end, sample_rate=4096) if not urls: raise RuntimeError(f"No frames for {event}") local_files = [] for i, url in enumerate(urls): fname = f"{event}_{i}.gwf" urllib.request.urlretrieve(url, fname) local_files.append(fname) strain_list = [] for fname in local_files: ts = gwpy.timeseries.TimeSeries.read(fname, format='gwf.lalframe', channel=f'{detector}:GWOSC-4KHZ_R1_16KHZ') strain_list.append(ts) strain = strain_list[0].append(strain_list[1:]) strain = strain.crop(start, end) print(f" ✅ {event}: loaded from direct frames") return strain # ------------------------------------------------------------ # 3. Process a single event: bandpass, whiten, fit, return residuals # ------------------------------------------------------------ def process_event(event, p0, bounds, common_time=None): print(f"\nProcessing {event}...") strain = fetch_strain(event) fs = strain.sample_rate.value # Bandpass + whiten (same as before) nyq = 0.5 * fs b, a = butter(4, [20/nyq, 2000/nyq], btype='band') filtered = filtfilt(b, a, strain.value) spec = rfft(filtered) whitened = irfft(spec / (np.abs(spec) + 1e-10), n=len(filtered)) whitened /= np.std(whitened) # Rebuild TimeSeries whitened_ts = gwpy.timeseries.TimeSeries(whitened, t0=strain.t0.value, dt=1/fs) gps = strain.t0.value + len(strain)/fs / 2 # approximate; better: use event_gps # Actually get precise gps for this event gps = datasets.event_gps(event) post = whitened_ts.crop(gps, gps + 0.5) time = post.times.value - gps data = post.value # Fit Monad-field model t_fit = time[time >= 0] data_fit = data[time >= 0] try: popt, _ = curve_fit(monad_waveform, t_fit, data_fit, p0=p0, bounds=bounds, maxfev=20000) model = monad_waveform(t_fit, *popt) residuals = data_fit - model print(f" Fit converged for {event}. β={popt[2]:.3f}, f={popt[3]:.1f} Hz.") except Exception as e: print(f" Fit failed for {event}: {e}") residuals = data_fit popt = p0 # Interpolate residuals to common time grid if provided if common_time is not None: f_int = interp1d(t_fit, residuals, bounds_error=False, fill_value=0) res_interp = f_int(common_time) return res_interp, popt else: return residuals, popt # ------------------------------------------------------------ # 4. Monad-Field waveform (safe version)) # ------------------------------------------------------------ def monad_waveform(t, A, tau, beta, f, phi, A_echo, tau_echo, t_echo): main_env = A * np.exp(-np.power(np.maximum(t, 0)/tau, beta)) * (t >= 0) main = main_env * np.cos(2*np.pi*f*t + phi) echo_env = A_echo * np.exp(-np.power(np.maximum(t - t_echo, 0)/tau_echo, beta)) * (t >= t_echo) echo = echo_env * np.cos(2*np.pi*f*(t - t_echo) + phi - np.pi/4) return main + echo # ------------------------------------------------------------ # 5. ESPRIT frequency estimator (high resolution) # ------------------------------------------------------------ def execute_esprit_frcfd(signal, n_signals, fs): """ High-resolution ESPRIT algorithm for Monad-Field master stack. n_signals: number of complex exponentials (2 per real frequency) """ L = len(signal) // 3 # Hankel matrix rows H = scipy.linalg.hankel(signal[:L], signal[L-1:]) U, S, Vh = scipy.linalg.svd(H, full_matrices=False) Us = U[:, :n_signals] # Rotation matrix phi = np.linalg.pinv(Us[:-1, :]) @ Us[1:, :] eigvals = np.linalg.eigvals(phi) freqs = np.angle(eigvals) * fs / (2 * np.pi) # Return positive frequencies only, sorted pos_freqs = np.abs(freqs[freqs > 0]) return np.sort(pos_freqs) # ------------------------------------------------------------ # 6. Main stacking routine # ------------------------------------------------------------ print("\n--- Starting Multi-Event Stacking Analysis ---") # Common time grid for residuals (2000 points over 0.5s) common_time = np.linspace(0, 0.5, 2000) residuals_stack = [] all_fits = [] for ev in events: p0 = event_params.get(ev, [1.5, 0.05, 0.35, 60.0, 0.0, 0.1, 0.02, 0.03]) res, popt = process_event(ev, p0, bounds, common_time=common_time) residuals_stack.append(res) all_fits.append((ev, popt)) # Average residuals across events master_stack = np.mean(residuals_stack, axis=0) # Plot master stack plt.figure(figsize=(12,5)) plt.plot(common_time*1000, master_stack, 'b-', lw=1.5) plt.xlabel('Time after merger (ms)') plt.ylabel('Stacked residual amplitude') plt.title('Monad‑Field Master Stack (averaged residuals over multiple events)') plt.grid(alpha=0.3) plt.show() # ------------------------------------------------------------ # 7. ESPRIT analysis on master stack # ------------------------------------------------------------ fs_process = 4096 # original sample rate, but master stack is on common_time grid # The time steps are not uniform? Actually common_time is uniform. dt = common_time[1] - common_time[0] effective_fs = 1 / dt # ~4000 Hz, close to original print(f"\nEffective sampling rate for master stack: {effective_fs:.1f} Hz") # Use n_signals=4 (expect 2 dominant frequencies: f0 and harmonic) detected_modes = execute_esprit_frcfd(master_stack, n_signals=4, fs=effective_fs) positive_modes = detected_modes[detected_modes > 5] # ignore <5 Hz unique_modes = np.unique(np.round(positive_modes, 1)) print("\n--- ESPRIT Substrate Modal Analysis ---") if len(unique_modes) >= 1: print(f"Primary Substrate Mode (f0): {unique_modes[0]:.1f} Hz") if len(unique_modes) >= 2: print(f"Harmonic / Snap Mode (f1): {unique_modes[1]:.1f} Hz") ratio = unique_modes[1] / unique_modes[0] if unique_modes[0] > 0 else 0 print(f"Coupling Ratio: {ratio:.3f}") else: print("Only one dominant mode detected.") print("\nAll fitted β values:") for ev, p in all_fits: print(f" {ev}: β = {p[2]:.3f}, f = {p[3]:.1f} Hz") print("\nMaster stack saved as 'master_stack.npy'") np.save('master_stack.npy', master_stack) # Monad‑Field Blinded Predictive Test – FINAL CONSENSUS VERSION # All models agree: frozen calibration, PSD whitening, matched filtering, # blank main burst (0‑25ms), cross‑detector, null distribution, p‑value. !pip install gwosc gwpy import numpy as np import matplotlib.pyplot as plt from gwosc import datasets import gwpy.timeseries from scipy.signal import butter, filtfilt, welch from scipy.fft import irfft, rfft import urllib.request from gwosc.locate import get_urls import warnings warnings.filterwarnings("ignore") # ------------------------------------------------------------ # 1. Calibrated Universal Constants (FROZEN) # ------------------------------------------------------------ BETA_CAL = 0.35 # power‑law relaxation exponent F_SNAP_CAL = 996.8 # high‑frequency snap (Hz) LAMBDA_MS_PRED = 40.8 # predicted echo delay (ms) ENVELOPE_TAU = 0.02 # seconds PREDICTED_DELAY_SEC = LAMBDA_MS_PRED / 1000.0 BLANK_DURATION_MS = 25.0 # blank the main burst in the first X ms # ------------------------------------------------------------ # 2. Choose new event (not used in original stacking) # ------------------------------------------------------------ event = 'GW200220_061928' # change to any unseen event detector_H1 = 'H1' detector_L1 = 'L1' time_window = 16 # seconds around merger print(f"\n=== BLINDED PREDICTIVE TEST (Final Consensus) ===") print(f"Event: {event}") print(f"Predicted echo delay: {LAMBDA_MS_PRED:.1f} ms") print(f"Template: β={BETA_CAL}, f_snap={F_SNAP_CAL:.1f} Hz") print(f"Blank first {BLANK_DURATION_MS} ms to avoid main burst\n") # ------------------------------------------------------------ # 3. Helper: fetch data robustly # ------------------------------------------------------------ def fetch_strain(event, detector, time_window=16): gps = datasets.event_gps(event) start = gps - time_window end = gps + time_window try: strain = gwpy.timeseries.TimeSeries.fetch_open_data( detector, start, end, sample_rate=4096, verbose=False ) print(f"✅ {detector}: fetch_open_data success") except Exception as e: print(f"⚠️ {detector}: fetch_open_data failed, trying direct frames...") urls = get_urls(detector, start, end, sample_rate=4096) if not urls: raise RuntimeError(f"No frames for {detector}") local_files = [] for i, url in enumerate(urls): fname = f"{event}_{detector}_{i}.gwf" urllib.request.urlretrieve(url, fname) local_files.append(fname) strain_list = [] for fname in local_files: ts = gwpy.timeseries.TimeSeries.read(fname, format='gwf.lalframe', channel=f'{detector}:GWOSC-4KHZ_R1_16KHZ') strain_list.append(ts) strain = strain_list[0].append(strain_list[1:]) strain = strain.crop(start, end) print(f"✅ {detector}: loaded from direct frames") return strain # ------------------------------------------------------------ # 4. Preprocessing with proper PSD whitening # ------------------------------------------------------------ def preprocess_strain(strain, fs=4096): # Bandpass 20‑2000 Hz nyq = fs / 2 b, a = butter(4, [20/nyq, 2000/nyq], btype='band') filtered = filtfilt(b, a, strain.value) # Estimate PSD using Welch f_psd, psd = welch(filtered, fs=fs, nperseg=1024) # Whitening in frequency domain spec = rfft(filtered) freqs = np.fft.rfftfreq(len(filtered), d=1/fs) # Interpolate PSD to match spec frequencies psd_interp = np.interp(freqs, f_psd, psd) psd_interp = np.maximum(psd_interp, 1e-12) spec_whitened = spec / np.sqrt(psd_interp) whitened = irfft(spec_whitened, n=len(filtered)) whitened = whitened / np.std(whitened) return whitened # ------------------------------------------------------------ # 5. Build universal echo template (matched filter) # ------------------------------------------------------------ def build_template(fs, duration=0.1): t = np.linspace(0, duration, int(fs * duration)) envelope = np.exp(-np.power(t / ENVELOPE_TAU, BETA_CAL)) signal = envelope * np.cos(2 * np.pi * F_SNAP_CAL * t) # Normalize to unit norm (matched filter) signal = signal / np.sqrt(np.sum(signal**2)) return signal, t # ------------------------------------------------------------ # 6. Matched filter (normalized cross‑correlation) # ------------------------------------------------------------ def matched_filter(ts, signal, fs): conv = np.correlate(ts, signal, mode='full') lags = np.arange(-len(signal)+1, len(ts)) / fs * 1000.0 # milliseconds return conv, lags # ------------------------------------------------------------ # 7. Process a single detector (with blanking of main burst) # ------------------------------------------------------------ def process_detector(event, detector, time_window=16): strain = fetch_strain(event, detector, time_window) gps = datasets.event_gps(event) whitened = preprocess_strain(strain, fs=4096) ts = gwpy.timeseries.TimeSeries(whitened, t0=strain.t0.value, dt=1/4096) post = ts.crop(gps, gps + 0.5) # 0.5 seconds after merger time = post.times.value - gps data = post.value return time, data, gps # ------------------------------------------------------------ # 8. Delay scan + null distribution # ------------------------------------------------------------ def analyze_detector(time, data, event_name, detector_label): fs = 4096 template, t_template = build_template(fs) # ----- BLANK THE MAIN BURST (first X ms) ----- blank_samples = int(BLANK_DURATION_MS / 1000.0 * fs) data_blanked = data.copy() data_blanked[:blank_samples] = 0.0 # Matched filter on blanked data corr, lags_ms = matched_filter(data_blanked, template, fs) # Restrict to positive delays 5‑200 ms (skip the blanked region) idx = (lags_ms >= 5) & (lags_ms <= 200) lags_pos = lags_ms[idx] corr_pos = corr[idx] max_abs_idx = np.argmax(np.abs(corr_pos)) measured_delay_ms = lags_pos[max_abs_idx] measured_rho = corr_pos[max_abs_idx] # Null distribution: 500 time‑shifts of the blanked data n_shifts = 500 null_rhos = [] np.random.seed(42) for _ in range(n_shifts): shift = np.random.randint(1000, len(data_blanked)//2) shifted = np.roll(data_blanked, shift) corr_shift, _ = matched_filter(shifted, template, fs) corr_shift_pos = corr_shift[idx] null_rhos.append(np.max(np.abs(corr_shift_pos))) null_rhos = np.array(null_rhos) p_value = (np.sum(null_rhos >= np.abs(measured_rho)) + 1) / (n_shifts + 1) return measured_delay_ms, measured_rho, p_value, lags_pos, corr_pos, null_rhos # ------------------------------------------------------------ # 9. Run on H1 and L1 # ------------------------------------------------------------ time_H1, data_H1, gps_H1 = process_detector(event, detector_H1, time_window) delay_H1, rho_H1, p_H1, lags_H1, corr_H1, nulls_H1 = analyze_detector( time_H1, data_H1, event, detector_H1) time_L1, data_L1, gps_L1 = process_detector(event, detector_L1, time_window) delay_L1, rho_L1, p_L1, lags_L1, corr_L1, nulls_L1 = analyze_detector( time_L1, data_L1, event, detector_L1) # ------------------------------------------------------------ # 10. Decision logic (coincident, predicted delay, significant) # ------------------------------------------------------------ coincident = abs(delay_H1 - delay_L1) < 5.0 # both peaks within 5 ms predicted_delay_ok = (abs(delay_H1 - LAMBDA_MS_PRED) < 5.0 and abs(delay_L1 - LAMBDA_MS_PRED) < 5.0) significant = (p_H1 < 0.05 and p_L1 < 0.05) print("\n=== BLIND TEST RESULTS ===") print(f"H1 → delay = {delay_H1:.1f} ms, ρ = {rho_H1:.4f}, p = {p_H1:.4f}") print(f"L1 → delay = {delay_L1:.1f} ms, ρ = {rho_L1:.4f}, p = {p_L1:.4f}") print(f"Coincident (Δ < 5 ms): {'YES' if coincident else 'NO'}") print(f"Matches prediction 40.8 ms: {'YES' if predicted_delay_ok else 'NO'}") print(f"p < 0.05 in both: {'YES' if significant else 'NO'}") if coincident and predicted_delay_ok and significant: print("\n✅ BLIND TEST PASS: Coincident universal echo detected at predicted delay.") print(" The substrate reflex time is empirically supported.") elif coincident and (p_H1 < 0.05 or p_L1 < 0.05): print("\n⚠️ AMBIGUOUS: Coincident but not at predicted delay or significance marginal.") else: print("\n❌ BLIND TEST FAIL: No coincident echo found at the universal prediction.") print(" The universal echo hypothesis is not supported by this event.") # ------------------------------------------------------------ # 11. Plot correlation curves and null distributions # ------------------------------------------------------------ fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10)) ax1.plot(lags_H1, corr_H1, 'b-', lw=1, label=f'H1 matched filter') ax1.plot(lags_L1, corr_L1, 'g-', lw=1, label=f'L1 matched filter') ax1.axvline(LAMBDA_MS_PRED, color='r', linestyle='--', label=f'Prediction {LAMBDA_MS_PRED} ms') ax1.set_xlim(5, 200) ax1.set_xlabel('Delay (ms)') ax1.set_ylabel('Matched filter amplitude') ax1.set_title(f'{event} – cross‑detector echo search') ax1.legend() ax1.grid(alpha=0.3) ax2.hist(nulls_H1, bins=30, alpha=0.6, color='blue', label='H1 null') ax2.hist(nulls_L1, bins=30, alpha=0.6, color='green', label='L1 null') ax2.axvline(rho_H1, color='blue', linestyle='--', label=f'H1 measured ρ = {rho_H1:.3f}') ax2.axvline(rho_L1, color='green', linestyle='--', label=f'L1 measured ρ = {rho_L1:.3f}') ax2.set_xlabel('Max |correlation| (null distribution)') ax2.set_ylabel('Count') ax2.set_title('Null distributions (500 time‑shifts each)') ax2.legend() plt.tight_layout() plt.show() # ------------------------------------------------------------ # 12. Save # ------------------------------------------------------------ np.savez(f'blinded_test_{event}_final.npz', lags_H1=lags_H1, corr_H1=corr_H1, nulls_H1=nulls_H1, lags_L1=lags_L1, corr_L1=corr_L1, nulls_L1=nulls_L1, delay_H1=delay_H1, rho_H1=rho_H1, p_H1=p_H1, delay_L1=delay_L1, rho_L1=rho_L1, p_L1=p_L1) print(f"\n💾 Results saved to blinded_test_{event}_final.npz") # ============================================================ # MONAD-FIELD EXPLORATORY ECHO PIPELINE # Scientific Residual Analysis Framework # # PURPOSE: # Explore whether repeatable post-merger structures exist # in GW residual data using disciplined signal analysis. # # This code DOES NOT assume the theory is true. # It attempts to reduce self-deception and template bias. # ============================================================ !pip install gwosc gwpy pycbc -q import numpy as np import matplotlib.pyplot as plt from gwosc import datasets from gwpy.timeseries import TimeSeries from scipy.signal import butter, filtfilt, tukey, welch from scipy.fft import rfft, irfft, rfftfreq from scipy.stats import norm from pycbc.filter import matched_filter from pycbc.types import TimeSeries as PyCBCSeries import warnings warnings.filterwarnings("ignore") # ============================================================ # CONFIGURATION # ============================================================ EVENT = "GW200220_061928" FS = 4096 TIME_WINDOW = 16 POST_WINDOW = 0.5 # Universal exploratory constants BETA = 0.35 FREQ_CENTER = 996.8 FREQ_SCAN = np.arange(850, 1150, 25) TAU = 0.02 DELAY_MIN_MS = 5 DELAY_MAX_MS = 200 BLANK_MS = 25 N_MONTECARLO = 1000 np.random.seed(1234) # ============================================================ # FETCH DATA # ============================================================ def load_detector(det): gps = datasets.event_gps(EVENT) ts = TimeSeries.fetch_open_data( det, gps - TIME_WINDOW, gps + TIME_WINDOW, sample_rate=FS, verbose=False ) return ts, gps # ============================================================ # PREPROCESSING # ============================================================ def preprocess(ts): data = ts.value.astype(np.float64) # Tukey window win = tukey(len(data), alpha=0.1) data = data * win # Bandpass nyq = FS / 2 b, a = butter(4, [20/nyq, 1800/nyq], btype='band') data = filtfilt(b, a, data) # PSD estimate f_psd, psd = welch( data, fs=FS, nperseg=FS, average='median' ) # FFT whitening spec = rfft(data) freqs = rfftfreq(len(data), 1/FS) psd_interp = np.interp(freqs, f_psd, psd) psd_interp = np.maximum(psd_interp, 1e-18) white_spec = spec / np.sqrt(psd_interp) white = irfft(white_spec, n=len(data)) # normalize white /= np.std(white) return white # ============================================================ # TEMPLATE BANK # ============================================================ def build_template(freq): t = np.arange(0, 0.1, 1/FS) env = np.exp(-(t / TAU)**BETA) sig = env * np.cos(2*np.pi*freq*t) # zero mean sig -= np.mean(sig) # normalize sig /= np.sqrt(np.sum(sig**2)) return sig # ============================================================ # BLANK MAIN MERGER # ============================================================ def blank_main_burst(data): blank = int(BLANK_MS * 1e-3 * FS) d = data.copy() # smooth taper instead of hard zero taper = np.linspace(0, 1, blank) d[:blank] *= taper return d # ============================================================ # EXTRACT POST-MERGER # ============================================================ def get_postmerger(white, gps, t0): ts = TimeSeries( white, t0=t0, dt=1/FS ) post = ts.crop(gps, gps + POST_WINDOW) return post.value # ============================================================ # MATCHED FILTER SCAN # ============================================================ def scan_delays(data): best = None all_results = [] pydata = PyCBCSeries(data, delta_t=1/FS) for freq in FREQ_SCAN: template = build_template(freq) pytmpl = PyCBCSeries(template, delta_t=1/FS) snr = matched_filter( pytmpl, pydata, low_frequency_cutoff=20 ) snr = snr.numpy() lags_ms = np.arange(len(snr)) / FS * 1000 idx = ( (lags_ms >= DELAY_MIN_MS) & (lags_ms <= DELAY_MAX_MS) ) scan = np.abs(snr[idx]) peak_idx = np.argmax(scan) peak_snr = scan[peak_idx] peak_delay = lags_ms[idx][peak_idx] result = { "freq": freq, "delay": peak_delay, "snr": peak_snr } all_results.append(result) if best is None or peak_snr > best["snr"]: best = result return best, all_results # ============================================================ # MONTE CARLO NULLS # ============================================================ def montecarlo_null(data): null_max = [] for _ in range(N_MONTECARLO): shuffled = np.random.permutation(data) best, _ = scan_delays(shuffled) null_max.append(best["snr"]) return np.array(null_max) # ============================================================ # PROCESS DETECTOR # ============================================================ def run_detector(det): ts, gps = load_detector(det) white = preprocess(ts) post = get_postmerger( white, gps, ts.t0.value ) post = blank_main_burst(post) best, scans = scan_delays(post) nulls = montecarlo_null(post) p = ( np.sum(nulls >= best["snr"]) + 1 ) / (len(nulls) + 1) return { "best": best, "nulls": nulls, "p": p, "scans": scans } # ============================================================ # RUN BOTH DETECTORS # ============================================================ print(f"\nRunning exploratory analysis on {EVENT}\n") H1 = run_detector("H1") L1 = run_detector("L1") # ============================================================ # COINCIDENCE TEST # ============================================================ delay_diff = abs( H1["best"]["delay"] - L1["best"]["delay"] ) freq_diff = abs( H1["best"]["freq"] - L1["best"]["freq"] ) coincident = ( delay_diff < 5 and freq_diff < 50 ) # ============================================================ # RESULTS # ============================================================ print("\n==============================") print("RESULTS") print("==============================\n") print("H1") print(H1["best"]) print(f"p-value = {H1['p']:.5f}\n") print("L1") print(L1["best"]) print(f"p-value = {L1['p']:.5f}\n") print(f"Delay difference = {delay_diff:.2f} ms") print(f"Frequency difference = {freq_diff:.2f} Hz") print(f"\nCoincident = {coincident}") # ============================================================ # PLOTS # ============================================================ fig, ax = plt.subplots(1,2, figsize=(12,4)) ax[0].hist(H1["nulls"], bins=40, alpha=0.7) ax[0].axvline(H1["best"]["snr"], color='r') ax[0].set_title("H1 Null Distribution") ax[1].hist(L1["nulls"], bins=40, alpha=0.7) ax[1].axvline(L1["best"]["snr"], color='r') ax[1].set_title("L1 Null Distribution") plt.tight_layout() plt.show() # ============================================================ # INTERPRETATION # ============================================================ print("\n==============================") print("INTERPRETATION") print("==============================\n") if coincident and H1["p"] < 0.01 and L1["p"] < 0.01: print("Strong coincident post-merger structure detected.") elif coincident: print("Coincident structure present but significance modest.") else: print("No strong coincident structure detected.") # ============================================================ # MONAD-FIELD EXPLORATORY ECHO PIPELINE – Version 2.3 FINAL # Fixed: direct FrequencySeries construction (no from_numpy_arrays). # All refinements: pre‑merger PSD, cosine blanking, bootstrap CI, # pre‑merger control, negative template. # ============================================================ !pip install gwosc gwpy pycbc -q import numpy as np import matplotlib.pyplot as plt from scipy.stats import bootstrap from gwosc import datasets from gwpy.timeseries import TimeSeries from scipy.signal import butter, filtfilt, welch from scipy.signal.windows import tukey from scipy.fft import rfft, irfft, rfftfreq from pycbc.filter import matched_filter from pycbc.types import TimeSeries as PyCBCSeries, FrequencySeries from pycbc.psd import interpolate as psd_interpolate import warnings warnings.filterwarnings("ignore") # ============================================================ # CONFIGURATION # ============================================================ EVENT = "GW200220_061928" FS = 4096 TIME_WINDOW = 16 POST_WINDOW = 0.5 BETA = 0.35 FREQ_CENTER = 996.8 FREQ_SCAN = np.arange(850, 1150, 25) TAU = 0.02 DELAY_MIN_MS = 5 DELAY_MAX_MS = 200 BLANK_MS = 25 COINC_DELAY_MS = 5.0 COINC_FREQ_HZ = 25.0 N_MONTECARLO = 1000 N_BOOTSTRAP = 1000 np.random.seed(1234) # ============================================================ # FETCH DATA # ============================================================ def load_detector(det): gps = datasets.event_gps(EVENT) ts = TimeSeries.fetch_open_data(det, gps - TIME_WINDOW, gps + TIME_WINDOW, sample_rate=FS, verbose=False) return ts, gps # ============================================================ # PREPROCESS – returns whitened post, whitened pre, and PyCBC PSD # ============================================================ def preprocess(ts, gps): data_full = ts.value.astype(np.float64) # Bandpass nyq = FS / 2 b, a = butter(4, [20/nyq, 1800/nyq], btype='band') data_full = filtfilt(b, a, data_full) # Pre‑merger noise segment (0.5 s before merger) idx_pre_start = int((gps - 0.5 - ts.t0.value) * FS) idx_pre_end = int((gps - ts.t0.value) * FS) pre_noise = data_full[idx_pre_start:idx_pre_end] # Estimate PSD using Welch f_psd, psd_raw = welch(pre_noise, fs=FS, nperseg=FS, average='median') # Build PyCBC FrequencySeries directly – no from_numpy_arrays delta_f = f_psd[1] - f_psd[0] # frequency resolution (should be 1 Hz) psd_pycbc = FrequencySeries(psd_raw, delta_f=delta_f) # FFT whitening of full data using the PSD spec = rfft(data_full) freqs = rfftfreq(len(data_full), 1/FS) psd_interp_for_whiten = np.interp(freqs, f_psd, psd_raw) psd_interp_for_whiten = np.maximum(psd_interp_for_whiten, 1e-18) white_spec = spec / np.sqrt(psd_interp_for_whiten) white_full = irfft(white_spec, n=len(data_full)) white_full /= np.std(white_full) # Build TimeSeries for cropping ts_white = TimeSeries(white_full, t0=ts.t0.value, dt=1/FS) # Post‑merger (0 to POST_WINDOW) post = ts_white.crop(gps, gps + POST_WINDOW).value # Pre‑merger control segment (-0.5 to 0) pre_ctrl = ts_white.crop(gps - 0.5, gps).value return post, pre_ctrl, psd_pycbc # ============================================================ # COSINE BLANKING (smooth taper) # ============================================================ def blank_main_burst(data): blank = int(BLANK_MS * 1e-3 * FS) d = data.copy() t = np.linspace(0, 1, blank) taper = 0.5 * (1 - np.cos(np.pi * t)) d[:blank] *= taper return d # ============================================================ # TEMPLATE BANK # ============================================================ def build_template(freq): t = np.arange(0, 0.1, 1/FS) env = np.exp(-(t / TAU)**BETA) sig = env * np.cos(2*np.pi*freq*t) sig -= np.mean(sig) sig /= np.sqrt(np.sum(sig**2)) return sig # ============================================================ # MATCHED FILTER SCAN (PSD‑weighted) # ============================================================ def scan_delays(data, psd_pycbc): best = None all_results = [] pydata = PyCBCSeries(data, delta_t=1/FS) # Interpolate the PSD to match the data's frequency resolution psd_interp = psd_interpolate(psd_pycbc, pydata.delta_f, len(pydata)) for freq in FREQ_SCAN: template = build_template(freq) pytmpl = PyCBCSeries(template, delta_t=1/FS) snr = matched_filter(pytmpl, pydata, low_frequency_cutoff=20, psd=psd_interp) snr = snr.numpy() # Correct lag axis (milliseconds) lags_ms = (np.arange(len(snr)) - (len(pytmpl) - 1)) / FS * 1000 idx = (lags_ms >= DELAY_MIN_MS) & (lags_ms <= DELAY_MAX_MS) if not np.any(idx): continue scan = np.abs(snr[idx]) peak_idx = np.argmax(scan) peak_snr = scan[peak_idx] peak_delay = lags_ms[idx][peak_idx] result = {"freq": freq, "delay": peak_delay, "snr": peak_snr} all_results.append(result) if best is None or peak_snr > best["snr"]: best = result return best, all_results # ============================================================ # PHASE‑RANDOMIZED NULL (preserves PSD) # ============================================================ def phase_randomize(data): spec = rfft(data) amp = np.abs(spec) phase = np.random.uniform(0, 2*np.pi, len(spec)) new_spec = amp * np.exp(1j * phase) rand = irfft(new_spec, n=len(data)) rand /= np.std(rand) return rand # ============================================================ # MONTE CARLO NULL # ============================================================ def montecarlo_null(data, psd_pycbc): null_max = [] for _ in range(N_MONTECARLO): randomized = phase_randomize(data) best, _ = scan_delays(randomized, psd_pycbc) null_max.append(best["snr"] if best is not None else 0.0) return np.array(null_max) # ============================================================ # BOOTSTRAP CI FOR NULL MAXIMA # ============================================================ def bootstrap_ci(measured, null_dist, n_resamples=N_BOOTSTRAP): resampled = np.random.choice(null_dist, size=(n_resamples, len(null_dist)), replace=True) bootstrap_max = np.max(resampled, axis=1) lower = np.percentile(bootstrap_max, 2.5) upper = np.percentile(bootstrap_max, 97.5) return lower, upper # ============================================================ # BROADBAND NEGATIVE TEMPLATE (white noise, same PSD shaping) # ============================================================ def negative_template_test(data, psd_pycbc): pydata = PyCBCSeries(data, delta_t=1/FS) psd_interp = psd_interpolate(psd_pycbc, pydata.delta_f, len(pydata)) noise = np.random.normal(0, 1, len(data)) noise -= np.mean(noise) noise /= np.std(noise) pytmpl = PyCBCSeries(noise, delta_t=1/FS) snr = matched_filter(pytmpl, pydata, low_frequency_cutoff=20, psd=psd_interp) return np.max(np.abs(snr.numpy())) # ============================================================ # PROCESS A SINGLE DETECTOR # ============================================================ def run_detector(det): ts, gps = load_detector(det) post_raw, pre_raw, psd_pycbc = preprocess(ts, gps) # Post‑merger (blanked) post = blank_main_burst(post_raw) best_post, _ = scan_delays(post, psd_pycbc) nulls_post = montecarlo_null(post, psd_pycbc) if best_post is not None else np.array([]) p_post = (np.sum(nulls_post >= best_post["snr"]) + 1) / (len(nulls_post) + 1) if best_post is not None else 1.0 ci_low, ci_high = bootstrap_ci(best_post["snr"], nulls_post) if best_post is not None else (0,0) # Pre‑merger control (no blanking) best_pre, _ = scan_delays(pre_raw, psd_pycbc) nulls_pre = montecarlo_null(pre_raw, psd_pycbc) if best_pre is not None else np.array([]) p_pre = (np.sum(nulls_pre >= best_pre["snr"]) + 1) / (len(nulls_pre) + 1) if best_pre is not None else 1.0 # Negative template test neg_snr = negative_template_test(post, psd_pycbc) return { "det": det, "best_post": best_post, "p_post": p_post, "nulls_post": nulls_post, "ci_95": (ci_low, ci_high), "best_pre": best_pre, "p_pre": p_pre, "neg_snr": neg_snr, "post_data": post } # ============================================================ # RUN BOTH DETECTORS # ============================================================ print(f"\n--- MONAD-FIELD PIPELINE v2.3 (Direct FrequencySeries) ---") print(f"Event: {EVENT}") print("Pre‑merger PSD, cosine blanking, bootstrap CI, pre‑merger control, negative template.\n") H1 = run_detector("H1") L1 = run_detector("L1") # Coincidence test if H1["best_post"] is not None and L1["best_post"] is not None: delay_diff = abs(H1["best_post"]["delay"] - L1["best_post"]["delay"]) freq_diff = abs(H1["best_post"]["freq"] - L1["best_post"]["freq"]) coincident = (delay_diff < COINC_DELAY_MS) and (freq_diff < COINC_FREQ_HZ) else: coincident = False # ============================================================ # OUTPUT # ============================================================ print("\n==============================") print("RESULTS") print("==============================\n") for res in [H1, L1]: print(f"{res['det']} (post‑merger):") if res["best_post"]: print(f" best freq = {res['best_post']['freq']:.1f} Hz") print(f" best delay = {res['best_post']['delay']:.1f} ms") print(f" best SNR = {res['best_post']['snr']:.3f}") print(f" p-value = {res['p_post']:.5f}") print(f" 95% CI (null max) = [{res['ci_95'][0]:.3f}, {res['ci_95'][1]:.3f}]") else: print(" No peak found in scan range.") print(f" Pre‑merger control p = {res['p_pre']:.4f}") print(f" Negative template SNR = {res['neg_snr']:.3f}\n") print(f"Coincident (Δdelay<{COINC_DELAY_MS}ms, Δfreq<{COINC_FREQ_HZ}Hz): {coincident}\n") # Significance criteria pre_clean = (H1["p_pre"] > 0.05) and (L1["p_pre"] > 0.05) neg_clean = (H1["neg_snr"] < 2.0) and (L1["neg_snr"] < 2.0) if coincident and H1["p_post"] < 0.01 and L1["p_post"] < 0.01 and pre_clean and neg_clean: print("✅ STRONG EVIDENCE: Coincident, significant post‑merger structure detected.") print(" The universal echo hypothesis is supported by this event.") elif coincident and (H1["p_post"] < 0.05 or L1["p_post"] < 0.05): print("⚠️ MODEST EVIDENCE: Coincident but significance marginal.") else: print("❌ NO SIGNIFICANT EVIDENCE: Coincident structure not detected at predicted delay.") print(" The universal echo hypothesis is not supported by this event.") # ============================================================ # PLOT NULL DISTRIBUTIONS # ============================================================ fig, ax = plt.subplots(1, 2, figsize=(12, 4)) for idx, res in enumerate([H1, L1]): ax[idx].hist(res["nulls_post"], bins=40, alpha=0.7, label='Phase‑randomized nulls') if res["best_post"]: ax[idx].axvline(res["best_post"]["snr"], color='r', label=f'Measured SNR = {res["best_post"]["snr"]:.2f}') ax[idx].set_xlabel("Best matched‑filter SNR") ax[idx].set_ylabel("Count") ax[idx].set_title(f"{res['det']} Null Distribution") ax[idx].legend() plt.tight_layout() plt.show() # ============================================================ # SAVE RESULTS # ============================================================ np.savez(f"monad_pipeline_{EVENT}_v2.3_final.npz", H1_best=H1["best_post"], H1_p=H1["p_post"], H1_null=H1["nulls_post"], L1_best=L1["best_post"], L1_p=L1["p_post"], L1_null=L1["nulls_post"]) print(f"\nResults saved to monad_pipeline_{EVENT}_v2.3_final.npz") # ============================================================ # MONAD-FIELD EXPLORATORY ECHO PIPELINE – Version 2.3 FINAL # Fixed: template zero‑padded to match data length. # ============================================================ !pip install gwosc gwpy pycbc -q import numpy as np import matplotlib.pyplot as plt from scipy.stats import bootstrap from gwosc import datasets from gwpy.timeseries import TimeSeries from scipy.signal import butter, filtfilt, welch from scipy.signal.windows import tukey from scipy.fft import rfft, irfft, rfftfreq from pycbc.filter import matched_filter from pycbc.types import TimeSeries as PyCBCSeries, FrequencySeries from pycbc.psd import interpolate as psd_interpolate import warnings warnings.filterwarnings("ignore") # ============================================================ # CONFIGURATION # ============================================================ EVENT = "GW200220_061928" FS = 4096 TIME_WINDOW = 16 POST_WINDOW = 0.5 BETA = 0.35 FREQ_CENTER = 996.8 FREQ_SCAN = np.arange(850, 1150, 25) TAU = 0.02 DELAY_MIN_MS = 5 DELAY_MAX_MS = 200 BLANK_MS = 25 COINC_DELAY_MS = 5.0 COINC_FREQ_HZ = 25.0 N_MONTECARLO = 1000 N_BOOTSTRAP = 1000 np.random.seed(1234) # ============================================================ # FETCH DATA # ============================================================ def load_detector(det): gps = datasets.event_gps(EVENT) ts = TimeSeries.fetch_open_data(det, gps - TIME_WINDOW, gps + TIME_WINDOW, sample_rate=FS, verbose=False) return ts, gps # ============================================================ # PREPROCESS – returns whitened post, whitened pre, and PyCBC PSD # ============================================================ def preprocess(ts, gps): data_full = ts.value.astype(np.float64) # Bandpass nyq = FS / 2 b, a = butter(4, [20/nyq, 1800/nyq], btype='band') data_full = filtfilt(b, a, data_full) # Pre‑merger noise segment (0.5 s before merger) idx_pre_start = int((gps - 0.5 - ts.t0.value) * FS) idx_pre_end = int((gps - ts.t0.value) * FS) pre_noise = data_full[idx_pre_start:idx_pre_end] # Estimate PSD using Welch – use nperseg = length of noise segment f_psd, psd_raw = welch(pre_noise, fs=FS, nperseg=len(pre_noise), average='median') # Build PyCBC FrequencySeries directly df = f_psd[1] - f_psd[0] psd_pycbc = FrequencySeries(psd_raw, delta_f=df) # FFT whitening of full data using the PSD spec = rfft(data_full) freqs = rfftfreq(len(data_full), 1/FS) psd_interp_for_whiten = np.interp(freqs, f_psd, psd_raw) psd_interp_for_whiten = np.maximum(psd_interp_for_whiten, 1e-18) white_spec = spec / np.sqrt(psd_interp_for_whiten) white_full = irfft(white_spec, n=len(data_full)) white_full /= np.std(white_full) # Build TimeSeries for cropping ts_white = TimeSeries(white_full, t0=ts.t0.value, dt=1/FS) # Post‑merger (0 to POST_WINDOW) post = ts_white.crop(gps, gps + POST_WINDOW).value # Pre‑merger control segment (-0.5 to 0) pre_ctrl = ts_white.crop(gps - 0.5, gps).value return post, pre_ctrl, psd_pycbc # ============================================================ # COSINE BLANKING (smooth taper) # ============================================================ def blank_main_burst(data): blank = int(BLANK_MS * 1e-3 * FS) d = data.copy() t = np.linspace(0, 1, blank) taper = 0.5 * (1 - np.cos(np.pi * t)) d[:blank] *= taper return d # ============================================================ # TEMPLATE BUILDER – returns short template (0.1s) # ============================================================ def build_short_template(freq): t = np.arange(0, 0.1, 1/FS) env = np.exp(-(t / TAU)**BETA) sig = env * np.cos(2*np.pi*freq*t) sig -= np.mean(sig) sig /= np.sqrt(np.sum(sig**2)) return sig # ============================================================ # MATCHED FILTER SCAN (with zero‑padding to match data length) # ============================================================ def scan_delays(data, psd_pycbc): best = None all_results = [] pydata = PyCBCSeries(data, delta_t=1/FS) target_len = len(pydata) # e.g., 2048 samples for 0.5s # Interpolate PSD to match data's frequency resolution psd_interp = psd_interpolate(psd_pycbc, pydata.delta_f, target_len) for freq in FREQ_SCAN: short_tmpl = build_short_template(freq) # Zero‑pad to target length if len(short_tmpl) < target_len: pad_width = target_len - len(short_tmpl) template = np.pad(short_tmpl, (0, pad_width), mode='constant') else: template = short_tmpl[:target_len] pytmpl = PyCBCSeries(template, delta_t=1/FS) snr = matched_filter(pytmpl, pydata, low_frequency_cutoff=20, psd=psd_interp) snr = snr.numpy() # Lag axis: the peak corresponds to where the **start** of the template aligns lags_ms = (np.arange(len(snr)) - (len(pytmpl) - 1)) / FS * 1000 idx = (lags_ms >= DELAY_MIN_MS) & (lags_ms <= DELAY_MAX_MS) if not np.any(idx): continue scan = np.abs(snr[idx]) peak_idx = np.argmax(scan) peak_snr = scan[peak_idx] peak_delay = lags_ms[idx][peak_idx] result = {"freq": freq, "delay": peak_delay, "snr": peak_snr} all_results.append(result) if best is None or peak_snr > best["snr"]: best = result return best, all_results # ============================================================ # PHASE‑RANDOMIZED NULL (preserves PSD) # ============================================================ def phase_randomize(data): spec = rfft(data) amp = np.abs(spec) phase = np.random.uniform(0, 2*np.pi, len(spec)) new_spec = amp * np.exp(1j * phase) rand = irfft(new_spec, n=len(data)) rand /= np.std(rand) return rand # ============================================================ # MONTE CARLO NULL # ============================================================ def montecarlo_null(data, psd_pycbc): null_max = [] for _ in range(N_MONTECARLO): randomized = phase_randomize(data) best, _ = scan_delays(randomized, psd_pycbc) null_max.append(best["snr"] if best is not None else 0.0) return np.array(null_max) # ============================================================ # BOOTSTRAP CI FOR NULL MAXIMA # ============================================================ def bootstrap_ci(measured, null_dist, n_resamples=N_BOOTSTRAP): resampled = np.random.choice(null_dist, size=(n_resamples, len(null_dist)), replace=True) bootstrap_max = np.max(resampled, axis=1) lower = np.percentile(bootstrap_max, 2.5) upper = np.percentile(bootstrap_max, 97.5) return lower, upper # ============================================================ # BROADBAND NEGATIVE TEMPLATE (white noise, same length) # ============================================================ def negative_template_test(data, psd_pycbc): pydata = PyCBCSeries(data, delta_t=1/FS) psd_interp = psd_interpolate(psd_pycbc, pydata.delta_f, len(pydata)) noise = np.random.normal(0, 1, len(data)) noise -= np.mean(noise) noise /= np.std(noise) pytmpl = PyCBCSeries(noise, delta_t=1/FS) snr = matched_filter(pytmpl, pydata, low_frequency_cutoff=20, psd=psd_interp) return np.max(np.abs(snr.numpy())) # ============================================================ # PROCESS A SINGLE DETECTOR # ============================================================ def run_detector(det): ts, gps = load_detector(det) post_raw, pre_raw, psd_pycbc = preprocess(ts, gps) # Post‑merger (blanked) post = blank_main_burst(post_raw) best_post, _ = scan_delays(post, psd_pycbc) nulls_post = montecarlo_null(post, psd_pycbc) if best_post is not None else np.array([]) p_post = (np.sum(nulls_post >= best_post["snr"]) + 1) / (len(nulls_post) + 1) if best_post is not None else 1.0 ci_low, ci_high = bootstrap_ci(best_post["snr"], nulls_post) if best_post is not None else (0,0) # Pre‑merger control (no blanking) best_pre, _ = scan_delays(pre_raw, psd_pycbc) nulls_pre = montecarlo_null(pre_raw, psd_pycbc) if best_pre is not None else np.array([]) p_pre = (np.sum(nulls_pre >= best_pre["snr"]) + 1) / (len(nulls_pre) + 1) if best_pre is not None else 1.0 # Negative template test neg_snr = negative_template_test(post, psd_pycbc) return { "det": det, "best_post": best_post, "p_post": p_post, "nulls_post": nulls_post, "ci_95": (ci_low, ci_high), "best_pre": best_pre, "p_pre": p_pre, "neg_snr": neg_snr, "post_data": post } # ============================================================ # RUN BOTH DETECTORS # ============================================================ print(f"\n--- MONAD-FIELD PIPELINE v2.3 (Zero‑padded template) ---") print(f"Event: {EVENT}") print("Pre‑merger PSD, cosine blanking, bootstrap CI, pre‑merger control, negative template.\n") H1 = run_detector("H1") L1 = run_detector("L1") # Coincidence test if H1["best_post"] is not None and L1["best_post"] is not None: delay_diff = abs(H1["best_post"]["delay"] - L1["best_post"]["delay"]) freq_diff = abs(H1["best_post"]["freq"] - L1["best_post"]["freq"]) coincident = (delay_diff < COINC_DELAY_MS) and (freq_diff < COINC_FREQ_HZ) else: coincident = False # ============================================================ # OUTPUT # ============================================================ print("\n==============================") print("RESULTS") print("==============================\n") for res in [H1, L1]: print(f"{res['det']} (post‑merger):") if res["best_post"]: print(f" best freq = {res['best_post']['freq']:.1f} Hz") print(f" best delay = {res['best_post']['delay']:.1f} ms") print(f" best SNR = {res['best_post']['snr']:.3f}") print(f" p-value = {res['p_post']:.5f}") print(f" 95% CI (null max) = [{res['ci_95'][0]:.3f}, {res['ci_95'][1]:.3f}]") else: print(" No peak found in scan range.") print(f" Pre‑merger control p = {res['p_pre']:.4f}") print(f" Negative template SNR = {res['neg_snr']:.3f}\n") print(f"Coincident (Δdelay<{COINC_DELAY_MS}ms, Δfreq<{COINC_FREQ_HZ}Hz): {coincident}\n") # Significance criteria pre_clean = (H1["p_pre"] > 0.05) and (L1["p_pre"] > 0.05) neg_clean = (H1["neg_snr"] < 2.0) and (L1["neg_snr"] < 2.0) if coincident and H1["p_post"] < 0.01 and L1["p_post"] < 0.01 and pre_clean and neg_clean: print("✅ STRONG EVIDENCE: Coincident, significant post‑merger structure detected.") print(" The universal echo hypothesis is supported by this event.") elif coincident and (H1["p_post"] < 0.05 or L1["p_post"] < 0.05): print("⚠️ MODEST EVIDENCE: Coincident but significance marginal.") else: print("❌ NO SIGNIFICANT EVIDENCE: Coincident structure not detected at predicted delay.") print(" The universal echo hypothesis is not supported by this event.") # ============================================================ # PLOT NULL DISTRIBUTIONS # ============================================================ fig, ax = plt.subplots(1, 2, figsize=(12, 4)) for idx, res in enumerate([H1, L1]): ax[idx].hist(res["nulls_post"], bins=40, alpha=0.7, label='Phase‑randomized nulls') if res["best_post"]: ax[idx].axvline(res["best_post"]["snr"], color='r', label=f'Measured SNR = {res["best_post"]["snr"]:.2f}') ax[idx].set_xlabel("Best matched‑filter SNR") ax[idx].set_ylabel("Count") ax[idx].set_title(f"{res['det']} Null Distribution") ax[idx].legend() plt.tight_layout() plt.show() # ============================================================ # SAVE RESULTS # ============================================================ np.savez(f"monad_pipeline_{EVENT}_v2.3_final.npz", H1_best=H1["best_post"], H1_p=H1["p_post"], H1_null=H1["nulls_post"], L1_best=L1["best_post"], L1_p=L1["p_post"], L1_null=L1["nulls_post"]) print(f"\nResults saved to monad_pipeline_{EVENT}_v2.3_final.npz")

Popular posts from this blog

THE GOLDEN BALLROOM/BUNKER

Conceptual Summary #2: (∂t2​S−c2∇2S+βS3)=σ(x,t)⋅FR​(C[Ψ])

ICE PROUDLY ANNOUNCES NEW “ELITE” TASK FORCE COMMANDER JEREMY DEWITTE