Python Bindings Guide

This guide covers DTL’s Python bindings, providing a Pythonic interface with seamless NumPy integration for distributed computing.


Table of Contents


Overview

The Python bindings provide:

  • Pythonic API: Follows Python conventions (PEP 8, context managers)

  • Zero-Copy NumPy Views: local_view() shares memory with DTL containers

  • mpi4py Compatibility: Pass mpi4py communicators directly

  • Full Type Annotations: PEP 484 type hints for IDE support

  • Collective Operations: allreduce, broadcast, gather, scatter, etc.

Requirements

  • Python 3.8+

  • NumPy 1.20+

  • (Optional) mpi4py 3.0+ for MPI parallelism


Installation

From Source (in DTL Build Directory)

cd dtl/build
cmake .. -DDTL_BUILD_PYTHON=ON
make _dtl
make python_install

Verify Installation

>>> import dtl
>>> dtl.__version__
'0.1.0a1'
>>> dtl.has_mpi()
True

Quick Start

import dtl
import numpy as np

# Create execution context
with dtl.Context() as ctx:
    print(f"Rank {ctx.rank} of {ctx.size}")

    # Create distributed vector (1000 elements, float64)
    vec = dtl.DistributedVector(ctx, size=1000, dtype=np.float64)

    # Access local partition as NumPy array (zero-copy)
    local = vec.local_view()
    local[:] = np.arange(len(local)) + vec.local_offset

    # Collective reduce
    local_sum = np.sum(local)
    global_sum = dtl.allreduce(ctx, local_sum, op=dtl.SUM)

    if ctx.is_root:
        print(f"Global sum: {global_sum}")

Run with MPI:

mpirun -np 4 python my_script.py

Environment API

The Environment class manages backend lifecycle (MPI, CUDA, HIP, NCCL, SHMEM) using reference-counted RAII semantics. This is the recommended way to initialize DTL.

Creation

# Create environment (initializes backends on first call)
env = dtl.Environment()

# As context manager (recommended)
with dtl.Environment() as env:
    ctx = env.make_world_context()
    print(f"Rank {ctx.rank} of {ctx.size}")

Context Factory Methods

# Create context spanning all MPI ranks (CPU)
ctx = env.make_world_context()

# Create GPU-enabled context
ctx = env.make_world_context(device_id=0)

# Create single-process CPU-only context (no MPI)
ctx = env.make_cpu_context()

Backend Queries

env.has_mpi     # bool: MPI available
env.has_cuda    # bool: CUDA available
env.has_hip     # bool: HIP available
env.has_nccl    # bool: NCCL available
env.has_shmem   # bool: SHMEM available

Static Queries

dtl.Environment.is_initialized()  # True if any environment handle exists
dtl.Environment.ref_count()       # Number of active environment handles

Context API

The Context class encapsulates the execution environment. Contexts can be created directly or via Environment factory methods (preferred).

Creation

# Default: uses MPI_COMM_WORLD, CPU only
ctx = dtl.Context()

# With GPU device
ctx = dtl.Context(device_id=0)

# With mpi4py communicator
from mpi4py import MPI
ctx = dtl.Context(comm=MPI.COMM_WORLD)

Properties

ctx.rank            # int: Current rank (0 to size-1)
ctx.size            # int: Total number of ranks
ctx.is_root         # bool: True if rank 0
ctx.device_id       # int: GPU device ID (-1 for CPU only)
ctx.has_device      # bool: True if GPU enabled
ctx.has_mpi         # bool: MPI domain present
ctx.has_cuda        # bool: CUDA domain present
ctx.has_nccl        # bool: NCCL domain present
ctx.nccl_mode       # int: DTL_NCCL_MODE_* (or -1 if no NCCL)

Methods

ctx.barrier()  # Synchronize all ranks
ctx.fence()    # Local memory fence
ctx.dup()      # Duplicate context communicator
ctx.split(color, key=0)
ctx.with_cuda(device_id)
ctx.with_nccl(device_id, mode=dtl.DTL_NCCL_MODE_HYBRID_PARITY)
ctx.split_nccl(color, key=0, device_id=None, mode=dtl.DTL_NCCL_MODE_HYBRID_PARITY)
ctx.nccl_supports_native(dtl.DTL_NCCL_OP_ALLREDUCE)
ctx.nccl_supports_hybrid(dtl.DTL_NCCL_OP_SCAN)

NCCL mode constants:

dtl.DTL_NCCL_MODE_NATIVE_ONLY
dtl.DTL_NCCL_MODE_HYBRID_PARITY

