netcl wiki
tutorials

Tutorial: Data-Parallel Training

Tutorial: Data-Parallel Training

netcl's distributed module is a host-based minimum-viable-product for data-parallel training on a single host. Replicas of the model run in subprocesses, each processes a batch slice, and gradients are mean-reduced at the end of the step with all_reduce. There is no MPI, no NCCL, no RDMA — the transport is shared memory plus, where helpful, an OpenCL device-side pairwise sum.

This is the right pattern when the model fits on a single device but the training job would benefit from multiple devices running in parallel. For models that do not fit on a single device, netcl does not currently provide model-parallelism (sharded parameters, pipeline parallelism, or ZeRO-style optimizer sharding) — see the When to use data-parallel callout below.

Prerequisites

What You'll Build

A 4-replica data-parallel training loop for an MLP on MNIST. Each replica gets one quarter of the batch, runs forward + backward on its own device, gradients are mean-reduced across replicas with sync_grads, and the optimizer step runs on every replica. The same recipe scales from 1 GPU to N GPUs on a single host.

When to Use Data-Parallel

Decision callout. Data-parallel is the right answer when:

  • The model fits in the memory of a single device but you want to train on multiple devices in parallel (1–4 GPUs on a developer box, or 1 GPU + several CPU devices).
  • The host topology is PCIe or NVLink — i.e. the same machine. There is no network hop in the data path.
  • You are bottlenecked by per-batch compute, not by model size. A 2× speedup with 2 devices is realistic; a 4× speedup with 4 devices is the upper bound.

Data-parallel is the wrong answer when the model does not fit on a single device (no model-parallel / pipeline / ZeRO in netcl today), when the host topology spans multiple nodes (no RDMA / NCCL / MPI), or when you only have one device. For the single-device case, see MNIST with an MLP.

Step-by-Step

1. Imports

The complete import block for a 4-replica run, with all the symbols we will use:

from netcl.distributed import (
    prepare_replicas, data_parallel_step,
    all_reduce, broadcast, broadcast_params,
    shard_batch, sync_grads,
)
from netcl.core.device import manager
from netcl.nn import Linear, ReLU, Dropout, Sequential
from netcl.optim import SGD
import netcl.autograd as ag

A few notes on the imports:

  • shard_batch lives in netcl.distributed.data_parallel and is re-exported from the netcl.distributed package root. The German original omitted it from the import block; without it, the call site in §4 raises NameError: name 'shard_batch' is not defined.
  • sync_grads is the host-based mean-reduce of every param.grad across replicas. It is also re-exported from netcl.distributed; the import list above pulls it in correctly.
  • DeviceManager is the same class as netcl.core.device.DeviceManager — re-exported under the distributed namespace for ergonomics. The DeviceManager per-replica context is the per-thread device binding used inside a replica's forward.

2. Build the Model, the Optimizer, and the Device Manager

dev = manager.default("auto")
model = Sequential(
    Linear(dev.queue, 784, 256), ReLU(), Dropout(p=0.1),
    Linear(dev.queue, 256, 128), ReLU(), Dropout(p=0.1),
    Linear(dev.queue, 128, 10),
)
opt = SGD(model.parameters(), lr=1e-2, momentum=0.9)

from netcl.core.device import DeviceManager as CoreDeviceManager
mgr = CoreDeviceManager()          # one cl.Context + one cl.CommandQueue per device
print("devices:", mgr.num_devices())
print("queues :", [q.device.name for q in mgr.get_queues()])

DeviceManager defaults to the discovered default context's devices. Pass an explicit list to pin it to a subset — for example, "the two discrete GPUs" on a box that also has an integrated GPU. The class is the same as the DeviceManager from the core API; use whichever import path feels more natural in the surrounding code.

3. Build the Replicas

prepare_replicas clones the model parameters onto each device's queue, returning a List[List[Tensor]] — one parameter list per replica. The clone is a real H2D copy, so each replica is fully independent after this call.

replicas = prepare_replicas(
    model.parameters(),
    queues=mgr.get_queues(),       # one queue per replica
)
# One optimizer per replica (each optimizer mutates its own parameter list).
opt_replicas = [SGD(r, lr=1e-2, momentum=0.9) for r in replicas]

The first argument is the source parameter iterator; the second is the list of target queues. Each replica now has its own copy of the parameters on its own device, ready to run forward + backward in parallel.

Why one optimizer per replica? The Optimizer is a thin wrapper around a parameter list — it mutates the parameters in place during step(). A single SGD instance cannot drive four independent parameter lists. You therefore build N optimizers, one per replica, and call step() on every one. The gradients have been mean-reduced by sync_grads before the step, so every replica computes the same update — there is no drift.

4. The Training Loop (Manual Pattern)

The manual pattern is the most explicit form; the data_parallel_step shortcut in §5 wraps it.

import netcl.autograd as ag

for x, y in loader:                                 # x: (B, 784), y: (B,)
    # 1) Shard the batch along axis 0 — one sub-batch per replica.
    shards = shard_batch((x, y), num_shards=len(replicas))

    # 2) Forward + backward on each replica.
    losses = []
    for (params_r, opt_r), (xs, ys) in zip(zip(replicas, opt_replicas), shards):
        with ag.Tape() as tape:
            # Rebuild a tiny model view over this replica's parameter list.
            # (Use the same MLP class; parameters are bound by identity.)
            logits = model_forward(params_r, xs)    # see helper below
            loss   = ag.cross_entropy(logits, ys)
        tape.backward(loss)
        losses.append(loss)

    # 3) Mean-reduce every parameter's grad across replicas.
    sync_grads(replicas)

    # 4) Optimizer step on every replica.
    for opt_r in opt_replicas:
        opt_r.step()
        opt_r.zero_grad()

    # 5) Broadcast the updated parameters from replica 0 to every other replica.
    broadcast_params(src_params=replicas[0], dst_param_groups=replicas, root=0)

