Architecture: Distributed
Architecture: Distributed
netcl's distributed subpackage is not a cluster framework. It
solves the much narrower problem of running a single training job
across multiple devices / multiple processes on a single host,
with the smallest possible overhead and zero external dependencies
(no MPI, no NCCL, no RDMA). Every replica is a normal Python
process; collectives happen through multiprocessing shared memory
or, for larger payloads, a Unix socket.
This page is the design rationale and the host-topology picture. For the step-by-step recipe, see the Tutorial: Data-Parallel Training.
Host topology
The intended deployment is one or more OpenCL devices visible to a
single host process. Each device is owned by one worker process;
the trainer runs in the main process and talks to the workers
through multiprocessing queues plus a shared-memory ring for
small tensors.
Caption — one host, N worker processes, each holding one DeviceManager handle and one OpenCL context/queue. The trainer (top) dispatches batches, gathers gradients, and broadcasts parameters. The transport is shared memory for small tensors and Unix sockets for large ones. There is no InfiniBand and no network hop.
This is sometimes called a "1-node N-device" model. If you want multi-node, you need a different framework; the Distributed Architecture module is deliberately small and easy to wrap.
The subprocess replica pattern
The recommended way to spawn replicas is one Python process per
device, started with subprocess.Popen. The child process opens
its own OpenCL context (via DeviceManager.default("gpu") or
"cpu"), creates its own cl.CommandQueue, and waits for
work over a pipe.
A typical launcher looks like this (paraphrased from
distributed/trainer.py and the
Tutorial: Data-Parallel Training):
import subprocess, sys
workers = []
for dev in ["gpu:0", "gpu:1", "cpu"]:
p = subprocess.Popen(
[sys.executable, "-m", "netcl.distributed.replica"],
env={**os.environ, "NETCL_DEVICE": dev},
)
workers.append(p)
Each worker uses the DeviceManager to discover and bind
to its assigned device. The DeviceManager is
fork-safe in the sense described in the
Tensor Backend page: the OpenCL
objects are snapshotted before fork() and re-initialised in the
child.
prepare_replicas and data_parallel_step
distributed/trainer.py provides two convenience functions that
glue the rest of the stack together.
from netcl.distributed import (
prepare_replicas, data_parallel_step, sync_grads, broadcast_params,
)
replicas = prepare_replicas(model, opt, n_replicas=4, devices=[...])
for x, y in loader:
loss = data_parallel_step(
replicas, (x, y),
loss_fn=lambda logits, y: ag.cross_entropy(logits, y),
tape_factory=ag.Tape,
)
prepare_replicastakes the source model parameters and copies them onto each device's queue. The result isList[List[Tensor]]— one list of parameter tensors per replica.data_parallel_stepdoes the work of one training step across replicas: 1. Shard the batch along axis 0 (shard_batchindistributed/data_parallel.py). 2. For each replica, run the user-suppliedforward_fn(queue, xb, yb, params)to produce a(loss_node, tape)pair, then calltape.backward(loss_node). 3. Callsync_gradsto all-reduce (mean) the per-replica gradients. 4. Run each replica's Optimizerstep()andzero_grad(). 5. Optionally run apost_step_hook(e.g. to log per-replica loss).
This is the function the trainer loop calls once per step. The Tutorial: Data-Parallel Training demonstrates wiring it into a real loader.
Collective primitives
The collective primitives live in distributed/collectives.py and
are implemented as host-side reductions over a list of
Tensors. There is no peer-to-peer GPU path; the
implementation is small enough that you can read it in a few
minutes.
| Collective | Implementation |
|---|---|
all_reduce(tensors, op="sum"\|"mean") |
to_host() on every tensor, np.sum / np.mean along axis 0, scatter back with cl.enqueue_copy. With overlap=True, the scatter is done from one thread per tensor. |
broadcast(tensors, root=0) |
Pull the root tensor to host, copy the host array into every other tensor. |
scatter(tensor, chunks) |
np.array_split(tensor.to_host(), chunks, axis=0). |
gather(tensors) |
np.concatenate([t.to_host() for t in tensors], axis=0). |
A small P2P variant, all_reduce_p2p, is provided for the case
where all tensors share the same cl.Context. It JIT-compiles an
ar_sum kernel and reduces pairwise. For typical 1-host 4-device
training it is no faster than the host path because the
to_host() round-trip already amortises the launch overhead.
Transport selection
all_reduce and broadcast always go through the host (a NumPy
intermediate) because the OpenCL contexts are different on each
device — peer-to-peer access is not portable across OpenCL
implementations. The host staging means there is no zero-copy
shortcut between devices; the cost is one H2D and one D2H per
tensor per step. In practice this is fast enough for
parameter-sync on ResNet/MNIST-class workloads; for large
embedding tables, the cost dominates and you should switch to a
cluster framework.
Data-parallel flow
The end-to-end flow of one step is:
A more detailed look at the parameter-sync portion:
# After per-replica backward:
sync_grads(param_replicas) # mean across replicas
for opt in optimizers:
opt.step() # per-replica step
opt.zero_grad()
broadcast_params(replicas) # optional: re-broadcast weights
The broadcast_params step is mostly a safety net — every
replica's Optimizer starts from the same parameters
because sync_grads averaged the gradients. If the optimizer is
stateful (Adam, AdamW, RMSProp) and you re-shard between steps,
broadcast_params is what keeps the optimizer state consistent.
What this module is not
- Not NCCL / not a ring all-reduce — there is no topology-aware reduction and no InfiniBand path. Two-GPU on one host is fast; sixteen GPUs across two hosts is not what this module is for.
- Not multi-node — every replica is a
subprocessof one host. If you need to span hosts, wrapdata_parallel_stepin an outer driver (e.g.torch.distributedwith PyOpenCL worker hooks) and use this module as the per-host reducer. - Not gradient-compressed — raw bytes are averaged. For models with very large embedding tables, switch to a framework that supports FP16/BF16 gradient compression.
- Not fault-tolerant — if one replica crashes, the host has to restart all of them. There is no elastic re-spawn.
For cluster training, the recommended path is one of:
torch.distributedwith PyOpenCL worker hooks.- A MPI/NCCL wrapper that delegates the per-host reduction to
netcl's
all_reduceand uses the cluster framework for the cross-host step.
See also
- Distributed Architecture — this page.
- distributed API — the full symbol list:
all_reduce,broadcast,scatter,gather,DeviceManager,prepare_replicas,data_parallel_step,sync_grads,broadcast_params,shard_batch,replicate_params. - Tutorial: Data-Parallel Training — end-to-end walk-through of a 1-host 4-device training job.
- Tensor API — the Tensor type that the collectives exchange.
- Optimizer API — the Optimizer that consumes the synced gradients.
- AMP API — the AMP loss-scaler that unscales gradients before the optimizer.
- Data API — the DataLoader that
data_parallel_stepshards. - JIT Compiler — caches fused kernels per replica, so each replica's hot loop is fast after the first step.