Note: explicit C ABI NCCL device-collective entry points (dtl_nccl_*_device and *_device_ex) are currently exposed directly in C/Fortran. Python currently uses the generic collective API surface plus explicit context/domain selection.

Context Manager

The recommended pattern is using with:

with dtl.Context() as ctx:
    # Context is valid here
    vec = dtl.DistributedVector(ctx, 1000)
    # ...
# Context cleaned up automatically

Containers

DistributedVector

A 1D distributed array partitioned across ranks.

# Create vector
vec = dtl.DistributedVector(ctx, size=10000, dtype=np.float64)

# With fill value
vec = dtl.DistributedVector(ctx, size=10000, dtype=np.float32, fill=0.0)

# Properties
vec.global_size   # Total elements across all ranks
vec.local_size    # Elements on this rank
vec.local_offset  # Global index of first local element
vec.dtype         # NumPy dtype

# Zero-copy NumPy view
local = vec.local_view()
local[:] = np.random.randn(len(local))

# Index queries
vec.is_local(global_idx)  # bool: Is index on this rank?
vec.owner(global_idx)     # int: Which rank owns index?

Supported dtypes

NumPy dtype

Status

np.float64

Supported

np.float32

Supported

np.int64

Supported

np.int32

Supported

np.uint64

Supported

np.uint32

Supported

np.int8

Supported

np.uint8

Supported

DistributedArray

A fixed-size 1D distributed array. Unlike DistributedVector, the size is fixed at creation time and cannot be resized.

# Create fixed-size array (1000 elements, float64)
arr = dtl.DistributedArray(ctx, size=1000, dtype=np.float64)

# With fill value
arr = dtl.DistributedArray(ctx, size=1000, dtype=np.float64, fill=0.0)

# With policy selection
arr = dtl.DistributedArray(
    ctx, size=1000, dtype=np.float64,
    partition=dtl.PARTITION_CYCLIC
)

# Properties (same as DistributedVector)
arr.global_size   # Total elements (always 1000)
arr.local_size    # Elements on this rank
arr.local_offset  # Global index of first local element
arr.dtype         # NumPy dtype

# Zero-copy NumPy view
local = arr.local_view()
local[:] = np.arange(len(local))

# Index queries
arr.is_local(global_idx)  # bool: Is index on this rank?
arr.owner(global_idx)     # int: Which rank owns index?

# NO resize() method - arrays are fixed size
# arr.resize(2000)  # AttributeError!

Key Difference from DistributedVector: Arrays are fixed-size containers. Use DistributedArray when:

  • Size is known at creation time and won’t change

  • You want compile-time size guarantees

  • Slightly more memory-efficient than vector

DistributedSpan

Python exposes a first-class DistributedSpan factory that creates typed non-owning distributed spans from distributed containers:

vec = dtl.DistributedVector(ctx, size=1000, dtype=np.float64)
span = dtl.DistributedSpan(vec)

local = span.local_view()     # zero-copy local NumPy view
copy = span.to_numpy()        # explicit copy
sub = span.subspan(10, 20)    # non-owning subspan

Key semantics:

  • non-owning: span lifetime is tied to the source container lifetime

  • local contiguous semantics: indexing/slicing through NumPy local views

  • distributed metadata: global_size, local_size, rank, num_ranks

DistributedTensor

An N-dimensional distributed array, partitioned along the first dimension.

# Create 3D tensor (100x64x64)
tensor = dtl.DistributedTensor(ctx, shape=(100, 64, 64), dtype=np.float32)

# Properties
tensor.shape        # Tuple: Global shape
tensor.local_shape  # Tuple: Local shape (differs in dim 0)
tensor.ndim         # int: Number of dimensions
tensor.dtype        # NumPy dtype

# Zero-copy NumPy view
local = tensor.local_view()  # 3D NumPy array
local[:] = np.random.randn(*local.shape)

Collective Operations

DTL provides distributed collective operations that work with NumPy arrays.

Available Operations

Function

Description

dtl.allreduce(ctx, data, op)

Reduce across ranks, result to all

dtl.reduce(ctx, data, op, root)

Reduce across ranks, result to root

dtl.broadcast(ctx, data, root)

Send from root to all ranks

dtl.gather(ctx, data, root)

Collect data at root

dtl.scatter(ctx, data, root)

Distribute data from root

dtl.allgather(ctx, data)

Gather to all ranks

Reduction Operations

dtl.SUM   # Sum
dtl.PROD  # Product
dtl.MIN   # Minimum
dtl.MAX   # Maximum

Examples

import dtl
import numpy as np

