.. DO NOT EDIT. .. THIS FILE WAS AUTOMATICALLY GENERATED BY SPHINX-GALLERY. .. TO MAKE CHANGES, EDIT THE SOURCE PYTHON FILE: .. "auto_examples/VDEF_optim/example_vdef_mdao.py" .. LINE NUMBERS ARE GIVEN BELOW. .. only:: html .. note:: :class: sphx-glr-download-link-note :ref:`Go to the end ` to download the full example code. .. rst-class:: sphx-glr-example-title .. _sphx_glr_auto_examples_VDEF_optim_example_vdef_mdao.py: Example: Direct MDAO problem setup for plasma torch parameter space exploration. .. warning:: Equations are AI generated, not ready for production. This example demonstrates how to: 1. Set up the problem directly in Python (no STONE config) 2. Use RepetitivePulseBreakdownModel with: - Pulse duration effects (ionization avalanche kinetics: e + M → e + e + M+) - Memory effects from repeated pulsing - Field enhancement factors 3. Explore feasible parameter space with power and breakdown constraints 4. Visualize the solution space 5. **Run parameter sweeps in parallel using OpenMDAO's DOEDriver** Optimization variables: - Frequency: 5 - 200 kHz - HV voltage (breakdown initiation): 5 - 80 kV - Gap distance: 6 - 30 mm Constraints: - Total power deposited = 25 kW - HV voltage ≥ Breakdown voltage (pulse duration & frequency dependent) - Generator HV operating window: f * V^2 / (Z * t_pulse) <= P_max (Z=50Ω, t=20ns, P_max=1kW) Fixed parameters: - Mass flow = 160 kg CH4/d - HV pulse duration = 1 µs (affects breakdown voltage) - Sustainer voltage = 500 V (independent of HV voltage) - Sustainer pulse duration = computed to match 25 kW target power Outputs: - Energy per pulse (calculated) - Breakdown voltage (pulse duration & frequency-dependent) - Feasible parameter space visualization Physics: - Short pulse durations require higher voltage for avalanche development - High pulse rates (many pulses/molecule) reduce breakdown voltage via memory effects - Transition from single-pulse to DC breakdown as frequency increases **Usage**: Standard execution (uses Python multiprocessing by default):: python example_vdef_mdao.py MPI-based parallel execution (Linux/Unix only, requires mpi4py and petsc4py):: mpiexec -n $($env:NUMBER_OF_PROCESSORS - 1) python example_vdef_mdao.py **Note**: On Windows, parallel execution uses Python's multiprocessing module by default. MPI with PETSc is not easily available on Windows. .. GENERATED FROM PYTHON SOURCE LINES 57-1166 .. code-block:: Python import numpy as np import openmdao.api as om from paroto.mdao.group import TorchDesignGroup from paroto.models.ablation import SimpleAblationModel from paroto.models.breakdown_voltage import RepetitivePulseBreakdownModel from paroto.models.gas_properties import GasHeatCapacityModel from paroto.utils.io_utils import write_results_to_sqlite def setup_problem_with_power_constraint(): r"""Set up OpenMDAO problem with 25 kW power constraint. .. warning:: Equations are AI generated, not ready for production. Uses RepetitivePulseBreakdownModel which accounts for: - Pulse duration effects (ionization avalanche time) - Memory effects from repeated pulsing - Field enhancement factors The problem includes the following key equations: Energy per pulse: .. math:: E_{pulse} = \\frac{P_{gas}}{f_{pulse}} Power constraint error: .. math:: \\epsilon_{power} = \\frac{P_{gas} - P_{target}}{P_{target}} where: - \\(E_{pulse}\\) is the energy deposited per pulse (J) - \\(P_{gas}\\) is the power transferred to gas (W) - \\(f_{pulse}\\) is the pulse frequency (Hz) - \\(P_{target}\\) = 25,000 W is the target power Returns ------- prob : om.Problem Configured problem ready to run """ # Create problem with torch design group prob = om.Problem() # Configure models breakdown_model = RepetitivePulseBreakdownModel( model_parameters={ "A_paschen": 24.4, "B_paschen": 6.73, "field_enhancement_factor": 1.0, "ionization_time": 1e-7, "pulse_amplitude_factor": 4.0, "decay_constant": 1.0, } ) ablation_model = SimpleAblationModel( model_parameters={"threshold_fraction": 0.8, "ablation_coefficient": 1.0e-5} ) gas_heat_capacity = GasHeatCapacityModel( model_parameters={"cp_coeff_a": 1700.0, "cp_coeff_b": 9.08, "cp_coeff_c": -0.002} ) # Create torch design group with models prob.model = TorchDesignGroup( max_current_density=1.0e6, breakdown_model=breakdown_model, ablation_model=ablation_model, gas_density=0.717, gas_viscosity=1.1e-5, gas_heat_capacity=gas_heat_capacity, ) # Add required energy per pulse calculation (from target power and frequency) prob.model.add_subsystem( "required_energy", om.ExecComp( "required_energy_per_pulse = target_power / (pulse_frequency + 1e-10)", required_energy_per_pulse={"units": "J"}, target_power={"units": "W", "val": 25000.0}, pulse_frequency={"units": "Hz"}, ), promotes=["pulse_frequency"], ) # Add actual energy per pulse calculation prob.model.add_subsystem( "actual_energy", om.ExecComp( "energy_per_pulse = arc_voltage * arc_current * sustainer_pulse_duration", energy_per_pulse={"units": "J"}, arc_voltage={"units": "V"}, arc_current={"units": "A"}, sustainer_pulse_duration={"units": "s"}, ), promotes=["arc_voltage", "arc_current"], ) # Use BalanceComp to iteratively solve for sustainer_pulse_duration # Balance: actual_energy = required_energy by adjusting sustainer_pulse_duration # Improved initial guess and scaling based on expected range: # At 5 kHz: E_pulse ≈ 5 J, with I≈4500A, V≈300V → t ≈ 3.7 µs # At 200 kHz: E_pulse ≈ 0.125 J, with I≈4500A, V≈300V → t ≈ 0.093 µs # Using 10 µs as middle-ground initial guess balance = prob.model.add_subsystem("balance", om.BalanceComp()) balance.add_balance( "sustainer_pulse_duration", units="s", eq_units="J", lhs_name="energy_per_pulse", rhs_name="required_energy_per_pulse", val=10e-6, # Better guess: 10 µs is middle of expected range lower=0.01e-6, # 0.01 µs (10 ns) minimum - allow very short pulses at high freq upper=1e-3, # 1 ms maximum ref=10e-6, # Reference for normalization (typical mid-range value) ref0=1e-6, # Lower reference bound ) # Connect balance output to system prob.model.connect("balance.sustainer_pulse_duration", "sustainer_pulse_duration") prob.model.connect("balance.sustainer_pulse_duration", "actual_energy.sustainer_pulse_duration") # Connect energies to balance prob.model.connect("actual_energy.energy_per_pulse", "balance.energy_per_pulse") prob.model.connect( "required_energy.required_energy_per_pulse", "balance.required_energy_per_pulse" ) # Add power check (should equal target power) prob.model.add_subsystem( "power_check", om.ExecComp( "power_error = (power_to_gas - target_power) / target_power", power_error={"val": 0.0}, power_to_gas={"units": "W"}, target_power={"units": "W", "val": 25000.0}, ), promotes=["power_to_gas"], ) # Set input defaults to resolve promotion ambiguities (after all subsystems added) prob.model.set_input_defaults("pulse_frequency", val=1000.0, units="Hz") prob.model.set_input_defaults("pulse_duration", val=1e-6, units="s") prob.model.set_input_defaults("hv_voltage", val=10000.0, units="V") prob.model.set_input_defaults("sustainer_voltage", val=500.0, units="V") prob.model.set_input_defaults("gap_distance", val=0.01, units="m") # sustainer_pulse_duration is set by the balance component # Add nonlinear solver to solve for sustainer_pulse_duration via balance # Use Newton solver with actual derivatives (now that declare_partials are fixed) prob.model.nonlinear_solver = om.NewtonSolver(maxiter=20, solve_subsystems=True) prob.model.nonlinear_solver.options["err_on_non_converge"] = False prob.model.nonlinear_solver.options["atol"] = 1e-6 prob.model.nonlinear_solver.options["rtol"] = 1e-6 prob.model.nonlinear_solver.linesearch = om.BoundsEnforceLS() prob.model.linear_solver = om.DirectSolver() return prob def evaluate_single_case(case_params): """Evaluate a single parameter case. Parameters ---------- case_params : tuple (frequency, hv_voltage, gap_distance) parameter values Returns ------- dict or None Results dictionary if successful, None if failed """ freq, hv_volt, gap = case_params try: # Create a fresh problem for this case prob = setup_problem_with_power_constraint() prob.setup() # Set fixed parameters mass_flow = 160 / 86400 # kg/d to kg/s prob.set_val("pulse_duration", 1e-6) prob.set_val("sustainer_voltage", 300.0) prob.set_val("mass_flow", mass_flow) prob.set_val("gas_properties_pressure", 101325.0) prob.set_val("preheat_temperature", 300.0) prob.set_val("electrode_radius", 0.005) prob.set_val("graphite_sublimation_temp", 3900.0) prob.set_val("graphite_heat_capacity", 710.0) prob.set_val("torch_length", 0.1) prob.set_val("torch_diameter", 0.02) # Set design variables for this case prob.set_val("pulse_frequency", freq) prob.set_val("hv_voltage", hv_volt) prob.set_val("gap_distance", gap) # Run the model prob.run_model() # Check if nonlinear solver converged solver = prob.model.nonlinear_solver if hasattr(solver, "_iter_count") and hasattr(solver, "options"): if solver._iter_count >= solver.options["maxiter"]: # Solver reached max iterations without converging return None # Extract results and check for NaN/invalid values results = { "frequency": freq, "hv_voltage": hv_volt, "gap": gap, "energy_per_pulse": prob.get_val("actual_energy.energy_per_pulse")[0], "power_to_gas": prob.get_val("power_to_gas")[0], "power_error": abs(prob.get_val("power_check.power_error")[0]), "breakdown_voltage": prob.get_val("breakdown_voltage")[0], "breakdown_margin": prob.get_val("breakdown_margin")[0], "arc_current": prob.get_val("arc_current")[0], "sustainer_pulse_duration": prob.get_val("balance.sustainer_pulse_duration")[0], "operating_window_power": prob.get_val("operating_window_power")[0], "hv_operating_window_satisfied": prob.get_val("hv_operating_window_satisfied")[0], } # Check for NaN or infinite values (sign of convergence failure) for key, value in results.items(): if key in ["frequency", "hv_voltage", "gap"]: continue # Skip input parameters if not np.isfinite(value): return None return results except Exception: # Return None for failed cases (exceptions, NaN, etc.) return None def run_parameter_sweep(use_multiprocessing=True, n_procs=None): r"""Run parameter space exploration using parallel DOEDriver. .. warning:: Equations are AI generated, not ready for production. Evaluates the model over a grid of parameters using OpenMDAO's DOEDriver with parallel execution support: .. math:: \\mathcal{P} = \\{(f, V_{HV}, d) : f \\in [5, 200]\\text{ kHz}, V_{HV} \\in [5, 80]\\text{ kV}, d \\in [6, 30]\\text{ mm}\\} where: - :math:`f` is the pulse frequency - :math:`V_{HV}` is the HV voltage (breakdown initiation) - :math:`d` is the gap distance Feasibility criterion: .. math:: |\\epsilon_{power}| < 0.10 Parameters ---------- use_multiprocessing : bool, optional Use Python multiprocessing for parallel execution (default: True). If False, uses MPI (requires mpiexec and petsc4py). n_procs : int, optional Number of processors to use. If None, uses all available cores minus 1. Returns ------- results : dict Dictionary containing sweep results """ import os print("\n" + "=" * 70) print(" VDEF MDAO Example - Parameter Space Exploration") if use_multiprocessing: print(" (Parallel execution via multiprocessing)") else: print(" (Parallel execution via MPI)") print("=" * 70) # Convert mass flow rate mass_flow = 160 / 86400 # kg/d to kg/s # Define parameter ranges (adjusted for 25 kW target) frequency_range = np.linspace(5000, 200000, 10) # Hz (5-200 kHz) hv_voltage_range = np.linspace(5000, 80000, 15) # V (5-80 kV HV voltage) gap_range = np.linspace(0.006, 0.030, 8) # m (6-30 mm) # Fixed parameters sustainer_voltage_fixed = 300.0 # V (constant, independent of HV voltage) hv_pulse_duration = 1e-6 # 1 µs (HV pulse duration) print("\nParameter ranges:") print( f" Frequency: {frequency_range.min() / 1000:.0f} - {frequency_range.max() / 1000:.0f} kHz" ) print( f" HV Voltage: {hv_voltage_range.min() / 1000:.0f} - " f"{hv_voltage_range.max() / 1000:.0f} kV" ) print(f" Gap: {gap_range.min() * 1000:.1f} - {gap_range.max() * 1000:.1f} mm") print(f" Sustainer Voltage: {sustainer_voltage_fixed:.0f} V (fixed)") print(f" HV Pulse Duration: {hv_pulse_duration * 1e6:.1f} µs (fixed)") print(" Target power: 25.0 kW") print(f" Mass flow: {mass_flow:.6f} kg/s (fixed)") # Create problem with DOEDriver prob = setup_problem_with_power_constraint() # Add design variables for DOE prob.model.add_design_var( "pulse_frequency", lower=frequency_range.min(), upper=frequency_range.max() ) prob.model.add_design_var( "hv_voltage", lower=hv_voltage_range.min(), upper=hv_voltage_range.max() ) prob.model.add_design_var("gap_distance", lower=gap_range.min(), upper=gap_range.max()) # Add objectives to record (not actual optimization objectives) prob.model.add_objective("power_to_gas") # Determine number of processors # Use half of available cores to avoid memory exhaustion (each worker loads full OpenMDAO model) if n_procs is None: total_cores = os.cpu_count() if os.cpu_count() is not None else 1 n_procs = max(1, total_cores // 2) # Use half of available cores if use_multiprocessing: print(f"\nUsing {n_procs} processors for parallel execution via multiprocessing") else: print("\nUsing MPI for parallel execution") # Create DOE driver with full factorial design prob.driver = om.DOEDriver( om.FullFactorialGenerator( levels={ "pulse_frequency": len(frequency_range), "hv_voltage": len(hv_voltage_range), "gap_distance": len(gap_range), } ) ) # Only enable OpenMDAO's parallel execution for MPI mode if not use_multiprocessing: prob.driver.options["run_parallel"] = True prob.driver.options["procs_per_model"] = 1 # Create persistent SQLite database for case recording recorder_file = "parameter_sweep_cases.sql" # Add recorder to capture all cases recorder = om.SqliteRecorder(recorder_file) prob.driver.add_recorder(recorder) prob.driver.recording_options["includes"] = [ "pulse_frequency", "hv_voltage", "gap_distance", "power_to_gas", "power_check.power_error", "actual_energy.energy_per_pulse", "breakdown_voltage", "breakdown_margin", "arc_current", "balance.sustainer_pulse_duration", "operating_window_power", "hv_operating_window_satisfied", ] prob.driver.recording_options["record_objectives"] = True prob.driver.recording_options["record_constraints"] = False prob.driver.recording_options["record_desvars"] = True # Setup and set fixed parameters prob.setup() prob.set_val("pulse_duration", hv_pulse_duration) prob.set_val("sustainer_voltage", sustainer_voltage_fixed) prob.set_val("mass_flow", mass_flow) prob.set_val("gas_properties_pressure", 101325.0) prob.set_val("preheat_temperature", 300.0) prob.set_val("electrode_radius", 0.005) prob.set_val("graphite_sublimation_temp", 3900.0) prob.set_val("graphite_heat_capacity", 710.0) prob.set_val("torch_length", 0.1) prob.set_val("torch_diameter", 0.02) total_evals = len(frequency_range) * len(hv_voltage_range) * len(gap_range) print("\nRunning parallel parameter sweep...") print(f" Total evaluations: {total_evals}") # Initialize results dictionary with convergence tracking results = { "frequency": [], "hv_voltage": [], "gap": [], "energy_per_pulse": [], "power_to_gas": [], "power_error": [], "breakdown_voltage": [], "breakdown_margin": [], "arc_current": [], "sustainer_pulse_duration": [], "operating_window_power": [], "hv_operating_window_satisfied": [], "converged": [], # Track convergence status } successful_count = 0 failed_count = 0 if use_multiprocessing: # Use multiprocessing for Windows from concurrent.futures import ProcessPoolExecutor from itertools import product # Generate all parameter combinations param_cases = list(product(frequency_range, hv_voltage_range, gap_range)) print(f" Using ProcessPoolExecutor with {n_procs} workers") # Run cases in parallel with ProcessPoolExecutor(max_workers=n_procs) as executor: case_results = list(executor.map(evaluate_single_case, param_cases)) # Process results - store all cases including failed ones for i, result in enumerate(case_results): freq, hv_volt, gap = param_cases[i] if result is not None: # Converged case for key, value in result.items(): results[key].append(value) results["converged"].append(True) successful_count += 1 else: # Non-converged case - store parameters but mark as failed results["frequency"].append(freq) results["hv_voltage"].append(hv_volt) results["gap"].append(gap) results["energy_per_pulse"].append(np.nan) results["power_to_gas"].append(np.nan) results["power_error"].append(np.nan) results["breakdown_voltage"].append(np.nan) results["breakdown_margin"].append(np.nan) results["arc_current"].append(np.nan) results["sustainer_pulse_duration"].append(np.nan) results["operating_window_power"].append(np.nan) results["hv_operating_window_satisfied"].append(np.nan) results["converged"].append(False) failed_count += 1 else: # Use MPI-based DOE driver (Linux/Unix with PETSc) print(f" Recording to: {recorder_file}") prob.run_driver() print("\nExtracting results from case recorder...") # Extract results from recorder cr = om.CaseReader(recorder_file) cases = cr.list_cases("driver") for case_id in cases: try: case = cr.get_case(case_id) # Extract design variables freq = case.get_val("pulse_frequency")[0] hv_volt = case.get_val("hv_voltage")[0] gap = case.get_val("gap_distance")[0] # Extract outputs power_to_gas = case.get_val("power_to_gas")[0] power_error = case.get_val("power_check.power_error")[0] energy_per_pulse = case.get_val("actual_energy.energy_per_pulse")[0] breakdown_voltage_val = case.get_val("breakdown_voltage")[0] breakdown_margin_val = case.get_val("breakdown_margin")[0] arc_current = case.get_val("arc_current")[0] sustainer_pulse_duration = case.get_val("balance.sustainer_pulse_duration")[0] operating_window_power = case.get_val("operating_window_power")[0] hv_operating_window_satisfied = case.get_val("hv_operating_window_satisfied")[0] # Store results results["frequency"].append(freq) results["hv_voltage"].append(hv_volt) results["gap"].append(gap) results["energy_per_pulse"].append(energy_per_pulse) results["power_to_gas"].append(power_to_gas) results["power_error"].append(abs(power_error)) results["breakdown_voltage"].append(breakdown_voltage_val) results["breakdown_margin"].append(breakdown_margin_val) results["arc_current"].append(arc_current) results["sustainer_pulse_duration"].append(sustainer_pulse_duration) results["operating_window_power"].append(operating_window_power) results["hv_operating_window_satisfied"].append(hv_operating_window_satisfied) results["converged"].append(True) successful_count += 1 except Exception as e: failed_count += 1 if failed_count <= 5: print(f" Warning: Failed to extract case {case_id}: {str(e)}") print(f"\nCompleted {successful_count + failed_count} evaluations") print(f" Successful: {successful_count}") print(f" Failed: {failed_count}") # Convert to numpy arrays for key in results: results[key] = np.array(results[key]) # For multiprocessing mode (Windows), manually write results to SQLite # This is necessary because ProcessPoolExecutor bypasses OpenMDAO's recorder if use_multiprocessing: print(f"\nWriting results to SQLite database: {recorder_file}") write_results_to_sqlite(results, recorder_file) print(f"Results saved to SQLite database: {recorder_file}") return results def visualize_parameter_space(results): """Visualize the feasible parameter space. .. warning:: Equations are AI generated, not ready for production. Parameters ---------- results : dict Results from parameter sweep """ print("\n" + "-" * 70) print(" Generating visualizations...") print("-" * 70) # Create three-way classification: feasible, converged-infeasible, non-converged tolerance = 0.10 converged_mask = np.array(results["converged"], dtype=bool) # For converged points, check feasibility power_feasible = np.array(results["power_error"]) < tolerance breakdown_feasible = np.array(results["breakdown_margin"]) > 0 hv_feasible = np.array(results["hv_operating_window_satisfied"]) >= 1.0 constraints_met = power_feasible & breakdown_feasible & hv_feasible # Three categories feasible_mask = converged_mask & constraints_met converged_infeasible_mask = converged_mask & ~constraints_met non_converged_mask = ~converged_mask n_total = len(results["frequency"]) n_feasible = np.sum(feasible_mask) n_converged_infeasible = np.sum(converged_infeasible_mask) n_non_converged = np.sum(non_converged_mask) n_converged = n_feasible + n_converged_infeasible print("\nConvergence and Feasibility analysis:") print(f" Total points evaluated: {n_total}") print(f" Converged: {n_converged} ({n_converged / n_total * 100:.1f}%)") print(f" - Feasible (meets constraints): {n_feasible} ({n_feasible / n_total * 100:.1f}%)") print(f" - Infeasible (violates constraints): {n_converged_infeasible}") print(f" Non-converged: {n_non_converged} ({n_non_converged / n_total * 100:.1f}%)") # Detailed constraint violation analysis (for converged points) if n_converged > 0: print("\n Constraint violation breakdown (converged points only):") n_power_violated = np.sum(converged_mask & ~power_feasible) n_breakdown_violated = np.sum(converged_mask & ~breakdown_feasible) n_hv_violated = np.sum(converged_mask & ~hv_feasible) print(f" - Power constraint violated: {n_power_violated}") print(f" - Breakdown constraint violated: {n_breakdown_violated}") print(f" - HV operating window violated: {n_hv_violated}") if n_feasible == 0: print("\n WARNING: No feasible points found.") # Check if we have any results at all if n_total == 0: print("\n ERROR: No successful evaluations. Check parameter ranges and model setup.") print("\n" + "=" * 70) return print("\n" + "-" * 70) print(" Closest Points Analysis") print("-" * 70) # Analyze the distribution of power errors power_errors = results["power_error"] power_actual = results["power_to_gas"] / 1000 # kW target_power = 25.0 # kW if len(power_errors) > 0: print("\nPower error statistics:") print(f" Min error: {power_errors.min() * 100:.1f}%") print(f" Median error: {np.median(power_errors) * 100:.1f}%") print(f" Max error: {power_errors.max() * 100:.1f}%") print("\nActual power range:") print(f" Min power: {power_actual.min():.2f} kW (target: {target_power} kW)") print(f" Max power: {power_actual.max():.2f} kW (target: {target_power} kW)") else: print("\n No power error data available - all evaluations failed.") print("\n" + "=" * 70) return # Find the N closest points n_closest = min(10, n_total) closest_indices = np.argsort(power_errors)[:n_closest] print(f"\n{n_closest} Closest points to target power:") print("-" * 125) print( f"{'#':<3} {'Freq (kHz)':<12} {'HV (kV)':<10} {'Gap (mm)':<10} " f"{'Power (kW)':<12} {'Error (%)':<10} {'V_bd (V)':<10} {'Margin (V)':<12}" ) print("-" * 125) for i, idx in enumerate(closest_indices, 1): freq = results["frequency"][idx] / 1000 # Convert to kHz volt = results["hv_voltage"][idx] / 1000 # Convert to kV gap = results["gap"][idx] * 1000 # Convert to mm power = results["power_to_gas"][idx] / 1000 # Convert to kW error = results["power_error"][idx] * 100 # Convert to % v_bd = results["breakdown_voltage"][idx] margin = results["breakdown_margin"][idx] print( f"{i:<3} {freq:<12.1f} {volt:<10.1f} {gap:<10.2f} " f"{power:<12.2f} {error:<10.1f} {v_bd:<10.0f} {margin:<12.0f}" ) print("-" * 125) # Provide diagnostic suggestions print("\nDiagnostic suggestions:") avg_power = power_actual.mean() if avg_power < target_power * 0.5: print( " -> Power is too low. Consider: increasing voltage, pulse duration, " "decreasing frequency, or increasing gap" ) elif avg_power > target_power * 1.5: print( " -> Power is too high. Consider: decreasing voltage, pulse duration, " "increasing frequency, or decreasing gap" ) else: print(" -> Power range overlaps target. Consider: refining grid resolution") best_idx = closest_indices[0] best_error = power_errors[best_idx] * 100 if best_error < 20: print(f" -> Best point is within {best_error:.1f}% - close to feasible!") print(" Try refining the search around the closest points shown above.") print("\n" + "=" * 70) return # Convert arrays for convenience freq_array = np.array(results["frequency"]) volt_array = np.array(results["hv_voltage"]) gap_array = np.array(results["gap"]) energy_array = np.array(results["energy_per_pulse"]) current_array = np.array(results["arc_current"]) # Extract feasible points freq_feas = freq_array[feasible_mask] / 1000 # Convert to kHz volt_feas = volt_array[feasible_mask] / 1000 # Convert to kV gap_feas = gap_array[feasible_mask] * 1000 # Convert to mm energy_feas = energy_array[feasible_mask] current_feas = current_array[feasible_mask] # Extract converged but infeasible points freq_conv_infeas = freq_array[converged_infeasible_mask] / 1000 volt_conv_infeas = volt_array[converged_infeasible_mask] / 1000 gap_conv_infeas = gap_array[converged_infeasible_mask] * 1000 energy_conv_infeas = energy_array[converged_infeasible_mask] current_conv_infeas = current_array[converged_infeasible_mask] # Extract non-converged points freq_non_conv = freq_array[non_converged_mask] / 1000 volt_non_conv = volt_array[non_converged_mask] / 1000 gap_non_conv = gap_array[non_converged_mask] * 1000 # Non-converged points have NaN for outputs, so we don't use them for coloring print("\nFeasible parameter ranges:") print(f" Frequency: {freq_feas.min():.1f} - {freq_feas.max():.1f} kHz") print(f" HV Voltage: {volt_feas.min():.1f} - {volt_feas.max():.1f} kV") print(f" Gap: {gap_feas.min():.1f} - {gap_feas.max():.1f} mm") print(f" Energy/pulse: {energy_feas.min():.1f} - {energy_feas.max():.1f} J") print(f" Arc current: {current_feas.min():.1f} - {current_feas.max():.1f} A") # Import matplotlib only in main process for visualization (avoid memory issues in workers) import matplotlib.pyplot as plt # Create figure with subplots fig = plt.figure(figsize=(16, 10)) fig.suptitle( f"Plasma Torch Parameter Space (25 kW Constraint) - " f"Converged: {n_converged}/{n_total}, Feasible: {n_feasible}/{n_total} " f"({n_feasible / n_total * 100:.1f}%)", fontsize=16, fontweight="bold", ) # Plot 1: Frequency vs Voltage, colored by Energy per pulse ax1 = fig.add_subplot(2, 3, 1) # Plot non-converged points (crosses) - gray, no color mapping if len(freq_non_conv) > 0: ax1.scatter( freq_non_conv, volt_non_conv, c="dimgray", marker="x", s=100, alpha=0.7, linewidth=2.5, label=f"Non-converged ({len(freq_non_conv)})", ) # Plot converged but infeasible points (triangles with red edge) if len(freq_conv_infeas) > 0: ax1.scatter( freq_conv_infeas, volt_conv_infeas, c=energy_conv_infeas, cmap="viridis", marker="^", s=80, alpha=0.8, edgecolors="red", linewidth=1.2, label=f"Conv. Infeasible ({len(freq_conv_infeas)})", ) # Plot feasible points (circles with green edge) scatter1 = ax1.scatter( freq_feas, volt_feas, c=energy_feas, cmap="viridis", marker="o", s=80, alpha=0.9, edgecolors="green", linewidth=1.2, label=f"Feasible ({len(freq_feas)})", ) ax1.set_xlabel("Frequency (kHz)", fontsize=11) ax1.set_ylabel("HV Voltage (kV)", fontsize=11) ax1.set_title("Frequency vs HV Voltage", fontweight="bold") ax1.grid(True, alpha=0.3) ax1.legend(loc="best", fontsize=8) cbar1 = plt.colorbar(scatter1, ax=ax1) cbar1.set_label("Energy/pulse (J)", fontsize=10) # Plot 2: Frequency vs Gap, colored by Energy per pulse ax2 = fig.add_subplot(2, 3, 2) # Plot non-converged points if len(freq_non_conv) > 0: ax2.scatter( freq_non_conv, gap_non_conv, c="dimgray", marker="x", s=100, alpha=0.7, linewidth=2.5, label="Non-converged", ) # Plot converged but infeasible points if len(freq_conv_infeas) > 0: ax2.scatter( freq_conv_infeas, gap_conv_infeas, c=energy_conv_infeas, cmap="viridis", marker="^", s=80, alpha=0.8, edgecolors="red", linewidth=1.2, label="Conv. Infeasible", ) # Plot feasible points scatter2 = ax2.scatter( freq_feas, gap_feas, c=energy_feas, cmap="viridis", marker="o", s=80, alpha=0.9, edgecolors="green", linewidth=1.2, label="Feasible", ) ax2.set_xlabel("Frequency (kHz)", fontsize=11) ax2.set_ylabel("Gap Distance (mm)", fontsize=11) ax2.set_title("Frequency vs Gap", fontweight="bold") ax2.grid(True, alpha=0.3) ax2.legend(loc="best", fontsize=8) cbar2 = plt.colorbar(scatter2, ax=ax2) cbar2.set_label("Energy/pulse (J)", fontsize=10) # Plot 3: Voltage vs Gap, colored by Energy per pulse ax3 = fig.add_subplot(2, 3, 3) # Plot non-converged points if len(volt_non_conv) > 0: ax3.scatter( volt_non_conv, gap_non_conv, c="dimgray", marker="x", s=100, alpha=0.7, linewidth=2.5, label="Non-converged", ) # Plot converged but infeasible points if len(volt_conv_infeas) > 0: ax3.scatter( volt_conv_infeas, gap_conv_infeas, c=energy_conv_infeas, cmap="viridis", marker="^", s=80, alpha=0.8, edgecolors="red", linewidth=1.2, label="Conv. Infeasible", ) # Plot feasible points scatter3 = ax3.scatter( volt_feas, gap_feas, c=energy_feas, cmap="viridis", marker="o", s=80, alpha=0.9, edgecolors="green", linewidth=1.2, label="Feasible", ) ax3.set_xlabel("HV Voltage (kV)", fontsize=11) ax3.set_ylabel("Gap Distance (mm)", fontsize=11) ax3.set_title("HV Voltage vs Gap", fontweight="bold") ax3.grid(True, alpha=0.3) ax3.legend(loc="best", fontsize=8) cbar3 = plt.colorbar(scatter3, ax=ax3) cbar3.set_label("Energy/pulse (J)", fontsize=10) # Plot 4: Frequency vs Voltage, colored by Arc Current ax4 = fig.add_subplot(2, 3, 4) # Plot non-converged points if len(freq_non_conv) > 0: ax4.scatter( freq_non_conv, volt_non_conv, c="dimgray", marker="x", s=100, alpha=0.7, linewidth=2.5, label="Non-converged", ) # Plot converged but infeasible points if len(freq_conv_infeas) > 0: ax4.scatter( freq_conv_infeas, volt_conv_infeas, c=current_conv_infeas, cmap="plasma", marker="^", s=80, alpha=0.8, edgecolors="red", linewidth=1.2, label="Conv. Infeasible", ) # Plot feasible points scatter4 = ax4.scatter( freq_feas, volt_feas, c=current_feas, cmap="plasma", marker="o", s=80, alpha=0.9, edgecolors="green", linewidth=1.2, label="Feasible", ) ax4.set_xlabel("Frequency (kHz)", fontsize=11) ax4.set_ylabel("HV Voltage (kV)", fontsize=11) ax4.set_title("Frequency vs HV Voltage (Arc Current)", fontweight="bold") ax4.grid(True, alpha=0.3) ax4.legend(loc="best", fontsize=8) cbar4 = plt.colorbar(scatter4, ax=ax4) cbar4.set_label("Arc Current (A)", fontsize=10) # Plot 5: Energy per pulse distribution - show all three categories ax5 = fig.add_subplot(2, 3, 5) # Plot converged infeasible distribution if len(energy_conv_infeas) > 0: ax5.hist( energy_conv_infeas, bins=30, color="orange", alpha=0.5, edgecolor="darkred", label=f"Conv. Infeasible ({len(energy_conv_infeas)})", linewidth=1.5, ) # Plot feasible distribution if len(energy_feas) > 0: ax5.hist( energy_feas, bins=30, color="steelblue", alpha=0.7, edgecolor="black", label=f"Feasible ({len(energy_feas)})", linewidth=1.5, ) ax5.set_xlabel("Energy per Pulse (J)", fontsize=11) ax5.set_ylabel("Frequency Count", fontsize=11) ax5.set_title("Energy per Pulse Distribution", fontweight="bold") ax5.grid(True, alpha=0.3, axis="y") # Add mean line for feasible points if len(energy_feas) > 0: ax5.axvline( energy_feas.mean(), color="darkblue", linestyle="--", linewidth=2, label=f"Feasible Mean: {energy_feas.mean():.1f} J", ) ax5.legend(fontsize=8) # Plot 6: 3D scatter (Frequency, Voltage, Gap) ax6 = fig.add_subplot(2, 3, 6, projection="3d") # Plot non-converged points if len(freq_non_conv) > 0: ax6.scatter( freq_non_conv, volt_non_conv, gap_non_conv, c="dimgray", marker="x", s=80, alpha=0.7, linewidth=2.5, label="Non-converged", ) # Plot converged but infeasible points if len(freq_conv_infeas) > 0: ax6.scatter( freq_conv_infeas, volt_conv_infeas, gap_conv_infeas, c=energy_conv_infeas, cmap="viridis", marker="^", s=50, alpha=0.8, edgecolors="red", linewidth=1.2, label="Conv. Infeasible", ) # Plot feasible points scatter6 = ax6.scatter( freq_feas, volt_feas, gap_feas, c=energy_feas, cmap="viridis", marker="o", s=50, alpha=0.9, edgecolors="green", linewidth=1.2, label="Feasible", ) ax6.set_xlabel("Frequency (kHz)", fontsize=10) ax6.set_ylabel("HV Voltage (kV)", fontsize=10) ax6.set_zlabel("Gap (mm)", fontsize=10) ax6.set_title("3D Parameter Space", fontweight="bold") ax6.legend(loc="best", fontsize=8) cbar6 = plt.colorbar(scatter6, ax=ax6, shrink=0.7) cbar6.set_label("Energy/pulse (J)", fontsize=9) plt.tight_layout() # Save figure output_path = "parameter_space_visualization.png" plt.savefig(output_path, dpi=150, bbox_inches="tight") print(f"\nVisualization saved to: {output_path}") # Show plot plt.show() print("\n" + "=" * 70) print(" Analysis complete!") print("=" * 70) def generate_n2_diagram(): """Generate N2 diagram for the MDAO problem. Creates an interactive HTML N2 diagram showing the connections between all subsystems and components in the torch design problem. The diagram is configured to show detailed node information by default. """ print("\n" + "=" * 70) print(" Generating N2 Diagram") print("=" * 70) # Set up the problem prob = setup_problem_with_power_constraint() prob.setup() # Run the model once to populate values for better visualization print("\n Running model to populate values...") prob.set_val("pulse_duration", 1e-6) # s (1 µs HV pulse) prob.set_val("sustainer_voltage", 500.0) # V prob.set_val("mass_flow", 160 / 86400) # kg/s (converted from 160 kg/d) prob.set_val("gas_properties_pressure", 101325.0) # Pa (atmospheric) prob.set_val("preheat_temperature", 300.0) # K prob.set_val("electrode_radius", 0.005) # m (5 mm) prob.set_val("graphite_sublimation_temp", 3900.0) # K prob.set_val("graphite_heat_capacity", 710.0) # J/(kg·K) prob.set_val("torch_length", 0.1) # m (100 mm) prob.set_val("torch_diameter", 0.02) # m (20 mm) prob.set_val("pulse_frequency", 50000.0) # Hz (50 kHz) prob.set_val("hv_voltage", 20000.0) # V (20 kV) prob.set_val("gap_distance", 0.01) # m (10 mm) prob.run_model() # Generate N2 diagram with detailed information # The N2 diagram will show: # - Component hierarchy and connections # - Variable values and units # - Solver information om.n2( prob, outfile="n2_diagram.html", show_browser=True, title="Plasma Torch MDAO System - Detailed N2 Diagram", ) print("\nN2 diagram generated successfully!") print(" Output file: n2_diagram.html") print(" The diagram should open in your default web browser.") print("\nInteractive features:") print(" - Click on any component to see detailed information") print(" - Hover over connections to see variable names and values") print(" - Use the toolbar to toggle between different views") print(" - Click the info icon in toolbar for help") print("\n" + "=" * 70) def main(n_procs=None): """Execute parameter sweep using parallel DOEDriver.""" # Run parameter sweep results = run_parameter_sweep(n_procs=n_procs) # Visualize results visualize_parameter_space(results) print("\nNext steps:") print(" 1. Examine the feasible parameter space visualizations") print(" 2. Select optimal operating point based on constraints") print(" 3. Consider adding arc mobility model for refined analysis") print(" 4. Validate with experimental data if available") print("\nParallel execution notes:") print(" - Windows: Uses Python multiprocessing automatically (current default)") print(" - Linux/Unix: Can use MPI with 'mpiexec -n python example_vdef_mdao.py'") print(" - DOEDriver automatically distributes cases across available processors") print() if __name__ == "__main__": import sys try: # Check for command-line argument to generate N2 diagram only if len(sys.argv) > 1 and sys.argv[1] == "--n2": generate_n2_diagram() else: main(n_procs=4) except Exception as e: print(f"\nError: {e}") import traceback traceback.print_exc() .. _sphx_glr_download_auto_examples_VDEF_optim_example_vdef_mdao.py: .. only:: html .. container:: sphx-glr-footer sphx-glr-footer-example .. container:: sphx-glr-download sphx-glr-download-jupyter :download:`Download Jupyter notebook: example_vdef_mdao.ipynb ` .. container:: sphx-glr-download sphx-glr-download-python :download:`Download Python source code: example_vdef_mdao.py ` .. container:: sphx-glr-download sphx-glr-download-zip :download:`Download zipped: example_vdef_mdao.zip `