MPI Backend Guide
Status: Production-Ready Since: DTL 0.1.0-alpha.1 Last Updated: 2026-02-07
Overview
The MPI (Message Passing Interface) backend provides distributed communication for DTL. It enables containers and algorithms to span multiple processes (ranks), communicating via point-to-point and collective operations. MPI is the primary communication backend and the foundation for multi-node DTL programs.
Key capabilities:
Point-to-point communication:
send,recv,isend,irecv,sendrecvCollective operations:
broadcast,gather,scatter,allreduce,alltoall,barrierReduction operations:
reduce,allreduce,scan,exscanwith standard opsCommunicator management: Splitting, duplicating, and sub-group creation
Non-blocking operations: Request-based async communication with
wait/testThread safety: Configurable via
thread_support_level
Requirements
MPI Implementation: OpenMPI 4.0+, MPICH 3.3+, Intel MPI, or Cray MPICH
C++20 compiler with MPI support
CMake 3.18+
Installing MPI
# Ubuntu/Debian
sudo apt install libopenmpi-dev openmpi-bin
# or MPICH
sudo apt install libmpich-dev mpich
# macOS (via Homebrew)
brew install open-mpi
# Verify installation
mpirun --version
CMake Configuration
Enable the MPI backend:
cmake -DDTL_ENABLE_MPI=ON ..
CMake will auto-detect the MPI installation via find_package(MPI).
Common CMake Flags
Flag |
Default |
Description |
|---|---|---|
|
|
Enable MPI backend |
|
Auto |
Path to MPI C++ compiler wrapper |
Initialization
Using dtl::environment
The recommended way to initialize MPI is via DTL’s environment RAII guard:
#include <dtl/dtl.hpp>
int main(int argc, char** argv) {
// Initializes MPI (and other backends) on first construction
dtl::environment env(argc, argv);
// Create a context spanning all ranks
auto ctx = env.make_world_context();
std::cout << "Rank " << ctx.rank() << " of " << ctx.size() << "\n";
// For explicit MPI operations, extract the communicator:
auto& comm = ctx.get<dtl::mpi_domain>().communicator();
// MPI finalized when env goes out of scope
return 0;
}
Environment Options
Configure MPI thread support and ownership:
#include <dtl/core/environment_options.hpp>
auto opts = dtl::environment_options::defaults();
// Request multi-threaded MPI support
opts.mpi.thread_level = dtl::thread_support_level::multiple;
// Or adopt externally initialized MPI
opts.mpi.ownership = dtl::backend_ownership::adopt_external;
dtl::environment env(argc, argv, opts);
Backend Ownership Modes
Mode |
Description |
|---|---|
|
DTL calls |
|
User has already called |
|
Initialize if MPI is available, skip silently if not |
|
Do not use MPI regardless of availability |
Communicator Management
World Communicator
Get the default world communicator (wraps MPI_COMM_WORLD):
#include <dtl/communication/default_communicator.hpp>
auto comm = dtl::world_comm();
int rank = comm.rank();
int size = comm.size();
MPI Communicator Wrapper
The mpi_communicator class wraps MPI_Comm with RAII semantics:
#include <backends/mpi/mpi_communicator.hpp>
// Wrap MPI_COMM_WORLD (non-owning)
dtl::mpi::mpi_communicator world(MPI_COMM_WORLD, false);
// Query properties
rank_t my_rank = world.rank();
rank_t world_size = world.size();
bool valid = world.valid();
Splitting Communicators
Create sub-groups for collective operations on subsets of ranks:
// Split into even/odd groups
int color = comm.rank() % 2;
auto sub_comm = comm.split(color);
// Now sub_comm contains only ranks with the same color
// Collectives on sub_comm only involve those ranks
Duplicating Communicators
Create an independent copy for isolated communication:
auto dup_comm = comm.dup();
// dup_comm is independent — collectives on it won't interfere with comm
Point-to-Point Communication
Blocking Send/Receive
#include <dtl/communication/point_to_point.hpp>
auto comm = dtl::world_comm();
if (comm.rank() == 0) {
// Send data to rank 1
std::vector<double> data = {1.0, 2.0, 3.0};
dtl::send(comm, data.data(), data.size(), /*dest=*/1, /*tag=*/0);
} else if (comm.rank() == 1) {
// Receive data from rank 0
std::vector<double> data(3);
dtl::recv(comm, data.data(), data.size(), /*source=*/0, /*tag=*/0);
}
Non-Blocking Communication
auto comm = dtl::world_comm();
std::vector<double> send_buf(100, 42.0);
std::vector<double> recv_buf(100);
// Initiate non-blocking operations
auto send_req = dtl::isend(comm, send_buf.data(), 100, /*dest=*/1, /*tag=*/0);
auto recv_req = dtl::irecv(comm, recv_buf.data(), 100, /*source=*/1, /*tag=*/0);
// ... do other work while communication proceeds ...
// Wait for completion
dtl::wait(comm, send_req);
dtl::wait(comm, recv_req);
Send-Receive
Combined send and receive for exchanging data between pairs:
std::vector<double> send_buf(100);
std::vector<double> recv_buf(100);
int partner = (comm.rank() + 1) % comm.size();
dtl::sendrecv(comm,
send_buf.data(), 100, partner, /*send_tag=*/0,
recv_buf.data(), 100, partner, /*recv_tag=*/0);
Collective Operations
Barrier
Synchronize all ranks:
dtl::barrier(comm); // Blocks until all ranks reach this point
Broadcast
Send data from one rank to all:
std::vector<double> data(100);
if (comm.rank() == 0) {
// Root initializes data
std::fill(data.begin(), data.end(), 3.14);
}
// All ranks receive root's data
dtl::broadcast(comm, data.data(), data.size(), /*root=*/0);
Gather and Scatter
// Gather: collect from all ranks to root
std::vector<double> local_data(10);
std::vector<double> all_data; // Only meaningful on root
if (comm.rank() == 0) {
all_data.resize(10 * comm.size());
}
dtl::gather(comm, local_data.data(), 10,
all_data.data(), 10, /*root=*/0);
// Scatter: distribute from root to all ranks
dtl::scatter(comm, all_data.data(), 10,
local_data.data(), 10, /*root=*/0);
Allreduce
Reduce and distribute result to all ranks:
double local_sum = compute_local_sum();
double global_sum;
dtl::allreduce(comm, &local_sum, &global_sum, dtl::reduce_sum<double>{});
// global_sum is the same on all ranks
All-to-All
Exchange data between all pairs of ranks:
std::vector<double> send_buf(comm.size()); // One element per rank
std::vector<double> recv_buf(comm.size());
dtl::alltoall(comm, send_buf.data(), 1, recv_buf.data(), 1);
Collective Best Practices
All ranks must participate in collective operations. If one rank skips a collective, the program will deadlock.
Match arguments across ranks: All ranks must pass the same
count,dtype,root, andopto collective calls.Use in-place variants when possible to avoid extra buffer allocations:
dtl::allreduce_inplace(comm, &value, dtl::reduce_sum<double>{});
Prefer allreduce over reduce + broadcast when all ranks need the result.
Batch small messages into fewer larger messages to reduce latency overhead.
Deadlock Avoidance
Common Deadlock Patterns
Mismatched collectives:
// DEADLOCK: rank 0 calls barrier, rank 1 calls reduce
if (comm.rank() == 0) {
dtl::barrier(comm);
} else {
dtl::reduce(comm, &data, &result, op, 0);
}
Circular send/recv:
// DEADLOCK: both ranks block on send before posting recv
dtl::send(comm, data, count, partner, tag); // Blocks!
dtl::recv(comm, data, count, partner, tag);
Safe Patterns
Use sendrecv for exchanges:
// SAFE: combined send/receive
dtl::sendrecv(comm,
send_buf, count, partner, tag,
recv_buf, count, partner, tag);
Use non-blocking operations:
// SAFE: non-blocking send before blocking recv
auto req = dtl::isend(comm, data, count, partner, tag);
dtl::recv(comm, recv_data, count, partner, tag);
dtl::wait(comm, req);
Ensure all ranks follow the same control flow for collectives:
// SAFE: all ranks execute the same collective
dtl::barrier(comm);
auto result = dtl::allreduce(comm, &local, &global, op);
DTL Algorithm Integration
DTL algorithms use the MPI communicator for distributed phases:
auto comm = dtl::world_comm();
dtl::distributed_vector<int> vec(100000, comm.size(), comm.rank());
// Distributed reduce (local reduce + MPI allreduce)
auto sum = dtl::reduce(dtl::par{}, vec, 0, std::plus<>{}, comm);
// Distributed sort (local sort + MPI sample exchange)
dtl::sort(dtl::par{}, vec, std::less<>{}, comm);
// Distributed scan (local scan + MPI prefix scan)
dtl::distributed_vector<int> output(100000, comm.size(), comm.rank());
dtl::inclusive_scan(dtl::par{}, vec, output, 0, std::plus<>{});
Performance Tips
Reduce Communication Volume
Use
allreduceinstead ofreduce+broadcastBatch multiple small messages into a single larger message
Use variable-count variants (
gatherv,scatterv) to avoid padding
Overlap Communication and Computation
// Post non-blocking receive
auto req = dtl::irecv(comm, recv_buf, count, source, tag);
// Do local computation while data arrives
compute_local_work();
// Wait for communication to finish
dtl::wait(comm, req);
// Process received data
process_received(recv_buf);
Choose the Right MPI Implementation
Implementation |
Strengths |
|---|---|
OpenMPI |
General-purpose, good defaults, wide platform support |
MPICH |
Excellent standards compliance, good for development |
Intel MPI |
Optimized for Intel hardware, fabric-aware |
Cray MPICH |
Optimized for Cray/HPE interconnects |
Thread Safety
For multi-threaded MPI usage, request thread_support_level::multiple:
auto opts = dtl::environment_options::defaults();
opts.mpi.thread_level = dtl::thread_support_level::multiple;
dtl::environment env(argc, argv, opts);
Launching MPI Programs
# OpenMPI
mpirun -np 4 ./my_dtl_app
# MPICH
mpiexec -n 4 ./my_dtl_app
# SLURM
srun -n 4 ./my_dtl_app
See Also
CPU Backend Guide — Local execution policies
NCCL Backend — GPU-to-GPU collectives
OpenSHMEM Backend — PGAS one-sided communication
Backend Comparison — Feature comparison across backends