"""
Discrete Fourier Transform (DFT) monitors for frequency-domain field analysis.
This module implements on-the-fly DFT computation during time-domain simulations,
allowing efficient extraction of frequency-domain fields without storing all time steps.
"""
from typing import List, Tuple, Dict, Optional, Union
import numpy as np
from prismo.core.grid import YeeGrid
from prismo.core.fields import ElectromagneticFields, FieldComponent
from prismo.monitors.base import Monitor
from prismo.backends import Backend, get_backend
[docs]
class DFTMonitor(Monitor):
"""
Discrete Fourier Transform monitor for frequency-domain analysis.
This monitor computes frequency-domain fields on-the-fly during time-domain
simulation using:
F(ω) = ∫ f(t) * exp(-jωt) dt
Discretized as:
F(ω) ≈ Σ f(n*dt) * exp(-jω*n*dt) * dt
Parameters
----------
center : Tuple[float, float, float]
Physical coordinates of monitor center (x, y, z) in meters.
size : Tuple[float, float, float]
Physical dimensions of monitor (Lx, Ly, Lz) in meters.
frequencies : List[float]
List of frequencies to monitor (Hz).
components : List[str], optional
Field components to record. Default is all E components.
name : str, optional
Monitor name.
backend : Backend, optional
Computational backend to use.
"""
[docs]
def __init__(
self,
center: Tuple[float, float, float],
size: Tuple[float, float, float],
frequencies: List[float],
components: Optional[List[str]] = None,
name: Optional[str] = None,
backend: Optional[Union[str, Backend]] = None,
):
super().__init__(center, size, name)
self.frequencies = np.array(frequencies)
self.omega = 2 * np.pi * self.frequencies # Angular frequencies
if components is None:
self.components = ["Ex", "Ey", "Ez"]
else:
self.components = components
# Initialize backend
if isinstance(backend, str):
self.backend = get_backend(backend)
elif isinstance(backend, Backend):
self.backend = backend
elif backend is None:
self.backend = get_backend()
else:
raise TypeError("backend must be a Backend instance or string name")
# Storage for DFT accumulation (complex-valued)
self._dft_data: Dict[str, np.ndarray] = {}
self._time_steps = 0
self._dt = None
[docs]
def initialize(self, grid: YeeGrid) -> None:
"""Initialize the DFT monitor on the grid."""
super().initialize(grid)
# Initialize DFT accumulation arrays
for component in self.components:
# Get shape for this component in the monitor region
shape = self._get_component_shape(component)
# Create complex array: (n_frequencies, *spatial_dims)
dft_shape = (len(self.frequencies),) + shape
self._dft_data[component] = np.zeros(dft_shape, dtype=np.complex128)
def _get_component_shape(self, component: str) -> Tuple[int, ...]:
"""Get the spatial shape for a field component in the monitor region."""
# This returns the shape of the field in the monitored region
# For simplicity, we'll use the grid shape
# In practice, this would be determined by the monitor region slices
if self._grid is None:
raise RuntimeError("Monitor must be initialized first")
# Get the component indices
indices = self._monitor_region[component]
# For now, assume a simple point or line monitor
# Full implementation would extract proper shapes from indices
return (10, 10) # Placeholder
[docs]
def update(self, fields: ElectromagneticFields, time: float, dt: float) -> None:
"""
Update DFT accumulation with current field values.
Parameters
----------
fields : ElectromagneticFields
Current electromagnetic fields.
time : float
Current simulation time (s).
dt : float
Time step (s).
"""
if self._dt is None:
self._dt = dt
# For each frequency, accumulate: F(ω) += f(t) * exp(-jωt) * dt
for component in self.components:
# Get current field values in monitor region
field_data = self._extract_field_data(fields, component)
# Update DFT for each frequency
for freq_idx, omega in enumerate(self.omega):
# Compute phase factor: exp(-j * omega * time)
phase = np.exp(-1j * omega * time)
# Accumulate: DFT += field * phase * dt
self._dft_data[component][freq_idx] += field_data * phase * dt
self._time_steps += 1
def _extract_field_data(
self, fields: ElectromagneticFields, component: str
) -> np.ndarray:
"""Extract field data from the monitor region."""
# Get field component
field = fields[component]
# Extract data from monitor region
# For now, return a simplified extraction
# Full implementation would use proper slicing from _monitor_region
# Convert to numpy if using GPU backend
if hasattr(fields, "backend"):
field_np = fields.backend.to_numpy(field)
else:
field_np = np.asarray(field)
# Return a subset (placeholder - should use monitor region)
if field_np.ndim >= 2:
return field_np[:10, :10]
else:
return field_np[:10]
[docs]
def get_frequency_data(
self, component: str, frequency_index: Optional[int] = None
) -> Union[np.ndarray, Dict[float, np.ndarray]]:
"""
Get frequency-domain field data.
Parameters
----------
component : str
Field component ('Ex', 'Ey', 'Ez', etc.).
frequency_index : int, optional
Index of frequency to retrieve. If None, returns all frequencies.
Returns
-------
ndarray or dict
Complex frequency-domain field data.
If frequency_index is None, returns dict mapping frequency to data.
"""
if component not in self._dft_data:
raise ValueError(f"Component {component} not monitored")
if frequency_index is None:
# Return dict of all frequencies
result = {}
for i, freq in enumerate(self.frequencies):
result[freq] = self._dft_data[component][i]
return result
else:
return self._dft_data[component][frequency_index]
[docs]
def get_intensity(self, component: str, frequency_index: int) -> np.ndarray:
"""
Get intensity (|E|²) at a specific frequency.
Parameters
----------
component : str
Field component.
frequency_index : int
Frequency index.
Returns
-------
ndarray
Field intensity.
"""
field = self.get_frequency_data(component, frequency_index)
return np.abs(field) ** 2
[docs]
def get_power_spectrum(self, component: str, normalize: bool = True) -> np.ndarray:
"""
Get power spectrum vs frequency.
Parameters
----------
component : str
Field component.
normalize : bool
Whether to normalize to maximum value.
Returns
-------
ndarray
Power spectrum with shape (n_frequencies,).
"""
spectrum = np.zeros(len(self.frequencies))
for i in range(len(self.frequencies)):
intensity = self.get_intensity(component, i)
spectrum[i] = np.sum(intensity)
if normalize and np.max(spectrum) > 0:
spectrum = spectrum / np.max(spectrum)
return spectrum
[docs]
def get_transmission_spectrum(
self, reference_power: Optional[np.ndarray] = None
) -> np.ndarray:
"""
Calculate transmission spectrum (normalized power flow).
Parameters
----------
reference_power : ndarray, optional
Reference power for normalization. If None, normalizes to max.
Returns
-------
ndarray
Transmission spectrum.
"""
# Calculate total power for each frequency
power = np.zeros(len(self.frequencies))
for i in range(len(self.frequencies)):
# Sum power from all E-field components
for comp in ["Ex", "Ey", "Ez"]:
if comp in self._dft_data:
intensity = self.get_intensity(comp, i)
power[i] += np.sum(intensity)
# Normalize
if reference_power is not None:
transmission = power / reference_power
elif np.max(power) > 0:
transmission = power / np.max(power)
else:
transmission = power
return transmission