FODO Cell, Programmable Element

This implements the same FODO cell as the stable FODO cell example. However, in the example here we define additional user-defined, custom elements (impactx.elements.Programmable) from the ImpactX Python APIs.

More generally, ImpactX exposes all data structures through pyAMReX for adding additional computation, enabling rapid prototyping of new elements on both CPU and GPU.

Run

This example can be run as a Python script: python3 fodo_programmable.py. For MPI-parallel runs, prefix this line with mpiexec -n 4 ... or srun -n 4 ..., depending on the system.

Listing 94 You can copy this file from examples/fodo_programmable/run_fodo_programmable.py.
#!/usr/bin/env python3
#
# Copyright 2022-2023 ImpactX contributors
# Authors: Axel Huebl, Chad Mitchell
# License: BSD-3-Clause-LBNL
#
# -*- coding: utf-8 -*-


from impactx import ImpactX, distribution, elements

sim = ImpactX()

# set numerical parameters and IO control
sim.space_charge = False
# sim.diagnostics = False  # benchmarking
sim.slice_step_diagnostics = True

# domain decomposition & space charge mesh
sim.init_grids()

# load a 2 GeV electron beam with an initial
# unnormalized rms emittance of 2 nm
kin_energy_MeV = 2.0e3  # reference energy
bunch_charge_C = 1.0e-9  # used with space charge
npart = 10000  # number of macro particles

#   reference particle
ref = sim.particle_container().ref_particle()
ref.set_charge_qe(-1.0).set_mass_MeV(0.510998950).set_kin_energy_MeV(kin_energy_MeV)

#   particle bunch
distr = distribution.Waterbag(
    lambdaX=3.9984884770e-5,
    lambdaY=3.9984884770e-5,
    lambdaT=1.0e-3,
    lambdaPx=2.6623538760e-5,
    lambdaPy=2.6623538760e-5,
    lambdaPt=2.0e-3,
    muxpx=-0.846574929020762,
    muypy=0.846574929020762,
    mutpt=0.0,
)
sim.add_particles(bunch_charge_C, distr, npart)

# number of slices per ds in each lattice element
ns = 25


# build a custom, Pythonic beam optical element
def my_drift(pge, pti, refpart):
    """This pushes the beam particles as a drift.

    Relative to the reference particle.

    :param pti: particle iterator for the current tile or box
    :param refpart: the reference particle
    """
    # access particle attributes
    soa = pti.soa().to_xp()  # automatic: NumPy (CPU) or CuPy (GPU)

    x = soa.real["position_x"]
    y = soa.real["position_y"]
    t = soa.real["position_t"]
    px = soa.real["momentum_x"]
    py = soa.real["momentum_y"]
    pt = soa.real["momentum_t"]

    # length of the current slice
    slice_ds = pge.ds / pge.nslice

    # access reference particle values to find beta*gamma^2
    pt_ref = refpart.pt
    betgam2 = pt_ref**2 - 1.0

    # advance position and momentum (drift)
    x[:] += slice_ds * px[:]
    y[:] += slice_ds * py[:]
    t[:] += (slice_ds / betgam2) * pt[:]


def my_ref_drift(pge, refpart):
    """This pushes the reference particle.

    :param refpart: reference particle
    """
    #  assign input reference particle values
    x = refpart.x
    px = refpart.px
    y = refpart.y
    py = refpart.py
    z = refpart.z
    pz = refpart.pz
    t = refpart.t
    pt = refpart.pt
    s = refpart.s

    # length of the current slice
    slice_ds = pge.ds / pge.nslice

    # assign intermediate parameter
    step = slice_ds / (pt**2 - 1.0) ** 0.5

    # advance position and momentum (drift)
    refpart.x = x + step * px
    refpart.y = y + step * py
    refpart.z = z + step * pz
    refpart.t = t - step * pt

    # advance integrated path length
    refpart.s = s + slice_ds


pge1 = elements.Programmable(name="d1")
pge1.nslice = ns
pge1.beam_particles = lambda pti, refpart: my_drift(pge1, pti, refpart)
pge1.ref_particle = lambda refpart: my_ref_drift(pge1, refpart)
pge1.ds = 0.25

# attention: assignment is a reference for pge2 = pge1

pge2 = elements.Programmable(name="d2")
pge2.nslice = ns
pge2.beam_particles = lambda pti, refpart: my_drift(pge2, pti, refpart)
pge2.ref_particle = lambda refpart: my_ref_drift(pge2, refpart)
pge2.ds = 0.5

# add beam diagnostics
monitor = elements.BeamMonitor("monitor", backend="h5")

# design the accelerator lattice
fodo = [
    monitor,
    pge1,  # equivalent to elements.Drift("d1", ds=0.25, nslice=ns)
    monitor,
    elements.Quad(name="q1", ds=1.0, k=1.0, nslice=ns),
    monitor,
    pge2,  # equivalent to elements.Drift("d2", ds=0.5, nslice=ns)
    monitor,
    elements.Quad(name="q2", ds=1.0, k=-1.0, nslice=ns),
    monitor,
    pge1,  # equivalent to elements.Drift("d1", ds=0.25, nslice=ns)
    monitor,
]
# assign a fodo segment
sim.lattice.extend(fodo)

# run simulation
sim.track_particles()

# clean shutdown
sim.finalize()

Analyze

We run the following script to analyze correctness:

Script analysis_fodo.py
Listing 95 You can copy this file from examples/fodo_programmable/analysis_fodo.py.
#!/usr/bin/env python3
#
# Copyright 2022-2023 ImpactX contributors
# Authors: Axel Huebl, Chad Mitchell
# License: BSD-3-Clause-LBNL
#


import numpy as np
import openpmd_api as io
from scipy.stats import moment


def get_moments(beam):
    """Calculate standard deviations of beam position & momenta
    and emittance values

    Returns
    -------
    sigx, sigy, sigt, emittance_x, emittance_y, emittance_t
    """
    sigx = moment(beam["position_x"], moment=2) ** 0.5  # variance -> std dev.
    sigpx = moment(beam["momentum_x"], moment=2) ** 0.5
    sigy = moment(beam["position_y"], moment=2) ** 0.5
    sigpy = moment(beam["momentum_y"], moment=2) ** 0.5
    sigt = moment(beam["position_t"], moment=2) ** 0.5
    sigpt = moment(beam["momentum_t"], moment=2) ** 0.5

    epstrms = beam.cov(ddof=0)
    emittance_x = (sigx**2 * sigpx**2 - epstrms["position_x"]["momentum_x"] ** 2) ** 0.5
    emittance_y = (sigy**2 * sigpy**2 - epstrms["position_y"]["momentum_y"] ** 2) ** 0.5
    emittance_t = (sigt**2 * sigpt**2 - epstrms["position_t"]["momentum_t"] ** 2) ** 0.5

    return (sigx, sigy, sigt, emittance_x, emittance_y, emittance_t)


# initial/final beam
series = io.Series("diags/openPMD/monitor.h5", io.Access.read_only)
last_step = list(series.iterations)[-1]
initial = series.iterations[1].particles["beam"].to_df()
beam_final = series.iterations[last_step].particles["beam"]
final = beam_final.to_df()

# compare number of particles
num_particles = 10000
assert num_particles == len(initial)
assert num_particles == len(final)

print("Initial Beam:")
sigx, sigy, sigt, emittance_x, emittance_y, emittance_t = get_moments(initial)
print(f"  sigx={sigx:e} sigy={sigy:e} sigt={sigt:e}")
print(
    f"  emittance_x={emittance_x:e} emittance_y={emittance_y:e} emittance_t={emittance_t:e}"
)

