from enum import Enum
import numpy as np
from mtflib import MultivariateTaylorFunction
from mtflib.backends.c import mtf_c_backend # type: ignore
from mtflib.backends.cpp import mtf_cpp # type: ignore
from .vector_fields import FieldVector, VectorField
[docs]
class Backend(str, Enum):
PYTHON = "python"
CPP = "cpp"
C = "c"
CPP_V2 = "cpp_v2"
MPI = "mpi"
[docs]
def calculate_b_field(coil_instance, field_points, backend=Backend.PYTHON):
"""
Calculates the magnetic field generated by a coil at a set of
field points using the Biot-Savart law.
Args:
coil_instance (Coil): An instance of a Coil subclass.
field_points (np.ndarray or np.ndarray of
mtf.MultivariateTaylorFunction):
The points (x, y, z) where the magnetic field should be
calculated. Can be a (M, 3) NumPy array of numbers or MTF objects.
backend (Backend or str, optional): The backend to use for the calculation.
Defaults to Backend.PYTHON.
Returns:
VectorField: A VectorField object containing the field points and the
calculated FieldVector objects.
"""
# Normalize backend to Enum
if isinstance(backend, str):
try:
backend = Backend(backend)
except ValueError:
raise ValueError(f"Backend '{backend}' is not a valid option.")
# Input validation for field_points
if not isinstance(field_points, np.ndarray) or (
field_points.ndim == 2 and field_points.shape[1] != 3
):
raise TypeError("field_points must be a NumPy array of shape (N, 3).")
# Check if segments have been generated
if coil_instance.segment_centers is None:
raise RuntimeError("Coil segments have not been generated.")
if coil_instance.use_mtf_for_segments:
element_centers_np = np.array([
[v.x, v.y, v.z] for v in coil_instance.segment_centers
])
element_directions_np = np.array([
[v.x, v.y, v.z] for v in coil_instance.segment_directions
])
else:
element_centers_np = np.array([
c.to_numpy_array() for c in coil_instance.segment_centers
])
element_directions_np = np.array([
d.to_numpy_array() for d in coil_instance.segment_directions
])
if backend == Backend.MPI:
b_field_vectors = mpi_biot_savart(
element_centers=element_centers_np,
element_lengths=coil_instance.segment_lengths,
element_directions=element_directions_np,
field_points=field_points,
)
else:
b_field_vectors = serial_biot_savart(
element_centers=element_centers_np,
element_lengths=coil_instance.segment_lengths,
element_directions=element_directions_np,
field_points=field_points,
backend=backend,
)
# Apply the current scaling and create FieldVector objects
vec_objects = []
for vec in b_field_vectors:
vec_objects.append(
FieldVector(
coil_instance.current * vec[0],
coil_instance.current * vec[1],
coil_instance.current * vec[2],
)
)
b_field_vectors = np.array(vec_objects, dtype=object)
# Apply post-processing based on use_mtf_for_segments flag for all coils
for i, vec in enumerate(b_field_vectors):
if coil_instance.use_mtf_for_segments:
# When using MTF, a proper numerical integration is performed
# over the segment, which is represented by a variable `u` in
# the range [-1, 1].
vec.x = vec.x.integrate(4, -1, 1)
vec.y = vec.y.integrate(4, -1, 1)
vec.z = vec.z.integrate(4, -1, 1)
else:
# When not using MTF, the field is calculated at a single point
# (the center of the segment). The result is then multiplied by 2
# as a numerical approximation to account for the contribution
# of the segment's length.
vec = 2 * vec
b_field_vectors[i] = vec
return VectorField(field_points=field_points, vectors=b_field_vectors)
mu_0_4pi = 1e-7 # Define mu_0_4pi if it's not already globally defined
def _python_biot_savart_core(source_points, dl_vectors, field_points, order=None):
"""
Core vectorized Biot-Savart calculation in pure Python.
"""
source_points_reshaped = source_points[:, np.newaxis, :]
field_points_reshaped = field_points[np.newaxis, :, :]
r_vectors = field_points_reshaped - source_points_reshaped
r_squared = np.sum(r_vectors**2, axis=2)
# Avoid division by zero at the source point location
# Avoid division by zero at the source point location
# Optimization: Use vectorized masking for numeric arrays
if r_squared.size > 0:
is_mtf = isinstance(r_squared.flat[0], MultivariateTaylorFunction)
if is_mtf:
# Fallback to loops for MTF objects (unless we implement vectorized
# extraction)
for i in range(r_squared.shape[0]):
for j in range(r_squared.shape[1]):
val = r_squared[i, j]
# Extract constant term safely
const_term = val.extract_coefficient(tuple([0] * val.dimension))
# Check magnitude
if abs(const_term) < 1e-18:
r_squared[i, j] += 1e-18
else:
# Vectorized masking for numeric arrays
mask = np.abs(r_squared) < 1e-18
r_squared[mask] = 1e-18
dl_vectors_reshaped = dl_vectors[:, np.newaxis, :]
cross_products = np.cross(dl_vectors_reshaped, r_vectors, axis=2)
# Calculate 1/r^3 for magnitude scaling.
# This is done as 1/r^2 * 1/r to work with MTF ufunc overrides.
inv_r_cubed = np.reciprocal(r_squared) * r_squared ** (-0.5)
inv_r_cubed_expanded = np.expand_dims(inv_r_cubed, axis=2)
dB_contributions = (mu_0_4pi * cross_products) * inv_r_cubed_expanded
B_field = np.sum(dB_contributions, axis=0)
# If an order is specified, truncate each MTF in the result
if order is not None:
for i in range(B_field.shape[0]):
for j in range(B_field.shape[1]):
if isinstance(B_field[i, j], MultivariateTaylorFunction):
B_field[i, j] = B_field[i, j].truncate(order)
return B_field
def _cpp_biot_savart_core(source_points, dl_vectors, field_points, order=None):
"""
Core vectorized Biot-Savart calculation using C++ backend.
"""
all_exponents = []
all_coeffs = []
shapes = []
def process_points(points):
shapes.append(len(points))
point_shapes = []
for p in points:
for item in p:
if not isinstance(item, MultivariateTaylorFunction):
item = MultivariateTaylorFunction.from_constant(item)
all_exponents.append(item.exponents)
all_coeffs.append(item.coeffs)
point_shapes.append(item.exponents.shape[0])
return point_shapes
sp_shapes = process_points(source_points)
dl_shapes = process_points(dl_vectors)
fp_shapes = process_points(field_points)
dimension = MultivariateTaylorFunction.get_max_dimension()
shapes = np.array(
[len(source_points), len(dl_vectors), len(field_points), dimension]
+ sp_shapes
+ dl_shapes
+ fp_shapes,
dtype=np.int32,
)
all_exponents = np.vstack(all_exponents)
all_coeffs = np.concatenate(all_coeffs)
b_field_dicts = mtf_cpp.biot_savart_from_flat_numpy(
all_exponents, all_coeffs, shapes
)
b_field_py = []
for b_vec_dicts in b_field_dicts:
b_field_py.append([
MultivariateTaylorFunction(coefficients=(d["exponents"], d["coeffs"]))
for d in b_vec_dicts
])
return np.array(b_field_py)
[docs]
def numpy_biot_savart(
element_centers, element_lengths, element_directions, field_points, order=None
):
"""
NumPy vectorized Biot-Savart calculation using element inputs.
Calculates the magnetic field at specified field points due to a set of
current elements, using NumPy vectorization for efficiency. This function
is designed for serial (single-processor) execution and takes element-based
descriptions of the current source.
Args:
element_centers (numpy.ndarray): (N, 3) array of center coordinates of
current elements. Each row represents the (x, y, z) coordinates of
the center of a current element.
element_lengths (numpy.ndarray): (N,) array of lengths of current
elements (dl). Each element represents the length of the
corresponding current element.
element_directions (numpy.ndarray): (N, 3) array of unit vectors
representing the direction of current flow for each element. Each
row is a unit vector.
field_points (numpy.ndarray): (M, 3) array of field point coordinates.
Each row represents the (x, y, z) coordinates where the magnetic
field is to be calculated.
Returns:
numpy.ndarray: (M, 3) array of magnetic field vectors at each field
point. Each row is a vector representing the (Bx, By, Bz)
components of the magnetic field at the corresponding field point.
Raises:
ValueError: If input arrays do not have the expected dimensions or
shapes.
Example:
>>> element_centers = np.array([[0, 0, 0], [1, 0, 0]])
>>> element_lengths = np.array([0.1, 0.1])
>>> element_directions = np.array([[1, 0, 0], [0, 1, 0]])
>>> field_points = np.array([[0, 1, 0], [1, 1, 0]])
>>> B_field = numpy_biot_savart(
... element_centers,
... element_lengths,
... element_directions,
... field_points,
... )
>>> print(B_field)
[[ 0.00000000e+00 0.00000000e+00 1.00000000e-08]
[ 0.00000000e+00 0.00000000e+00 5.00000000e-09]]
"""
return serial_biot_savart(
element_centers, element_lengths, element_directions, field_points, order=order
)
[docs]
def mpi_biot_savart(
element_centers, element_lengths, element_directions, field_points, order=None
):
"""
Parallel Biot-Savart calculation using mpi4py with element inputs.
Computes the magnetic field in parallel using MPI (mpi4py library). This
function distributes the calculation of the magnetic field at different
field points across multiple MPI processes to speed up computation.
Note:
- Requires `mpi4py` to be installed. If not installed, it will raise an
ImportError.
- Must be run in an MPI environment (e.g., using `mpiexec` or
`mpirun`).
- Input arrays `element_centers`, `element_lengths`, and
`element_directions` are assumed to be the complete datasets and are
broadcasted to all MPI processes.
- `field_points` are distributed among processes. The result is
gathered on rank 0.
Args:
element_centers (numpy.ndarray): (N, 3) array of center coordinates of
current elements. (Broadcasted to all MPI processes).
element_lengths (numpy.ndarray): (N,) array of lengths of current
elements (dl). (Broadcasted to all MPI processes).
element_directions (numpy.ndarray): (N, 3) array of unit vectors for
current directions. (Broadcasted to all MPI processes).
field_points (numpy.ndarray): (M, 3) array of field point coordinates.
(Distributed across MPI processes).
Returns:
numpy.ndarray or None:
On MPI rank 0: (M, 3) array of magnetic field vectors at each
field point. On MPI ranks > 0: None.
Raises:
ImportError: if mpi4py is not installed.
Example (Run in MPI environment, e.g.,
`mpiexec -n 4 python your_script.py`):
>>> import numpy as np
>>> from applications.em.biot_savart import mpi_biot_savart
>>> element_centers = np.array([[0, 0, 0], [1, 0, 0]])
>>> element_lengths = np.array([0.1, 0.1])
>>> element_directions = np.array([[1, 0, 0], [0, 1, 0]])
>>> # More field points for MPI to distribute
>>> field_points = np.array(
... [[0, 1, 0], [1, 1, 0], [2, 1, 0], [3, 1, 0]]
... )
>>> B_field = mpi_biot_savart(
... element_centers,
... element_lengths,
... element_directions,
... field_points,
... )
>>> if MPI.COMM_WORLD.Get_rank() == 0: # Rank 0 has the full result
>>> print(B_field)
"""
try:
from mpi4py import MPI
except ImportError:
raise ImportError("mpi4py is not installed, cannot run in MPI mode.")
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
element_centers = np.array(element_centers)
element_lengths = np.array(element_lengths)
element_directions = np.array(element_directions)
field_points = np.array(field_points)
num_field_points = field_points.shape[0]
chunk_size = num_field_points // size
remainder = num_field_points % size
start_index = rank * chunk_size + min(rank, remainder)
end_index = start_index + chunk_size + (1 if rank < remainder else 0)
local_field_points = field_points[start_index:end_index]
local_B_field = serial_biot_savart(
element_centers,
element_lengths,
element_directions,
local_field_points,
order=order,
)
all_B_field_chunks = comm.gather(local_B_field, root=0)
if rank == 0:
B_field = np.concatenate(all_B_field_chunks, axis=0)
return B_field
else:
return None
def _c_biot_savart_core(source_points, dl_vectors, field_points, order=None):
"""
Core vectorized Biot-Savart calculation using C backend.
"""
all_exponents = []
all_coeffs = []
shapes = []
def process_points(points):
shapes.append(len(points))
point_shapes = []
for p in points:
for item in p:
if not isinstance(item, MultivariateTaylorFunction):
item = MultivariateTaylorFunction.from_constant(item)
all_exponents.append(item.exponents)
all_coeffs.append(item.coeffs)
point_shapes.append(item.exponents.shape[0])
return point_shapes
sp_shapes = process_points(source_points)
dl_shapes = process_points(dl_vectors)
fp_shapes = process_points(field_points)
dimension = MultivariateTaylorFunction.get_max_dimension()
shapes = np.array(
[len(source_points), len(dl_vectors), len(field_points), dimension]
+ sp_shapes
+ dl_shapes
+ fp_shapes,
dtype=np.int32,
)
all_exponents = np.vstack(all_exponents)
all_coeffs = np.concatenate(all_coeffs)
result_dict = mtf_c_backend.biot_savart_c_from_flat_numpy(
all_exponents, all_coeffs, shapes
)
# Reconstruct MTF objects from the result
res_exps = result_dict["exponents"]
res_coeffs = result_dict["coeffs"]
res_shapes = result_dict["shapes"]
n_field_points = res_shapes[0]
b_field_py = []
current_offset = 0
shape_idx = 1
for i in range(n_field_points):
vec = []
for j in range(3):
n_terms = res_shapes[shape_idx]
exps = res_exps[current_offset : current_offset + n_terms]
coeffs = res_coeffs[current_offset : current_offset + n_terms]
vec.append(MultivariateTaylorFunction(coefficients=(exps, coeffs)))
current_offset += n_terms
shape_idx += 1
b_field_py.append(vec)
return np.array(b_field_py)
[docs]
def serial_biot_savart(
element_centers,
element_lengths,
element_directions,
field_points,
order=None,
backend=Backend.PYTHON,
):
"""
Serial Biot-Savart calculation with element inputs.
Computes the magnetic field serially (non-parallel), taking current element
center points, lengths, and directions as input. This function is suitable
for single-processor execution and serves as a serial counterpart to the
`mpi_biot_savart` function.
Args:
element_centers (numpy.ndarray): (N, 3) array of center coordinates of
current elements.
element_lengths (numpy.ndarray): (N,) array of lengths of current
elements (dl).
element_directions (numpy.ndarray): (N, 3) array of unit vectors for
current directions.
field_points (numpy.ndarray): (M, 3) array of field point coordinates.
order (int, optional): The maximum order of the Taylor series to
compute. If None, the global max order is used. Defaults to None.
Returns:
numpy.ndarray: (M, 3) array of magnetic field vectors at each field
point.
Example:
>>> element_centers = np.array([[0, 0, 0], [1, 0, 0]])
>>> element_lengths = np.array([0.1, 0.1])
>>> element_directions = np.array([[1, 0, 0], [0, 1, 0]])
>>> field_points = np.array([[0, 1, 0], [1, 1, 0]])
>>> B_field = serial_biot_savart(
... element_centers,
... element_lengths,
... element_directions,
... field_points,
... order=0,
... )
>>> print(B_field)
[[ 0.00000000e+00 0.00000000e+00 1.00000000e-08]
[ 0.00000000e+00 0.00000000e+00 5.00000000e-09]]
"""
element_centers = np.array(element_centers)
element_lengths = np.array(element_lengths)
element_directions = np.array(element_directions)
field_points = np.array(field_points)
if element_centers.ndim != 2 or element_centers.shape[1] != 3:
raise ValueError("element_centers must be a NumPy array of shape (N, 3)")
if (
element_lengths.ndim != 1
or element_lengths.shape[0] != element_centers.shape[0]
):
raise ValueError(
"element_lengths must be a NumPy array of shape (N,) and have the "
"same length as element_centers"
)
if (
element_directions.ndim != 2
or element_directions.shape[1] != 3
or element_directions.shape[0] != element_centers.shape[0]
):
raise ValueError(
"element_directions must be a NumPy array of shape (N, 3) and "
"have the same length as element_centers"
)
if field_points.ndim != 2 or field_points.shape[1] != 3:
raise ValueError("field_points must be a NumPy array of shape (M, 3)")
source_points = element_centers
dl_vectors = 0.5 * element_lengths[:, np.newaxis] * element_directions
if backend is None:
backend = MultivariateTaylorFunction._IMPLEMENTATION
# Normalize backend if needed (though it should be passed as Enum from
# calculate_b_field)
if isinstance(backend, str):
backend = Backend(backend)
if backend == Backend.CPP:
return _cpp_biot_savart_core(source_points, dl_vectors, field_points, order)
elif backend == Backend.C:
return _c_biot_savart_core(source_points, dl_vectors, field_points, order)
elif backend == Backend.CPP_V2:
return _cpp_biot_savart_core(source_points, dl_vectors, field_points, order)
else: # python
return _python_biot_savart_core(source_points, dl_vectors, field_points, order)