tidal wave script 100 hours runtime sim
# ============================================================
# TEST 10.8 — Critical Transition, Breather & Memory Suite (v4)
# Uses BIG_GIANT_SOLVER_FILE_Test10_core_solver.py
# ============================================================
import os
import json
import numpy as np
import matplotlib.pyplot as plt
from BIG_GIANT_SOLVER_FILE_Test10_core_solver import run_test10_core_solver
try:
from google.colab import files
IN_COLAB = True
except ImportError:
IN_COLAB = False
# ------------------------------------------------------------
# GLOBAL CONFIG
# ------------------------------------------------------------
DX = 0.1
DT = 0.001
V = 1.0
MU = -1.0
LAM = 1.0
RESULTS_DIR = "results_10_8"
os.makedirs(RESULTS_DIR, exist_ok=True)
# Which tests to run (staged)
RUN_10_8A = True # Critical transition verification
RUN_10_8B = True # Breather survival
RUN_10_8C = True # Recovery / attractor
RUN_10_8D = False # Deliberate damage
RUN_10_8E = False # Critical slowing down
RUN_10_8F = False # Hysteresis probe
RUN_10_8G = False # Basin of attraction mapping
REQUIRED_HISTORY_FIELDS = [
"t",
"norm_core_r<2",
"norm_shell_10_20",
"center_amp2",
"energy"
]
# ------------------------------------------------------------
# UTILITIES
# ------------------------------------------------------------
def load_initial_state():
psi = np.load("Psi_test10_final.npy")
pi = np.load("Pi_test10_final.npy")
return psi, pi
def startup_audit(label, kappa, psi_init):
N = psi_init.shape[0]
x = np.arange(N) * DX
X, Y = np.meshgrid(x, x)
center_coord = (N * DX) / 2.0
S = np.exp(-((X - center_coord)**2 + (Y - center_coord)**2) / 4.0)
print("====================================================")
print(f" STARTUP AUDIT — {label}")
print("====================================================")
print(f"dt : {DT}")
print(f"dx : {DX}")
print(f"dtype : {psi_init.dtype}")
print(f"mu : {MU}")
print(f"lambda : {LAM}")
print(f"kappa : {kappa}")
print(f"max(S) : {float(S.max()):.6e}")
print(f"min(S) : {float(S.min()):.6e}")
print("kappa*S*psi term sign: + kappa * S * psi")
print("====================================================")
def save_history_block(base_dir, tag, history):
os.makedirs(base_dir, exist_ok=True)
with open(os.path.join(base_dir, f"history_{tag}.json"), "w") as f:
json.dump(history, f, indent=2)
def fit_log_gamma(t, series, mask=None, min_points=20):
t = np.array(t)
series = np.array(series)
if mask is None:
mask = np.ones_like(t, dtype=bool)
mask = mask & np.isfinite(t) & np.isfinite(series)
if mask.sum() < min_points:
return {"gamma": 0.0, "gamma_err": None, "R2": None, "n_points": int(mask.sum())}
x = t[mask]
y = np.log(np.maximum(series[mask], 1e-20))
finite = np.isfinite(x) & np.isfinite(y)
if finite.sum() < min_points:
return {"gamma": 0.0, "gamma_err": None, "R2": None, "n_points": int(finite.sum())}
x = x[finite]
y = y[finite]
coeffs = np.polyfit(x, y, 1)
a, b = coeffs
y_fit = a * x + b
ss_res = np.sum((y - y_fit)**2)
ss_tot = np.sum((y - np.mean(y))**2)
R2 = 1.0 - ss_res / ss_tot if ss_tot > 0 else None
n = len(x)
if n > 2:
sigma2 = ss_res / (n - 2)
Sxx = np.sum((x - np.mean(x))**2)
gamma_err = np.sqrt(sigma2 / Sxx) if Sxx > 0 else None
else:
gamma_err = None
return {
"gamma": float(a),
"gamma_err": None if gamma_err is None else float(gamma_err),
"R2": None if R2 is None else float(R2),
"n_points": int(n)
}
def rolling_gamma(t, core_norm, window_T=50.0, min_points=20):
t = np.array(t)
core_norm = np.array(core_norm)
gammas = np.full_like(t, np.nan, dtype=float)
for i in range(len(t)):
t_max = t[i]
t_min = t_max - window_T
if t_min < t[0]:
continue
mask = (t >= t_min) & (t <= t_max)
res = fit_log_gamma(t, core_norm, mask=mask, min_points=min_points)
if res["n_points"] >= min_points and res["R2"] is not None and res["R2"] > 0.9:
gammas[i] = res["gamma"]
return gammas
def dominant_frequency(t, series):
t = np.array(t)
series = np.array(series)
if len(t) < 8:
return None
dts = np.diff(t)
if len(dts) == 0:
return None
dt_local = float(np.median(dts))
if dt_local <= 0:
return None
arr = series - np.mean(series)
window = np.hanning(len(arr))
arr_w = arr * window
fft_vals = np.fft.rfft(arr_w)
freqs = np.fft.rfftfreq(len(arr_w), dt_local)
amp = np.abs(fft_vals)
if len(amp) < 3:
return None
idx = np.argmax(amp[1:]) + 1
return float(freqs[idx])
def rolling_frequency(t, series, window_T=50.0, min_points=40):
t = np.array(t)
series = np.array(series)
freqs_out = np.full_like(t, np.nan, dtype=float)
for i in range(len(t)):
t_max = t[i]
t_min = t_max - window_T
if t_min < t[0]:
continue
mask = (t >= t_min) & (t <= t_max)
if mask.sum() < min_points:
continue
tt = t[mask]
ss = series[mask]
dts = np.diff(tt)
if len(dts) == 0:
continue
dt_local = float(np.median(dts))
if dt_local <= 0:
continue
arr = ss - np.mean(ss)
window = np.hanning(len(arr))
arr_w = arr * window
fft_vals = np.fft.rfft(arr_w)
freqs = np.fft.rfftfreq(len(arr_w), dt_local)
amp = np.abs(fft_vals)
if len(amp) < 3:
continue
idx = np.argmax(amp[1:]) + 1
freqs_out[i] = float(freqs[idx])
return freqs_out
def radial_mask(shape, dx, r_max):
N = shape[0]
x = np.arange(N) * dx
X, Y = np.meshgrid(x, x)
center = (N * dx) / 2.0
R = np.sqrt((X - center)**2 + (Y - center)**2)
return (R < r_max)
def compute_core_norm(field, dx, r_max=2.0):
"""
Approximate L2 norm inside radius r_max.
"""
N = field.shape[0]
x = np.arange(N) * dx
X, Y = np.meshgrid(x, x)
center = (N * dx) / 2.0
R = np.sqrt((X - center)**2 + (Y - center)**2)
mask = (R < r_max)
return float(np.sum(np.abs(field[mask])**2) * dx * dx)
def run_segment(psi_init, pi_init, kappa, t_max, save_prefix, out_dir):
total_steps = int(t_max / DT)
startup_audit(save_prefix, kappa, psi_init)
psi_file, pi_file, hist_file = run_test10_core_solver(
psi_init=psi_init,
pi_init=pi_init,
dx=DX,
dt=DT,
total_steps=total_steps,
v=V,
mu=MU,
lam=LAM,
kappa=kappa,
sponge_width=40,
save_prefix=save_prefix
)
with open(hist_file, "r") as f:
history = json.load(f)
for key in REQUIRED_HISTORY_FIELDS:
if key not in history:
raise KeyError(f"Required history field missing: {key}")
save_history_block(out_dir, save_prefix, history)
psi_final = np.load(psi_file)
pi_final = np.load(pi_file)
return psi_final, pi_final, history
def fit_relaxation_tau(t, deviation_series, t0=None, min_points=30):
"""
Fit deviation(t) ~ exp(-(t - t0)/tau),
where deviation_series is already |observable - equilibrium|.
"""
t = np.array(t)
deviation_series = np.array(deviation_series)
if t0 is None:
t0 = t[0]
mask = (
(t >= t0)
& np.isfinite(t)
& np.isfinite(deviation_series)
)
if mask.sum() < min_points:
return None
x = t[mask]
dy = np.maximum(deviation_series[mask], 1e-20)
y = np.log(dy)
finite = np.isfinite(x) & np.isfinite(y)
if finite.sum() < min_points:
return None
x = x[finite]
y = y[finite]
a, b = np.polyfit(x, y, 1)
if a >= 0:
return None
tau = -1.0 / a
return float(tau)
# ------------------------------------------------------------
# TEST 10.8A — Critical Transition Verification
# ------------------------------------------------------------
def test_10_8A():
print("\n=== TEST 10.8A — Critical Transition Verification ===")
psi0, pi0 = load_initial_state()
kappas = np.arange(0.08, 0.201, 0.01)
t_max = 500.0
out_dir = os.path.join(RESULTS_DIR, "10_8A")
os.makedirs(out_dir, exist_ok=True)
summary = []
for kappa in kappas:
kappa = float(kappa)
label = f"10_8A_kappa_{kappa:.3f}"
psi_f, pi_f, hist = run_segment(
psi_init=psi0,
pi_init=pi0,
kappa=kappa,
t_max=t_max,
save_prefix=label,
out_dir=out_dir
)
t = np.array(hist["t"])
core_norm = np.array(hist["norm_core_r<2"])
shell_norm = np.array(hist["norm_shell_10_20"])
mask_late = (t >= 0.5 * t.max())
gamma_fit = fit_log_gamma(t, core_norm, mask=mask_late, min_points=30)
gamma_roll = rolling_gamma(t, core_norm, window_T=50.0, min_points=20)
np.save(os.path.join(out_dir, f"gamma_roll_kappa_{kappa:.3f}.npy"), gamma_roll)
freq_core = dominant_frequency(t, core_norm)
freq_halo = dominant_frequency(t, shell_norm)
delta_f = None
if freq_core is not None and freq_halo is not None:
delta_f = float(freq_core - freq_halo)
core_change = 100.0 * (core_norm[-1] - core_norm[0]) / max(core_norm[0], 1e-20)
halo_change = 100.0 * (shell_norm[-1] - shell_norm[0]) / max(shell_norm[0], 1e-20)
entry = {
"kappa": float(kappa),
"gamma_core": gamma_fit["gamma"],
"gamma_err": gamma_fit["gamma_err"],
"gamma_R2": gamma_fit["R2"],
"gamma_n": gamma_fit["n_points"],
"core_norm_change_pct": float(core_change),
"halo_norm_change_pct": float(halo_change),
"freq_core": freq_core,
"freq_halo": freq_halo,
"delta_f": delta_f
}
summary.append(entry)
with open(os.path.join(out_dir, "summary_10_8A.json"), "w") as f:
json.dump(summary, f, indent=2)
# ------------------------------------------------------------
# TEST 10.8B — Breather Survival Test
# ------------------------------------------------------------
def test_10_8B():
print("\n=== TEST 10.8B — Breather Survival Test ===")
psi0, pi0 = load_initial_state()
kappa = 0.15
t_max = 1000.0
out_dir = os.path.join(RESULTS_DIR, "10_8B")
os.makedirs(out_dir, exist_ok=True)
label = f"10_8B_kappa_{kappa:.2f}"
psi_f, pi_f, hist = run_segment(
psi_init=psi0,
pi_init=pi0,
kappa=kappa,
t_max=t_max,
save_prefix=label,
out_dir=out_dir
)
t = np.array(hist["t"])
core_norm = np.array(hist["norm_core_r<2"])
shell_norm = np.array(hist["norm_shell_10_20"])
center_amp2 = np.array(hist["center_amp2"])
energy = np.array(hist["energy"])
gamma_roll = rolling_gamma(t, core_norm, window_T=50.0, min_points=20)
np.save(os.path.join(out_dir, "gamma_roll_10_8B.npy"), gamma_roll)
freq_core = dominant_frequency(t, core_norm)
freq_halo = dominant_frequency(t, shell_norm)
energy_core = hist.get("energy_core", None)
energy_halo = hist.get("energy_halo", None)
summary = {
"kappa": kappa,
"freq_core": freq_core,
"freq_halo": freq_halo,
"core_norm_initial": float(core_norm[0]),
"core_norm_final": float(core_norm[-1]),
"amp_max": float(np.max(np.sqrt(np.maximum(center_amp2, 0.0)))),
"energy_initial": float(energy[0]),
"energy_final": float(energy[-1]),
"has_energy_core": energy_core is not None,
"has_energy_halo": energy_halo is not None
}
with open(os.path.join(out_dir, "summary_10_8B.json"), "w") as f:
json.dump(summary, f, indent=2)
# ------------------------------------------------------------
# TEST 10.8C — Recovery / Attractor Test
# ------------------------------------------------------------
def test_10_8C():
print("\n=== TEST 10.8C — Recovery / Attractor Test ===")
psi0, pi0 = load_initial_state()
kappa = 0.15
out_dir = os.path.join(RESULTS_DIR, "10_8C")
os.makedirs(out_dir, exist_ok=True)
# Pre-kick evolution
t1 = 100.0
label1 = "10_8C_kappa_0.15_pre_kick"
psi_mid, pi_mid, hist1 = run_segment(
psi_init=psi0,
pi_init=pi0,
kappa=kappa,
t_max=t1,
save_prefix=label1,
out_dir=out_dir
)
t_pre = np.array(hist1["t"])
core_pre = np.array(hist1["norm_core_r<2"])
amp2_pre = np.array(hist1["center_amp2"])
freq_pre = dominant_frequency(t_pre, core_pre)
ref_core = float(core_pre[-1])
ref_amp = float(np.sqrt(max(amp2_pre[-1], 0.0)))
ref_freq = freq_pre
# Kick
rng = np.random.default_rng()
psi_kicked = psi_mid * (1.0 + 0.02 * rng.standard_normal(size=psi_mid.shape))
pi_kicked = pi_mid * (1.0 + 0.02 * rng.standard_normal(size=pi_mid.shape))
# Post-kick evolution
t2 = 400.0
label2 = "10_8C_kappa_0.15_post_kick"
psi_f, pi_f, hist2 = run_segment(
psi_init=psi_kicked,
pi_init=pi_kicked,
kappa=kappa,
t_max=t2,
save_prefix=label2,
out_dir=out_dir
)
t_post_local = np.array(hist2["t"])
t_post = t_post_local + t1
core_post = np.array(hist2["norm_core_r<2"])
amp2_post = np.array(hist2["center_amp2"])
freq_roll = rolling_frequency(t_post, core_post, window_T=50.0, min_points=40)
np.save(os.path.join(out_dir, "freq_roll_10_8C.npy"), freq_roll)
tol = 0.05
recovered_time = None
if ref_freq is not None:
freq_res = 1.0 / 50.0
freq_tol = max(tol * abs(ref_freq), freq_res)
else:
freq_tol = None
for ti, cn, a2, f_now in zip(t_post, core_post, amp2_post, freq_roll):
amp = float(np.sqrt(max(a2, 0.0)))
freq_ok = True
if ref_freq is not None:
if not np.isfinite(f_now):
freq_ok = False
else:
freq_ok = abs(f_now - ref_freq) <= freq_tol
if (abs(cn - ref_core) <= tol * abs(ref_core) and
abs(amp - ref_amp) <= tol * abs(ref_amp) and
freq_ok):
recovered_time = float(ti)
break
summary = {
"kappa": kappa,
"ref_core_norm": ref_core,
"ref_amp": ref_amp,
"ref_freq": ref_freq,
"recovery_time": recovered_time
}
with open(os.path.join(out_dir, "summary_10_8C.json"), "w") as f:
json.dump(summary, f, indent=2)
# ------------------------------------------------------------
# TEST 10.8D — Deliberate Damage Test
# ------------------------------------------------------------
def test_10_8D():
print("\n=== TEST 10.8D — Deliberate Damage Test ===")
psi0, pi0 = load_initial_state()
kappa = 0.15
out_dir = os.path.join(RESULTS_DIR, "10_8D")
os.makedirs(out_dir, exist_ok=True)
damage_levels = [0.10, 0.20, 0.40, 0.60]
all_entries = []
for dmg in damage_levels:
print(f" Damage level: {int(dmg*100)}%")
t1 = 100.0
label1 = f"10_8D_kappa_0.15_pre_damage_{int(dmg*100)}"
psi_mid, pi_mid, hist1 = run_segment(
psi_init=psi0,
pi_init=pi0,
kappa=kappa,
t_max=t1,
save_prefix=label1,
out_dir=out_dir
)
core_pre = np.array(hist1["norm_core_r<2"])
pre_core = float(core_pre[-1])
mask_core = radial_mask(psi_mid.shape, DX, r_max=1.0)
psi_damaged = psi_mid.copy()
pi_damaged = pi_mid.copy()
psi_damaged[mask_core] *= (1.0 - dmg)
pi_damaged[mask_core] *= (1.0 - dmg)
damaged_core = compute_core_norm(psi_damaged, DX, r_max=2.0)
t2 = 400.0
label2 = f"10_8D_kappa_0.15_post_damage_{int(dmg*100)}"
psi_f, pi_f, hist2 = run_segment(
psi_init=psi_damaged,
pi_init=pi_damaged,
kappa=kappa,
t_max=t2,
save_prefix=label2,
out_dir=out_dir
)
core_post = np.array(hist2["norm_core_r<2"])
final_core = float(core_post[-1])
denom = pre_core - damaged_core
if abs(denom) > 1e-20:
healing_fraction = (final_core - damaged_core) / denom
else:
healing_fraction = None
entry = {
"kappa": kappa,
"damage_level": float(dmg),
"pre_core_norm": pre_core,
"damaged_core_norm": damaged_core,
"final_core_norm": final_core,
"healing_fraction": healing_fraction
}
all_entries.append(entry)
with open(os.path.join(out_dir, "summary_10_8D.json"), "w") as f:
json.dump(all_entries, f, indent=2)
# ------------------------------------------------------------
# TEST 10.8E — Critical Slowing Down
# ------------------------------------------------------------
def test_10_8E():
print("\n=== TEST 10.8E — Critical Slowing Down ===")
psi0, pi0 = load_initial_state()
kappas = [0.08, 0.10, 0.12, 0.14, 0.16, 0.18, 0.20]
out_dir = os.path.join(RESULTS_DIR, "10_8E")
os.makedirs(out_dir, exist_ok=True)
tau_data = []
for kappa in kappas:
print(f" κ = {kappa:.2f}")
t1 = 100.0
label1 = f"10_8E_kappa_{kappa:.2f}_pre_kick"
psi_mid, pi_mid, hist1 = run_segment(
psi_init=psi0,
pi_init=pi0,
kappa=kappa,
t_max=t1,
save_prefix=label1,
out_dir=out_dir
)
t_pre = np.array(hist1["t"])
core_pre = np.array(hist1["norm_core_r<2"])
ref_core = float(core_pre[-1])
rng = np.random.default_rng(seed=12345)
psi_kicked = psi_mid * (1.0 + 0.02 * rng.standard_normal(size=psi_mid.shape))
pi_kicked = pi_mid * (1.0 + 0.02 * rng.standard_normal(size=pi_mid.shape))
t2 = 400.0
label2 = f"10_8E_kappa_{kappa:.2f}_post_kick"
psi_f, pi_f, hist2 = run_segment(
psi_init=psi_kicked,
pi_init=pi_kicked,
kappa=kappa,
t_max=t2,
save_prefix=label2,
out_dir=out_dir
)
t_post_local = np.array(hist2["t"])
t_post = t_post_local + t1
core_post = np.array(hist2["norm_core_r<2"])
delta_core = np.abs(core_post - ref_core)
tau_core = fit_relaxation_tau(t_post, delta_core, t0=t1, min_points=40)
tau_data.append({
"kappa": float(kappa),
"tau_core": tau_core
})
with open(os.path.join(out_dir, "tau_recover_10_8E.json"), "w") as f:
json.dump(tau_data, f, indent=2)
# ------------------------------------------------------------
# TEST 10.8F — Hysteresis Probe
# ------------------------------------------------------------
def test_10_8F():
print("\n=== TEST 10.8F — Hysteresis Probe ===")
psi0, pi0 = load_initial_state()
out_dir = os.path.join(RESULTS_DIR, "10_8F")
os.makedirs(out_dir, exist_ok=True)
sequence = [0.05, 0.15, 0.25, 0.15, 0.05]
t_segment = 200.0
psi_curr, pi_curr = psi0.copy(), pi0.copy()
entries = []
for idx, kappa in enumerate(sequence):
label = f"10_8F_step_{idx}_kappa_{kappa:.2f}"
psi_curr, pi_curr, hist = run_segment(
psi_init=psi_curr,
pi_init=pi_curr,
kappa=kappa,
t_max=t_segment,
save_prefix=label,
out_dir=out_dir
)
t = np.array(hist["t"])
core_norm = np.array(hist["norm_core_r<2"])
shell_norm = np.array(hist["norm_shell_10_20"])
mask_late = (t >= 0.5 * t.max())
gamma_fit = fit_log_gamma(t, core_norm, mask=mask_late, min_points=20)
freq_core = dominant_frequency(t, core_norm)
freq_halo = dominant_frequency(t, shell_norm)
delta_f = None
if freq_core is not None and freq_halo is not None:
delta_f = float(freq_core - freq_halo)
entries.append({
"step_index": idx,
"kappa": float(kappa),
"gamma_core": gamma_fit["gamma"],
"gamma_err": gamma_fit["gamma_err"],
"gamma_R2": gamma_fit["R2"],
"freq_core": freq_core,
"freq_halo": freq_halo,
"delta_f": delta_f,
"core_norm_final": float(core_norm[-1])
})
with open(os.path.join(out_dir, "summary_10_8F.json"), "w") as f:
json.dump(entries, f, indent=2)
# ------------------------------------------------------------
# TEST 10.8G — Basin of Attraction Mapping (optional)
# ------------------------------------------------------------
def test_10_8G():
print("\n=== TEST 10.8G — Basin of Attraction Mapping ===")
psi0, pi0 = load_initial_state()
kappa = 0.15
out_dir = os.path.join(RESULTS_DIR, "10_8G")
os.makedirs(out_dir, exist_ok=True)
kick_levels = [0.01, 0.05, 0.10, 0.20, 0.40, 0.60]
t1 = 100.0
t2 = 400.0
all_entries = []
for lvl in kick_levels:
print(f" Kick level: {int(lvl*100)}%")
label1 = f"10_8G_kappa_0.15_pre_kick_{int(lvl*100)}"
psi_mid, pi_mid, hist1 = run_segment(
psi_init=psi0,
pi_init=pi0,
kappa=kappa,
t_max=t1,
save_prefix=label1,
out_dir=out_dir
)
t_pre = np.array(hist1["t"])
core_pre = np.array(hist1["norm_core_r<2"])
amp2_pre = np.array(hist1["center_amp2"])
ref_core = float(core_pre[-1])
ref_amp = float(np.sqrt(max(amp2_pre[-1], 0.0)))
rng = np.random.default_rng(seed=123 + int(lvl*1000))
psi_kicked = psi_mid * (1.0 + lvl * rng.standard_normal(size=psi_mid.shape))
pi_kicked = pi_mid * (1.0 + lvl * rng.standard_normal(size=pi_mid.shape))
label2 = f"10_8G_kappa_0.15_post_kick_{int(lvl*100)}"
psi_f, pi_f, hist2 = run_segment(
psi_init=psi_kicked,
pi_init=pi_kicked,
kappa=kappa,
t_max=t2,
save_prefix=label2,
out_dir=out_dir
)
core_post = np.array(hist2["norm_core_r<2"])
amp2_post = np.array(hist2["center_amp2"])
final_core = float(core_post[-1])
final_amp = float(np.sqrt(max(amp2_post[-1], 0.0)))
ratio = final_core / max(ref_core, 1e-20)
if ratio > 0.9:
state_label = "recovered"
elif ratio > 0.3:
state_label = "modified"
else:
state_label = "collapsed"
all_entries.append({
"kappa": kappa,
"kick_level": float(lvl),
"ref_core_norm": ref_core,
"ref_amp": ref_amp,
"final_core_norm": final_core,
"final_amp": final_amp,
"state_label": state_label
})
with open(os.path.join(out_dir, "summary_10_8G.json"), "w") as f:
json.dump(all_entries, f, indent=2)
# ------------------------------------------------------------
# MAIN
# ------------------------------------------------------------
def main():
print("====================================================")
print(" Test 10.8 — Critical Transition, Breather & Memory")
print("====================================================")
print(f"DT : {DT}")
print(f"DX : {DX}")
print(f"RESULTS : {RESULTS_DIR}")
print("====================================================")
if RUN_10_8A:
test_10_8A()
if RUN_10_8B:
test_10_8B()
if RUN_10_8C:
test_10_8C()
if RUN_10_8D:
test_10_8D()
if RUN_10_8E:
test_10_8E()
if RUN_10_8F:
test_10_8F()
if RUN_10_8G:
test_10_8G()
if IN_COLAB:
import shutil as _shutil
zip_name = "Test10_8_results.zip"
if os.path.exists(zip_name):
os.remove(zip_name)
_shutil.make_archive("Test10_8_results", "zip", RESULTS_DIR)
files.download(zip_name)
print("Triggered download of:", zip_name)
if __name__ == "__main__":
main()