This notebook is an exploration of Deep Learning and problem formalization for volumetric phase retrieval applied to phased arrays with a target plane parallel to the emission plane.
In [1]:
# Libraries
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
import matplotlib.animation as animation
import math
from skimage.transform import resize
import plotly.graph_objs as go
import plotly.offline as pyo
from IPython.display import HTML
from JSAnimation import IPython_display
from scipy.ndimage import gaussian_filter
from tensorflow.keras.layers import Input, Dense, Reshape, Concatenate, Lambda
from dataclasses import dataclass
from PIL import Image
try:
from scipy.ndimage import zoom as ndi_zoom
_HAS_SCIPY = True
except Exception:
_HAS_SCIPY = False
/Users/ngollay/Desktop/Wavefront/Code/DeepGerchberg/.venv/lib/python3.9/site-packages/urllib3/__init__.py:35: NotOpenSSLWarning: urllib3 v2 only supports OpenSSL 1.1.1+, currently the 'ssl' module is compiled with 'LibreSSL 2.8.3'. See: https://github.com/urllib3/urllib3/issues/3020 warnings.warn(
In [2]:
# Parameters of System
emission_resolution = 64 # Size of the acoustic field matrix
num_emitters_x = num_emitters_y = 16 # 16x16 transducer array
num_emitters = num_emitters_x * num_emitters_y
plane_size = 10.0 #cm
plane_distance = 1.0 # Distance between the planes
speed_of_sound = 34300.0 #cm/s
frequency = 40000.0 #hz
period = 1/frequency
wavelength = speed_of_sound / frequency
k = 2 * np.pi / wavelength
angular_frequency = 2 * np.pi * frequency
dx = dy = plane_size / (emission_resolution - 1)
print("Speed of Sound: ", speed_of_sound, "cm/s")
print("Frequency: ", frequency, "Hz")
print("Period: ", period, "s")
print("Wavelength: ", wavelength, "cm")
print("Wavenumber: ", k, "rad/cm")
Speed of Sound: 34300.0 cm/s Frequency: 40000.0 Hz Period: 2.5e-05 s Wavelength: 0.8575 cm Wavenumber: 7.327329804291062 rad/cm
In [3]:
# Creating Smiley Face
# Creating a 64x64 binary image
image_size = emission_resolution
image = np.zeros((image_size, image_size))
# Defining the smiley face parameters
center = (image_size // 2, image_size // 2)
radius = 14*(emission_resolution/32)
eye_radius = 4*(emission_resolution/32)
eye_offset_x = 7*(emission_resolution/32)
eye_offset_y = 4*(emission_resolution/32)
mouth_radius = 6*(emission_resolution/32)
# Drawing the face
y, x = np.ogrid[-center[0]:image_size-center[0], -center[1]:image_size-center[1]]
mask = x*x + y*y <= radius*radius
image[mask] = 1
# Drawing the eyes
left_eye = (center[0] - eye_offset_y, center[1] - eye_offset_x)
right_eye = (center[0] - eye_offset_y, center[1] + eye_offset_x)
y, x = np.ogrid[-left_eye[0]:image_size-left_eye[0], -left_eye[1]:image_size-left_eye[1]]
mask = x*x + y*y <= eye_radius*eye_radius
image[mask] = 0
y, x = np.ogrid[-right_eye[0]:image_size-right_eye[0], -right_eye[1]:image_size-right_eye[1]]
mask = x*x + y*y <= eye_radius*eye_radius
image[mask] = 0
# Drawing the mouth
mouth_top = (center[0] + mouth_radius // 2, center[1])
y, x = np.ogrid[-mouth_top[0]:image_size-mouth_top[0], -mouth_top[1]:image_size-mouth_top[1]]
mask = x*x + y*y <= mouth_radius*mouth_radius
mouth_mask = np.bitwise_and(mask, y > 0)
image[mouth_mask] = 0
smiley_image = image
smiley = [image for _ in range(2)]
# Displaying the image
plt.imshow(image, cmap='gray_r')
plt.axis('off')
plt.show()
In [4]:
# Generate Locations of Individual Transducers
emitter_locations_matrix = np.zeros((num_emitters_x, num_emitters_y, 3))
for i in range(num_emitters_x):
for j in range(num_emitters_y):
emitter_locations_matrix[i, j, 0] = i
emitter_locations_matrix[i, j, 1] = j
emitter_locations_matrix = (emitter_locations_matrix / (num_emitters_x - 1)) * plane_size
emitter_locations = np.reshape(emitter_locations_matrix, (num_emitters, 3))
In [5]:
# Generate Discrete Representation of Target Plane Location
target_plane_location_matrix = np.zeros((emission_resolution, emission_resolution, 3))
for i in range(emission_resolution):
for j in range(emission_resolution):
target_plane_location_matrix[i, j, 0] = i
target_plane_location_matrix[i, j, 1] = j
target_plane_location_matrix = (target_plane_location_matrix / (emission_resolution - 1)) * plane_size
target_plane_location_matrix[:, :, 2] += plane_distance
target_plane_locations = np.reshape(target_plane_location_matrix, (emission_resolution**2, 3))
target_plane_locations.min()
Out[5]:
0.0
In [6]:
import plotly.graph_objects as go
import plotly.io as pio
pio.renderers.default = "notebook"
# Extract x, y, z coordinates
emitter_x = emitter_locations[:, 0]
emitter_y = emitter_locations[:, 1]
emitter_z = emitter_locations[:, 2]
emitter_scatter = go.Scatter3d(
x=emitter_x, y=emitter_y, z=emitter_z,
mode='markers',
name='Emitters',
marker=dict(size=5, color='black', opacity=0.6)
)
target_plane_x = target_plane_locations[:, 0]
target_plane_y = target_plane_locations[:, 1]
target_plane_z = target_plane_locations[:, 2]
target_plane_scatter = go.Scatter3d(
x=target_plane_x, y=target_plane_y, z=target_plane_z,
mode='markers',
name='Plane Samples',
marker=dict(size=5, color='blue', opacity=0.4)
)
layout = go.Layout(
title='Ultrasound Sources and Plane',
scene=dict()
)
fig = go.Figure(data=[emitter_scatter, target_plane_scatter], layout=layout)
fig.show()
In [7]:
def vectorizedDistance(point_arr_1, point_arr_2):
matrix1_exp = point_arr_1[:, np.newaxis, :]
matrix2_exp = point_arr_2[np.newaxis, :, :]
# Euclidean Distance
differences = matrix1_exp - matrix2_exp
squared_differences = np.sum(differences**2, axis=-1)
distances = np.sqrt(squared_differences)
return distances
distances = vectorizedDistance(emitter_locations, target_plane_locations)
distances.shape
Out[7]:
(256, 4096)
In [8]:
def vectorizedSuperposition(distances, phases, time, angular_frequency, wavenumber, plane_distance, gaussian_sigma=None):
# Calculates pressures from distances p(d,t)=A sin(kd−ωt+φ)
distances_with_phases = wavenumber * distances - angular_frequency * time + phases[:, np.newaxis]
sine_distances = np.sin(distances_with_phases)
if gaussian_sigma is not None:
# Gaussian directivity based on lateral (x,y) distance from each emitter
lateral_squared = np.maximum(distances**2 - plane_distance**2, 0.0)
weights = np.exp(-lateral_squared / (2.0 * gaussian_sigma**2))
sine_distances *= weights
summed_sines = np.sum(sine_distances, axis=0)
return summed_sines
def reshapeToPlane(amplitudes):
# Reshapes so can be seen as a plane
return np.reshape(amplitudes, (emission_resolution, emission_resolution))
In [9]:
def animate_single_emitter(emitter_index=0, frames=60, interval=100, time_step=None, gaussian_sigma=None):
if time_step is None:
time_step = period / 30
emitter_distances = distances[emitter_index:emitter_index + 1]
emitter_phases = np.zeros((1,))
def get_frame(frame):
field = vectorizedSuperposition(
emitter_distances,
emitter_phases,
frame * time_step,
angular_frequency,
k,
plane_distance,
gaussian_sigma,
)
return reshapeToPlane(field)
fig, ax = plt.subplots()
img = ax.imshow(get_frame(0), cmap='gray_r', animated=True)
ax.set_title('Single-Emitter Field (Time Animation)')
ax.axis('off')
def update(frame):
img.set_array(get_frame(frame))
return img,
ani = FuncAnimation(fig, update, frames=frames, interval=interval, blit=True)
plt.close(fig)
return ani
gaussian_sigma_value = 5.0
ani = animate_single_emitter(emitter_index=0, gaussian_sigma=gaussian_sigma_value)
HTML(ani.to_jshtml())
Out[9]:
In [10]:
# Test
phases = np.zeros((num_emitters))
gaussian_sigma = 3 # cm
out = vectorizedSuperposition(distances, phases, 1.0, angular_frequency, k, plane_distance, gaussian_sigma)
out = reshapeToPlane(out / num_emitters)
plt.imshow(out, cmap='gray_r')
plt.title('Emission Plane (With Directivity)')
plt.axis('off')
plt.show()
# Directivity of a single transducer (visualized as a beam)
x = np.linspace(-plane_size / 2, plane_size / 2, emission_resolution)
z = np.linspace(0, plane_distance, emission_resolution)
X, Z = np.meshgrid(x, z)
theta = np.arctan2(np.abs(X), np.maximum(Z, 1e-9))
theta_sigma = np.arctan2(gaussian_sigma, plane_distance)
directivity_beam = np.exp(-(theta**2) / (2.0 * theta_sigma**2))
plt.imshow(
directivity_beam,
cmap='gray_r',
origin='lower',
extent=[-plane_size / 2, plane_size / 2, 0, plane_distance],
)
plt.scatter([0], [0], color='red', s=20)
plt.title('Single-Emitter Directivity (Beam View)')
plt.xlabel('Lateral offset (cm)')
plt.ylabel('Axial distance (cm)')
plt.show()
In [11]:
# Sanity Check
sane = False
if not sane:
time = 1.0
phases = np.zeros((num_emitters))
pressures = np.zeros((emission_resolution, emission_resolution))
for i in range(emission_resolution):
for j in range(emission_resolution):
for k_i in range(num_emitters_x):
for k_j in range(num_emitters_y):
distance = math.sqrt(
(emitter_locations_matrix[k_i, k_j, 0] - target_plane_location_matrix[i, j, 0])**2 +
(emitter_locations_matrix[k_i, k_j, 1] - target_plane_location_matrix[i, j, 1])**2 +
(emitter_locations_matrix[k_i, k_j, 2] - target_plane_location_matrix[i, j, 2])**2
)
pressures[i, j] += math.sin(k * distance - angular_frequency * time + 0)
pressures /= num_emitters
out = vectorizedSuperposition(distances, phases, time, angular_frequency, k, None)
out = out / num_emitters
assert np.allclose(out.reshape(emission_resolution, emission_resolution), pressures)
plt.imshow(pressures, cmap='gray_r')
plt.show()
In [12]:
%matplotlib inline
def animateTargetPlaneSuperposition(phases, period, angular_frequency, wavenumber, return_mag = False):
time_step = period / 30
frames = []
max_frame = np.zeros((emission_resolution, emission_resolution))
for i in range(30):
amplitude_frame = vectorizedSuperposition(distances, phases, i * time_step, angular_frequency, wavenumber, plane_distance, 10)
reshaped_frame = np.reshape(amplitude_frame, (emission_resolution, emission_resolution))
frames.append(reshaped_frame)
max_frame = np.maximum(max_frame, reshaped_frame)
print(f"\rGenerating Frame: {i}", end="")
fig, ax = plt.subplots()
im = ax.imshow(frames[0], cmap='gray_r')
def update(frame):
im.set_array(frame)
return [im]
ani = animation.FuncAnimation(fig, update, frames=frames, interval=100, blit=True, repeat=True)
plt.close(fig)
if return_mag:
return ani, max_frame
return ani
phases = np.zeros((num_emitters))
ani = animateTargetPlaneSuperposition(phases, period, angular_frequency, k)
HTML(ani.to_jshtml())
Generating Frame: 29
Out[12]:
In [13]:
def imresize(arr, out_shape, mode="bilinear"):
"""
Rough MATLAB imresize replacement for 2D arrays:
- bilinear: order=1
- nearest: order=0
"""
arr = np.asarray(arr)
if arr.ndim != 2:
raise ValueError("imresize expects a 2D array")
if not _HAS_SCIPY:
pil_mode = Image.BILINEAR if mode == "bilinear" else Image.NEAREST
a = arr.astype(np.float32)
im = Image.fromarray(a)
im = im.resize((out_shape[1], out_shape[0]), resample=pil_mode)
return np.array(im, dtype=arr.dtype)
order = 1 if mode == "bilinear" else 0
zoom_factors = (out_shape[0] / arr.shape[0], out_shape[1] / arr.shape[1])
return ndi_zoom(arr, zoom_factors, order=order)
In [14]:
@dataclass
class Medium:
soundspeed: float # m/s
attenuationdBcmMHz: float = 0.0 # dB / (cm * MHz)
def fftasa(p0, z, medium: Medium, N, delta, f0):
"""
Angular spectrum propagation (ported from your MATLAB).
p0: complex field at z=0 plane, shape (nx, ny) but assumed square here like MATLAB usage
z: propagation distance (m). Positive forward, negative backward.
N: FFT grid size (int)
delta: spatial sampling interval (m)
f0: frequency (Hz)
Returns: complex field at distance z, shape (N, N)
"""
p0 = np.asarray(p0, dtype=np.complex128)
wavelen = medium.soundspeed / f0
dBperNeper = 20.0 * np.log10(np.e)
attenuation_nepers_per_meter = (medium.attenuationdBcmMHz / dBperNeper) * 100.0 * (f0 / 1e6)
# fft2(p0, N, N)
fftpressz0 = np.fft.fft2(p0, s=(N, N))
wavenum = 2.0 * np.pi / wavelen
if N % 2 == 1: # odd
k = np.arange(-N/2 - 0.5, N/2 - 0.5, 1.0) # length N
else: # even
k = np.arange(-N/2, N/2, 1.0) # length N
# kx = k * wavelen/(N*delta)
kx = k * (wavelen / (N * delta))
ky = k * (wavelen / (N * delta))
kxspace, kyspace = np.meshgrid(kx, ky, indexing="xy")
kxsq_ysq = np.fft.fftshift(kxspace**2 + kyspace**2)
# kz = k0 * sqrt(1 - kx^2 - ky^2) (kx,ky are dimensionless here)
# Complex sqrt is fine; we’ll keep it complex to avoid nan on evanescent components.
kzspace = wavenum * np.sqrt(1.0 - kxsq_ysq + 0j)
# Basic spectral propagator
if z > 0:
H = np.conj(np.exp(1j * z * kzspace))
else:
H = np.exp(-1j * z * kzspace) * (kxsq_ysq <= 1.0)
# Attenuation (only for propagating modes)
if attenuation_nepers_per_meter > 0:
propagating = (np.sqrt(kxsq_ysq) < 1.0)
# exp(-att * delz / cos(asin(sqrt(kxsq_ysq))) ) * propagating
# cos(asin(a)) = sqrt(1 - a^2)
a = np.sqrt(np.clip(kxsq_ysq, 0.0, 1.0))
cos_theta = np.sqrt(np.clip(1.0 - a**2, 1e-12, 1.0))
H = H * np.exp(-(attenuation_nepers_per_meter * abs(z) / cos_theta) * propagating) * propagating
# Angular threshold filter
D = (N - 1) * delta
thres = np.sqrt(0.5 * D**2 / (0.5 * D**2 + z**2)) if z != 0 else 1.0
filt = (np.sqrt(kxsq_ysq) <= thres)
H = H * filt
# Back to space
fftpress = np.fft.ifft2(fftpressz0 * H, s=(N, N))
return fftpress
In [15]:
def calc_emission_for_target_amp_slice(
target_amp_slice,
dist,
iters,
slice_size,
freq,
sound_speed,
emitter_size,
amp_res,
phase_res,
record_mse=True,
eps=1e-12,
):
target_amp_slice = np.asarray(target_amp_slice, dtype=np.float64)
w, h = target_amp_slice.shape
assert w == h, "target_amp_slice must be square"
assert (w & (w - 1)) == 0, "target_amp_slice width must be power of 2"
target = np.zeros((w, h), dtype=np.complex128)
emission = np.zeros((w, h), dtype=np.complex128)
n_emitters_per_side = int(np.floor(slice_size / emitter_size))
n_emitters = n_emitters_per_side * n_emitters_per_side
emitter_px = w / n_emitters_per_side # float
# amplitude mask: grid of circles (vectorized)
yy, xx = np.meshgrid(np.arange(h), np.arange(w), indexing="xy")
x_in_cell = (xx % emitter_px) - (emitter_px / 2.0)
y_in_cell = (yy % emitter_px) - (emitter_px / 2.0)
em_px2 = (emitter_px * emitter_px) / 4.0
mask = ((x_in_cell**2 + y_in_cell**2) < em_px2).astype(np.float64)
medium = Medium(soundspeed=sound_speed, attenuationdBcmMHz=0.0)
# Precompute normalized target amplitude used for MSE
target_norm = target_amp_slice / (np.max(target_amp_slice) + eps)
mse_history = []
for _ in range(iters):
# stamp target amplitude, retain phases
target = target_amp_slice * np.exp(1j * np.angle(target))
# backpropagate target -> emission
emission = fftasa(target, -dist, medium, w, slice_size / w, freq)
# apply constraints on emission slice
amp = np.abs(emission)
phase = np.angle(emission)
# downscale to per-emitter grid
down_amp = imresize(amp, (n_emitters_per_side, n_emitters_per_side), mode="bilinear")
down_phase = imresize(phase, (n_emitters_per_side, n_emitters_per_side), mode="bilinear")
# discretize amplitude
if amp_res == 0:
down_amp = np.ones_like(down_amp)
else:
down_amp = np.fix(down_amp * amp_res) / amp_res
# discretize phase
if phase_res == 0:
down_phase = np.zeros_like(down_phase)
else:
down_phase = np.fix((down_phase / np.pi) * phase_res) * (np.pi / phase_res)
# upscale so each emitter region is constant
amp = imresize(down_amp, (w, h), mode="nearest")
phase = imresize(down_phase, (w, h), mode="nearest")
amp = amp * mask
# combine amp/phase into complex emission
emission = amp * np.exp(1j * phase)
# propagate emission -> target
target = fftasa(emission, dist, medium, w, slice_size / w, freq)
if record_mse:
amp_slice_iter = np.abs(target)
amp_slice_iter = amp_slice_iter / (np.max(amp_slice_iter) + eps)
mse_history.append(np.mean((target_norm - amp_slice_iter) ** 2))
amp_slice = np.abs(target)
# extract amps/phases for each emitter at its center pixel (MATLAB-style)
amps = np.zeros((n_emitters,), dtype=np.float64)
phases = np.zeros((n_emitters,), dtype=np.float64)
idx = 0
for ix in range(1, n_emitters_per_side + 1):
for iy in range(1, n_emitters_per_side + 1):
center_x = int(np.round(ix * emitter_px - emitter_px / 2.0))
center_y = int(np.round(iy * emitter_px - emitter_px / 2.0))
center_x = np.clip(center_x, 0, w - 1)
center_y = np.clip(center_y, 0, h - 1)
em = emission[center_x, center_y]
amps[idx] = np.abs(em)
phases[idx] = np.angle(em)
idx += 1
return amps, phases, amp_slice, np.array(mse_history, dtype=np.float64), target
In [16]:
targetfile = "smiley.png" # put in same folder as notebook, or give full path
img = Image.open(targetfile).convert("L") # grayscale
target_amp = np.array(img, dtype=np.float64)
target_amp = target_amp / np.max(target_amp) # normalize
amps, phases, amp_slice, mse, target_field = calc_emission_for_target_amp_slice(
target_amp_slice=target_amp,
dist=0.16,
iters=50,
slice_size=0.16,
freq=40000,
sound_speed=340,
emitter_size=0.01,
amp_res=0,
phase_res=32,
record_mse=True,
)
plt.figure(figsize=(6,4))
plt.plot(mse)
plt.yscale("log")
plt.xlabel("Iteration")
plt.ylabel("MSE (normalized)")
plt.title("GS convergence")
plt.grid(True, which="both")
plt.tight_layout()
plt.show()
plt.figure(figsize=(8, 8))
plt.subplot(2, 1, 1)
plt.imshow(target_amp, aspect="equal", cmap="gray_r")
plt.title("target")
plt.axis("off")
plt.subplot(2, 1, 2)
plt.imshow(amp_slice, aspect="equal", cmap="gray_r")
plt.title("obtained")
plt.axis("off")
plt.tight_layout()
plt.show()
# MSE (match MATLAB normalization)
target_n = target_amp / np.max(target_amp)
slice_n = amp_slice / np.max(amp_slice)
mse = np.mean((target_n - slice_n) ** 2)
print("MSE:", mse)
MSE: 0.10190947886666264
In [17]:
from matplotlib.animation import FuncAnimation
from IPython.display import HTML
def animate_one_period(target_field, freq_hz, n_frames=30, interval_ms=50, normalize=True):
"""
target_field: complex 2D array (phasor at target plane)
freq_hz: oscillation frequency
n_frames: frames across one period
"""
target_field = np.asarray(target_field, dtype=np.complex128)
omega = 2 * np.pi * freq_hz
T = 1.0 / freq_hz
times = np.linspace(0, T, n_frames, endpoint=False)
# Precompute frames (fast + stable scaling)
frames = []
for t in times:
p = np.real(target_field * np.exp(-1j * omega * t))
frames.append(p)
frames = np.stack(frames, axis=0)
if normalize:
scale = np.max(np.abs(frames)) + 1e-12
frames = frames / scale
vmin, vmax = frames.min(), frames.max()
fig, ax = plt.subplots(figsize=(6, 6))
im = ax.imshow(frames[0], vmin=vmin, vmax=vmax, animated=True)
ax.set_title("Target plane pressure over one period")
ax.axis("off")
plt.close(fig)
def update(i):
im.set_array(frames[i])
ax.set_title(f"t = {times[i]*1e6:.2f} µs ({i+1}/{n_frames})")
return (im,)
anim = FuncAnimation(fig, update, frames=n_frames, interval=interval_ms, blit=True)
return HTML(anim.to_jshtml())
In [18]:
amps, phases, amp_slice, mse, target_field = calc_emission_for_target_amp_slice(
target_amp_slice=target_amp,
dist=0.16,
iters=50,
slice_size=0.16,
freq=40000,
sound_speed=340,
emitter_size=0.005,
amp_res=0,
phase_res=32,
record_mse=True,
)
animate_one_period(target_field, freq_hz=40000, n_frames=30)
Out[18]: