netcl.data — DataLoader, Augment, SHM-Ring
netcl.data — DataLoader, Augment, SHM-Ring
The data API is the training-data ingestion layer. It builds on top of
Python's multiprocessing to give you a DataLoader with a worker pool, a
sliding-window prefetch
ring buffer, and an optional lock-free shared-memory fast path for the inner H2D copy
on OpenCL devices. It also ships a small set of CPU and
GPU augmentation primitives, balanced-sampling / label filters, and a fork-vs-spawn aware
pool initializer that keeps PyOpenCL safe to import inside a worker.
Note — Long-form imports. The
netcl/datapackage does not have a flat__init__.pyre-export. Reach the public symbols by their full submodule path:
python from netcl.data.dataloader import DataLoader from netcl.data.shared_batch import SharedBatchRingBuffer, worker_write_to_shm from netcl.data.augment import random_erase, cutout, color_jitter from netcl.data.augment_gpu import flip_horizontal, apply_color_jitter, cutout from netcl.data.filters import ( apply_filters, to_float, normalize, horizontal_flip, random_crop, default_cifar10_filters, basic_cifar10_filters, )
Module Map
| Symbol | Path | Purpose |
|---|---|---|
DataLoader |
data/dataloader.py |
High-level loader: workers + prefetch ring + optional SHM fast path |
SharedBatchRingBuffer |
data/shared_batch.py |
Lock-free shared-memory ring of pre-allocated batch slots |
worker_write_to_shm |
data/shared_batch.py |
Worker-side protocol: copy a stack of samples into a ring slot |
random_erase, cutout, color_jitter |
data/augment.py |
CPU-side NCHW augmentations (return a fresh array, do not mutate) |
flip_horizontal, apply_color_jitter, cutout |
data/augment_gpu.py |
OpenCL-side NCHW augmentations (masks / factors come from host) |
apply_filters, to_float, normalize, horizontal_flip, random_crop, default_cifar10_filters, basic_cifar10_filters |
data/filters.py |
Dataset-level filter and balanced-sampling helpers |
random_split |
utils/data.py |
Convenience helper: np.array_split of indices into N stratified shards |
DataLoader
DataLoader is the only thing most user code should instantiate. It
combines a worker pool, an asynchronous prefetch ring, and a thin post-processing hook
chain. It is lazy: the worker pool is only created on the first call to __iter__,
and it is persistent: the same pool is reused across epochs as long as the dataset
identity is unchanged.
from netcl.data.dataloader import DataLoader
loader = DataLoader(
dataset,
batch_size=128,
shuffle=True,
drop_last=False,
num_workers=2,
prefetch=4, # ring buffer depth (defaults to 2 * num_workers)
seed=0,
transforms=None, # optional (xb, yb) -> (xb, yb) callable
use_shared_mem=False, # set True for the SHM fast path (num_workers must be > 0)
)
for xb, yb in loader:
...
| Argument | Default | Purpose |
|---|---|---|
dataset |
required | Any object with __len__ and __getitem__(int) -> tuple[np.ndarray, ...]. |
batch_size |
32 |
Samples per batch. |
shuffle |
True |
Shuffle sample order each epoch. |
drop_last |
False |
Drop the trailing partial batch. |
num_workers |
4 |
Worker processes; 0 runs single-threaded (useful for debugging). |
prefetch |
2 * num_workers |
Sliding-window ring size — batches in flight ahead of the consumer. |
seed |
None |
RNG seed for shuffling. |
transforms |
None |
A single callable (xb, yb) -> (xb, yb) or a list of such callables, applied on the main thread after the batch arrives from a worker. |
use_shared_mem |
False |
Use the SharedBatchRingBuffer zero-copy path. Requires num_workers > 0. |
DataLoader exposes three lifecycle hooks beyond __iter__:
close()— terminate the worker pool andunlink()the SHM ring buffer. Called automatically from__del__, but explicit calls are encouraged in long-running processes to free the SHM segments deterministically.__len__()— the number of batches the loader will yield this epoch (respectsdrop_last).__del__()— callsclose(). Safe to rely on for short-lived scripts.
Fork-vs-Spawn Caveat
PyOpenCL is fork-unsafe: a fork() after PyOpenCL has imported its ICD / bound a
driver leaves the child process with a half-initialized runtime that crashes on the first
kernel launch. The DataLoader handles this automatically:
- On Linux, where
forkis the default and cheap (it lets workers inherit the parent's loaded datasets via copy-on-write),_get_mp_context()returnsmultiprocessing.get_context("fork"). PyOpenCL is not yet imported in the main process when the workers start, so the fork happens before any driver binding. - On macOS and Windows, the only available start method is
"spawn". The dataset is sent to each worker once via the pool initializer; the first iteration may take a second or two for a large in-memory dataset, but every subsequent iteration is fast.
If you need to use fork explicitly (e.g. for a one-shot diagnostic) and PyOpenCL is
already imported, do it before the DataLoader ever constructs its pool. Otherwise stick
with the platform default.
Shared-Memory Ring Buffer
When use_shared_mem=True and num_workers > 0, DataLoader takes the
zero-copy path. The main process creates a SharedBatchRingBuffer sized
K = prefetch + 2 and asks each worker to write its batch directly into a slot instead
of returning a pickled NumPy tuple.
loader = DataLoader(ds, batch_size=64, num_workers=2, prefetch=8, use_shared_mem=True)
The flow is:
- The first batch is processed on the main thread (so the loader can learn the field shapes and dtypes), and yielded to the consumer immediately.
- The main process attaches to the SHM ring and starts
apply_asyncjobs on the worker pool, each call passing the slot ID as a serializedattach_infodict (name, slot size, shape, dtype). - Workers call
worker_write_to_shmwhichnp.copytos the stacked arrays into the slot. - The main process
slot_copy(slot_id)s the data out, freeing the slot for the next worker. The slot's view is thennp.array(...)copied into a fresh allocation, so the SHM slot can be reused immediately without a race. - The last (incomplete) batch falls back to the regular pickle path because the per-worker shape discovery happens only once.
The full shared-memory contract — including the per-worker cache that avoids re-attaching on every call — is described in Distributed Architecture.
pin_memory (H2D Pre-Pinning)
The DataLoader does not currently expose a pin_memory=True flag in the constructor
itself. The pre-pinning path is implemented inside the
OpenCLBackend via the PinnedBufferPool: a Tensor.from_host(...)
with use_pinned=True (the default when NETCL_PINNED_H2D=1) routes the H2D copy
through a page-locked host buffer. The result is the same end-to-end behavior as a
pin_memory=True argument on PyTorch's DataLoader — the host side of the H2D copy
is page-locked, which makes the PCIe transfer significantly faster on discrete GPUs.
To opt out, set NETCL_PINNED_H2D=0 in the environment, or pass use_pinned=False to
Tensor.from_host(...) at the call site.
Augmentations
CPU Augmentations
from netcl.data.augment import random_erase, cutout, color_jitter
# All functions take NCHW float32 (or uint8) numpy arrays and return a fresh array.
x_aug = random_erase(x, p=0.5, scale=(0.02, 0.2), ratio=(0.3, 3.3))
x_aug = cutout(x, size=8)
x_aug = color_jitter(x, brightness=0.2, contrast=0.2, saturation=0.0)
| Function | Shape | Purpose |
|---|---|---|
random_erase |
(N, C, H, W) |
Random Erase: zero a random rectangle per sample |
cutout |
(N, C, H, W) |
Fixed-size square cutout centered on a random position |
color_jitter |
(N, C, H, W) |
Per-batch brightness + contrast jitter (no saturation) |
The CPU augmentations are intentionally tiny — they exist to make single-process debug runs easy. In a real training loop you should reach for the GPU variants, or for the filter pipeline below.
GPU Augmentations
from netcl.data.augment_gpu import flip_horizontal, apply_color_jitter, cutout
# All functions take an OpenCL Tensor and per-sample masks/factors from the host.
mask = (np.random.rand(N) < 0.5).astype(np.uint8)
x_aug = flip_horizontal(x, mask)
b = np.random.uniform(-0.2, 0.2, size=(N,)).astype(np.float32)
c = 1.0 + np.random.uniform(-0.2, 0.2, size=(N,)).astype(np.float32)
x_aug = apply_color_jitter(x, b, c)
centers = np.random.randint(0, 32, size=(N, 2)).astype(np.int32)
x_aug = cutout(x, centers, size=8)
| Function | Shape | Per-sample input | Purpose |
|---|---|---|---|
flip_horizontal |
(N, C, H, W) |
mask: (N,) uint8 |
Flip images where mask[n] == 1 |
apply_color_jitter |
(N, C, H, W) |
brightness: (N,), contrast: (N,) float32 |
x * c[n] + b[n] per sample |
cutout |
(N, C, H, W) |
centers: (N, 2) int |
Zero out a square patch per sample |
Note — Randomness is host-side. Every GPU augment function takes its randomness (masks, factors, centers) as a host NumPy array. The OpenCL kernel stays a pure deterministic function of
(x, randomness), which is what makes these kernels trivially testable and re-buildable across devices.
Filter Pipeline
The data/filters.py module provides a composable filter pipeline that runs on the
dataset (not the batch) and is therefore most useful when combined with the
DataLoader.transforms= argument.
from netcl.data.filters import (
random_crop, horizontal_flip, to_float, normalize,
default_cifar10_filters, basic_cifar10_filters,
)
pipeline = [
random_crop(padding=4, crop_size=32, pad_mode="reflect"),
horizontal_flip(p=0.5),
to_float(scale=255.0),
normalize(mean=(0.4914, 0.4822, 0.4465), std=(0.2023, 0.1994, 0.2010)),
]
loader = DataLoader(ds, batch_size=128, transforms=pipeline)
| Filter / Helper | Purpose |
|---|---|
apply_filters(batch, filters) |
Run a single FilterFn or a list over a (xb, yb) batch |
to_float(scale=255.0) |
Cast to float32 and divide by scale (if the input is integer-typed) |
normalize(mean, std) |
(x - mean) / std per channel |
horizontal_flip(p=0.5) |
Per-sample horizontal flip with probability p |
random_crop(padding=4, crop_size=32) |
Reflect-pad then per-sample random crop |
default_cifar10_filters() |
The standard CIFAR-10 augmentation recipe |
basic_cifar10_filters() |
No-augmentation baseline (just to_float + normalize) |
A FilterFn is just Callable[[np.ndarray, np.ndarray], tuple[np.ndarray, np.ndarray]].
You can write your own — they compose left-to-right in the order they appear in the
list.
Custom Datasets
Any object that supports __len__ and __getitem__ works. A sample is a tuple of
NumPy arrays; a batch is what you get when the loader calls np.stack along a new
leading axis:
class MyDataset:
def __len__(self):
return 1000
def __getitem__(self, i):
x = ... # np.ndarray, e.g. (C, H, W)
y = ... # np.ndarray, e.g. () (scalar)
return x, y # tuple of ndarrays
For multi-field samples, return a longer tuple (or a dict with the same keys in every
item). The loader will np.stack each field along a new leading axis and yield them in
the same shape:
return x_img, x_mask, y_label
# loader yields (xb_img, xb_mask, yb_label) with shapes ((B, C, H, W), (B, H, W), (B,))
See also
- MNIST with MLP — the worked example that uses
DataLoaderend-to-end. - Tensor — the device-side value type the H2D copy produces.
- OpenCLBackend — the
PinnedBufferPoolthat backs thepin_memory-equivalent H2D path. - Memory Pool — why the H2D path is
pin_memory-equivalent and how to size the pool. - Distributed Architecture — the cross-process IPC contract
that the
SharedBatchRingBufferimplements.