atol = 0.0  # ignored
rtol = 2.2 * num_particles**-0.5  # from random sampling of a smooth distribution
print(f"  rtol={rtol} (ignored: atol~={atol})")

assert np.allclose(
    [sigx, sigy, sigt, emittance_x, emittance_y, emittance_t],
    [
        7.5451170454175073e-005,
        7.5441588239210947e-005,
        9.9775878164077539e-004,
        1.9959540393751392e-009,
        2.0175015289132990e-009,
        2.0013820193294972e-006,
    ],
    rtol=rtol,
    atol=atol,
)


print("")
print("Final Beam:")
sigx, sigy, sigt, emittance_x, emittance_y, emittance_t = get_moments(final)
s_ref = beam_final.get_attribute("s_ref")
gamma_ref = beam_final.get_attribute("gamma_ref")
print(f"  sigx={sigx:e} sigy={sigy:e} sigt={sigt:e}")
print(
    f"  emittance_x={emittance_x:e} emittance_y={emittance_y:e} emittance_t={emittance_t:e}\n"
    f"  s_ref={s_ref:e} gamma_ref={gamma_ref:e}"
)

atol = 0.0  # ignored
rtol = 2.2 * num_particles**-0.5  # from random sampling of a smooth distribution
print(f"  rtol={rtol} (ignored: atol~={atol})")

assert np.allclose(
    [sigx, sigy, sigt, emittance_x, emittance_y, emittance_t, s_ref, gamma_ref],
    [
        7.4790118496224206e-005,
        7.5357525169680140e-005,
        9.9775879288128088e-004,
        1.9959539836392703e-009,
        2.0175014668882125e-009,
        2.0013820380883801e-006,
        3.000000,
        3.914902e003,
    ],
    rtol=rtol,
    atol=atol,
)

Visualize

You can run the following script to visualize the beam evolution over time:

Script plot_fodo.py
Listing 96 You can copy this file from examples/fodo_programmable/plot_fodo.py.
#!/usr/bin/env python3
#
# Copyright 2022-2023 ImpactX contributors
# Authors: Axel Huebl, Chad Mitchell
# License: BSD-3-Clause-LBNL
#

import argparse
import glob
import re

import matplotlib.pyplot as plt
import openpmd_api as io
import pandas as pd
from matplotlib.ticker import MaxNLocator


def read_file(file_pattern):
    for filename in glob.glob(file_pattern):
        df = pd.read_csv(filename, delimiter=r"\s+")
        if "step" not in df.columns:
            step = int(re.findall(r"[0-9]+", filename)[0])
            df["step"] = step
        yield df


def read_time_series(file_pattern):
    """Read in all CSV files from each MPI rank (and potentially OpenMP
    thread). Concatenate into one Pandas dataframe.

    Returns
    -------
    pandas.DataFrame
    """
    return pd.concat(
        read_file(file_pattern),
        axis=0,
        ignore_index=True,
    )  # .set_index('id')


# options to run this script
parser = argparse.ArgumentParser(description="Plot the FODO benchmark.")
parser.add_argument(
    "--save-png", action="store_true", help="non-interactive run: save to PNGs"
)
args = parser.parse_args()


# initial/final beam
series = io.Series("diags/openPMD/monitor.h5", io.Access.read_only)
last_step = list(series.iterations)[-1]
initial = series.iterations[1].particles["beam"].to_df()
final = series.iterations[last_step].particles["beam"].to_df()
ref_particle = read_time_series("diags/ref_particle.*")

# steps & corresponding z
steps = list(series.iterations)

z = list(
    map(lambda step: ref_particle[ref_particle["step"] == step].z.values[0], steps)
)

# scaling to units
millimeter = 1.0e3  # m->mm
mrad = 1.0e3  # ImpactX uses "static units": momenta are normalized by the magnitude of the momentum of the reference particle p0: px/p0 (rad)
# mm_mrad = 1.e6
nm_rad = 1.0e9

