"""
Python multiprocessing domain-decomposed finite Voronoi simulator.
"""
from __future__ import annotations
# Only import typing modules when type checking, e.g., in VS Code or IDEs.
from typing import TYPE_CHECKING
if TYPE_CHECKING: # pragma: no cover
from concurrent.futures import ProcessPoolExecutor
import typing
from dataclasses import dataclass
import os
import numpy as np
from .finite_voronoi import FiniteVoronoiSimulator
from .physical_params import PhysicalParams
[docs]
@dataclass(frozen=True)
class DomainDecomposition:
"""Point-only data for one spatial domain plus its halo.
This object stores the index bookkeeping needed to relate a local domain
calculation back to the original global point array. It contains no finite
Voronoi or AFV-specific data.
Args:
domain_id: Integer id of this spatial domain.
grid_ix: Domain index in the x direction.
grid_iy: Domain index in the y direction.
x_range: Owned-domain x interval.
y_range: Owned-domain y interval.
halo_x_range: Local-domain x interval after expanding by the halo width.
halo_y_range: Local-domain y interval after expanding by the halo width.
local_global_ids: Global point ids included in this domain's local box,
including halo points.
owned_local_ids: Local indices of points uniquely owned by this domain.
local_pts: Local point coordinates, equivalent to
``points[local_global_ids]``.
"""
domain_id: int # Integer id of this spatial domain.
# Grid coordinates of this owned domain.
grid_ix: int
grid_iy: int
# Original owned-domain box. Points inside this box belong to this domain
# as their unique owner.
x_range: tuple[float, float]
y_range: tuple[float, float]
# Expanded local box used to collect halo/ghost points.
halo_x_range: tuple[float, float]
halo_y_range: tuple[float, float]
# Original/global point ids included in this domain's local box,
# including halo points.
local_global_ids: np.ndarray
# Positions inside local_global_ids/local_pts for points uniquely owned
# by this domain.
owned_local_ids: np.ndarray
# Point coordinates for local_global_ids:
# local_pts == points[local_global_ids].
local_pts: np.ndarray
def _validate_grid_shape(grid_shape: tuple[int, int]) -> tuple[int, int]:
nx, ny = grid_shape
nx = int(nx)
ny = int(ny)
if nx <= 0 or ny <= 0: # pragma: no cover
raise ValueError("grid_shape entries must be positive")
return nx, ny
def _validate_domain_bounds(
domain_bounds: tuple[tuple[float, float], tuple[float, float]] | None,
) -> tuple[tuple[float, float], tuple[float, float]] | None:
if domain_bounds is None:
return None
(xmin, xmax), (ymin, ymax) = domain_bounds
xmin = float(xmin)
xmax = float(xmax)
ymin = float(ymin)
ymax = float(ymax)
if not (xmin < xmax and ymin < ymax): # pragma: no cover
raise ValueError("domain_bounds must be ((xmin, xmax), (ymin, ymax)) with nonzero spans")
return (xmin, xmax), (ymin, ymax)
def _domain_edges(
points: np.ndarray,
grid_shape: tuple[int, int],
domain_bounds: tuple[tuple[float, float], tuple[float, float]] | None,
) -> tuple[np.ndarray, np.ndarray]:
nx, ny = grid_shape
if domain_bounds is None:
xmin = float(np.min(points[:, 0]))
xmax = float(np.max(points[:, 0]))
ymin = float(np.min(points[:, 1]))
ymax = float(np.max(points[:, 1]))
if xmin == xmax or ymin == ymax: # pragma: no cover
raise ValueError("points must span a nonzero range in both x and y")
else:
(xmin, xmax), (ymin, ymax) = domain_bounds
if (
np.any(points[:, 0] < xmin)
or np.any(points[:, 0] > xmax)
or np.any(points[:, 1] < ymin)
or np.any(points[:, 1] > ymax)
): # pragma: no cover
raise ValueError("points must lie inside domain_bounds")
x_edges = np.linspace(xmin, xmax, nx + 1)
y_edges = np.linspace(ymin, ymax, ny + 1)
return x_edges, y_edges
[docs]
def decompose_points(
points: np.ndarray,
grid_shape: tuple[int, int] = (2, 2),
halo_width: float = 0.0,
*,
domain_bounds: tuple[tuple[float, float], tuple[float, float]] | None = None,
method: typing.Literal["dense", "sorted_x"] = "dense",
) -> list[DomainDecomposition]:
"""Decompose points into owned grid domains plus halo/local points.
This function is independent of the finite Voronoi model. It only computes
global/local point-index bookkeeping.
Points on internal owned-domain boundaries are assigned to exactly one
domain by using right-sided binning and clipping at the outermost boundary.
Halo/local domains include points on all halo box edges.
Args:
points (numpy.ndarray): (N,2) array of point coordinates.
grid_shape: Number of owned domains in the x and y directions.
halo_width: Width added to each side of every owned domain to collect
local halo points.
domain_bounds: Optional domain bounds as ``((xmin, xmax), (ymin, ymax))``.
If *None*, bounds are inferred from *points*.
method: Method used to collect halo points. ``"dense"`` builds a
dense domain-by-point mask and is usually faster for moderate
systems. ``"sorted_x"`` uses less temporary memory.
Returns:
list[DomainDecomposition]: One point-only decomposition object for
each domain in row-major order.
Raises:
ValueError: If *points* does not have shape (N,2).
ValueError: If *points* contains non-finite values.
ValueError: If *grid_shape*, *halo_width*, *domain_bounds*, or
*method* is invalid.
"""
points = np.asarray(points, dtype=float)
if points.ndim != 2 or points.shape[1] != 2: # pragma: no cover
raise ValueError("points must have shape (N,2)")
if not np.all(np.isfinite(points)): # pragma: no cover
raise ValueError("points must be finite")
halo_width = float(halo_width)
if halo_width < 0.0: # pragma: no cover
raise ValueError("halo_width must be non-negative")
if method not in ("dense", "sorted_x"): # pragma: no cover
raise ValueError("method must be 'dense' or 'sorted_x'")
nx, ny = _validate_grid_shape(grid_shape)
n_domains = nx * ny
checked_bounds = _validate_domain_bounds(domain_bounds)
if n_domains == 1:
if checked_bounds is None:
xmin = float(np.min(points[:, 0]))
xmax = float(np.max(points[:, 0]))
ymin = float(np.min(points[:, 1]))
ymax = float(np.max(points[:, 1]))
else:
(xmin, xmax), (ymin, ymax) = checked_bounds
if (
np.any(points[:, 0] < xmin)
or np.any(points[:, 0] > xmax)
or np.any(points[:, 1] < ymin)
or np.any(points[:, 1] > ymax)
): # pragma: no cover
raise ValueError("points must lie inside domain_bounds")
local_global_ids = np.arange(points.shape[0], dtype=int)
owned_local_ids = local_global_ids.copy()
return [
DomainDecomposition(
domain_id=0,
grid_ix=0,
grid_iy=0,
x_range=(xmin, xmax),
y_range=(ymin, ymax),
halo_x_range=(
xmin - halo_width,
xmax + halo_width,
),
halo_y_range=(
ymin - halo_width,
ymax + halo_width,
),
local_global_ids=local_global_ids,
owned_local_ids=owned_local_ids,
local_pts=points[local_global_ids],
)
]
x_edges, y_edges = _domain_edges(points, (nx, ny), checked_bounds)
ix = np.searchsorted(x_edges, points[:, 0], side="right") - 1
iy = np.searchsorted(y_edges, points[:, 1], side="right") - 1
ix = np.clip(ix, 0, nx - 1)
iy = np.clip(iy, 0, ny - 1)
domain_ids = iy * nx + ix
owned_order = np.argsort(domain_ids, kind="stable")
owned_domain_ids = domain_ids[owned_order]
owned_counts = np.bincount(owned_domain_ids, minlength=n_domains)
owned_starts = np.concatenate(([0], np.cumsum(owned_counts)))
domain_grid_ix = np.arange(n_domains) % nx
domain_grid_iy = np.arange(n_domains) // nx
x0 = x_edges[domain_grid_ix]
x1 = x_edges[domain_grid_ix + 1]
y0 = y_edges[domain_grid_iy]
y1 = y_edges[domain_grid_iy + 1]
halo_x0 = x0 - halo_width
halo_x1 = x1 + halo_width
halo_y0 = y0 - halo_width
halo_y1 = y1 + halo_width
if method == "dense":
px = points[:, 0]
py = points[:, 1]
local_mask = (
(px[None, :] >= halo_x0[:, None])
& (px[None, :] <= halo_x1[:, None])
& (py[None, :] >= halo_y0[:, None])
& (py[None, :] <= halo_y1[:, None])
)
local_domain_ids, local_global_ids_all = np.nonzero(local_mask)
local_counts = np.bincount(local_domain_ids, minlength=n_domains)
local_starts = np.concatenate(([0], np.cumsum(local_counts)))
else:
py = points[:, 1]
x_order = np.argsort(points[:, 0], kind="stable")
x_sorted = points[x_order, 0]
halo_x_left = np.searchsorted(x_sorted, halo_x0, side="left")
halo_x_right = np.searchsorted(x_sorted, halo_x1, side="right")
domains = []
for domain_id in range(n_domains):
grid_iy, grid_ix = divmod(domain_id, nx)
owned_global_ids = owned_order[
owned_starts[domain_id] : owned_starts[domain_id + 1]
]
if method == "dense":
local_global_ids = local_global_ids_all[
local_starts[domain_id] : local_starts[domain_id + 1]
]
else:
x_candidates = x_order[
halo_x_left[domain_id] : halo_x_right[domain_id]
]
y_mask = (
(py[x_candidates] >= halo_y0[domain_id])
& (py[x_candidates] <= halo_y1[domain_id])
)
local_global_ids = np.sort(x_candidates[y_mask])
owned_local_ids = np.searchsorted(local_global_ids, owned_global_ids)
valid_owned = owned_local_ids < local_global_ids.size
if (
not np.all(valid_owned)
or not np.array_equal(local_global_ids[owned_local_ids], owned_global_ids)
): # pragma: no cover
raise RuntimeError(f"Owned/local index mapping failed for domain {domain_id}")
domains.append(
DomainDecomposition(
domain_id=domain_id,
grid_ix=grid_ix,
grid_iy=grid_iy,
x_range=(float(x_edges[grid_ix]), float(x_edges[grid_ix + 1])),
y_range=(float(y_edges[grid_iy]), float(y_edges[grid_iy + 1])),
halo_x_range=(float(halo_x0[domain_id]), float(halo_x1[domain_id])),
halo_y_range=(float(halo_y0[domain_id]), float(halo_y1[domain_id])),
local_global_ids=local_global_ids,
owned_local_ids=owned_local_ids,
local_pts=points[local_global_ids],
)
)
return domains
@dataclass(frozen=True)
class _DomainTask:
domain: DomainDecomposition
local_preferred_areas: np.ndarray
phys: PhysicalParams
backend: typing.Literal["cython", "python"] | None
connect: bool
plot_mode: bool
def _build_domain(task: _DomainTask) -> dict[str, object]:
domain = task.domain
pid = os.getpid()
owned_count = int(domain.owned_local_ids.size)
owned_global_ids = domain.local_global_ids[domain.owned_local_ids]
empty_float = np.empty(0, dtype=float)
empty_int = np.empty(0, dtype=int)
empty_forces = np.empty((0, 2), dtype=float)
empty_connections = np.empty((0, 2), dtype=int)
if owned_count == 0: # pragma: no cover
result = {
"domain_id": domain.domain_id,
"pid": pid,
"owned_global_ids": empty_int,
"forces": empty_forces,
"areas": empty_float,
"perimeters": empty_float,
"arclens": empty_float,
"coord_nums": empty_int,
"connections": empty_connections,
}
if task.plot_mode:
result["diag_plot"] = {
"vertices": np.empty((0, 2), dtype=float),
"regions": [],
"edges_type": [],
}
return result
sim = FiniteVoronoiSimulator(domain.local_pts, task.phys, backend=task.backend)
sim.update_preferred_areas(task.local_preferred_areas)
diag = sim.build(connect=task.connect)
connections = empty_connections
if task.connect and diag["connections"].size:
local_connections = np.asarray(diag["connections"], dtype=int)
owned_local_mask = np.zeros(domain.local_global_ids.size, dtype=bool)
owned_local_mask[domain.owned_local_ids] = True
touches_owned = (
owned_local_mask[local_connections[:, 0]]
| owned_local_mask[local_connections[:, 1]]
)
connections = np.sort(
domain.local_global_ids[local_connections[touches_owned]],
axis=1,
)
result = {
"domain_id": domain.domain_id,
"pid": pid,
"owned_global_ids": owned_global_ids,
"forces": diag["forces"][domain.owned_local_ids],
"areas": diag["areas"][domain.owned_local_ids],
"perimeters": diag["perimeters"][domain.owned_local_ids],
"arclens": diag["arclens"][domain.owned_local_ids],
"coord_nums": diag["coord_nums"][domain.owned_local_ids],
"connections": connections,
}
if task.plot_mode:
diag_plot = {
"vertices": diag["vertices"],
"regions": [diag["regions"][ix] for ix in domain.owned_local_ids],
"edges_type": [diag["edges_type"][ix] for ix in domain.owned_local_ids],
}
result["diag_plot"] = diag_plot
return result
[docs]
class ParallelFiniteVoronoiSimulator:
"""Python multiprocessing domain-decomposed simulator for the AFV model.
This class decomposes the full point set into rectangular domains with
halo regions, then composes :py:class:`FiniteVoronoiSimulator` on each
local subdomain. The returned diagnostics are merged back into global
cell indexing for owned cells. When ``n_workers > 1``, local subdomain
builds use Python worker processes.
Args:
pts (numpy.ndarray): (N,2) array of initial cell center positions.
phys: Physical parameters used within this simulator.
grid_shape: Number of domains in the x and y directions.
n_workers: Number of worker processes. If *None*, use ``os.cpu_count()``.
halo_width: Width of the halo region added to each domain. If *None*,
use ``4.01 * phys.r``.
backend: Optional, specify "python" to force the use of the pure Python
fallback implementation inside each local finite Voronoi simulator.
Otherwise, the "cython" backend is used.
domain_bounds: Optional domain bounds as ``((xmin, xmax), (ymin, ymax))``.
If *None*, bounds are inferred from the current point positions.
decomposition_method: Method used to collect halo points. ``"dense"``
is usually faster for moderate systems, while ``"sorted_x"`` uses
less temporary memory.
Raises:
ValueError: If *pts* does not have shape (N,2).
ValueError: If *grid_shape*, *halo_width*, *domain_bounds*, or
*decomposition_method* is invalid.
TypeError: If *phys* is not an instance of :py:class:`PhysicalParams`.
.. note::
For repeated calls with ``n_workers > 1``, put the loop inside the
context manager so worker processes are created once and reused across
build steps::
with sim: # sim is an instance of ParallelFiniteVoronoiSimulator
for step in range(num_steps):
diag = sim.build()
"""
def __init__(
self,
pts: np.ndarray,
phys: PhysicalParams,
grid_shape: tuple[int, int] = (2, 2),
n_workers: int | None = None,
*,
halo_width: float | None = None,
backend: typing.Literal["cython", "python"] | None = None,
domain_bounds: tuple[tuple[float, float], tuple[float, float]] | None = None,
decomposition_method: typing.Literal["dense", "sorted_x"] = "dense",
):
pts = np.asarray(pts, dtype=float)
if pts.ndim != 2 or pts.shape[1] != 2: # pragma: no cover
raise ValueError("pts must have shape (N,2)")
if not isinstance(phys, PhysicalParams): # pragma: no cover
raise TypeError("phys must be an instance of PhysicalParams")
self.pts = pts.copy()
self.N = pts.shape[0]
self.phys = phys
self.grid_shape = _validate_grid_shape(grid_shape)
self._auto_halo_width = halo_width is None
self.halo_width = float(4.01 * phys.r if halo_width is None else halo_width)
if self.halo_width < 0.0: # pragma: no cover
raise ValueError("halo_width must be non-negative")
self.n_workers = int(
n_workers if n_workers is not None else (os.cpu_count() or 1)
)
if self.n_workers <= 0: # pragma: no cover
raise ValueError("n_workers must be positive")
self.backend = backend
self.domain_bounds = _validate_domain_bounds(domain_bounds)
if decomposition_method not in ("dense", "sorted_x"): # pragma: no cover
raise ValueError("decomposition_method must be 'dense' or 'sorted_x'")
self.decomposition_method = decomposition_method
self._preferred_areas = np.full(self.N, phys.A0, dtype=float)
self._executor: ProcessPoolExecutor | None = None
def __enter__(self) -> ParallelFiniteVoronoiSimulator: # pragma: no cover
if self.n_workers > 1 and self._executor is None:
from concurrent.futures import ProcessPoolExecutor
self._executor = ProcessPoolExecutor(max_workers=self.n_workers)
return self
def __exit__(self, exc_type, exc, tb) -> None: # pragma: no cover
self.close()
def close(self) -> None: # pragma: no cover
if self._executor is not None:
self._executor.shutdown(wait=True)
self._executor = None
def _make_domain_tasks(self, connect: bool, plot_mode: bool) -> list[_DomainTask]:
domains = decompose_points(
self.pts,
self.grid_shape,
self.halo_width,
domain_bounds=self.domain_bounds,
method=self.decomposition_method,
)
return [
_DomainTask(
domain=domain,
local_preferred_areas=self._preferred_areas[domain.local_global_ids],
phys=self.phys,
backend=self.backend,
connect=connect,
plot_mode=plot_mode,
)
for domain in domains
]
[docs]
def build(self, connect: bool = False, plot_mode: bool = False) -> dict[str, object]:
""" Build local finite Voronoi structures and merge global diagnostics.
Do the following:
- Decompose cell centers into owned domains plus halo points
- Build a finite Voronoi diagram in each local subdomain
- Extract diagnostics for owned cells
- Merge forces and geometric quantities back into global indexing
- Optionally collect global cell connectivity and per-domain plot data
Args:
connect: Whether to compute cell connectivity information.
Setting this to ``False`` saves some computation time when
connectivity is not needed.
Note that the default is ``False``, unlike
:py:meth:`FiniteVoronoiSimulator.build`, where the default is
``True``.
plot_mode: Whether to include per-domain plotting diagnostics.
If ``True``, use :py:func:`visualize_2d_parallel` for
visualization.
Returns:
dict[str, object]: A dictionary containing merged forces and
geometric properties with keys:
- **forces**: (N,2) array of forces on cell centers
- **areas**: (N,) array of cell areas
- **perimeters**: (N,) array of cell perimeters
- **arclens**: (N,) array of non-contacting edge (arc) lengths per cell
- **coord_nums**: (N,) integer array of coordination numbers per cell
- **connections**: (K,2) array of connected global cell index pairs
- **pids**: (D,) integer array of process ids used for each domain
- **plot_mode**: Whether per-domain plot diagnostics are included
If *plot_mode* is ``True``, the dictionary also contains:
- **owned_global_ids**: List of global cell ids owned by each domain
- **diag_plot**: List of local plotting dictionaries, each with
``vertices``, ``edges_type``, and ``regions`` for owned cells
"""
tasks = self._make_domain_tasks(connect=connect, plot_mode=plot_mode)
if self.n_workers == 1:
results = [_build_domain(task) for task in tasks]
elif self._executor is not None: # pragma: no cover
results = list(self._executor.map(_build_domain, tasks))
else: # pragma: no cover
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor(max_workers=self.n_workers) as pool:
results = list(pool.map(_build_domain, tasks))
return self._merge_results(results, connect=connect, plot_mode=plot_mode)
def _merge_results(
self,
results: list[dict[str, object]],
connect: bool,
plot_mode: bool,
) -> dict[str, object]:
forces = np.zeros((self.N, 2), dtype=float)
areas = np.zeros(self.N, dtype=float)
perimeters = np.zeros(self.N, dtype=float)
arclens = np.zeros(self.N, dtype=float)
coord_nums = np.zeros(self.N, dtype=int)
connection_chunks = []
for result in results:
owned_global_ids = np.asarray(result["owned_global_ids"], dtype=int)
if owned_global_ids.size == 0: # pragma: no cover
continue
forces[owned_global_ids] = result["forces"]
areas[owned_global_ids] = result["areas"]
perimeters[owned_global_ids] = result["perimeters"]
arclens[owned_global_ids] = result["arclens"]
coord_nums[owned_global_ids] = result["coord_nums"]
if connect and np.asarray(result["connections"]).size:
connection_chunks.append(np.asarray(result["connections"], dtype=int))
if connect and connection_chunks:
connections = np.unique(np.concatenate(connection_chunks, axis=0), axis=0)
else:
connections = np.empty((0, 2), dtype=int)
diag = {
"forces": forces,
"areas": areas,
"perimeters": perimeters,
"arclens": arclens,
"coord_nums": coord_nums,
"connections": connections,
"plot_mode": plot_mode,
"pids": np.asarray([result["pid"] for result in results], dtype=int),
}
if plot_mode:
diag["owned_global_ids"] = [result["owned_global_ids"] for result in results]
diag["diag_plot"] = [result["diag_plot"] for result in results]
return diag
[docs]
def update_positions(self, pts: np.ndarray, A0: float | np.ndarray | None = None) -> None: # pragma: no cover
"""
Update cell center positions.
.. note::
If the number of cells changes, the preferred areas for all cells
are reset to the default value---defined either at simulator
instantiation or by :py:meth:`update_params`---unless *A0* is
explicitly specified.
Args:
pts : New cell center positions.
A0: Optional, set new preferred area(s).
Raises:
ValueError: If *pts* does not have shape (N,2).
ValueError: If *A0* is an array and does not have shape (N,).
"""
pts = np.asarray(pts, dtype=float)
if pts.ndim != 2 or pts.shape[1] != 2:
raise ValueError("pts must have shape (N,2)")
N = pts.shape[0]
self.pts = pts.copy()
if N != self.N:
self.N = N
if A0 is None:
self._preferred_areas = np.full(N, self.phys.A0, dtype=float)
else:
self.update_preferred_areas(A0)
elif A0 is not None:
self.update_preferred_areas(A0)
[docs]
def update_params(self, phys: PhysicalParams) -> None: # pragma: no cover
"""
Update physical parameters.
Args:
phys: New :py:class:`PhysicalParams` object.
Raises:
TypeError: If *phys* is not an instance of :py:class:`PhysicalParams`.
.. warning::
This also resets all preferred cell areas to the new value of *A0*.
If *halo_width* was not explicitly specified at instantiation, it
also updates the halo width to ``4.01 * phys.r``.
"""
if not isinstance(phys, PhysicalParams):
raise TypeError("phys must be an instance of PhysicalParams")
self.phys = phys
if self._auto_halo_width:
self.halo_width = 4.01 * phys.r
self.update_preferred_areas(phys.A0)
[docs]
def update_preferred_areas(self, A0: float | np.ndarray) -> None: # pragma: no cover
"""
Update the preferred areas for all cells.
Args:
A0: New preferred area(s) for all cells, either as a scalar or
as an array of shape (N,).
Raises:
ValueError: If *A0* does not match cell number.
"""
arr = np.asarray(A0, dtype=float)
if arr.ndim == 0:
arr = np.full(self.N, float(arr), dtype=float)
elif arr.shape == (1,):
arr = np.full(self.N, float(arr[0]), dtype=float)
elif arr.shape != (self.N,):
raise ValueError(f"A0 must be scalar or have shape ({self.N},)")
self._preferred_areas = arr
@property
def preferred_areas(self) -> np.ndarray: # pragma: no cover
"""
Return a copy of the preferred area array.
Returns:
numpy.ndarray: (N,) array of preferred cell areas.
"""
return self._preferred_areas.copy()
__all__ = ["DomainDecomposition", "decompose_points", "ParallelFiniteVoronoiSimulator"]