Python Bindings Guide
This guide covers DTL’s Python bindings, providing a Pythonic interface with seamless NumPy integration for distributed computing.
Table of Contents
Overview
The Python bindings provide:
Pythonic API: Follows Python conventions (PEP 8, context managers)
Zero-Copy NumPy Views:
local_view()shares memory with DTL containersmpi4py Compatibility: Pass mpi4py communicators directly
Full Type Annotations: PEP 484 type hints for IDE support
Collective Operations:
allreduce,broadcast,gather,scatter, etc.
Requirements
Python 3.8+
NumPy 1.20+
(Optional) mpi4py 3.0+ for MPI parallelism
Installation
From Source (in DTL Build Directory)
cd dtl/build
cmake .. -DDTL_BUILD_PYTHON=ON
make _dtl
make python_install
Verify Installation
>>> import dtl
>>> dtl.__version__
'0.1.0a1'
>>> dtl.has_mpi()
True
Quick Start
import dtl
import numpy as np
# Create execution context
with dtl.Context() as ctx:
print(f"Rank {ctx.rank} of {ctx.size}")
# Create distributed vector (1000 elements, float64)
vec = dtl.DistributedVector(ctx, size=1000, dtype=np.float64)
# Access local partition as NumPy array (zero-copy)
local = vec.local_view()
local[:] = np.arange(len(local)) + vec.local_offset
# Collective reduce
local_sum = np.sum(local)
global_sum = dtl.allreduce(ctx, local_sum, op=dtl.SUM)
if ctx.is_root:
print(f"Global sum: {global_sum}")
Run with MPI:
mpirun -np 4 python my_script.py
Environment API
The Environment class manages backend lifecycle (MPI, CUDA, HIP, NCCL, SHMEM) using reference-counted RAII semantics. This is the recommended way to initialize DTL.
Creation
# Create environment (initializes backends on first call)
env = dtl.Environment()
# As context manager (recommended)
with dtl.Environment() as env:
ctx = env.make_world_context()
print(f"Rank {ctx.rank} of {ctx.size}")
Context Factory Methods
# Create context spanning all MPI ranks (CPU)
ctx = env.make_world_context()
# Create GPU-enabled context
ctx = env.make_world_context(device_id=0)
# Create single-process CPU-only context (no MPI)
ctx = env.make_cpu_context()
Backend Queries
env.has_mpi # bool: MPI available
env.has_cuda # bool: CUDA available
env.has_hip # bool: HIP available
env.has_nccl # bool: NCCL available
env.has_shmem # bool: SHMEM available
Static Queries
dtl.Environment.is_initialized() # True if any environment handle exists
dtl.Environment.ref_count() # Number of active environment handles
Context API
The Context class encapsulates the execution environment. Contexts can be created directly or via Environment factory methods (preferred).
Creation
# Default: uses MPI_COMM_WORLD, CPU only
ctx = dtl.Context()
# With GPU device
ctx = dtl.Context(device_id=0)
# With mpi4py communicator
from mpi4py import MPI
ctx = dtl.Context(comm=MPI.COMM_WORLD)
Properties
ctx.rank # int: Current rank (0 to size-1)
ctx.size # int: Total number of ranks
ctx.is_root # bool: True if rank 0
ctx.device_id # int: GPU device ID (-1 for CPU only)
ctx.has_device # bool: True if GPU enabled
ctx.has_mpi # bool: MPI domain present
ctx.has_cuda # bool: CUDA domain present
ctx.has_nccl # bool: NCCL domain present
ctx.nccl_mode # int: DTL_NCCL_MODE_* (or -1 if no NCCL)
Methods
ctx.barrier() # Synchronize all ranks
ctx.fence() # Local memory fence
ctx.dup() # Duplicate context communicator
ctx.split(color, key=0)
ctx.with_cuda(device_id)
ctx.with_nccl(device_id, mode=dtl.DTL_NCCL_MODE_HYBRID_PARITY)
ctx.split_nccl(color, key=0, device_id=None, mode=dtl.DTL_NCCL_MODE_HYBRID_PARITY)
ctx.nccl_supports_native(dtl.DTL_NCCL_OP_ALLREDUCE)
ctx.nccl_supports_hybrid(dtl.DTL_NCCL_OP_SCAN)
NCCL mode constants:
dtl.DTL_NCCL_MODE_NATIVE_ONLY
dtl.DTL_NCCL_MODE_HYBRID_PARITY
Note: explicit C ABI NCCL device-collective entry points (dtl_nccl_*_device
and *_device_ex) are currently exposed directly in C/Fortran. Python
currently uses the generic collective API surface plus explicit context/domain
selection.
Context Manager
The recommended pattern is using with:
with dtl.Context() as ctx:
# Context is valid here
vec = dtl.DistributedVector(ctx, 1000)
# ...
# Context cleaned up automatically
Containers
DistributedVector
A 1D distributed array partitioned across ranks.
# Create vector
vec = dtl.DistributedVector(ctx, size=10000, dtype=np.float64)
# With fill value
vec = dtl.DistributedVector(ctx, size=10000, dtype=np.float32, fill=0.0)
# Properties
vec.global_size # Total elements across all ranks
vec.local_size # Elements on this rank
vec.local_offset # Global index of first local element
vec.dtype # NumPy dtype
# Zero-copy NumPy view
local = vec.local_view()
local[:] = np.random.randn(len(local))
# Index queries
vec.is_local(global_idx) # bool: Is index on this rank?
vec.owner(global_idx) # int: Which rank owns index?
Supported dtypes
NumPy dtype |
Status |
|---|---|
|
Supported |
|
Supported |
|
Supported |
|
Supported |
|
Supported |
|
Supported |
|
Supported |
|
Supported |
DistributedArray
A fixed-size 1D distributed array. Unlike DistributedVector, the size is fixed at creation time and cannot be resized.
# Create fixed-size array (1000 elements, float64)
arr = dtl.DistributedArray(ctx, size=1000, dtype=np.float64)
# With fill value
arr = dtl.DistributedArray(ctx, size=1000, dtype=np.float64, fill=0.0)
# With policy selection
arr = dtl.DistributedArray(
ctx, size=1000, dtype=np.float64,
partition=dtl.PARTITION_CYCLIC
)
# Properties (same as DistributedVector)
arr.global_size # Total elements (always 1000)
arr.local_size # Elements on this rank
arr.local_offset # Global index of first local element
arr.dtype # NumPy dtype
# Zero-copy NumPy view
local = arr.local_view()
local[:] = np.arange(len(local))
# Index queries
arr.is_local(global_idx) # bool: Is index on this rank?
arr.owner(global_idx) # int: Which rank owns index?
# NO resize() method - arrays are fixed size
# arr.resize(2000) # AttributeError!
Key Difference from DistributedVector: Arrays are fixed-size containers. Use DistributedArray when:
Size is known at creation time and won’t change
You want compile-time size guarantees
Slightly more memory-efficient than vector
DistributedSpan
Python exposes a first-class DistributedSpan factory that creates typed non-owning distributed spans from distributed containers:
vec = dtl.DistributedVector(ctx, size=1000, dtype=np.float64)
span = dtl.DistributedSpan(vec)
local = span.local_view() # zero-copy local NumPy view
copy = span.to_numpy() # explicit copy
sub = span.subspan(10, 20) # non-owning subspan
Key semantics:
non-owning: span lifetime is tied to the source container lifetime
local contiguous semantics: indexing/slicing through NumPy local views
distributed metadata:
global_size,local_size,rank,num_ranks
DistributedTensor
An N-dimensional distributed array, partitioned along the first dimension.
# Create 3D tensor (100x64x64)
tensor = dtl.DistributedTensor(ctx, shape=(100, 64, 64), dtype=np.float32)
# Properties
tensor.shape # Tuple: Global shape
tensor.local_shape # Tuple: Local shape (differs in dim 0)
tensor.ndim # int: Number of dimensions
tensor.dtype # NumPy dtype
# Zero-copy NumPy view
local = tensor.local_view() # 3D NumPy array
local[:] = np.random.randn(*local.shape)
Collective Operations
DTL provides distributed collective operations that work with NumPy arrays.
Available Operations
Function |
Description |
|---|---|
|
Reduce across ranks, result to all |
|
Reduce across ranks, result to root |
|
Send from root to all ranks |
|
Collect data at root |
|
Distribute data from root |
|
Gather to all ranks |
Reduction Operations
dtl.SUM # Sum
dtl.PROD # Product
dtl.MIN # Minimum
dtl.MAX # Maximum
Examples
import dtl
import numpy as np
with dtl.Context() as ctx:
# Allreduce: global sum
local_value = np.array([ctx.rank * 10.0])
global_sum = dtl.allreduce(ctx, local_value, op=dtl.SUM)
# Reduce to root
global_max = dtl.reduce(ctx, local_value, op=dtl.MAX, root=0)
# Broadcast from root
if ctx.is_root:
data = np.array([1.0, 2.0, 3.0])
else:
data = np.zeros(3)
data = dtl.broadcast(ctx, data, root=0)
# Gather: collect arrays at root
my_data = np.array([ctx.rank])
gathered = dtl.gather(ctx, my_data, root=0)
# On root: gathered.shape = (size, 1)
# Scatter: distribute from root
if ctx.is_root:
all_data = np.arange(ctx.size * 3).reshape(ctx.size, 3)
else:
all_data = np.empty((ctx.size, 3))
my_chunk = dtl.scatter(ctx, all_data, root=0)
# my_chunk.shape = (3,)
# Allgather: gather to all
all_gathered = dtl.allgather(ctx, my_data)
# all_gathered.shape = (size, 1)
Working with Scalars
Collective operations handle scalars automatically:
# Scalar input
local_sum = 42.0
global_sum = dtl.allreduce(ctx, local_sum, op=dtl.SUM)
# global_sum is a scalar (float)
# Array input
local_arr = np.array([1.0, 2.0, 3.0])
global_arr = dtl.allreduce(ctx, local_arr, op=dtl.SUM)
# global_arr is np.ndarray
Algorithm Operations
DTL provides distributed algorithm operations that work on containers.
Container-Level Algorithms
Function |
Description |
|---|---|
|
Apply function to each element |
|
Transform elements from src to dst |
|
Fill container with value |
|
Copy elements from src to dst |
|
Find first occurrence of value |
|
Count occurrences of value |
|
Reduce all elements |
|
Sort elements |
Examples
import dtl
import numpy as np
with dtl.Context() as ctx:
vec = dtl.DistributedVector(ctx, size=1000, dtype=np.float64)
local = vec.local_view()
local[:] = np.random.randn(len(local))
# Fill with value
dtl.fill(vec, 0.0)
# Transform with NumPy ufunc
src = dtl.DistributedVector(ctx, size=1000, dtype=np.float64)
dst = dtl.DistributedVector(ctx, size=1000, dtype=np.float64)
src.local_view()[:] = np.arange(src.local_size)
dtl.transform(src, dst, np.square)
# Distributed reduce
total = dtl.reduce(vec, op=dtl.SUM)
maximum = dtl.reduce(vec, op=dtl.MAX)
# Sort (distributed)
dtl.sort(vec)
dtl.sort(vec, reverse=True)
Policy Selection
Policies control how data is distributed and where it resides.
Partition Policies
Control how elements are distributed across ranks:
dtl.PARTITION_BLOCK # Contiguous chunks (default)
dtl.PARTITION_CYCLIC # Round-robin distribution
dtl.PARTITION_HASH # Hash-based distribution
# Block partition: ranks get contiguous chunks
# [0,1,2,3,4,5,6,7] with 2 ranks -> rank0:[0,1,2,3], rank1:[4,5,6,7]
vec_block = dtl.DistributedVector(
ctx, size=1000, dtype=np.float64,
partition=dtl.PARTITION_BLOCK
)
# Cyclic partition: round-robin
# [0,1,2,3,4,5,6,7] with 2 ranks -> rank0:[0,2,4,6], rank1:[1,3,5,7]
vec_cyclic = dtl.DistributedVector(
ctx, size=1000, dtype=np.float64,
partition=dtl.PARTITION_CYCLIC
)
Placement Policies
Control where data resides in memory:
dtl.PLACEMENT_HOST # CPU memory (default)
dtl.PLACEMENT_DEVICE # GPU memory (requires CUDA)
dtl.PLACEMENT_UNIFIED # CUDA unified memory
# CPU-only vector
cpu_vec = dtl.DistributedVector(
ctx, size=1000, dtype=np.float64,
placement=dtl.PLACEMENT_HOST
)
# GPU vector (if CUDA available)
if dtl.has_cuda():
gpu_vec = dtl.DistributedVector(
ctx, size=1000, dtype=np.float64,
placement=dtl.PLACEMENT_DEVICE
)
Querying Policies
vec.partition_policy # Returns current partition policy
vec.placement_policy # Returns current placement policy
Availability Checks
dtl.has_cuda() # bool: CUDA backend available
dtl.has_mpi() # bool: MPI backend available
RMA Operations
Remote Memory Access (RMA) enables one-sided communication where one rank can directly access another rank’s memory.
Window Creation
A Window exposes memory for remote access:
import dtl
import numpy as np
with dtl.Context() as ctx:
# Allocate window (DTL allocates memory)
win = dtl.Window(ctx, size=1024)
# Create window from existing array
data = np.zeros(100, dtype=np.float64)
win = dtl.Window(ctx, base=data)
Window Properties
win.size # Size in bytes
win.base # Base pointer (as integer)
win.is_valid # Whether window handle is valid
Synchronization
RMA operations require synchronization epochs:
# Active-target synchronization (fence)
win.fence()
# ... RMA operations ...
win.fence()
# Passive-target synchronization (lock/unlock)
win.lock(target=1, mode="exclusive")
# ... RMA operations to target 1 ...
win.unlock(target=1)
# Lock all ranks
win.lock_all()
# ... RMA operations ...
win.unlock_all()
# Flush operations
win.flush(target=1) # Complete operations to target
win.flush_all() # Complete operations to all targets
win.flush_local(target=1) # Local completion only
win.flush_local_all() # Local completion for all
Data Transfer
import dtl
import numpy as np
with dtl.Context() as ctx:
win = dtl.Window(ctx, size=1024)
# Put data to remote window
data = np.array([1.0, 2.0, 3.0], dtype=np.float64)
win.fence()
dtl.rma_put(win, target=0, offset=0, data=data)
win.fence()
# Get data from remote window
win.fence()
result = dtl.rma_get(win, target=0, offset=0, size=24, dtype=np.float64)
win.fence()
# result is np.array([1.0, 2.0, 3.0])
Atomic Operations
# Accumulate: atomically add to remote memory
dtl.rma_accumulate(win, target=0, offset=0, data=np.array([1.0]), op=dtl.SUM)
# Fetch and add: get old value, add new
old_value = dtl.rma_fetch_and_add(
win, target=0, offset=0,
addend=1.0, dtype=np.float64
)
# Compare and swap: atomic conditional update
old_value = dtl.rma_compare_and_swap(
win, target=0, offset=0,
compare=0.0, swap=1.0, dtype=np.float64
)
# If remote value was 0.0, it's now 1.0
# old_value contains the value that was there
RMA Example: Distributed Counter
"""Atomic distributed counter using RMA."""
import dtl
import numpy as np
with dtl.Context() as ctx:
# Each rank has a counter window
counter = np.zeros(1, dtype=np.int64)
win = dtl.Window(ctx, base=counter)
# All ranks increment rank 0's counter
win.lock(target=0, mode="exclusive")
old = dtl.rma_fetch_and_add(
win, target=0, offset=0,
addend=1, dtype=np.int64
)
win.unlock(target=0)
ctx.barrier()
if ctx.is_root:
print(f"Final counter value: {counter[0]}")
# Should equal ctx.size
NumPy Integration
Zero-Copy Views
local_view() returns a NumPy array that shares memory with the DTL container:
vec = dtl.DistributedVector(ctx, 1000)
local = vec.local_view()
# Modifications affect the container
local[:] = np.sin(np.arange(len(local)))
# NumPy operations work directly
mean = np.mean(local)
local_sorted = np.sort(local) # Creates copy
np.multiply(local, 2, out=local) # In-place, zero-copy
View Lifetime
Views are valid as long as the container exists:
# CORRECT: Container outlives view
vec = dtl.DistributedVector(ctx, 1000)
local = vec.local_view()
local[:] = 0 # Safe
# INCORRECT: Container destroyed before view used
local = dtl.DistributedVector(ctx, 1000).local_view() # vec destroyed!
local[:] = 0 # UNDEFINED BEHAVIOR
Performance Tips
# GOOD: Vectorized NumPy operations
local = vec.local_view()
local[:] = np.exp(local) # Fast, uses SIMD
# BAD: Python loops
for i in range(len(local)):
local[i] = np.exp(local[i]) # Slow, Python overhead
mpi4py Interoperability
DTL integrates with mpi4py for advanced MPI usage:
from mpi4py import MPI
import dtl
# Use existing communicator
world = MPI.COMM_WORLD
ctx = dtl.Context(comm=world)
# Split communicator
color = ctx.rank % 2
sub_comm = world.Split(color, ctx.rank)
sub_ctx = dtl.Context(comm=sub_comm)
# Operations within sub-communicator
vec = dtl.DistributedVector(sub_ctx, 1000)
# Collectives only involve ranks in sub_comm
Without mpi4py
If mpi4py is not installed, Context() uses MPI_COMM_WORLD internally:
# Works without mpi4py
ctx = dtl.Context() # Uses MPI_COMM_WORLD
Exception Handling
DTL exceptions map C status codes to Python:
class DTLError(Exception):
"""Base class for DTL exceptions."""
class CommunicationError(DTLError):
"""MPI/communication failure."""
class MemoryError(DTLError, MemoryError):
"""Memory allocation failure."""
class BoundsError(DTLError, IndexError):
"""Index out of bounds."""
class InvalidArgumentError(DTLError, ValueError):
"""Invalid argument."""
class BackendError(DTLError):
"""Backend (MPI/CUDA) failure."""
Handling Exceptions
try:
vec = dtl.DistributedVector(ctx, -1) # Invalid size
except dtl.InvalidArgumentError as e:
print(f"Invalid argument: {e}")
except dtl.DTLError as e:
print(f"DTL error: {e}")
Type Annotations
DTL includes PEP 484 type hints and a .pyi stub file:
# IDE will show types
def allreduce(
ctx: Context,
data: Union[np.ndarray, float, int],
op: str = "sum"
) -> Union[np.ndarray, float, int]: ...
Type Checking
mypy your_script.py
Running with MPI
Basic Execution
# Run with 4 MPI ranks
mpirun -np 4 python my_script.py
# With OpenMPI hostfile
mpirun -np 8 --hostfile hosts python my_script.py
pytest-mpi
For testing:
pip install pytest-mpi
mpirun -np 4 python -m pytest tests/ --with-mpi
Complete Examples
Example 1: Distributed Mean
"""Compute mean of distributed data."""
import dtl
import numpy as np
with dtl.Context() as ctx:
# Create vector with 1M elements
vec = dtl.DistributedVector(ctx, size=1_000_000, dtype=np.float64)
# Fill with random data
local = vec.local_view()
np.random.seed(42 + ctx.rank)
local[:] = np.random.randn(len(local))
# Local sum and count
local_sum = np.sum(local)
local_count = len(local)
# Global reduction
global_sum = dtl.allreduce(ctx, local_sum, op=dtl.SUM)
global_count = dtl.allreduce(ctx, float(local_count), op=dtl.SUM)
global_mean = global_sum / global_count
if ctx.is_root:
print(f"Global mean: {global_mean:.6f}")
Example 2: Distributed Dot Product
"""Compute dot product of two distributed vectors."""
import dtl
import numpy as np
with dtl.Context() as ctx:
N = 100_000
# Create two vectors
a = dtl.DistributedVector(ctx, size=N, dtype=np.float64)
b = dtl.DistributedVector(ctx, size=N, dtype=np.float64)
# Initialize
local_a = a.local_view()
local_b = b.local_view()
local_a[:] = np.arange(len(local_a)) + a.local_offset
local_b[:] = 2.0
# Local dot product
local_dot = np.dot(local_a, local_b)
# Global sum
global_dot = dtl.allreduce(ctx, local_dot, op=dtl.SUM)
if ctx.is_root:
# Expected: 2 * sum(0..N-1) = 2 * N*(N-1)/2 = N*(N-1)
expected = N * (N - 1)
print(f"Dot product: {global_dot:.0f}")
print(f"Expected: {expected}")
Example 3: Stencil Computation with Halo Exchange
"""1D stencil with neighbor communication."""
import dtl
import numpy as np
with dtl.Context() as ctx:
N = 1000
vec = dtl.DistributedVector(ctx, size=N, dtype=np.float64)
local = vec.local_view()
local[:] = np.sin(np.linspace(0, 2*np.pi, vec.global_size))[vec.local_offset:vec.local_offset+len(local)]
# Simple 3-point stencil: new[i] = 0.25*old[i-1] + 0.5*old[i] + 0.25*old[i+1]
# (Boundary handling omitted for simplicity)
result = np.zeros_like(local)
for i in range(1, len(local) - 1):
result[i] = 0.25 * local[i-1] + 0.5 * local[i] + 0.25 * local[i+1]
# Copy boundaries
result[0] = local[0]
result[-1] = local[-1]
local[:] = result
ctx.barrier()
if ctx.is_root:
print("Stencil computation complete")
References
C Bindings Guide (underlying C ABI)