tidal wave script 100 hours runtime sim

# ============================================================ # TEST 10.8 — Critical Transition, Breather & Memory Suite (v4) # Uses BIG_GIANT_SOLVER_FILE_Test10_core_solver.py # ============================================================ import os import json import numpy as np import matplotlib.pyplot as plt from BIG_GIANT_SOLVER_FILE_Test10_core_solver import run_test10_core_solver try: from google.colab import files IN_COLAB = True except ImportError: IN_COLAB = False # ------------------------------------------------------------ # GLOBAL CONFIG # ------------------------------------------------------------ DX = 0.1 DT = 0.001 V = 1.0 MU = -1.0 LAM = 1.0 RESULTS_DIR = "results_10_8" os.makedirs(RESULTS_DIR, exist_ok=True) # Which tests to run (staged) RUN_10_8A = True # Critical transition verification RUN_10_8B = True # Breather survival RUN_10_8C = True # Recovery / attractor RUN_10_8D = False # Deliberate damage RUN_10_8E = False # Critical slowing down RUN_10_8F = False # Hysteresis probe RUN_10_8G = False # Basin of attraction mapping REQUIRED_HISTORY_FIELDS = [ "t", "norm_core_r<2", "norm_shell_10_20", "center_amp2", "energy" ] # ------------------------------------------------------------ # UTILITIES # ------------------------------------------------------------ def load_initial_state(): psi = np.load("Psi_test10_final.npy") pi = np.load("Pi_test10_final.npy") return psi, pi def startup_audit(label, kappa, psi_init): N = psi_init.shape[0] x = np.arange(N) * DX X, Y = np.meshgrid(x, x) center_coord = (N * DX) / 2.0 S = np.exp(-((X - center_coord)**2 + (Y - center_coord)**2) / 4.0) print("====================================================") print(f" STARTUP AUDIT — {label}") print("====================================================") print(f"dt : {DT}") print(f"dx : {DX}") print(f"dtype : {psi_init.dtype}") print(f"mu : {MU}") print(f"lambda : {LAM}") print(f"kappa : {kappa}") print(f"max(S) : {float(S.max()):.6e}") print(f"min(S) : {float(S.min()):.6e}") print("kappa*S*psi term sign: + kappa * S * psi") print("====================================================") def save_history_block(base_dir, tag, history): os.makedirs(base_dir, exist_ok=True) with open(os.path.join(base_dir, f"history_{tag}.json"), "w") as f: json.dump(history, f, indent=2) def fit_log_gamma(t, series, mask=None, min_points=20): t = np.array(t) series = np.array(series) if mask is None: mask = np.ones_like(t, dtype=bool) mask = mask & np.isfinite(t) & np.isfinite(series) if mask.sum() < min_points: return {"gamma": 0.0, "gamma_err": None, "R2": None, "n_points": int(mask.sum())} x = t[mask] y = np.log(np.maximum(series[mask], 1e-20)) finite = np.isfinite(x) & np.isfinite(y) if finite.sum() < min_points: return {"gamma": 0.0, "gamma_err": None, "R2": None, "n_points": int(finite.sum())} x = x[finite] y = y[finite] coeffs = np.polyfit(x, y, 1) a, b = coeffs y_fit = a * x + b ss_res = np.sum((y - y_fit)**2) ss_tot = np.sum((y - np.mean(y))**2) R2 = 1.0 - ss_res / ss_tot if ss_tot > 0 else None n = len(x) if n > 2: sigma2 = ss_res / (n - 2) Sxx = np.sum((x - np.mean(x))**2) gamma_err = np.sqrt(sigma2 / Sxx) if Sxx > 0 else None else: gamma_err = None return { "gamma": float(a), "gamma_err": None if gamma_err is None else float(gamma_err), "R2": None if R2 is None else float(R2), "n_points": int(n) } def rolling_gamma(t, core_norm, window_T=50.0, min_points=20): t = np.array(t) core_norm = np.array(core_norm) gammas = np.full_like(t, np.nan, dtype=float) for i in range(len(t)): t_max = t[i] t_min = t_max - window_T if t_min < t[0]: continue mask = (t >= t_min) & (t <= t_max) res = fit_log_gamma(t, core_norm, mask=mask, min_points=min_points) if res["n_points"] >= min_points and res["R2"] is not None and res["R2"] > 0.9: gammas[i] = res["gamma"] return gammas def dominant_frequency(t, series): t = np.array(t) series = np.array(series) if len(t) < 8: return None dts = np.diff(t) if len(dts) == 0: return None dt_local = float(np.median(dts)) if dt_local <= 0: return None arr = series - np.mean(series) window = np.hanning(len(arr)) arr_w = arr * window fft_vals = np.fft.rfft(arr_w) freqs = np.fft.rfftfreq(len(arr_w), dt_local) amp = np.abs(fft_vals) if len(amp) < 3: return None idx = np.argmax(amp[1:]) + 1 return float(freqs[idx]) def rolling_frequency(t, series, window_T=50.0, min_points=40): t = np.array(t) series = np.array(series) freqs_out = np.full_like(t, np.nan, dtype=float) for i in range(len(t)): t_max = t[i] t_min = t_max - window_T if t_min < t[0]: continue mask = (t >= t_min) & (t <= t_max) if mask.sum() < min_points: continue tt = t[mask] ss = series[mask] dts = np.diff(tt) if len(dts) == 0: continue dt_local = float(np.median(dts)) if dt_local <= 0: continue arr = ss - np.mean(ss) window = np.hanning(len(arr)) arr_w = arr * window fft_vals = np.fft.rfft(arr_w) freqs = np.fft.rfftfreq(len(arr_w), dt_local) amp = np.abs(fft_vals) if len(amp) < 3: continue idx = np.argmax(amp[1:]) + 1 freqs_out[i] = float(freqs[idx]) return freqs_out def radial_mask(shape, dx, r_max): N = shape[0] x = np.arange(N) * dx X, Y = np.meshgrid(x, x) center = (N * dx) / 2.0 R = np.sqrt((X - center)**2 + (Y - center)**2) return (R < r_max) def compute_core_norm(field, dx, r_max=2.0): """ Approximate L2 norm inside radius r_max. """ N = field.shape[0] x = np.arange(N) * dx X, Y = np.meshgrid(x, x) center = (N * dx) / 2.0 R = np.sqrt((X - center)**2 + (Y - center)**2) mask = (R < r_max) return float(np.sum(np.abs(field[mask])**2) * dx * dx) def run_segment(psi_init, pi_init, kappa, t_max, save_prefix, out_dir): total_steps = int(t_max / DT) startup_audit(save_prefix, kappa, psi_init) psi_file, pi_file, hist_file = run_test10_core_solver( psi_init=psi_init, pi_init=pi_init, dx=DX, dt=DT, total_steps=total_steps, v=V, mu=MU, lam=LAM, kappa=kappa, sponge_width=40, save_prefix=save_prefix ) with open(hist_file, "r") as f: history = json.load(f) for key in REQUIRED_HISTORY_FIELDS: if key not in history: raise KeyError(f"Required history field missing: {key}") save_history_block(out_dir, save_prefix, history) psi_final = np.load(psi_file) pi_final = np.load(pi_file) return psi_final, pi_final, history def fit_relaxation_tau(t, deviation_series, t0=None, min_points=30): """ Fit deviation(t) ~ exp(-(t - t0)/tau), where deviation_series is already |observable - equilibrium|. """ t = np.array(t) deviation_series = np.array(deviation_series) if t0 is None: t0 = t[0] mask = ( (t >= t0) & np.isfinite(t) & np.isfinite(deviation_series) ) if mask.sum() < min_points: return None x = t[mask] dy = np.maximum(deviation_series[mask], 1e-20) y = np.log(dy) finite = np.isfinite(x) & np.isfinite(y) if finite.sum() < min_points: return None x = x[finite] y = y[finite] a, b = np.polyfit(x, y, 1) if a >= 0: return None tau = -1.0 / a return float(tau) # ------------------------------------------------------------ # TEST 10.8A — Critical Transition Verification # ------------------------------------------------------------ def test_10_8A(): print("\n=== TEST 10.8A — Critical Transition Verification ===") psi0, pi0 = load_initial_state() kappas = np.arange(0.08, 0.201, 0.01) t_max = 500.0 out_dir = os.path.join(RESULTS_DIR, "10_8A") os.makedirs(out_dir, exist_ok=True) summary = [] for kappa in kappas: kappa = float(kappa) label = f"10_8A_kappa_{kappa:.3f}" psi_f, pi_f, hist = run_segment( psi_init=psi0, pi_init=pi0, kappa=kappa, t_max=t_max, save_prefix=label, out_dir=out_dir ) t = np.array(hist["t"]) core_norm = np.array(hist["norm_core_r<2"]) shell_norm = np.array(hist["norm_shell_10_20"]) mask_late = (t >= 0.5 * t.max()) gamma_fit = fit_log_gamma(t, core_norm, mask=mask_late, min_points=30) gamma_roll = rolling_gamma(t, core_norm, window_T=50.0, min_points=20) np.save(os.path.join(out_dir, f"gamma_roll_kappa_{kappa:.3f}.npy"), gamma_roll) freq_core = dominant_frequency(t, core_norm) freq_halo = dominant_frequency(t, shell_norm) delta_f = None if freq_core is not None and freq_halo is not None: delta_f = float(freq_core - freq_halo) core_change = 100.0 * (core_norm[-1] - core_norm[0]) / max(core_norm[0], 1e-20) halo_change = 100.0 * (shell_norm[-1] - shell_norm[0]) / max(shell_norm[0], 1e-20) entry = { "kappa": float(kappa), "gamma_core": gamma_fit["gamma"], "gamma_err": gamma_fit["gamma_err"], "gamma_R2": gamma_fit["R2"], "gamma_n": gamma_fit["n_points"], "core_norm_change_pct": float(core_change), "halo_norm_change_pct": float(halo_change), "freq_core": freq_core, "freq_halo": freq_halo, "delta_f": delta_f } summary.append(entry) with open(os.path.join(out_dir, "summary_10_8A.json"), "w") as f: json.dump(summary, f, indent=2) # ------------------------------------------------------------ # TEST 10.8B — Breather Survival Test # ------------------------------------------------------------ def test_10_8B(): print("\n=== TEST 10.8B — Breather Survival Test ===") psi0, pi0 = load_initial_state() kappa = 0.15 t_max = 1000.0 out_dir = os.path.join(RESULTS_DIR, "10_8B") os.makedirs(out_dir, exist_ok=True) label = f"10_8B_kappa_{kappa:.2f}" psi_f, pi_f, hist = run_segment( psi_init=psi0, pi_init=pi0, kappa=kappa, t_max=t_max, save_prefix=label, out_dir=out_dir ) t = np.array(hist["t"]) core_norm = np.array(hist["norm_core_r<2"]) shell_norm = np.array(hist["norm_shell_10_20"]) center_amp2 = np.array(hist["center_amp2"]) energy = np.array(hist["energy"]) gamma_roll = rolling_gamma(t, core_norm, window_T=50.0, min_points=20) np.save(os.path.join(out_dir, "gamma_roll_10_8B.npy"), gamma_roll) freq_core = dominant_frequency(t, core_norm) freq_halo = dominant_frequency(t, shell_norm) energy_core = hist.get("energy_core", None) energy_halo = hist.get("energy_halo", None) summary = { "kappa": kappa, "freq_core": freq_core, "freq_halo": freq_halo, "core_norm_initial": float(core_norm[0]), "core_norm_final": float(core_norm[-1]), "amp_max": float(np.max(np.sqrt(np.maximum(center_amp2, 0.0)))), "energy_initial": float(energy[0]), "energy_final": float(energy[-1]), "has_energy_core": energy_core is not None, "has_energy_halo": energy_halo is not None } with open(os.path.join(out_dir, "summary_10_8B.json"), "w") as f: json.dump(summary, f, indent=2) # ------------------------------------------------------------ # TEST 10.8C — Recovery / Attractor Test # ------------------------------------------------------------ def test_10_8C(): print("\n=== TEST 10.8C — Recovery / Attractor Test ===") psi0, pi0 = load_initial_state() kappa = 0.15 out_dir = os.path.join(RESULTS_DIR, "10_8C") os.makedirs(out_dir, exist_ok=True) # Pre-kick evolution t1 = 100.0 label1 = "10_8C_kappa_0.15_pre_kick" psi_mid, pi_mid, hist1 = run_segment( psi_init=psi0, pi_init=pi0, kappa=kappa, t_max=t1, save_prefix=label1, out_dir=out_dir ) t_pre = np.array(hist1["t"]) core_pre = np.array(hist1["norm_core_r<2"]) amp2_pre = np.array(hist1["center_amp2"]) freq_pre = dominant_frequency(t_pre, core_pre) ref_core = float(core_pre[-1]) ref_amp = float(np.sqrt(max(amp2_pre[-1], 0.0))) ref_freq = freq_pre # Kick rng = np.random.default_rng() psi_kicked = psi_mid * (1.0 + 0.02 * rng.standard_normal(size=psi_mid.shape)) pi_kicked = pi_mid * (1.0 + 0.02 * rng.standard_normal(size=pi_mid.shape)) # Post-kick evolution t2 = 400.0 label2 = "10_8C_kappa_0.15_post_kick" psi_f, pi_f, hist2 = run_segment( psi_init=psi_kicked, pi_init=pi_kicked, kappa=kappa, t_max=t2, save_prefix=label2, out_dir=out_dir ) t_post_local = np.array(hist2["t"]) t_post = t_post_local + t1 core_post = np.array(hist2["norm_core_r<2"]) amp2_post = np.array(hist2["center_amp2"]) freq_roll = rolling_frequency(t_post, core_post, window_T=50.0, min_points=40) np.save(os.path.join(out_dir, "freq_roll_10_8C.npy"), freq_roll) tol = 0.05 recovered_time = None if ref_freq is not None: freq_res = 1.0 / 50.0 freq_tol = max(tol * abs(ref_freq), freq_res) else: freq_tol = None for ti, cn, a2, f_now in zip(t_post, core_post, amp2_post, freq_roll): amp = float(np.sqrt(max(a2, 0.0))) freq_ok = True if ref_freq is not None: if not np.isfinite(f_now): freq_ok = False else: freq_ok = abs(f_now - ref_freq) <= freq_tol if (abs(cn - ref_core) <= tol * abs(ref_core) and abs(amp - ref_amp) <= tol * abs(ref_amp) and freq_ok): recovered_time = float(ti) break summary = { "kappa": kappa, "ref_core_norm": ref_core, "ref_amp": ref_amp, "ref_freq": ref_freq, "recovery_time": recovered_time } with open(os.path.join(out_dir, "summary_10_8C.json"), "w") as f: json.dump(summary, f, indent=2) # ------------------------------------------------------------ # TEST 10.8D — Deliberate Damage Test # ------------------------------------------------------------ def test_10_8D(): print("\n=== TEST 10.8D — Deliberate Damage Test ===") psi0, pi0 = load_initial_state() kappa = 0.15 out_dir = os.path.join(RESULTS_DIR, "10_8D") os.makedirs(out_dir, exist_ok=True) damage_levels = [0.10, 0.20, 0.40, 0.60] all_entries = [] for dmg in damage_levels: print(f" Damage level: {int(dmg*100)}%") t1 = 100.0 label1 = f"10_8D_kappa_0.15_pre_damage_{int(dmg*100)}" psi_mid, pi_mid, hist1 = run_segment( psi_init=psi0, pi_init=pi0, kappa=kappa, t_max=t1, save_prefix=label1, out_dir=out_dir ) core_pre = np.array(hist1["norm_core_r<2"]) pre_core = float(core_pre[-1]) mask_core = radial_mask(psi_mid.shape, DX, r_max=1.0) psi_damaged = psi_mid.copy() pi_damaged = pi_mid.copy() psi_damaged[mask_core] *= (1.0 - dmg) pi_damaged[mask_core] *= (1.0 - dmg) damaged_core = compute_core_norm(psi_damaged, DX, r_max=2.0) t2 = 400.0 label2 = f"10_8D_kappa_0.15_post_damage_{int(dmg*100)}" psi_f, pi_f, hist2 = run_segment( psi_init=psi_damaged, pi_init=pi_damaged, kappa=kappa, t_max=t2, save_prefix=label2, out_dir=out_dir ) core_post = np.array(hist2["norm_core_r<2"]) final_core = float(core_post[-1]) denom = pre_core - damaged_core if abs(denom) > 1e-20: healing_fraction = (final_core - damaged_core) / denom else: healing_fraction = None entry = { "kappa": kappa, "damage_level": float(dmg), "pre_core_norm": pre_core, "damaged_core_norm": damaged_core, "final_core_norm": final_core, "healing_fraction": healing_fraction } all_entries.append(entry) with open(os.path.join(out_dir, "summary_10_8D.json"), "w") as f: json.dump(all_entries, f, indent=2) # ------------------------------------------------------------ # TEST 10.8E — Critical Slowing Down # ------------------------------------------------------------ def test_10_8E(): print("\n=== TEST 10.8E — Critical Slowing Down ===") psi0, pi0 = load_initial_state() kappas = [0.08, 0.10, 0.12, 0.14, 0.16, 0.18, 0.20] out_dir = os.path.join(RESULTS_DIR, "10_8E") os.makedirs(out_dir, exist_ok=True) tau_data = [] for kappa in kappas: print(f" κ = {kappa:.2f}") t1 = 100.0 label1 = f"10_8E_kappa_{kappa:.2f}_pre_kick" psi_mid, pi_mid, hist1 = run_segment( psi_init=psi0, pi_init=pi0, kappa=kappa, t_max=t1, save_prefix=label1, out_dir=out_dir ) t_pre = np.array(hist1["t"]) core_pre = np.array(hist1["norm_core_r<2"]) ref_core = float(core_pre[-1]) rng = np.random.default_rng(seed=12345) psi_kicked = psi_mid * (1.0 + 0.02 * rng.standard_normal(size=psi_mid.shape)) pi_kicked = pi_mid * (1.0 + 0.02 * rng.standard_normal(size=pi_mid.shape)) t2 = 400.0 label2 = f"10_8E_kappa_{kappa:.2f}_post_kick" psi_f, pi_f, hist2 = run_segment( psi_init=psi_kicked, pi_init=pi_kicked, kappa=kappa, t_max=t2, save_prefix=label2, out_dir=out_dir ) t_post_local = np.array(hist2["t"]) t_post = t_post_local + t1 core_post = np.array(hist2["norm_core_r<2"]) delta_core = np.abs(core_post - ref_core) tau_core = fit_relaxation_tau(t_post, delta_core, t0=t1, min_points=40) tau_data.append({ "kappa": float(kappa), "tau_core": tau_core }) with open(os.path.join(out_dir, "tau_recover_10_8E.json"), "w") as f: json.dump(tau_data, f, indent=2) # ------------------------------------------------------------ # TEST 10.8F — Hysteresis Probe # ------------------------------------------------------------ def test_10_8F(): print("\n=== TEST 10.8F — Hysteresis Probe ===") psi0, pi0 = load_initial_state() out_dir = os.path.join(RESULTS_DIR, "10_8F") os.makedirs(out_dir, exist_ok=True) sequence = [0.05, 0.15, 0.25, 0.15, 0.05] t_segment = 200.0 psi_curr, pi_curr = psi0.copy(), pi0.copy() entries = [] for idx, kappa in enumerate(sequence): label = f"10_8F_step_{idx}_kappa_{kappa:.2f}" psi_curr, pi_curr, hist = run_segment( psi_init=psi_curr, pi_init=pi_curr, kappa=kappa, t_max=t_segment, save_prefix=label, out_dir=out_dir ) t = np.array(hist["t"]) core_norm = np.array(hist["norm_core_r<2"]) shell_norm = np.array(hist["norm_shell_10_20"]) mask_late = (t >= 0.5 * t.max()) gamma_fit = fit_log_gamma(t, core_norm, mask=mask_late, min_points=20) freq_core = dominant_frequency(t, core_norm) freq_halo = dominant_frequency(t, shell_norm) delta_f = None if freq_core is not None and freq_halo is not None: delta_f = float(freq_core - freq_halo) entries.append({ "step_index": idx, "kappa": float(kappa), "gamma_core": gamma_fit["gamma"], "gamma_err": gamma_fit["gamma_err"], "gamma_R2": gamma_fit["R2"], "freq_core": freq_core, "freq_halo": freq_halo, "delta_f": delta_f, "core_norm_final": float(core_norm[-1]) }) with open(os.path.join(out_dir, "summary_10_8F.json"), "w") as f: json.dump(entries, f, indent=2) # ------------------------------------------------------------ # TEST 10.8G — Basin of Attraction Mapping (optional) # ------------------------------------------------------------ def test_10_8G(): print("\n=== TEST 10.8G — Basin of Attraction Mapping ===") psi0, pi0 = load_initial_state() kappa = 0.15 out_dir = os.path.join(RESULTS_DIR, "10_8G") os.makedirs(out_dir, exist_ok=True) kick_levels = [0.01, 0.05, 0.10, 0.20, 0.40, 0.60] t1 = 100.0 t2 = 400.0 all_entries = [] for lvl in kick_levels: print(f" Kick level: {int(lvl*100)}%") label1 = f"10_8G_kappa_0.15_pre_kick_{int(lvl*100)}" psi_mid, pi_mid, hist1 = run_segment( psi_init=psi0, pi_init=pi0, kappa=kappa, t_max=t1, save_prefix=label1, out_dir=out_dir ) t_pre = np.array(hist1["t"]) core_pre = np.array(hist1["norm_core_r<2"]) amp2_pre = np.array(hist1["center_amp2"]) ref_core = float(core_pre[-1]) ref_amp = float(np.sqrt(max(amp2_pre[-1], 0.0))) rng = np.random.default_rng(seed=123 + int(lvl*1000)) psi_kicked = psi_mid * (1.0 + lvl * rng.standard_normal(size=psi_mid.shape)) pi_kicked = pi_mid * (1.0 + lvl * rng.standard_normal(size=pi_mid.shape)) label2 = f"10_8G_kappa_0.15_post_kick_{int(lvl*100)}" psi_f, pi_f, hist2 = run_segment( psi_init=psi_kicked, pi_init=pi_kicked, kappa=kappa, t_max=t2, save_prefix=label2, out_dir=out_dir ) core_post = np.array(hist2["norm_core_r<2"]) amp2_post = np.array(hist2["center_amp2"]) final_core = float(core_post[-1]) final_amp = float(np.sqrt(max(amp2_post[-1], 0.0))) ratio = final_core / max(ref_core, 1e-20) if ratio > 0.9: state_label = "recovered" elif ratio > 0.3: state_label = "modified" else: state_label = "collapsed" all_entries.append({ "kappa": kappa, "kick_level": float(lvl), "ref_core_norm": ref_core, "ref_amp": ref_amp, "final_core_norm": final_core, "final_amp": final_amp, "state_label": state_label }) with open(os.path.join(out_dir, "summary_10_8G.json"), "w") as f: json.dump(all_entries, f, indent=2) # ------------------------------------------------------------ # MAIN # ------------------------------------------------------------ def main(): print("====================================================") print(" Test 10.8 — Critical Transition, Breather & Memory") print("====================================================") print(f"DT : {DT}") print(f"DX : {DX}") print(f"RESULTS : {RESULTS_DIR}") print("====================================================") if RUN_10_8A: test_10_8A() if RUN_10_8B: test_10_8B() if RUN_10_8C: test_10_8C() if RUN_10_8D: test_10_8D() if RUN_10_8E: test_10_8E() if RUN_10_8F: test_10_8F() if RUN_10_8G: test_10_8G() if IN_COLAB: import shutil as _shutil zip_name = "Test10_8_results.zip" if os.path.exists(zip_name): os.remove(zip_name) _shutil.make_archive("Test10_8_results", "zip", RESULTS_DIR) files.download(zip_name) print("Triggered download of:", zip_name) if __name__ == "__main__": main()

Popular posts from this blog

THE GOLDEN BALLROOM/BUNKER

Conceptual Summary #2: (∂t2​S−c2∇2S+βS3)=σ(x,t)⋅FR​(C[Ψ])

ICE PROUDLY ANNOUNCES NEW “ELITE” TASK FORCE COMMANDER JEREMY DEWITTE