FODO Cell, Programmable Element
This implements the same FODO cell as the stable FODO cell example.
However, in the example here we define additional user-defined, custom elements (impactx.elements.Programmable) from the ImpactX Python APIs.
More generally, ImpactX exposes all data structures through pyAMReX for adding additional computation, enabling rapid prototyping of new elements on both CPU and GPU.
Run
This example can be run as a Python script: python3 fodo_programmable.py.
For MPI-parallel runs, prefix this line with mpiexec -n 4 ... or srun -n 4 ..., depending on the system.
#!/usr/bin/env python3
#
# Copyright 2022-2023 ImpactX contributors
# Authors: Axel Huebl, Chad Mitchell
# License: BSD-3-Clause-LBNL
#
# -*- coding: utf-8 -*-
from impactx import ImpactX, distribution, elements
sim = ImpactX()
# set numerical parameters and IO control
sim.space_charge = False
# sim.diagnostics = False # benchmarking
sim.slice_step_diagnostics = True
# domain decomposition & space charge mesh
sim.init_grids()
# load a 2 GeV electron beam with an initial
# unnormalized rms emittance of 2 nm
kin_energy_MeV = 2.0e3 # reference energy
bunch_charge_C = 1.0e-9 # used with space charge
npart = 10000 # number of macro particles
# reference particle
ref = sim.particle_container().ref_particle()
ref.set_charge_qe(-1.0).set_mass_MeV(0.510998950).set_kin_energy_MeV(kin_energy_MeV)
# particle bunch
distr = distribution.Waterbag(
lambdaX=3.9984884770e-5,
lambdaY=3.9984884770e-5,
lambdaT=1.0e-3,
lambdaPx=2.6623538760e-5,
lambdaPy=2.6623538760e-5,
lambdaPt=2.0e-3,
muxpx=-0.846574929020762,
muypy=0.846574929020762,
mutpt=0.0,
)
sim.add_particles(bunch_charge_C, distr, npart)
# number of slices per ds in each lattice element
ns = 25
# build a custom, Pythonic beam optical element
def my_drift(pge, pti, refpart):
"""This pushes the beam particles as a drift.
Relative to the reference particle.
:param pti: particle iterator for the current tile or box
:param refpart: the reference particle
"""
# access particle attributes
soa = pti.soa().to_xp() # automatic: NumPy (CPU) or CuPy (GPU)
x = soa.real["position_x"]
y = soa.real["position_y"]
t = soa.real["position_t"]
px = soa.real["momentum_x"]
py = soa.real["momentum_y"]
pt = soa.real["momentum_t"]
# length of the current slice
slice_ds = pge.ds / pge.nslice
# access reference particle values to find beta*gamma^2
pt_ref = refpart.pt
betgam2 = pt_ref**2 - 1.0
# advance position and momentum (drift)
x[:] += slice_ds * px[:]
y[:] += slice_ds * py[:]
t[:] += (slice_ds / betgam2) * pt[:]
def my_ref_drift(pge, refpart):
"""This pushes the reference particle.
:param refpart: reference particle
"""
# assign input reference particle values
x = refpart.x
px = refpart.px
y = refpart.y
py = refpart.py
z = refpart.z
pz = refpart.pz
t = refpart.t
pt = refpart.pt
s = refpart.s
# length of the current slice
slice_ds = pge.ds / pge.nslice
# assign intermediate parameter
step = slice_ds / (pt**2 - 1.0) ** 0.5
# advance position and momentum (drift)
refpart.x = x + step * px
refpart.y = y + step * py
refpart.z = z + step * pz
refpart.t = t - step * pt
# advance integrated path length
refpart.s = s + slice_ds
pge1 = elements.Programmable(name="d1")
pge1.nslice = ns
pge1.beam_particles = lambda pti, refpart: my_drift(pge1, pti, refpart)
pge1.ref_particle = lambda refpart: my_ref_drift(pge1, refpart)
pge1.ds = 0.25
# attention: assignment is a reference for pge2 = pge1
pge2 = elements.Programmable(name="d2")
pge2.nslice = ns
pge2.beam_particles = lambda pti, refpart: my_drift(pge2, pti, refpart)
pge2.ref_particle = lambda refpart: my_ref_drift(pge2, refpart)
pge2.ds = 0.5
# add beam diagnostics
monitor = elements.BeamMonitor("monitor", backend="h5")
# design the accelerator lattice
fodo = [
monitor,
pge1, # equivalent to elements.Drift("d1", ds=0.25, nslice=ns)
monitor,
elements.Quad(name="q1", ds=1.0, k=1.0, nslice=ns),
monitor,
pge2, # equivalent to elements.Drift("d2", ds=0.5, nslice=ns)
monitor,
elements.Quad(name="q2", ds=1.0, k=-1.0, nslice=ns),
monitor,
pge1, # equivalent to elements.Drift("d1", ds=0.25, nslice=ns)
monitor,
]
# assign a fodo segment
sim.lattice.extend(fodo)
# run simulation
sim.track_particles()
# clean shutdown
sim.finalize()
Analyze
We run the following script to analyze correctness:
Script analysis_fodo.py
#!/usr/bin/env python3
#
# Copyright 2022-2023 ImpactX contributors
# Authors: Axel Huebl, Chad Mitchell
# License: BSD-3-Clause-LBNL
#
import numpy as np
import openpmd_api as io
from scipy.stats import moment
def get_moments(beam):
"""Calculate standard deviations of beam position & momenta
and emittance values
Returns
-------
sigx, sigy, sigt, emittance_x, emittance_y, emittance_t
"""
sigx = moment(beam["position_x"], moment=2) ** 0.5 # variance -> std dev.
sigpx = moment(beam["momentum_x"], moment=2) ** 0.5
sigy = moment(beam["position_y"], moment=2) ** 0.5
sigpy = moment(beam["momentum_y"], moment=2) ** 0.5
sigt = moment(beam["position_t"], moment=2) ** 0.5
sigpt = moment(beam["momentum_t"], moment=2) ** 0.5
epstrms = beam.cov(ddof=0)
emittance_x = (sigx**2 * sigpx**2 - epstrms["position_x"]["momentum_x"] ** 2) ** 0.5
emittance_y = (sigy**2 * sigpy**2 - epstrms["position_y"]["momentum_y"] ** 2) ** 0.5
emittance_t = (sigt**2 * sigpt**2 - epstrms["position_t"]["momentum_t"] ** 2) ** 0.5
return (sigx, sigy, sigt, emittance_x, emittance_y, emittance_t)
# initial/final beam
series = io.Series("diags/openPMD/monitor.h5", io.Access.read_only)
last_step = list(series.iterations)[-1]
initial = series.iterations[1].particles["beam"].to_df()
beam_final = series.iterations[last_step].particles["beam"]
final = beam_final.to_df()
# compare number of particles
num_particles = 10000
assert num_particles == len(initial)
assert num_particles == len(final)
print("Initial Beam:")
sigx, sigy, sigt, emittance_x, emittance_y, emittance_t = get_moments(initial)
print(f" sigx={sigx:e} sigy={sigy:e} sigt={sigt:e}")
print(
f" emittance_x={emittance_x:e} emittance_y={emittance_y:e} emittance_t={emittance_t:e}"
)
atol = 0.0 # ignored
rtol = 2.2 * num_particles**-0.5 # from random sampling of a smooth distribution
print(f" rtol={rtol} (ignored: atol~={atol})")
assert np.allclose(
[sigx, sigy, sigt, emittance_x, emittance_y, emittance_t],
[
7.5451170454175073e-005,
7.5441588239210947e-005,
9.9775878164077539e-004,
1.9959540393751392e-009,
2.0175015289132990e-009,
2.0013820193294972e-006,
],
rtol=rtol,
atol=atol,
)
print("")
print("Final Beam:")
sigx, sigy, sigt, emittance_x, emittance_y, emittance_t = get_moments(final)
s_ref = beam_final.get_attribute("s_ref")
gamma_ref = beam_final.get_attribute("gamma_ref")
print(f" sigx={sigx:e} sigy={sigy:e} sigt={sigt:e}")
print(
f" emittance_x={emittance_x:e} emittance_y={emittance_y:e} emittance_t={emittance_t:e}\n"
f" s_ref={s_ref:e} gamma_ref={gamma_ref:e}"
)
atol = 0.0 # ignored
rtol = 2.2 * num_particles**-0.5 # from random sampling of a smooth distribution
print(f" rtol={rtol} (ignored: atol~={atol})")
assert np.allclose(
[sigx, sigy, sigt, emittance_x, emittance_y, emittance_t, s_ref, gamma_ref],
[
7.4790118496224206e-005,
7.5357525169680140e-005,
9.9775879288128088e-004,
1.9959539836392703e-009,
2.0175014668882125e-009,
2.0013820380883801e-006,
3.000000,
3.914902e003,
],
rtol=rtol,
atol=atol,
)
Visualize
You can run the following script to visualize the beam evolution over time:
Script plot_fodo.py
#!/usr/bin/env python3
#
# Copyright 2022-2023 ImpactX contributors
# Authors: Axel Huebl, Chad Mitchell
# License: BSD-3-Clause-LBNL
#
import argparse
import glob
import re
import matplotlib.pyplot as plt
import openpmd_api as io
import pandas as pd
from matplotlib.ticker import MaxNLocator
def read_file(file_pattern):
for filename in glob.glob(file_pattern):
df = pd.read_csv(filename, delimiter=r"\s+")
if "step" not in df.columns:
step = int(re.findall(r"[0-9]+", filename)[0])
df["step"] = step
yield df
def read_time_series(file_pattern):
"""Read in all CSV files from each MPI rank (and potentially OpenMP
thread). Concatenate into one Pandas dataframe.
Returns
-------
pandas.DataFrame
"""
return pd.concat(
read_file(file_pattern),
axis=0,
ignore_index=True,
) # .set_index('id')
# options to run this script
parser = argparse.ArgumentParser(description="Plot the FODO benchmark.")
parser.add_argument(
"--save-png", action="store_true", help="non-interactive run: save to PNGs"
)
args = parser.parse_args()
# initial/final beam
series = io.Series("diags/openPMD/monitor.h5", io.Access.read_only)
last_step = list(series.iterations)[-1]
initial = series.iterations[1].particles["beam"].to_df()
final = series.iterations[last_step].particles["beam"].to_df()
ref_particle = read_time_series("diags/ref_particle.*")
# steps & corresponding z
steps = list(series.iterations)
z = list(
map(lambda step: ref_particle[ref_particle["step"] == step].z.values[0], steps)
)
# scaling to units
millimeter = 1.0e3 # m->mm
mrad = 1.0e3 # ImpactX uses "static units": momenta are normalized by the magnitude of the momentum of the reference particle p0: px/p0 (rad)
# mm_mrad = 1.e6
nm_rad = 1.0e9
# read reduced diagnostics
rbc = read_time_series("diags/reduced_beam_characteristics.*")
s = rbc["s"]
sigma_x = rbc["sigma_x"] * millimeter
sigma_y = rbc["sigma_y"] * millimeter
sigma_t = rbc["sigma_t"] * millimeter
emittance_x = rbc["emittance_x"] * nm_rad
emittance_y = rbc["emittance_y"] * nm_rad
emittance_t = rbc["emittance_t"] * nm_rad
length = len(s) - 1
# select a single particle by id
# particle_42 = beam[beam["id"] == 42]
# print(particle_42)
# steps & corresponding z
steps = list(series.iterations)
# print beam transverse size over steps
f = plt.figure(figsize=(9, 4.8))
ax1 = f.gca()
im_sigx = ax1.plot(s, sigma_x, label=r"$\sigma_x$")
im_sigy = ax1.plot(s, sigma_y, label=r"$\sigma_y$")
ax2 = ax1.twinx()
ax2.set_prop_cycle(None) # reset color cycle
im_emittance_x = ax2.plot(s, emittance_x, ":", label=r"$\epsilon_x$")
im_emittance_y = ax2.plot(s, emittance_y, ":", label=r"$\epsilon_y$")
ax1.legend(
handles=im_sigx + im_sigy + im_emittance_x + im_emittance_y, loc="lower center"
)
ax1.set_xlabel(r"$z$ [m]")
ax1.set_ylabel(r"$\sigma_{x,y}$ [mm]")
ax2.set_ylabel(r"$\epsilon_{x,y}$ [nm]")
ax2.set_ylim([1.5, 2.5])
ax1.xaxis.set_major_locator(MaxNLocator(integer=True))
plt.tight_layout()
if args.save_png:
plt.savefig("fodo_sigma.png")
else:
plt.show()
# beam transverse scatter plot over steps
num_plots_per_row = len(steps)
fig, axs = plt.subplots(
3, num_plots_per_row, figsize=(9, 4.8), sharex="row", sharey="row"
)
ncol_ax = -1
for step in steps:
# plot initial distribution & at exit of each element
ncol_ax += 1
# x-y
ax = axs[(0, ncol_ax)]
beam_at_step = series.iterations[step].particles["beam"].to_df()
ax.scatter(
beam_at_step.position_x.multiply(millimeter),
beam_at_step.position_y.multiply(millimeter),
s=0.01,
)
ax.set_title(f"$z={z[ncol_ax]:.2f}$ [m]")
ax.set_xlabel(r"$x$ [mm]")
# x-px
ax = axs[(1, ncol_ax)]
beam_at_step = series.iterations[step].particles["beam"].to_df()
ax.scatter(
beam_at_step.position_x.multiply(millimeter),
beam_at_step.momentum_x.multiply(mrad),
s=0.01,
)
ax.set_xlabel(r"$x$ [mm]")
# y-py
ax = axs[(2, ncol_ax)]
beam_at_step = series.iterations[step].particles["beam"].to_df()
ax.scatter(
beam_at_step.position_y.multiply(millimeter),
beam_at_step.momentum_y.multiply(mrad),
s=0.01,
)
ax.set_xlabel(r"$y$ [mm]")
axs[(0, 0)].set_ylabel(r"$y$ [mm]")
axs[(1, 0)].set_ylabel(r"$p_x$ [mrad]")
axs[(2, 0)].set_ylabel(r"$p_y$ [mrad]")
plt.tight_layout()
if args.save_png:
plt.savefig("fodo_scatter.png")
else:
plt.show()
Fig. 7 FODO transversal beam width and emittance evolution
Fig. 8 FODO transversal beam width and phase space evolution