with dtl.Context() as ctx:
    # Allreduce: global sum
    local_value = np.array([ctx.rank * 10.0])
    global_sum = dtl.allreduce(ctx, local_value, op=dtl.SUM)

    # Reduce to root
    global_max = dtl.reduce(ctx, local_value, op=dtl.MAX, root=0)

    # Broadcast from root
    if ctx.is_root:
        data = np.array([1.0, 2.0, 3.0])
    else:
        data = np.zeros(3)
    data = dtl.broadcast(ctx, data, root=0)

    # Gather: collect arrays at root
    my_data = np.array([ctx.rank])
    gathered = dtl.gather(ctx, my_data, root=0)
    # On root: gathered.shape = (size, 1)

    # Scatter: distribute from root
    if ctx.is_root:
        all_data = np.arange(ctx.size * 3).reshape(ctx.size, 3)
    else:
        all_data = np.empty((ctx.size, 3))
    my_chunk = dtl.scatter(ctx, all_data, root=0)
    # my_chunk.shape = (3,)

    # Allgather: gather to all
    all_gathered = dtl.allgather(ctx, my_data)
    # all_gathered.shape = (size, 1)

Working with Scalars

Collective operations handle scalars automatically:

# Scalar input
local_sum = 42.0
global_sum = dtl.allreduce(ctx, local_sum, op=dtl.SUM)
# global_sum is a scalar (float)

# Array input
local_arr = np.array([1.0, 2.0, 3.0])
global_arr = dtl.allreduce(ctx, local_arr, op=dtl.SUM)
# global_arr is np.ndarray

Algorithm Operations

DTL provides distributed algorithm operations that work on containers.

Container-Level Algorithms

Function

Description

dtl.for_each(container, func)

Apply function to each element

dtl.transform(src, dst, func)

Transform elements from src to dst

dtl.fill(container, value)

Fill container with value

dtl.copy(src, dst)

Copy elements from src to dst

dtl.find(container, value)

Find first occurrence of value

dtl.count(container, value)

Count occurrences of value

dtl.reduce(container, op)

Reduce all elements

dtl.sort(container)

Sort elements

Examples

import dtl
import numpy as np

with dtl.Context() as ctx:
    vec = dtl.DistributedVector(ctx, size=1000, dtype=np.float64)
    local = vec.local_view()
    local[:] = np.random.randn(len(local))

    # Fill with value
    dtl.fill(vec, 0.0)

    # Transform with NumPy ufunc
    src = dtl.DistributedVector(ctx, size=1000, dtype=np.float64)
    dst = dtl.DistributedVector(ctx, size=1000, dtype=np.float64)
    src.local_view()[:] = np.arange(src.local_size)
    dtl.transform(src, dst, np.square)

    # Distributed reduce
    total = dtl.reduce(vec, op=dtl.SUM)
    maximum = dtl.reduce(vec, op=dtl.MAX)

    # Sort (distributed)
    dtl.sort(vec)
    dtl.sort(vec, reverse=True)

Policy Selection

Policies control how data is distributed and where it resides.

Partition Policies

Control how elements are distributed across ranks:

dtl.PARTITION_BLOCK   # Contiguous chunks (default)
dtl.PARTITION_CYCLIC  # Round-robin distribution
dtl.PARTITION_HASH    # Hash-based distribution
# Block partition: ranks get contiguous chunks
# [0,1,2,3,4,5,6,7] with 2 ranks -> rank0:[0,1,2,3], rank1:[4,5,6,7]
vec_block = dtl.DistributedVector(
    ctx, size=1000, dtype=np.float64,
    partition=dtl.PARTITION_BLOCK
)

# Cyclic partition: round-robin
# [0,1,2,3,4,5,6,7] with 2 ranks -> rank0:[0,2,4,6], rank1:[1,3,5,7]
vec_cyclic = dtl.DistributedVector(
    ctx, size=1000, dtype=np.float64,
    partition=dtl.PARTITION_CYCLIC
)

Placement Policies

Control where data resides in memory:

dtl.PLACEMENT_HOST     # CPU memory (default)
dtl.PLACEMENT_DEVICE   # GPU memory (requires CUDA)
dtl.PLACEMENT_UNIFIED  # CUDA unified memory
# CPU-only vector
cpu_vec = dtl.DistributedVector(
    ctx, size=1000, dtype=np.float64,
    placement=dtl.PLACEMENT_HOST
)

# GPU vector (if CUDA available)
if dtl.has_cuda():
    gpu_vec = dtl.DistributedVector(
        ctx, size=1000, dtype=np.float64,
        placement=dtl.PLACEMENT_DEVICE
    )

Querying Policies

vec.partition_policy  # Returns current partition policy
vec.placement_policy  # Returns current placement policy

