"""
Discrete Fourier Transform (DFT) monitors for frequency-domain field analysis.
This module implements on-the-fly DFT computation during time-domain simulations,
allowing efficient extraction of frequency-domain fields without storing all time steps.
"""
from typing import Optional, Union
import numpy as np
from prismo.backends import Backend, get_backend
from prismo.core.fields import ElectromagneticFields
from prismo.core.grid import YeeGrid
from prismo.monitors.base import Monitor
[docs]
class DFTMonitor(Monitor):
"""
Discrete Fourier Transform monitor for frequency-domain analysis.
This monitor computes frequency-domain fields on-the-fly during time-domain
simulation using:
F(ω) = ∫ f(t) * exp(-jωt) dt
Discretized as:
F(ω) ≈ Σ f(n*dt) * exp(-jω*n*dt) * dt
Parameters
----------
center : Tuple[float, float, float]
Physical coordinates of monitor center (x, y, z) in meters.
size : Tuple[float, float, float]
Physical dimensions of monitor (Lx, Ly, Lz) in meters.
frequencies : List[float]
List of frequencies to monitor (Hz).
components : List[str], optional
Field components to record. Default is all E components.
name : str, optional
Monitor name.
backend : Backend, optional
Computational backend to use.
"""
[docs]
def __init__(
self,
center: tuple[float, float, float],
size: tuple[float, float, float],
frequencies: list[float],
components: Optional[list[str]] = None,
name: Optional[str] = None,
backend: Optional[Union[str, Backend]] = None,
):
super().__init__(center, size, name)
self.frequencies = np.array(frequencies)
self.omega = 2 * np.pi * self.frequencies # Angular frequencies
if components is None:
self.components = ["Ex", "Ey", "Ez"]
else:
self.components = components
# Initialize backend
if isinstance(backend, str):
self.backend = get_backend(backend)
elif isinstance(backend, Backend):
self.backend = backend
elif backend is None:
self.backend = get_backend()
else:
raise TypeError("backend must be a Backend instance or string name")
# Storage for DFT accumulation (complex-valued)
self._dft_data: dict[str, np.ndarray] = {}
self._time_steps = 0
self._dt = None
[docs]
def initialize(self, grid: YeeGrid) -> None:
"""Initialize the DFT monitor on the grid."""
super().initialize(grid)
# Initialize DFT accumulation arrays
for component in self.components:
# Get shape for this component in the monitor region
shape = self._get_component_shape(component)
# Create complex array: (n_frequencies, *spatial_dims)
dft_shape = (len(self.frequencies),) + shape
self._dft_data[component] = np.zeros(dft_shape, dtype=np.complex128)
def _get_component_shape(self, component: str) -> tuple[int, ...]:
"""Get the spatial shape for a field component in the monitor region."""
# This returns the shape of the field in the monitored region
# For simplicity, we'll use the grid shape
# In practice, this would be determined by the monitor region slices
if self._grid is None:
raise RuntimeError("Monitor must be initialized first")
# Get the component indices
self._monitor_region[component]
# For now, assume a simple point or line monitor
# Full implementation would extract proper shapes from indices
return (10, 10) # Placeholder
[docs]
def update(self, fields: ElectromagneticFields, time: float, dt: float) -> None:
"""
Update DFT accumulation with current field values.
Parameters
----------
fields : ElectromagneticFields
Current electromagnetic fields.
time : float
Current simulation time (s).
dt : float
Time step (s).
"""
if self._dt is None:
self._dt = dt
# For each frequency, accumulate: F(ω) += f(t) * exp(-jωt) * dt
for component in self.components:
# Get current field values in monitor region
field_data = self._extract_field_data(fields, component)
# Update DFT for each frequency
for freq_idx, omega in enumerate(self.omega):
# Compute phase factor: exp(-j * omega * time)
phase = np.exp(-1j * omega * time)
# Accumulate: DFT += field * phase * dt
self._dft_data[component][freq_idx] += field_data * phase * dt
self._time_steps += 1
def _extract_field_data(
self, fields: ElectromagneticFields, component: str
) -> np.ndarray:
"""Extract field data from the monitor region."""
# Get field component
field = fields[component]
# Extract data from monitor region
# For now, return a simplified extraction
# Full implementation would use proper slicing from _monitor_region
# Convert to numpy if using GPU backend
if hasattr(fields, "backend"):
field_np = fields.backend.to_numpy(field)
else:
field_np = np.asarray(field)
# Return a subset (placeholder - should use monitor region)
if field_np.ndim >= 2:
return field_np[:10, :10]
else:
return field_np[:10]
[docs]
def get_frequency_data(
self, component: str, frequency_index: Optional[int] = None
) -> Union[np.ndarray, dict[float, np.ndarray]]:
"""
Get frequency-domain field data.
Parameters
----------
component : str
Field component ('Ex', 'Ey', 'Ez', etc.).
frequency_index : int, optional
Index of frequency to retrieve. If None, returns all frequencies.
Returns
-------
ndarray or dict
Complex frequency-domain field data.
If frequency_index is None, returns dict mapping frequency to data.
"""
if component not in self._dft_data:
raise ValueError(f"Component {component} not monitored")
if frequency_index is None:
# Return dict of all frequencies
result = {}
for i, freq in enumerate(self.frequencies):
result[freq] = self._dft_data[component][i]
return result
else:
return self._dft_data[component][frequency_index]
[docs]
def get_intensity(self, component: str, frequency_index: int) -> np.ndarray:
"""
Get intensity (|E|²) at a specific frequency.
Parameters
----------
component : str
Field component.
frequency_index : int
Frequency index.
Returns
-------
ndarray
Field intensity.
"""
field = self.get_frequency_data(component, frequency_index)
return np.abs(field) ** 2
[docs]
def get_power_spectrum(self, component: str, normalize: bool = True) -> np.ndarray:
"""
Get power spectrum vs frequency.
Parameters
----------
component : str
Field component.
normalize : bool
Whether to normalize to maximum value.
Returns
-------
ndarray
Power spectrum with shape (n_frequencies,).
"""
spectrum = np.zeros(len(self.frequencies))
for i in range(len(self.frequencies)):
intensity = self.get_intensity(component, i)
spectrum[i] = np.sum(intensity)
if normalize and np.max(spectrum) > 0:
spectrum = spectrum / np.max(spectrum)
return spectrum
[docs]
def get_transmission_spectrum(
self, reference_power: Optional[np.ndarray] = None
) -> np.ndarray:
"""
Calculate transmission spectrum (normalized power flow).
Parameters
----------
reference_power : ndarray, optional
Reference power for normalization. If None, normalizes to max.
Returns
-------
ndarray
Transmission spectrum.
"""
# Calculate total power for each frequency
power = np.zeros(len(self.frequencies))
for i in range(len(self.frequencies)):
# Sum power from all E-field components
for comp in ["Ex", "Ey", "Ez"]:
if comp in self._dft_data:
intensity = self.get_intensity(comp, i)
power[i] += np.sum(intensity)
# Normalize
if reference_power is not None:
transmission = power / reference_power
elif np.max(power) > 0:
transmission = power / np.max(power)
else:
transmission = power
return transmission