Note
Go to the end to download the full example code.
Example: 5D Operating Window Exploration for VedEf Parameters.
Warning
Equations are AI generated, not ready for production.
This example explores the 5-dimensional parameter space: - V (kV): High voltage - e (mm): Electrode gap distance - d_t (mm): Torch diameter - f (kHz): Pulse frequency - E_p (mJ): Energy per pulse
With 10 physics-based constraints to identify the feasible operating window.
Usage:
python example_operating_window_5d.py
The script will:
Set up the VedEfDesignGroup with all models and constraints
Run a parameter sweep in parallel
Save results to SQLite database
Display summary statistics
import importlib.util
import os
from concurrent.futures import ProcessPoolExecutor
from itertools import product
from pathlib import Path
import numpy as np
# Import setup_vedef_problem from the numbered file
spec = importlib.util.spec_from_file_location(
"problem_module", Path(__file__).parent / "1_setup_vedef_problem.py"
)
problem_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(problem_module)
setup_vedef_problem = problem_module.setup_vedef_problem
def evaluate_single_case(case_params):
"""Evaluate a single parameter case.
Parameters
----------
case_params : tuple
(V, e, d_t, f, E_p) parameter values
Returns
-------
dict or None
Results dictionary if successful, None if failed
"""
V, e, d_t, f, E_p = case_params
try:
# Create a fresh problem for this case
prob = setup_vedef_problem(target_power=25000.0, max_energy_density=1e9, as_subsystem=False)
# Set fixed parameters
prob.set_val("pulse_duration", 1e-6, units="s") # 1 μs
prob.set_val("TP_QM", 160.0 / 86400.0, units="kg/s") # 160 kg/day
prob.set_val("ThermalDiameterModel.gas_density", 0.717, units="kg/m**3") # CH4 at STP
prob.set_val("ThermalDiameterModel.gas_heat_capacity", 2200.0, units="J/(kg*K)")
prob.set_val("ThermalDiameterModel.thermal_conductivity", 0.08, units="W/(m*K)")
prob.set_val("InitialTemperatureModel.gas_density", 0.717, units="kg/m**3")
prob.set_val("BreakdownModel.gas_properties_pressure", 101325.0, units="Pa")
# Set design variables for this case (using ARC002 naming convention)
prob.set_val("G_UMAX_OUT", V, units="V")
prob.set_val("G_e", e, units="m")
prob.set_val("TP_D_OUT", d_t, units="m")
prob.set_val("G_F", f, units="Hz")
prob.set_val("G_Ep", E_p, units="J")
# Run the model
prob.run_model()
# Extract results
results = {
"V": V,
"e": e,
"d_t": d_t,
"f": f,
"E_p": E_p,
"T_0": prob.get_val("T_0", units="K")[0],
"T_max": prob.get_val("T_max", units="K")[0],
"breakdown_voltage": prob.get_val("breakdown_voltage", units="V")[0],
"breakdown_margin": prob.get_val("breakdown_margin", units="V")[0],
"coverage_fraction": prob.get_val("coverage_fraction")[0],
"coverage_margin": prob.get_val("coverage_margin")[0],
"energy_density": prob.get_val("energy_density", units="J/m**3")[0],
"density_margin": prob.get_val("density_margin", units="J/m**3")[0],
"flow_velocity": prob.get_val("flow_velocity", units="m/s")[0],
"G_PW_OUT": prob.get_val("G_PW_OUT", units="W")[0],
"power_error": prob.get_val("PowerConstraint.power_error")[0],
}
# Check for NaN or infinite values
for key, value in results.items():
if key in ["V", "e", "d_t", "f", "E_p"]:
continue # Skip input parameters
if not np.isfinite(value):
return None
return results
except Exception:
return None
def run_parameter_sweep(n_procs=None):
"""Run 5D parameter space exploration.
Parameters
----------
n_procs : int, optional
Number of processors to use. If None, uses half of available cores.
Returns
-------
results : dict
Dictionary containing sweep results
"""
print("\n" + "=" * 70)
print(" VedEf Operating Window - 5D Parameter Sweep")
print("=" * 70)
# Define parameter ranges (coarse grid for demonstration)
V_range = np.linspace(10e3, 60e3, 6) # 10-60 kV
e_range = np.linspace(0.005, 0.025, 5) # 5-25 mm
d_t_range = np.linspace(0.015, 0.035, 4) # 15-35 mm
f_range = np.logspace(4, 5.3, 5) # 10-200 kHz
E_p_range = np.linspace(1e-3, 30e-3, 6) # 1-30 mJ
print("\nParameter ranges:")
print(
f" V: {V_range.min() / 1e3:.0f} - {V_range.max() / 1e3:.0f} kV ({len(V_range)} levels)"
)
print(
f" e: {e_range.min() * 1e3:.1f} - {e_range.max() * 1e3:.1f} mm ({len(e_range)} levels)"
)
print(
f" d_t: {d_t_range.min() * 1e3:.1f} - {d_t_range.max() * 1e3:.1f} mm ({len(d_t_range)} levels)"
)
print(
f" f: {f_range.min() / 1e3:.0f} - {f_range.max() / 1e3:.0f} kHz ({len(f_range)} levels)"
)
print(
f" E_p: {E_p_range.min() * 1e3:.1f} - {E_p_range.max() * 1e3:.1f} mJ ({len(E_p_range)} levels)"
)
# Generate all parameter combinations
param_cases = list(product(V_range, e_range, d_t_range, f_range, E_p_range))
total_evals = len(param_cases)
print(f"\nTotal evaluations: {total_evals}")
# Determine number of processors
if n_procs is None:
total_cores = os.cpu_count() if os.cpu_count() is not None else 1
n_procs = max(1, total_cores // 2)
print(f"Using {n_procs} processors for parallel execution")
# Initialize results dictionary
results = {
"V": [],
"e": [],
"d_t": [],
"f": [],
"E_p": [],
"T_0": [],
"T_max": [],
"breakdown_voltage": [],
"breakdown_margin": [],
"coverage_fraction": [],
"coverage_margin": [],
"energy_density": [],
"density_margin": [],
"flow_velocity": [],
"G_PW_OUT": [],
"power_error": [],
"converged": [],
}
successful_count = 0
failed_count = 0
print("\nRunning parallel parameter sweep...")
# Run cases in parallel (or sequentially for debugging)
if n_procs > 1:
with ProcessPoolExecutor(max_workers=n_procs) as executor:
case_results = list(executor.map(evaluate_single_case, param_cases))
else:
# Sequential execution for debugging
case_results = [evaluate_single_case(case) for case in param_cases]
# Process results
for i, result in enumerate(case_results):
V, e, d_t, f, E_p = param_cases[i]
if result is not None:
# Converged case
for key, value in result.items():
results[key].append(value)
results["converged"].append(True)
successful_count += 1
else:
# Non-converged case
results["V"].append(V)
results["e"].append(e)
results["d_t"].append(d_t)
results["f"].append(f)
results["E_p"].append(E_p)
for key in [
"T_0",
"T_max",
"breakdown_voltage",
"breakdown_margin",
"coverage_fraction",
"coverage_margin",
"energy_density",
"density_margin",
"flow_velocity",
"G_PW_OUT",
"power_error",
]:
results[key].append(np.nan)
results["converged"].append(False)
failed_count += 1
print(f"\nCompleted {successful_count + failed_count} evaluations")
print(f" Successful: {successful_count}")
print(f" Failed: {failed_count}")
# Convert to numpy arrays
for key in results:
results[key] = np.array(results[key])
return results
def analyze_feasibility(results):
"""Analyze and display feasibility statistics.
Parameters
----------
results : dict
Results from parameter sweep
"""
print("\n" + "-" * 70)
print(" Feasibility Analysis")
print("-" * 70)
# Create masks
converged_mask = results["converged"]
n_converged = np.sum(converged_mask)
if n_converged == 0:
print("No converged solutions found!")
return
# Define constraint satisfaction (for converged points only)
breakdown_ok = results["breakdown_margin"][converged_mask] > 0
coverage_ok = results["coverage_margin"][converged_mask] > 0
density_ok = results["density_margin"][converged_mask] > 0
power_ok = results["power_error"][converged_mask] < 0.1 # 10% tolerance
# Combined feasibility
all_constraints = breakdown_ok & coverage_ok & density_ok & power_ok
n_feasible = np.sum(all_constraints)
print(f"\nTotal points: {len(results['V'])}")
print(f"Converged: {n_converged} ({n_converged / len(results['V']) * 100:.1f}%)")
print(f"Feasible: {n_feasible} ({n_feasible / len(results['V']) * 100:.1f}%)")
if n_feasible > 0:
print("\nFeasible parameter ranges:")
feasible_idx = np.where(converged_mask)[0][all_constraints]
print(
f" V: {results['V'][feasible_idx].min() / 1e3:.1f} - "
f"{results['V'][feasible_idx].max() / 1e3:.1f} kV"
)
print(
f" e: {results['e'][feasible_idx].min() * 1e3:.1f} - "
f"{results['e'][feasible_idx].max() * 1e3:.1f} mm"
)
print(
f" d_t: {results['d_t'][feasible_idx].min() * 1e3:.1f} - "
f"{results['d_t'][feasible_idx].max() * 1e3:.1f} mm"
)
print(
f" f: {results['f'][feasible_idx].min() / 1e3:.0f} - "
f"{results['f'][feasible_idx].max() / 1e3:.0f} kHz"
)
print(
f" E_p: {results['E_p'][feasible_idx].min() * 1e3:.1f} - "
f"{results['E_p'][feasible_idx].max() * 1e3:.1f} mJ"
)
print("\nConstraint violation breakdown (converged points):")
print(f" Breakdown violated: {np.sum(~breakdown_ok)}")
print(f" Coverage violated: {np.sum(~coverage_ok)}")
print(f" Density violated: {np.sum(~density_ok)}")
print(f" Power violated: {np.sum(~power_ok)}")
def main():
"""Execute parameter sweep and save results."""
# Run parameter sweep (use n_procs=1 for debugging)
results = run_parameter_sweep(n_procs=1)
# Analyze feasibility
analyze_feasibility(results)
# Create output directory if it doesn't exist
output_dir = "sweep_output"
os.makedirs(output_dir, exist_ok=True)
# Save results to NumPy file (SQLite save requires different key structure)
output_file = os.path.join(output_dir, "operating_window_results.npz")
print(f"\nSaving results to: {output_file}")
np.savez_compressed(output_file, **results)
print("\n" + "=" * 70)
print(" Sweep complete!")
print("=" * 70)
print("\nNext steps:")
print(" 1. Run visualize_operating_window.py to see 2D slices")
print(" 2. Run dashboard_operating_window.py for interactive exploration")
print(" 3. Run generate_vedef_diagram.py to see model connectivity")
print()
if __name__ == "__main__":
main()