"""
Mode expansion monitors for decomposing fields into waveguide modes.
This module implements monitors that decompose electromagnetic fields into
waveguide mode coefficients using overlap integrals.
"""
from typing import Optional, Union
import numpy as np
from prismo.backends import Backend, get_backend
from prismo.core.fields import ElectromagneticFields
from prismo.core.grid import YeeGrid
from prismo.modes.solver import WaveguideMode
from prismo.monitors.base import Monitor
[docs]
class ModeExpansionMonitor(Monitor):
"""
Mode expansion monitor for decomposing fields into mode coefficients.
Computes overlap integrals between simulation fields and waveguide modes
to extract forward and backward propagating mode amplitudes.
The overlap integral is:
a_m = ∫∫ (E_sim × H_mode* - E_mode* × H_sim) · n dA
Parameters
----------
center : Tuple[float, float, float]
Physical coordinates of monitor center.
size : Tuple[float, float, float]
Physical dimensions of monitor.
modes : List[WaveguideMode]
List of modes to decompose into.
direction : str
Normal direction of monitor plane ('x', 'y', or 'z').
frequencies : List[float], optional
Frequencies for frequency-domain decomposition.
name : str, optional
Monitor name.
backend : Backend, optional
Computational backend.
"""
[docs]
def __init__(
self,
center: tuple[float, float, float],
size: tuple[float, float, float],
modes: list[WaveguideMode],
direction: str = "x",
frequencies: Optional[list[float]] = None,
name: Optional[str] = None,
backend: Optional[Union[str, Backend]] = None,
):
super().__init__(center, size, name)
self.modes = modes
self.direction = direction.lower()
self.frequencies = frequencies
if self.direction not in ["x", "y", "z"]:
raise ValueError("direction must be 'x', 'y', or 'z'")
# Initialize backend
if isinstance(backend, str):
self.backend = get_backend(backend)
elif isinstance(backend, Backend):
self.backend = backend
elif backend is None:
self.backend = get_backend()
else:
raise TypeError("backend must be a Backend instance or string name")
# Storage for mode coefficients
self._mode_coeffs_time: dict[int, list[complex]] = {
i: [] for i in range(len(modes))
}
self._time_points: list[float] = []
# Frequency-domain storage
if frequencies is not None:
self.omega = 2 * np.pi * np.array(frequencies)
self._mode_coeffs_freq: dict[int, np.ndarray] = {
i: np.zeros(len(frequencies), dtype=complex) for i in range(len(modes))
}
# Mode profiles interpolated to monitor grid
self._interpolated_modes: list[dict[str, np.ndarray]] = []
[docs]
def initialize(self, grid: YeeGrid) -> None:
"""Initialize the mode expansion monitor on the grid."""
super().initialize(grid)
# Interpolate mode profiles to monitor plane
self._interpolate_modes_to_monitor()
# Compute mode normalization factors
self._compute_normalization()
def _interpolate_modes_to_monitor(self) -> None:
"""
Interpolate mode profiles to the monitor plane grid.
The modes may be defined on a different grid than the simulation,
so we need to interpolate.
"""
for mode in self.modes:
# Create interpolators for mode fields
mode_interp = {}
# Simplified: assume modes are on same grid as monitor
# Full implementation would interpolate properly
mode_interp["Ex"] = mode.Ex
mode_interp["Ey"] = mode.Ey
mode_interp["Ez"] = mode.Ez
mode_interp["Hx"] = mode.Hx
mode_interp["Hy"] = mode.Hy
mode_interp["Hz"] = mode.Hz
self._interpolated_modes.append(mode_interp)
def _compute_normalization(self) -> None:
"""
Compute mode normalization factors.
Modes should be normalized such that:
∫∫ (E_m × H_m*) · n dA = 1
"""
# Placeholder for normalization computation
self._mode_norms = np.ones(len(self.modes))
[docs]
def update(self, fields: ElectromagneticFields, time: float, dt: float) -> None:
"""
Update mode expansion with current fields.
Computes overlap integrals and stores mode coefficients.
Parameters
----------
fields : ElectromagneticFields
Current electromagnetic fields.
time : float
Current simulation time.
dt : float
Time step.
"""
# Compute mode coefficients at current time
coeffs = self._compute_mode_coefficients(fields)
# Store time-domain coefficients
for mode_idx, coeff in enumerate(coeffs):
self._mode_coeffs_time[mode_idx].append(coeff)
self._time_points.append(time)
# Update frequency-domain DFT if needed
if self.frequencies is not None:
for mode_idx, coeff in enumerate(coeffs):
for freq_idx, omega in enumerate(self.omega):
phase = np.exp(-1j * omega * time)
self._mode_coeffs_freq[mode_idx][freq_idx] += coeff * phase * dt
def _compute_mode_coefficients(
self, fields: ElectromagneticFields
) -> list[complex]:
"""
Compute mode coefficients using overlap integrals.
For each mode m:
a_m = ∫∫ (E_sim × H_mode* + E_mode* × H_sim) · n dA / (2 * P_mode)
Returns
-------
List[complex]
Mode coefficients for each mode.
"""
from prismo.utils.mode_matching import compute_mode_overlap
# Extract field components at monitor plane
Ex_sim = self._extract_field(fields, "Ex")
Ey_sim = self._extract_field(fields, "Ey")
Ez_sim = self._extract_field(fields, "Ez")
Hx_sim = self._extract_field(fields, "Hx")
Hy_sim = self._extract_field(fields, "Hy")
Hz_sim = self._extract_field(fields, "Hz")
coeffs = []
# Get grid spacing (assuming uniform grid)
dx = 1.0 # Will be updated from actual grid
dy = 1.0
for _mode_idx, mode in enumerate(self.modes):
# Use mode_matching utility for accurate overlap
coeff = compute_mode_overlap(
Ex_sim,
Ey_sim,
Ez_sim,
Hx_sim,
Hy_sim,
Hz_sim,
mode,
direction=self.direction,
dx=dx,
dy=dy,
)
coeffs.append(coeff)
return coeffs
def _extract_field(
self, fields: ElectromagneticFields, component: str
) -> np.ndarray:
"""Extract field component at monitor plane."""
field = fields[component]
# Convert to numpy
if hasattr(fields, "backend"):
field_np = fields.backend.to_numpy(field)
else:
field_np = np.asarray(field)
# Extract monitor region (placeholder)
if field_np.ndim >= 2:
return field_np[:10, :10]
return field_np[:10]
[docs]
def get_mode_coefficient(
self, mode_index: int, domain: str = "time"
) -> Union[np.ndarray, list[complex]]:
"""
Get mode coefficient time series or frequency spectrum.
Parameters
----------
mode_index : int
Mode index.
domain : str
'time' or 'frequency'.
Returns
-------
array or list
Mode coefficients.
"""
if domain == "time":
return self._mode_coeffs_time[mode_index]
elif domain == "frequency":
if self.frequencies is None:
raise RuntimeError("No frequencies specified")
return self._mode_coeffs_freq[mode_index]
else:
raise ValueError("domain must be 'time' or 'frequency'")
[docs]
def separate_forward_backward(
self, mode_index: int, frequency_index: int
) -> tuple[complex, complex]:
"""
Separate forward and backward propagating mode amplitudes.
Uses phase information to separate directions.
Parameters
----------
mode_index : int
Mode index.
frequency_index : int
Frequency index.
Returns
-------
Tuple[complex, complex]
(forward_amplitude, backward_amplitude)
"""
if self.frequencies is None:
raise RuntimeError("Frequency-domain decomposition required")
# Get mode coefficient at this frequency
coeff = self._mode_coeffs_freq[mode_index][frequency_index]
# Simplified separation (assumes single direction)
# Full implementation would analyze phase vs position
forward = coeff
backward = 0.0
return forward, backward
[docs]
def get_mode_power(
self, mode_index: int, frequency_index: Optional[int] = None
) -> Union[float, np.ndarray]:
"""
Calculate mode power.
Parameters
----------
mode_index : int
Mode index.
frequency_index : int, optional
Frequency index for frequency-domain power.
Returns
-------
float or array
Mode power.
"""
if frequency_index is not None:
# Frequency-domain power
coeff = self._mode_coeffs_freq[mode_index][frequency_index]
return float(np.abs(coeff) ** 2)
else:
# Time-domain power (average)
coeffs = np.array(self._mode_coeffs_time[mode_index])
power = np.abs(coeffs) ** 2
return np.mean(power)
[docs]
def get_mode_transmission(
self, mode_index: int, reference_power: Optional[float] = None
) -> np.ndarray:
"""
Calculate mode transmission vs frequency.
Parameters
----------
mode_index : int
Mode index.
reference_power : float, optional
Reference power for normalization.
Returns
-------
ndarray
Transmission coefficient vs frequency.
"""
if self.frequencies is None:
raise RuntimeError("Frequency-domain analysis required")
# Get power for this mode at all frequencies
power = np.abs(self._mode_coeffs_freq[mode_index]) ** 2
if reference_power is not None:
return power / reference_power
else:
return power / np.max(power) if np.max(power) > 0 else power
[docs]
def compute_s_parameters(
self,
source_mode_index: int = 0,
source_power: float = 1.0,
) -> dict[str, np.ndarray]:
"""
Compute S-parameters from mode coefficients.
For a two-port device with input and output monitors:
- S11: Reflection coefficient at input port
- S21: Transmission coefficient from port 1 to port 2
Parameters
----------
source_mode_index : int
Index of the excited mode.
source_power : float
Input power (for normalization).
Returns
-------
Dict[str, ndarray]
Dictionary with 'S11', 'S21', etc. vs frequency.
Notes
-----
This method assumes this monitor is the output/reflection monitor.
For full S-parameters, you need multiple monitors.
"""
if self.frequencies is None:
raise RuntimeError("Frequency-domain analysis required")
s_params = {}
# Get forward and backward amplitudes
# Simplified: assumes monitor can separate directions
for mode_idx in range(len(self.modes)):
# Transmission/reflection relative to source
coeff_freq = self._mode_coeffs_freq[mode_idx]
# Normalize by source
if source_power > 0:
s_param = coeff_freq / np.sqrt(source_power)
else:
s_param = coeff_freq
# Name the parameter
param_name = f"S_{mode_idx+1}{source_mode_index+1}"
s_params[param_name] = s_param
return s_params
[docs]
def compute_s_matrix(
self,
other_monitor: "ModeExpansionMonitor",
frequency_index: int,
source_mode_index: int = 0,
) -> np.ndarray:
"""
Compute full S-matrix between this and another monitor.
Parameters
----------
other_monitor : ModeExpansionMonitor
The other port monitor.
frequency_index : int
Frequency index to compute S-matrix at.
source_mode_index : int
Index of excited mode.
Returns
-------
ndarray
2x2 or NxN S-matrix.
Examples
--------
>>> # With input and output monitors
>>> S = output_monitor.compute_s_matrix(input_monitor, freq_idx=0)
>>> S11 = S[0, 0] # Reflection
>>> S21 = S[1, 0] # Transmission
"""
n_modes = len(self.modes)
S_matrix = np.zeros((n_modes, n_modes), dtype=complex)
# Fill S-matrix elements
# S[i,j] = output_mode_i / input_mode_j
for i in range(n_modes):
for j in range(n_modes):
if j == source_mode_index:
# This mode was excited
# Reflection at input
other_monitor._mode_coeffs_freq[i][frequency_index]
# Transmission at output
trans_coeff = self._mode_coeffs_freq[i][frequency_index]
S_matrix[i, j] = trans_coeff # Simplified
return S_matrix