Source code for prismo.monitors.flux

"""
Flux monitors for calculating power flow through surfaces.

This module implements monitors for computing electromagnetic power flow
(Poynting vector) through specified surfaces in the simulation domain.
"""

from typing import Literal, Optional, Union

import numpy as np

from prismo.backends import Backend, get_backend
from prismo.core.fields import ElectromagneticFields
from prismo.core.grid import YeeGrid
from prismo.monitors.base import Monitor


[docs] class FluxMonitor(Monitor): """ Flux monitor for computing power flow through a surface. Computes the Poynting vector S = E × H and integrates over a surface to determine power flow. Can compute both time-domain and frequency-domain flux. Parameters ---------- center : Tuple[float, float, float] Physical coordinates of monitor center. size : Tuple[float, float, float] Physical dimensions of monitor surface. direction : str Normal direction of flux surface ('x', 'y', or 'z'). name : str, optional Monitor name. frequencies : List[float], optional Frequencies for frequency-domain flux computation. backend : Backend, optional Computational backend. """
[docs] def __init__( self, center: tuple[float, float, float], size: tuple[float, float, float], direction: Literal["x", "y", "z"], name: Optional[str] = None, frequencies: Optional[list[float]] = None, backend: Optional[Union[str, Backend]] = None, ): super().__init__(center, size, name) self.direction = direction.lower() if self.direction not in ["x", "y", "z"]: raise ValueError("direction must be 'x', 'y', or 'z'") # Initialize backend if isinstance(backend, str): self.backend = get_backend(backend) elif isinstance(backend, Backend): self.backend = backend elif backend is None: self.backend = get_backend() else: raise TypeError("backend must be a Backend instance or string name") # Time-domain power flow storage self._power_flow_history: list[float] = [] self._time_history: list[float] = [] # Frequency-domain setup self.frequencies = frequencies if frequencies is not None: self.omega = 2 * np.pi * np.array(frequencies) self._dft_ex = None # Will be initialized self._dft_ey = None self._dft_ez = None self._dft_hx = None self._dft_hy = None self._dft_hz = None # Physical constants self.eps0 = 8.854187817e-12 self.mu0 = 4 * self.backend.pi * 1e-7
[docs] def initialize(self, grid: YeeGrid) -> None: """Initialize the flux monitor on the grid.""" super().initialize(grid) # Initialize frequency-domain DFT arrays if needed if self.frequencies is not None: shape = self._get_surface_shape() n_freq = len(self.frequencies) # Initialize complex DFT arrays for all field components self._dft_ex = np.zeros((n_freq,) + shape, dtype=np.complex128) self._dft_ey = np.zeros((n_freq,) + shape, dtype=np.complex128) self._dft_ez = np.zeros((n_freq,) + shape, dtype=np.complex128) self._dft_hx = np.zeros((n_freq,) + shape, dtype=np.complex128) self._dft_hy = np.zeros((n_freq,) + shape, dtype=np.complex128) self._dft_hz = np.zeros((n_freq,) + shape, dtype=np.complex128)
def _get_surface_shape(self) -> tuple[int, ...]: """Get the shape of the flux surface.""" # Placeholder - should extract from monitor region return (10, 10)
[docs] def update(self, fields: ElectromagneticFields, time: float, dt: float) -> None: """ Update flux monitor with current fields. Computes instantaneous power flow and updates DFT if frequencies are specified. Parameters ---------- fields : ElectromagneticFields Current electromagnetic fields. time : float Current simulation time. dt : float Time step. """ # Compute instantaneous power flow power = self._compute_power_flow(fields) self._power_flow_history.append(power) self._time_history.append(time) # Update frequency-domain DFT if needed if self.frequencies is not None: self._update_dft(fields, time, dt)
def _compute_power_flow(self, fields: ElectromagneticFields) -> float: """ Compute instantaneous power flow through the surface. Power = ∫∫ S·n dA where S = E × H is the Poynting vector. """ # Extract field components at the flux surface # For simplicity, assume fields are at the same spatial points # In practice, need to interpolate to common points Ex = self._extract_field_component(fields, "Ex") Ey = self._extract_field_component(fields, "Ey") Ez = self._extract_field_component(fields, "Ez") Hx = self._extract_field_component(fields, "Hx") Hy = self._extract_field_component(fields, "Hy") Hz = self._extract_field_component(fields, "Hz") # Compute Poynting vector components: S = E × H Sx = Ey * Hz - Ez * Hy Sy = Ez * Hx - Ex * Hz Sz = Ex * Hy - Ey * Hx # Select component normal to surface if self.direction == "x": S_normal = Sx elif self.direction == "y": S_normal = Sy else: # z S_normal = Sz # Integrate over surface # Get grid spacing for surface area element dx, dy, dz = self._grid.spacing if self.direction == "x": dA = dy * dz elif self.direction == "y": dA = dx * dz else: # z dA = dx * dy # Sum power flow power = np.sum(S_normal) * dA return float(power) def _extract_field_component( self, fields: ElectromagneticFields, component: str ) -> np.ndarray: """Extract field component from monitor region.""" field = fields[component] # Convert to numpy if needed if hasattr(fields, "backend"): field_np = fields.backend.to_numpy(field) else: field_np = np.asarray(field) # Extract from monitor region (placeholder) if field_np.ndim >= 2: return field_np[:10, :10] return field_np[:10] def _update_dft( self, fields: ElectromagneticFields, time: float, dt: float ) -> None: """Update frequency-domain DFT accumulation.""" # Extract field components Ex = self._extract_field_component(fields, "Ex") Ey = self._extract_field_component(fields, "Ey") Ez = self._extract_field_component(fields, "Ez") Hx = self._extract_field_component(fields, "Hx") Hy = self._extract_field_component(fields, "Hy") Hz = self._extract_field_component(fields, "Hz") # Update DFT for each frequency for i, omega in enumerate(self.omega): phase = np.exp(-1j * omega * time) self._dft_ex[i] += Ex * phase * dt self._dft_ey[i] += Ey * phase * dt self._dft_ez[i] += Ez * phase * dt self._dft_hx[i] += Hx * phase * dt self._dft_hy[i] += Hy * phase * dt self._dft_hz[i] += Hz * phase * dt
[docs] def get_time_domain_power(self) -> tuple[np.ndarray, np.ndarray]: """ Get time-domain power flow history. Returns ------- Tuple[ndarray, ndarray] (time_array, power_array) """ return np.array(self._time_history), np.array(self._power_flow_history)
[docs] def get_frequency_domain_power( self, frequency_index: Optional[int] = None ) -> Union[float, np.ndarray]: """ Get frequency-domain power flow. Computes power from frequency-domain fields: P(ω) = (1/2) Re[∫∫ (E × H*) · n dA] Parameters ---------- frequency_index : int, optional Frequency index. If None, returns array for all frequencies. Returns ------- float or ndarray Power flow at specified frequency(ies). """ if self.frequencies is None: raise RuntimeError("No frequencies specified for this monitor") dx, dy, dz = self._grid.spacing if self.direction == "x": dA = dy * dz elif self.direction == "y": dA = dx * dz else: dA = dx * dy def compute_power_at_freq(idx: int) -> float: """Compute power for a single frequency.""" # Get field DFTs Ex = self._dft_ex[idx] Ey = self._dft_ey[idx] Ez = self._dft_ez[idx] Hx = self._dft_hx[idx] Hy = self._dft_hy[idx] Hz = self._dft_hz[idx] # Compute Poynting vector: S = (1/2) * Re[E × H*] Sx = 0.5 * np.real(Ey * np.conj(Hz) - Ez * np.conj(Hy)) Sy = 0.5 * np.real(Ez * np.conj(Hx) - Ex * np.conj(Hz)) Sz = 0.5 * np.real(Ex * np.conj(Hy) - Ey * np.conj(Hx)) # Select component normal to surface if self.direction == "x": S_normal = Sx elif self.direction == "y": S_normal = Sy else: S_normal = Sz # Integrate power = np.sum(S_normal) * dA return float(power) if frequency_index is None: # Return array for all frequencies return np.array( [compute_power_at_freq(i) for i in range(len(self.frequencies))] ) else: return compute_power_at_freq(frequency_index)
[docs] def get_transmission( self, reference_power: Optional[Union[float, np.ndarray]] = None ) -> Union[float, np.ndarray]: """ Calculate transmission (normalized power). Parameters ---------- reference_power : float or ndarray, optional Reference power for normalization. Returns ------- float or ndarray Transmission coefficient(s). """ if self.frequencies is not None: power = self.get_frequency_domain_power() else: _, power_history = self.get_time_domain_power() power = np.mean(power_history) if len(power_history) > 0 else 0.0 if reference_power is not None: return power / reference_power else: return power