# read reduced diagnostics
rbc = read_time_series("diags/reduced_beam_characteristics.*")

s = rbc["s"]
sigma_x = rbc["sigma_x"] * millimeter
sigma_y = rbc["sigma_y"] * millimeter
sigma_t = rbc["sigma_t"] * millimeter
emittance_x = rbc["emittance_x"] * nm_rad
emittance_y = rbc["emittance_y"] * nm_rad
emittance_t = rbc["emittance_t"] * nm_rad

length = len(s) - 1

# select a single particle by id
# particle_42 = beam[beam["id"] == 42]
# print(particle_42)

# steps & corresponding z
steps = list(series.iterations)

# print beam transverse size over steps
f = plt.figure(figsize=(9, 4.8))
ax1 = f.gca()
im_sigx = ax1.plot(s, sigma_x, label=r"$\sigma_x$")
im_sigy = ax1.plot(s, sigma_y, label=r"$\sigma_y$")
ax2 = ax1.twinx()
ax2.set_prop_cycle(None)  # reset color cycle
im_emittance_x = ax2.plot(s, emittance_x, ":", label=r"$\epsilon_x$")
im_emittance_y = ax2.plot(s, emittance_y, ":", label=r"$\epsilon_y$")

ax1.legend(
    handles=im_sigx + im_sigy + im_emittance_x + im_emittance_y, loc="lower center"
)
ax1.set_xlabel(r"$z$ [m]")
ax1.set_ylabel(r"$\sigma_{x,y}$ [mm]")
ax2.set_ylabel(r"$\epsilon_{x,y}$ [nm]")
ax2.set_ylim([1.5, 2.5])
ax1.xaxis.set_major_locator(MaxNLocator(integer=True))
plt.tight_layout()
if args.save_png:
    plt.savefig("fodo_sigma.png")
else:
    plt.show()


# beam transverse scatter plot over steps
num_plots_per_row = len(steps)
fig, axs = plt.subplots(
    3, num_plots_per_row, figsize=(9, 4.8), sharex="row", sharey="row"
)

ncol_ax = -1
for step in steps:
    # plot initial distribution & at exit of each element
    ncol_ax += 1

    # x-y
    ax = axs[(0, ncol_ax)]
    beam_at_step = series.iterations[step].particles["beam"].to_df()
    ax.scatter(
        beam_at_step.position_x.multiply(millimeter),
        beam_at_step.position_y.multiply(millimeter),
        s=0.01,
    )

    ax.set_title(f"$z={z[ncol_ax]:.2f}$ [m]")
    ax.set_xlabel(r"$x$ [mm]")

    # x-px
    ax = axs[(1, ncol_ax)]
    beam_at_step = series.iterations[step].particles["beam"].to_df()
    ax.scatter(
        beam_at_step.position_x.multiply(millimeter),
        beam_at_step.momentum_x.multiply(mrad),
        s=0.01,
    )
    ax.set_xlabel(r"$x$ [mm]")

    # y-py
    ax = axs[(2, ncol_ax)]
    beam_at_step = series.iterations[step].particles["beam"].to_df()
    ax.scatter(
        beam_at_step.position_y.multiply(millimeter),
        beam_at_step.momentum_y.multiply(mrad),
        s=0.01,
    )
    ax.set_xlabel(r"$y$ [mm]")

axs[(0, 0)].set_ylabel(r"$y$ [mm]")
axs[(1, 0)].set_ylabel(r"$p_x$ [mrad]")
axs[(2, 0)].set_ylabel(r"$p_y$ [mrad]")
plt.tight_layout()
if args.save_png:
    plt.savefig("fodo_scatter.png")
else:
    plt.show()
focusing, defocusing and preserved emittance in our FODO cell benchmark.

Fig. 7 FODO transversal beam width and emittance evolution

focusing, defocusing and phase space rotation in our FODO cell benchmark.

Fig. 8 FODO transversal beam width and phase space evolution