Availability Checks

dtl.has_cuda()     # bool: CUDA backend available
dtl.has_mpi()      # bool: MPI backend available

RMA Operations

Remote Memory Access (RMA) enables one-sided communication where one rank can directly access another rank’s memory.

Window Creation

A Window exposes memory for remote access:

import dtl
import numpy as np

with dtl.Context() as ctx:
    # Allocate window (DTL allocates memory)
    win = dtl.Window(ctx, size=1024)

    # Create window from existing array
    data = np.zeros(100, dtype=np.float64)
    win = dtl.Window(ctx, base=data)

Window Properties

win.size      # Size in bytes
win.base      # Base pointer (as integer)
win.is_valid  # Whether window handle is valid

Synchronization

RMA operations require synchronization epochs:

# Active-target synchronization (fence)
win.fence()
# ... RMA operations ...
win.fence()

# Passive-target synchronization (lock/unlock)
win.lock(target=1, mode="exclusive")
# ... RMA operations to target 1 ...
win.unlock(target=1)

# Lock all ranks
win.lock_all()
# ... RMA operations ...
win.unlock_all()

# Flush operations
win.flush(target=1)      # Complete operations to target
win.flush_all()          # Complete operations to all targets
win.flush_local(target=1)     # Local completion only
win.flush_local_all()         # Local completion for all

Data Transfer

import dtl
import numpy as np

with dtl.Context() as ctx:
    win = dtl.Window(ctx, size=1024)

    # Put data to remote window
    data = np.array([1.0, 2.0, 3.0], dtype=np.float64)
    win.fence()
    dtl.rma_put(win, target=0, offset=0, data=data)
    win.fence()

    # Get data from remote window
    win.fence()
    result = dtl.rma_get(win, target=0, offset=0, size=24, dtype=np.float64)
    win.fence()
    # result is np.array([1.0, 2.0, 3.0])

Atomic Operations

# Accumulate: atomically add to remote memory
dtl.rma_accumulate(win, target=0, offset=0, data=np.array([1.0]), op=dtl.SUM)

# Fetch and add: get old value, add new
old_value = dtl.rma_fetch_and_add(
    win, target=0, offset=0,
    addend=1.0, dtype=np.float64
)

# Compare and swap: atomic conditional update
old_value = dtl.rma_compare_and_swap(
    win, target=0, offset=0,
    compare=0.0, swap=1.0, dtype=np.float64
)
# If remote value was 0.0, it's now 1.0
# old_value contains the value that was there

RMA Example: Distributed Counter

"""Atomic distributed counter using RMA."""
import dtl
import numpy as np

with dtl.Context() as ctx:
    # Each rank has a counter window
    counter = np.zeros(1, dtype=np.int64)
    win = dtl.Window(ctx, base=counter)

    # All ranks increment rank 0's counter
    win.lock(target=0, mode="exclusive")
    old = dtl.rma_fetch_and_add(
        win, target=0, offset=0,
        addend=1, dtype=np.int64
    )
    win.unlock(target=0)

    ctx.barrier()

    if ctx.is_root:
        print(f"Final counter value: {counter[0]}")
        # Should equal ctx.size

NumPy Integration

Zero-Copy Views

local_view() returns a NumPy array that shares memory with the DTL container:

vec = dtl.DistributedVector(ctx, 1000)
local = vec.local_view()

# Modifications affect the container
local[:] = np.sin(np.arange(len(local)))

# NumPy operations work directly
mean = np.mean(local)
local_sorted = np.sort(local)  # Creates copy
np.multiply(local, 2, out=local)  # In-place, zero-copy

View Lifetime

Views are valid as long as the container exists:

# CORRECT: Container outlives view
vec = dtl.DistributedVector(ctx, 1000)
local = vec.local_view()
local[:] = 0  # Safe

# INCORRECT: Container destroyed before view used
local = dtl.DistributedVector(ctx, 1000).local_view()  # vec destroyed!
local[:] = 0  # UNDEFINED BEHAVIOR

Performance Tips

# GOOD: Vectorized NumPy operations
local = vec.local_view()
local[:] = np.exp(local)  # Fast, uses SIMD

# BAD: Python loops
for i in range(len(local)):
    local[i] = np.exp(local[i])  # Slow, Python overhead

mpi4py Interoperability

DTL integrates with mpi4py for advanced MPI usage:

from mpi4py import MPI
import dtl

# Use existing communicator
world = MPI.COMM_WORLD
ctx = dtl.Context(comm=world)

# Split communicator
color = ctx.rank % 2
sub_comm = world.Split(color, ctx.rank)
sub_ctx = dtl.Context(comm=sub_comm)