The model_forward helper above is a thin closure that runs the same MLP computation over a per-replica parameter list. The easiest way to write it is:

def model_forward(params, x):
    # params[0] = layer 0 weight (256, 784), params[1] = layer 0 bias (256,)
    # params[2] = layer 1 weight (128, 256), params[3] = layer 1 bias (128,)
    # params[4] = layer 2 weight (10,  128), params[5] = layer 2 bias (10,)
    h = ag.add(ag.matmul(x, transpose(params[0])), params[1])
    h = ag.relu(h)
    h = ag.add(ag.matmul(h, transpose(params[2])), params[3])
    h = ag.relu(h)
    return ag.add(ag.matmul(h, transpose(params[4])), params[5])

The five steps above are exactly what data_parallel_step wraps; the manual form is the right starting point when you need a custom hook (logging, NaN guards, learning-rate schedule ticks).

5. The Same Loop with data_parallel_step

data_parallel_step does steps 1–4 in one call. It takes a forward_fn(queue, xb, yb, params) callable so the user controls the forward and backward path, and three optional hooks (pre_shard_hook, post_sync_hook, post_step_hook) for extra logic.

def forward_fn(queue, xb, yb, params):
    tape = ag.Tape()
    tape.__enter__()
    logits = model_forward(params, xb)
    loss   = ag.cross_entropy(logits, yb)
    return loss, tape

for x, y in loader:
    loss = data_parallel_step(
        forward_fn=forward_fn,
        param_replicas=replicas,
        optimizers=opt_replicas,
        batch=(x, y),
        post_sync_hook=lambda: print("grads synced"),
    )
    broadcast_params(src_params=replicas[0], dst_param_groups=replicas, root=0)

data_parallel_step does not call broadcast_params internally (the gradients are equal across replicas after the sync, but the parameters are about to diverge during step()). If your model uses running statistics (BatchNorm buffers, etc.) or any non-parameter buffer that the head replica should own, call broadcast_params explicitly after the step, as shown above.

6. Save and Load

Because every replica carries the same model after broadcast_params, saving one is enough. Use save_model on the first replica's model view:

from netcl.io import save_model

# After the last `broadcast_params` of the run:
save_model(build_model_for(replicas[0]), "parallel_mlp.netcl")

save_model writes a single self-contained .netcl file (a NumPy .npz) holding every parameter and a sidecar JSON describing the model architecture. To resume, use load_model in a fresh process and re-run the prepare_replicas shuffle. For exact-resume training (continuing from a specific step), reach for save_checkpoint / load_checkpoint from netcl.io.checkpoint — they also carry the optimizer state, the scheduler state, and the GradScaler state.

Common Failure Modes

Common-failures callout. A short list of the data-parallel failure modes that bite most often:

  1. NameError: name 'shard_batch' is not defined. You forgot the import. The symbol lives in netcl.distributed.data_parallel and is re-exported from netcl.distributed. Add shard_batch to the import block at the top of the file.
  2. NameError: name 'sync_grads' is not defined. Same fix — sync_grads lives in the same module and is re-exported from netcl.distributed. The German original forgot to import it; the corrected import block is in §1.
  3. sync_grads raises ValueError: sync_grads expects grad populated. A replica's forward + backward did not produce a gradient on one of the parameters in the prepare_replicas list. The most common cause is a model that is built from model.parameters() after the replicas were prepared, so the new parameters are not in the replica list. Build the model first, prepare the replicas, then build the optimizer.
  4. Replicas diverge after a few hundred steps. Almost always a missed broadcast_params call. The parameters start identical, drift apart after the first step(), and never converge. Add the broadcast back.
  5. The first batch is fine, every subsequent batch raises a queue-full stall. The OpenCL command queue is not being drained between replicas. Make sure each replica is using its own cl.CommandQueue (this is the default for DeviceManager) rather than a shared queue from a single context.
  6. Speedup plateaus at ~1.5× even with 4 devices. The host-side all_reduce is the bottleneck. Try all_reduce(..., overlap=True) for the all_reduce / gather calls; the parallel H2D copy is often a measurable win.

Troubleshooting

The narrative version:

  • GPU out-of-memory in a single replica. prepare_replicas does not shard the model — it copies the whole model onto each device. If a single replica runs out of memory, the model is too big for that device. The right answer is to reduce n_replicas (so fewer devices try to fit the whole model) or the per-replica batch size. If the model does not fit on any single device, you need a model-parallel framework — netcl does not provide one today.
  • Synchronization is the bottleneck. Many small all_reduce calls add up. Two practical mitigations: (a) overlap the H2D copy with the compute by passing overlap=True to the all_reduce call inside sync_grads, and (b) accumulate gradients for k micro-batches before syncing, simulating a larger global batch.
  • Inconsistent weights after a step. You forgot the broadcast_params call. The replicas are about to drift apart the moment their respective optimizers run. Always broadcast after step().
  • shard_batch returns one fewer element than expected. The batch size is not divisible by num_shards. The function uses np.array_split, which pads the trailing shard with a smaller slice — confirm with len(shards) and len(shards[-1]).
  • subprocess worker hangs on exit. The OpenCL command queue still has pending kernels. Install a SIGINT handler that calls queue.finish() and a multiprocessing.active_children() cleanup in the parent's atexit.

See also