Tutorial: Data-Parallel Training
Tutorial: Data-Parallel Training
netcl's distributed module is a host-based minimum-viable-product for data-parallel training on a single host. Replicas of the model run in subprocesses, each processes a batch slice, and gradients are mean-reduced at the end of the step with all_reduce. There is no MPI, no NCCL, no RDMA — the transport is shared memory plus, where helpful, an OpenCL device-side pairwise sum.
This is the right pattern when the model fits on a single device but the training job would benefit from multiple devices running in parallel. For models that do not fit on a single device, netcl does not currently provide model-parallelism (sharded parameters, pipeline parallelism, or ZeRO-style optimizer sharding) — see the When to use data-parallel callout below.
Prerequisites
- Quickstart — running a single kernel.
- Tensor — the value type the replicas produce and consume.
- Tensor Backend — the OpenCL command queue, the BufferPool, and the fork-safety of the OpenCLBackend.
- Understanding Autograd — the Tape / Node model, since each replica runs its own Tape.
- MNIST with an MLP — a single-process training script you can adapt.
What You'll Build
A 4-replica data-parallel training loop for an MLP on MNIST. Each replica gets one quarter of the batch, runs forward + backward on its own device, gradients are mean-reduced across replicas with sync_grads, and the optimizer step runs on every replica. The same recipe scales from 1 GPU to N GPUs on a single host.
When to Use Data-Parallel
Decision callout. Data-parallel is the right answer when:
- The model fits in the memory of a single device but you want to train on multiple devices in parallel (1–4 GPUs on a developer box, or 1 GPU + several CPU devices).
- The host topology is PCIe or NVLink — i.e. the same machine. There is no network hop in the data path.
- You are bottlenecked by per-batch compute, not by model size. A 2× speedup with 2 devices is realistic; a 4× speedup with 4 devices is the upper bound.
Data-parallel is the wrong answer when the model does not fit on a single device (no model-parallel / pipeline / ZeRO in netcl today), when the host topology spans multiple nodes (no RDMA / NCCL / MPI), or when you only have one device. For the single-device case, see MNIST with an MLP.
Step-by-Step
1. Imports
The complete import block for a 4-replica run, with all the symbols we will use:
from netcl.distributed import (
prepare_replicas, data_parallel_step,
all_reduce, broadcast, broadcast_params,
shard_batch, sync_grads,
)
from netcl.core.device import manager
from netcl.nn import Linear, ReLU, Dropout, Sequential
from netcl.optim import SGD
import netcl.autograd as ag
A few notes on the imports:
shard_batchlives innetcl.distributed.data_paralleland is re-exported from thenetcl.distributedpackage root. The German original omitted it from the import block; without it, the call site in §4 raisesNameError: name 'shard_batch' is not defined.sync_gradsis the host-based mean-reduce of everyparam.gradacross replicas. It is also re-exported fromnetcl.distributed; the import list above pulls it in correctly.DeviceManageris the same class asnetcl.core.device.DeviceManager— re-exported under the distributed namespace for ergonomics. The DeviceManager per-replica context is the per-thread device binding used inside a replica's forward.
2. Build the Model, the Optimizer, and the Device Manager
dev = manager.default("auto")
model = Sequential(
Linear(dev.queue, 784, 256), ReLU(), Dropout(p=0.1),
Linear(dev.queue, 256, 128), ReLU(), Dropout(p=0.1),
Linear(dev.queue, 128, 10),
)
opt = SGD(model.parameters(), lr=1e-2, momentum=0.9)
from netcl.core.device import DeviceManager as CoreDeviceManager
mgr = CoreDeviceManager() # one cl.Context + one cl.CommandQueue per device
print("devices:", mgr.num_devices())
print("queues :", [q.device.name for q in mgr.get_queues()])
DeviceManager defaults to the discovered default context's devices. Pass an explicit list to pin it to a subset — for example, "the two discrete GPUs" on a box that also has an integrated GPU. The class is the same as the DeviceManager from the core API; use whichever import path feels more natural in the surrounding code.
3. Build the Replicas
prepare_replicas clones the model parameters onto each device's
queue, returning a List[List[Tensor]] — one parameter list per replica. The clone is a
real H2D copy, so each replica is fully independent after this call.
replicas = prepare_replicas(
model.parameters(),
queues=mgr.get_queues(), # one queue per replica
)
# One optimizer per replica (each optimizer mutates its own parameter list).
opt_replicas = [SGD(r, lr=1e-2, momentum=0.9) for r in replicas]
The first argument is the source parameter iterator; the second is the list of target queues. Each replica now has its own copy of the parameters on its own device, ready to run forward + backward in parallel.
Why one optimizer per replica? The Optimizer is a thin wrapper around a parameter list — it mutates the parameters in place during
step(). A single SGD instance cannot drive four independent parameter lists. You therefore build N optimizers, one per replica, and callstep()on every one. The gradients have been mean-reduced by sync_grads before the step, so every replica computes the same update — there is no drift.
4. The Training Loop (Manual Pattern)
The manual pattern is the most explicit form; the data_parallel_step
shortcut in §5 wraps it.
import netcl.autograd as ag
for x, y in loader: # x: (B, 784), y: (B,)
# 1) Shard the batch along axis 0 — one sub-batch per replica.
shards = shard_batch((x, y), num_shards=len(replicas))
# 2) Forward + backward on each replica.
losses = []
for (params_r, opt_r), (xs, ys) in zip(zip(replicas, opt_replicas), shards):
with ag.Tape() as tape:
# Rebuild a tiny model view over this replica's parameter list.
# (Use the same MLP class; parameters are bound by identity.)
logits = model_forward(params_r, xs) # see helper below
loss = ag.cross_entropy(logits, ys)
tape.backward(loss)
losses.append(loss)
# 3) Mean-reduce every parameter's grad across replicas.
sync_grads(replicas)
# 4) Optimizer step on every replica.
for opt_r in opt_replicas:
opt_r.step()
opt_r.zero_grad()
# 5) Broadcast the updated parameters from replica 0 to every other replica.
broadcast_params(src_params=replicas[0], dst_param_groups=replicas, root=0)
The model_forward helper above is a thin closure that runs the same MLP computation
over a per-replica parameter list. The easiest way to write it is:
def model_forward(params, x):
# params[0] = layer 0 weight (256, 784), params[1] = layer 0 bias (256,)
# params[2] = layer 1 weight (128, 256), params[3] = layer 1 bias (128,)
# params[4] = layer 2 weight (10, 128), params[5] = layer 2 bias (10,)
h = ag.add(ag.matmul(x, transpose(params[0])), params[1])
h = ag.relu(h)
h = ag.add(ag.matmul(h, transpose(params[2])), params[3])
h = ag.relu(h)
return ag.add(ag.matmul(h, transpose(params[4])), params[5])
The five steps above are exactly what data_parallel_step wraps; the
manual form is the right starting point when you need a custom hook (logging, NaN guards,
learning-rate schedule ticks).
5. The Same Loop with data_parallel_step
data_parallel_step does steps 1–4 in one call. It takes a
forward_fn(queue, xb, yb, params) callable so the user controls the forward and
backward path, and three optional hooks (pre_shard_hook, post_sync_hook,
post_step_hook) for extra logic.
def forward_fn(queue, xb, yb, params):
tape = ag.Tape()
tape.__enter__()
logits = model_forward(params, xb)
loss = ag.cross_entropy(logits, yb)
return loss, tape
for x, y in loader:
loss = data_parallel_step(
forward_fn=forward_fn,
param_replicas=replicas,
optimizers=opt_replicas,
batch=(x, y),
post_sync_hook=lambda: print("grads synced"),
)
broadcast_params(src_params=replicas[0], dst_param_groups=replicas, root=0)
data_parallel_step does not call broadcast_params
internally (the gradients are equal across replicas after the sync, but the parameters
are about to diverge during step()). If your model uses running statistics (BatchNorm
buffers, etc.) or any non-parameter buffer that the head replica should own, call
broadcast_params explicitly after the step, as shown above.
6. Save and Load
Because every replica carries the same model after broadcast_params, saving one is enough. Use save_model on the first replica's model view:
from netcl.io import save_model
# After the last `broadcast_params` of the run:
save_model(build_model_for(replicas[0]), "parallel_mlp.netcl")
save_model writes a single self-contained .netcl file (a NumPy .npz) holding every
parameter and a sidecar JSON describing the model architecture. To resume, use
load_model in a fresh process and re-run the prepare_replicas
shuffle. For exact-resume training (continuing from a specific step), reach for
save_checkpoint / load_checkpoint from netcl.io.checkpoint — they also carry the
optimizer state, the scheduler state, and the GradScaler state.
Common Failure Modes
Common-failures callout. A short list of the data-parallel failure modes that bite most often:
- NameError: name 'shard_batch' is not defined. You forgot the import. The symbol lives in
netcl.distributed.data_paralleland is re-exported fromnetcl.distributed. Addshard_batchto the import block at the top of the file.- NameError: name 'sync_grads' is not defined. Same fix —
sync_gradslives in the same module and is re-exported fromnetcl.distributed. The German original forgot to import it; the corrected import block is in §1.sync_gradsraisesValueError: sync_grads expects grad populated. A replica's forward + backward did not produce a gradient on one of the parameters in the prepare_replicas list. The most common cause is a model that is built frommodel.parameters()after the replicas were prepared, so the new parameters are not in the replica list. Build the model first, prepare the replicas, then build the optimizer.- Replicas diverge after a few hundred steps. Almost always a missed broadcast_params call. The parameters start identical, drift apart after the first
step(), and never converge. Add the broadcast back.- The first batch is fine, every subsequent batch raises a queue-full stall. The OpenCL command queue is not being drained between replicas. Make sure each replica is using its own cl.CommandQueue (this is the default for DeviceManager) rather than a shared queue from a single context.
- Speedup plateaus at ~1.5× even with 4 devices. The host-side all_reduce is the bottleneck. Try
all_reduce(..., overlap=True)for the all_reduce / gather calls; the parallel H2D copy is often a measurable win.
Troubleshooting
The narrative version:
- GPU out-of-memory in a single replica.
prepare_replicasdoes not shard the model — it copies the whole model onto each device. If a single replica runs out of memory, the model is too big for that device. The right answer is to reducen_replicas(so fewer devices try to fit the whole model) or the per-replica batch size. If the model does not fit on any single device, you need a model-parallel framework — netcl does not provide one today. - Synchronization is the bottleneck. Many small all_reduce calls
add up. Two practical mitigations: (a) overlap the H2D copy with the compute by
passing
overlap=Trueto the all_reduce call insidesync_grads, and (b) accumulate gradients forkmicro-batches before syncing, simulating a larger global batch. - Inconsistent weights after a step. You forgot the
broadcast_params call. The replicas are about to drift apart the
moment their respective optimizers run. Always broadcast after
step(). shard_batchreturns one fewer element than expected. The batch size is not divisible bynum_shards. The function usesnp.array_split, which pads the trailing shard with a smaller slice — confirm withlen(shards)andlen(shards[-1]).subprocessworker hangs on exit. The OpenCL command queue still has pending kernels. Install aSIGINThandler that callsqueue.finish()and amultiprocessing.active_children()cleanup in the parent'satexit.
See also
- MNIST with an MLP — the single-device version of this tutorial, which is the right starting point before going multi-device.
- Understanding Autograd — the Tape / Node model that each replica runs independently.
- Distributed API — the full reference for DeviceManager, prepare_replicas, data_parallel_step, all_reduce, broadcast, broadcast_params, shard_batch, and sync_grads.
- Architecture: Distributed — the design rationale, the host-topology picture, and the subprocess replica pattern.
- Tensor API — the value type sync_grads and broadcast_params operate on.
- Optimizer API — the SGD / Adam / AdamW family that drives the per-replica step.
- io API — the save_model / load_model format and the
save_checkpoint/load_checkpointtraining-state helpers used at the end of a run.