netcl wiki
architecture

Architecture: Distributed

Architecture: Distributed

netcl's distributed subpackage is not a cluster framework. It solves the much narrower problem of running a single training job across multiple devices / multiple processes on a single host, with the smallest possible overhead and zero external dependencies (no MPI, no NCCL, no RDMA). Every replica is a normal Python process; collectives happen through multiprocessing shared memory or, for larger payloads, a Unix socket.

This page is the design rationale and the host-topology picture. For the step-by-step recipe, see the Tutorial: Data-Parallel Training.

Host topology

The intended deployment is one or more OpenCL devices visible to a single host process. Each device is owned by one worker process; the trainer runs in the main process and talks to the workers through multiprocessing queues plus a shared-memory ring for small tensors.

Caption — one host, N worker processes, each holding one DeviceManager handle and one OpenCL context/queue. The trainer (top) dispatches batches, gathers gradients, and broadcasts parameters. The transport is shared memory for small tensors and Unix sockets for large ones. There is no InfiniBand and no network hop.

This is sometimes called a "1-node N-device" model. If you want multi-node, you need a different framework; the Distributed Architecture module is deliberately small and easy to wrap.

The subprocess replica pattern

The recommended way to spawn replicas is one Python process per device, started with subprocess.Popen. The child process opens its own OpenCL context (via DeviceManager.default("gpu") or "cpu"), creates its own cl.CommandQueue, and waits for work over a pipe.

A typical launcher looks like this (paraphrased from distributed/trainer.py and the Tutorial: Data-Parallel Training):

import subprocess, sys

workers = []
for dev in ["gpu:0", "gpu:1", "cpu"]:
    p = subprocess.Popen(
        [sys.executable, "-m", "netcl.distributed.replica"],
        env={**os.environ, "NETCL_DEVICE": dev},
    )
    workers.append(p)

Each worker uses the DeviceManager to discover and bind to its assigned device. The DeviceManager is fork-safe in the sense described in the Tensor Backend page: the OpenCL objects are snapshotted before fork() and re-initialised in the child.

prepare_replicas and data_parallel_step

distributed/trainer.py provides two convenience functions that glue the rest of the stack together.

from netcl.distributed import (
    prepare_replicas, data_parallel_step, sync_grads, broadcast_params,
)

replicas = prepare_replicas(model, opt, n_replicas=4, devices=[...])

for x, y in loader:
    loss = data_parallel_step(
        replicas, (x, y),
        loss_fn=lambda logits, y: ag.cross_entropy(logits, y),
        tape_factory=ag.Tape,
    )
  • prepare_replicas takes the source model parameters and copies them onto each device's queue. The result is List[List[Tensor]] — one list of parameter tensors per replica.
  • data_parallel_step does the work of one training step across replicas: 1. Shard the batch along axis 0 (shard_batch in distributed/data_parallel.py). 2. For each replica, run the user-supplied forward_fn(queue, xb, yb, params) to produce a (loss_node, tape) pair, then call tape.backward(loss_node). 3. Call sync_grads to all-reduce (mean) the per-replica gradients. 4. Run each replica's Optimizer step() and zero_grad(). 5. Optionally run a post_step_hook (e.g. to log per-replica loss).

This is the function the trainer loop calls once per step. The Tutorial: Data-Parallel Training demonstrates wiring it into a real loader.

Collective primitives

The collective primitives live in distributed/collectives.py and are implemented as host-side reductions over a list of Tensors. There is no peer-to-peer GPU path; the implementation is small enough that you can read it in a few minutes.

Collective Implementation
all_reduce(tensors, op="sum"\|"mean") to_host() on every tensor, np.sum / np.mean along axis 0, scatter back with cl.enqueue_copy. With overlap=True, the scatter is done from one thread per tensor.
broadcast(tensors, root=0) Pull the root tensor to host, copy the host array into every other tensor.
scatter(tensor, chunks) np.array_split(tensor.to_host(), chunks, axis=0).
gather(tensors) np.concatenate([t.to_host() for t in tensors], axis=0).

A small P2P variant, all_reduce_p2p, is provided for the case where all tensors share the same cl.Context. It JIT-compiles an ar_sum kernel and reduces pairwise. For typical 1-host 4-device training it is no faster than the host path because the to_host() round-trip already amortises the launch overhead.

Transport selection

all_reduce and broadcast always go through the host (a NumPy intermediate) because the OpenCL contexts are different on each device — peer-to-peer access is not portable across OpenCL implementations. The host staging means there is no zero-copy shortcut between devices; the cost is one H2D and one D2H per tensor per step. In practice this is fast enough for parameter-sync on ResNet/MNIST-class workloads; for large embedding tables, the cost dominates and you should switch to a cluster framework.

Data-parallel flow

The end-to-end flow of one step is:

A more detailed look at the parameter-sync portion:

# After per-replica backward:
sync_grads(param_replicas)              # mean across replicas
for opt in optimizers:
    opt.step()                          # per-replica step
    opt.zero_grad()
broadcast_params(replicas)              # optional: re-broadcast weights

The broadcast_params step is mostly a safety net — every replica's Optimizer starts from the same parameters because sync_grads averaged the gradients. If the optimizer is stateful (Adam, AdamW, RMSProp) and you re-shard between steps, broadcast_params is what keeps the optimizer state consistent.

What this module is not

  • Not NCCL / not a ring all-reduce — there is no topology-aware reduction and no InfiniBand path. Two-GPU on one host is fast; sixteen GPUs across two hosts is not what this module is for.
  • Not multi-node — every replica is a subprocess of one host. If you need to span hosts, wrap data_parallel_step in an outer driver (e.g. torch.distributed with PyOpenCL worker hooks) and use this module as the per-host reducer.
  • Not gradient-compressed — raw bytes are averaged. For models with very large embedding tables, switch to a framework that supports FP16/BF16 gradient compression.
  • Not fault-tolerant — if one replica crashes, the host has to restart all of them. There is no elastic re-spawn.

For cluster training, the recommended path is one of:

  • torch.distributed with PyOpenCL worker hooks.
  • A MPI/NCCL wrapper that delegates the per-host reduction to netcl's all_reduce and uses the cluster framework for the cross-host step.

See also