# Operations within sub-communicator
vec = dtl.DistributedVector(sub_ctx, 1000)
# Collectives only involve ranks in sub_comm

Without mpi4py

If mpi4py is not installed, Context() uses MPI_COMM_WORLD internally:

# Works without mpi4py
ctx = dtl.Context()  # Uses MPI_COMM_WORLD

Exception Handling

DTL exceptions map C status codes to Python:

class DTLError(Exception):
    """Base class for DTL exceptions."""

class CommunicationError(DTLError):
    """MPI/communication failure."""

class MemoryError(DTLError, MemoryError):
    """Memory allocation failure."""

class BoundsError(DTLError, IndexError):
    """Index out of bounds."""

class InvalidArgumentError(DTLError, ValueError):
    """Invalid argument."""

class BackendError(DTLError):
    """Backend (MPI/CUDA) failure."""

Handling Exceptions

try:
    vec = dtl.DistributedVector(ctx, -1)  # Invalid size
except dtl.InvalidArgumentError as e:
    print(f"Invalid argument: {e}")
except dtl.DTLError as e:
    print(f"DTL error: {e}")

Type Annotations

DTL includes PEP 484 type hints and a .pyi stub file:

# IDE will show types
def allreduce(
    ctx: Context,
    data: Union[np.ndarray, float, int],
    op: str = "sum"
) -> Union[np.ndarray, float, int]: ...

Type Checking

mypy your_script.py

Running with MPI

Basic Execution

# Run with 4 MPI ranks
mpirun -np 4 python my_script.py

# With OpenMPI hostfile
mpirun -np 8 --hostfile hosts python my_script.py

pytest-mpi

For testing:

pip install pytest-mpi
mpirun -np 4 python -m pytest tests/ --with-mpi

Complete Examples

Example 1: Distributed Mean

"""Compute mean of distributed data."""
import dtl
import numpy as np

with dtl.Context() as ctx:
    # Create vector with 1M elements
    vec = dtl.DistributedVector(ctx, size=1_000_000, dtype=np.float64)

    # Fill with random data
    local = vec.local_view()
    np.random.seed(42 + ctx.rank)
    local[:] = np.random.randn(len(local))

    # Local sum and count
    local_sum = np.sum(local)
    local_count = len(local)

    # Global reduction
    global_sum = dtl.allreduce(ctx, local_sum, op=dtl.SUM)
    global_count = dtl.allreduce(ctx, float(local_count), op=dtl.SUM)

    global_mean = global_sum / global_count

    if ctx.is_root:
        print(f"Global mean: {global_mean:.6f}")

Example 2: Distributed Dot Product

"""Compute dot product of two distributed vectors."""
import dtl
import numpy as np

with dtl.Context() as ctx:
    N = 100_000

    # Create two vectors
    a = dtl.DistributedVector(ctx, size=N, dtype=np.float64)
    b = dtl.DistributedVector(ctx, size=N, dtype=np.float64)

    # Initialize
    local_a = a.local_view()
    local_b = b.local_view()

    local_a[:] = np.arange(len(local_a)) + a.local_offset
    local_b[:] = 2.0

    # Local dot product
    local_dot = np.dot(local_a, local_b)

    # Global sum
    global_dot = dtl.allreduce(ctx, local_dot, op=dtl.SUM)

    if ctx.is_root:
        # Expected: 2 * sum(0..N-1) = 2 * N*(N-1)/2 = N*(N-1)
        expected = N * (N - 1)
        print(f"Dot product: {global_dot:.0f}")
        print(f"Expected:    {expected}")

Example 3: Stencil Computation with Halo Exchange

"""1D stencil with neighbor communication."""
import dtl
import numpy as np

with dtl.Context() as ctx:
    N = 1000
    vec = dtl.DistributedVector(ctx, size=N, dtype=np.float64)

    local = vec.local_view()
    local[:] = np.sin(np.linspace(0, 2*np.pi, vec.global_size))[vec.local_offset:vec.local_offset+len(local)]

    # Simple 3-point stencil: new[i] = 0.25*old[i-1] + 0.5*old[i] + 0.25*old[i+1]
    # (Boundary handling omitted for simplicity)

    result = np.zeros_like(local)

    for i in range(1, len(local) - 1):
        result[i] = 0.25 * local[i-1] + 0.5 * local[i] + 0.25 * local[i+1]

    # Copy boundaries
    result[0] = local[0]
    result[-1] = local[-1]

    local[:] = result

    ctx.barrier()

    if ctx.is_root:
        print("Stencil computation complete")

References