Source code for prismo.monitors.dft

"""
Discrete Fourier Transform (DFT) monitors for frequency-domain field analysis.

This module implements on-the-fly DFT computation during time-domain simulations,
allowing efficient extraction of frequency-domain fields without storing all time steps.
"""

from typing import Optional, Union

import numpy as np

from prismo.backends import Backend, get_backend
from prismo.core.fields import ElectromagneticFields
from prismo.core.grid import YeeGrid
from prismo.monitors.base import Monitor


[docs] class DFTMonitor(Monitor): """ Discrete Fourier Transform monitor for frequency-domain analysis. This monitor computes frequency-domain fields on-the-fly during time-domain simulation using: F(ω) = ∫ f(t) * exp(-jωt) dt Discretized as: F(ω) ≈ Σ f(n*dt) * exp(-jω*n*dt) * dt Parameters ---------- center : Tuple[float, float, float] Physical coordinates of monitor center (x, y, z) in meters. size : Tuple[float, float, float] Physical dimensions of monitor (Lx, Ly, Lz) in meters. frequencies : List[float] List of frequencies to monitor (Hz). components : List[str], optional Field components to record. Default is all E components. name : str, optional Monitor name. backend : Backend, optional Computational backend to use. """
[docs] def __init__( self, center: tuple[float, float, float], size: tuple[float, float, float], frequencies: list[float], components: Optional[list[str]] = None, name: Optional[str] = None, backend: Optional[Union[str, Backend]] = None, ): super().__init__(center, size, name) self.frequencies = np.array(frequencies) self.omega = 2 * np.pi * self.frequencies # Angular frequencies if components is None: self.components = ["Ex", "Ey", "Ez"] else: self.components = components # Initialize backend if isinstance(backend, str): self.backend = get_backend(backend) elif isinstance(backend, Backend): self.backend = backend elif backend is None: self.backend = get_backend() else: raise TypeError("backend must be a Backend instance or string name") # Storage for DFT accumulation (complex-valued) self._dft_data: dict[str, np.ndarray] = {} self._time_steps = 0 self._dt = None
[docs] def initialize(self, grid: YeeGrid) -> None: """Initialize the DFT monitor on the grid.""" super().initialize(grid) # Initialize DFT accumulation arrays for component in self.components: # Get shape for this component in the monitor region shape = self._get_component_shape(component) # Create complex array: (n_frequencies, *spatial_dims) dft_shape = (len(self.frequencies),) + shape self._dft_data[component] = np.zeros(dft_shape, dtype=np.complex128)
def _get_component_shape(self, component: str) -> tuple[int, ...]: """Get the spatial shape for a field component in the monitor region.""" # This returns the shape of the field in the monitored region # For simplicity, we'll use the grid shape # In practice, this would be determined by the monitor region slices if self._grid is None: raise RuntimeError("Monitor must be initialized first") # Get the component indices self._monitor_region[component] # For now, assume a simple point or line monitor # Full implementation would extract proper shapes from indices return (10, 10) # Placeholder
[docs] def update(self, fields: ElectromagneticFields, time: float, dt: float) -> None: """ Update DFT accumulation with current field values. Parameters ---------- fields : ElectromagneticFields Current electromagnetic fields. time : float Current simulation time (s). dt : float Time step (s). """ if self._dt is None: self._dt = dt # For each frequency, accumulate: F(ω) += f(t) * exp(-jωt) * dt for component in self.components: # Get current field values in monitor region field_data = self._extract_field_data(fields, component) # Update DFT for each frequency for freq_idx, omega in enumerate(self.omega): # Compute phase factor: exp(-j * omega * time) phase = np.exp(-1j * omega * time) # Accumulate: DFT += field * phase * dt self._dft_data[component][freq_idx] += field_data * phase * dt self._time_steps += 1
def _extract_field_data( self, fields: ElectromagneticFields, component: str ) -> np.ndarray: """Extract field data from the monitor region.""" # Get field component field = fields[component] # Extract data from monitor region # For now, return a simplified extraction # Full implementation would use proper slicing from _monitor_region # Convert to numpy if using GPU backend if hasattr(fields, "backend"): field_np = fields.backend.to_numpy(field) else: field_np = np.asarray(field) # Return a subset (placeholder - should use monitor region) if field_np.ndim >= 2: return field_np[:10, :10] else: return field_np[:10]
[docs] def get_frequency_data( self, component: str, frequency_index: Optional[int] = None ) -> Union[np.ndarray, dict[float, np.ndarray]]: """ Get frequency-domain field data. Parameters ---------- component : str Field component ('Ex', 'Ey', 'Ez', etc.). frequency_index : int, optional Index of frequency to retrieve. If None, returns all frequencies. Returns ------- ndarray or dict Complex frequency-domain field data. If frequency_index is None, returns dict mapping frequency to data. """ if component not in self._dft_data: raise ValueError(f"Component {component} not monitored") if frequency_index is None: # Return dict of all frequencies result = {} for i, freq in enumerate(self.frequencies): result[freq] = self._dft_data[component][i] return result else: return self._dft_data[component][frequency_index]
[docs] def get_intensity(self, component: str, frequency_index: int) -> np.ndarray: """ Get intensity (|E|²) at a specific frequency. Parameters ---------- component : str Field component. frequency_index : int Frequency index. Returns ------- ndarray Field intensity. """ field = self.get_frequency_data(component, frequency_index) return np.abs(field) ** 2
[docs] def get_power_spectrum(self, component: str, normalize: bool = True) -> np.ndarray: """ Get power spectrum vs frequency. Parameters ---------- component : str Field component. normalize : bool Whether to normalize to maximum value. Returns ------- ndarray Power spectrum with shape (n_frequencies,). """ spectrum = np.zeros(len(self.frequencies)) for i in range(len(self.frequencies)): intensity = self.get_intensity(component, i) spectrum[i] = np.sum(intensity) if normalize and np.max(spectrum) > 0: spectrum = spectrum / np.max(spectrum) return spectrum
[docs] def get_transmission_spectrum( self, reference_power: Optional[np.ndarray] = None ) -> np.ndarray: """ Calculate transmission spectrum (normalized power flow). Parameters ---------- reference_power : ndarray, optional Reference power for normalization. If None, normalizes to max. Returns ------- ndarray Transmission spectrum. """ # Calculate total power for each frequency power = np.zeros(len(self.frequencies)) for i in range(len(self.frequencies)): # Sum power from all E-field components for comp in ["Ex", "Ey", "Ez"]: if comp in self._dft_data: intensity = self.get_intensity(comp, i) power[i] += np.sum(intensity) # Normalize if reference_power is not None: transmission = power / reference_power elif np.max(power) > 0: transmission = power / np.max(power) else: transmission = power return transmission