netcl wiki
api

netcl.data — DataLoader, Augment, SHM-Ring

netcl.data — DataLoader, Augment, SHM-Ring

The data API is the training-data ingestion layer. It builds on top of Python's multiprocessing to give you a DataLoader with a worker pool, a sliding-window prefetch ring buffer, and an optional lock-free shared-memory fast path for the inner H2D copy on OpenCL devices. It also ships a small set of CPU and GPU augmentation primitives, balanced-sampling / label filters, and a fork-vs-spawn aware pool initializer that keeps PyOpenCL safe to import inside a worker.

Note — Long-form imports. The netcl/data package does not have a flat __init__.py re-export. Reach the public symbols by their full submodule path:

python from netcl.data.dataloader import DataLoader from netcl.data.shared_batch import SharedBatchRingBuffer, worker_write_to_shm from netcl.data.augment import random_erase, cutout, color_jitter from netcl.data.augment_gpu import flip_horizontal, apply_color_jitter, cutout from netcl.data.filters import ( apply_filters, to_float, normalize, horizontal_flip, random_crop, default_cifar10_filters, basic_cifar10_filters, )

Module Map

Symbol Path Purpose
DataLoader data/dataloader.py High-level loader: workers + prefetch ring + optional SHM fast path
SharedBatchRingBuffer data/shared_batch.py Lock-free shared-memory ring of pre-allocated batch slots
worker_write_to_shm data/shared_batch.py Worker-side protocol: copy a stack of samples into a ring slot
random_erase, cutout, color_jitter data/augment.py CPU-side NCHW augmentations (return a fresh array, do not mutate)
flip_horizontal, apply_color_jitter, cutout data/augment_gpu.py OpenCL-side NCHW augmentations (masks / factors come from host)
apply_filters, to_float, normalize, horizontal_flip, random_crop, default_cifar10_filters, basic_cifar10_filters data/filters.py Dataset-level filter and balanced-sampling helpers
random_split utils/data.py Convenience helper: np.array_split of indices into N stratified shards

DataLoader

DataLoader is the only thing most user code should instantiate. It combines a worker pool, an asynchronous prefetch ring, and a thin post-processing hook chain. It is lazy: the worker pool is only created on the first call to __iter__, and it is persistent: the same pool is reused across epochs as long as the dataset identity is unchanged.

from netcl.data.dataloader import DataLoader

loader = DataLoader(
    dataset,
    batch_size=128,
    shuffle=True,
    drop_last=False,
    num_workers=2,
    prefetch=4,            # ring buffer depth (defaults to 2 * num_workers)
    seed=0,
    transforms=None,       # optional (xb, yb) -> (xb, yb) callable
    use_shared_mem=False,  # set True for the SHM fast path (num_workers must be > 0)
)
for xb, yb in loader:
    ...
Argument Default Purpose
dataset required Any object with __len__ and __getitem__(int) -> tuple[np.ndarray, ...].
batch_size 32 Samples per batch.
shuffle True Shuffle sample order each epoch.
drop_last False Drop the trailing partial batch.
num_workers 4 Worker processes; 0 runs single-threaded (useful for debugging).
prefetch 2 * num_workers Sliding-window ring size — batches in flight ahead of the consumer.
seed None RNG seed for shuffling.
transforms None A single callable (xb, yb) -> (xb, yb) or a list of such callables, applied on the main thread after the batch arrives from a worker.
use_shared_mem False Use the SharedBatchRingBuffer zero-copy path. Requires num_workers > 0.

DataLoader exposes three lifecycle hooks beyond __iter__:

  • close() — terminate the worker pool and unlink() the SHM ring buffer. Called automatically from __del__, but explicit calls are encouraged in long-running processes to free the SHM segments deterministically.
  • __len__() — the number of batches the loader will yield this epoch (respects drop_last).
  • __del__() — calls close(). Safe to rely on for short-lived scripts.

Fork-vs-Spawn Caveat

PyOpenCL is fork-unsafe: a fork() after PyOpenCL has imported its ICD / bound a driver leaves the child process with a half-initialized runtime that crashes on the first kernel launch. The DataLoader handles this automatically:

  • On Linux, where fork is the default and cheap (it lets workers inherit the parent's loaded datasets via copy-on-write), _get_mp_context() returns multiprocessing.get_context("fork"). PyOpenCL is not yet imported in the main process when the workers start, so the fork happens before any driver binding.
  • On macOS and Windows, the only available start method is "spawn". The dataset is sent to each worker once via the pool initializer; the first iteration may take a second or two for a large in-memory dataset, but every subsequent iteration is fast.

If you need to use fork explicitly (e.g. for a one-shot diagnostic) and PyOpenCL is already imported, do it before the DataLoader ever constructs its pool. Otherwise stick with the platform default.

Shared-Memory Ring Buffer

When use_shared_mem=True and num_workers > 0, DataLoader takes the zero-copy path. The main process creates a SharedBatchRingBuffer sized K = prefetch + 2 and asks each worker to write its batch directly into a slot instead of returning a pickled NumPy tuple.

loader = DataLoader(ds, batch_size=64, num_workers=2, prefetch=8, use_shared_mem=True)

The flow is:

  1. The first batch is processed on the main thread (so the loader can learn the field shapes and dtypes), and yielded to the consumer immediately.
  2. The main process attaches to the SHM ring and starts apply_async jobs on the worker pool, each call passing the slot ID as a serialized attach_info dict (name, slot size, shape, dtype).
  3. Workers call worker_write_to_shm which np.copytos the stacked arrays into the slot.
  4. The main process slot_copy(slot_id)s the data out, freeing the slot for the next worker. The slot's view is then np.array(...) copied into a fresh allocation, so the SHM slot can be reused immediately without a race.
  5. The last (incomplete) batch falls back to the regular pickle path because the per-worker shape discovery happens only once.

The full shared-memory contract — including the per-worker cache that avoids re-attaching on every call — is described in Distributed Architecture.

pin_memory (H2D Pre-Pinning)

The DataLoader does not currently expose a pin_memory=True flag in the constructor itself. The pre-pinning path is implemented inside the OpenCLBackend via the PinnedBufferPool: a Tensor.from_host(...) with use_pinned=True (the default when NETCL_PINNED_H2D=1) routes the H2D copy through a page-locked host buffer. The result is the same end-to-end behavior as a pin_memory=True argument on PyTorch's DataLoader — the host side of the H2D copy is page-locked, which makes the PCIe transfer significantly faster on discrete GPUs.

To opt out, set NETCL_PINNED_H2D=0 in the environment, or pass use_pinned=False to Tensor.from_host(...) at the call site.

Augmentations

CPU Augmentations

from netcl.data.augment import random_erase, cutout, color_jitter

# All functions take NCHW float32 (or uint8) numpy arrays and return a fresh array.
x_aug = random_erase(x, p=0.5, scale=(0.02, 0.2), ratio=(0.3, 3.3))
x_aug = cutout(x, size=8)
x_aug = color_jitter(x, brightness=0.2, contrast=0.2, saturation=0.0)
Function Shape Purpose
random_erase (N, C, H, W) Random Erase: zero a random rectangle per sample
cutout (N, C, H, W) Fixed-size square cutout centered on a random position
color_jitter (N, C, H, W) Per-batch brightness + contrast jitter (no saturation)

The CPU augmentations are intentionally tiny — they exist to make single-process debug runs easy. In a real training loop you should reach for the GPU variants, or for the filter pipeline below.

GPU Augmentations

from netcl.data.augment_gpu import flip_horizontal, apply_color_jitter, cutout

# All functions take an OpenCL Tensor and per-sample masks/factors from the host.
mask = (np.random.rand(N) < 0.5).astype(np.uint8)
x_aug = flip_horizontal(x, mask)

b = np.random.uniform(-0.2, 0.2, size=(N,)).astype(np.float32)
c = 1.0 + np.random.uniform(-0.2, 0.2, size=(N,)).astype(np.float32)
x_aug = apply_color_jitter(x, b, c)

centers = np.random.randint(0, 32, size=(N, 2)).astype(np.int32)
x_aug = cutout(x, centers, size=8)
Function Shape Per-sample input Purpose
flip_horizontal (N, C, H, W) mask: (N,) uint8 Flip images where mask[n] == 1
apply_color_jitter (N, C, H, W) brightness: (N,), contrast: (N,) float32 x * c[n] + b[n] per sample
cutout (N, C, H, W) centers: (N, 2) int Zero out a square patch per sample

Note — Randomness is host-side. Every GPU augment function takes its randomness (masks, factors, centers) as a host NumPy array. The OpenCL kernel stays a pure deterministic function of (x, randomness), which is what makes these kernels trivially testable and re-buildable across devices.

Filter Pipeline

The data/filters.py module provides a composable filter pipeline that runs on the dataset (not the batch) and is therefore most useful when combined with the DataLoader.transforms= argument.

from netcl.data.filters import (
    random_crop, horizontal_flip, to_float, normalize,
    default_cifar10_filters, basic_cifar10_filters,
)

pipeline = [
    random_crop(padding=4, crop_size=32, pad_mode="reflect"),
    horizontal_flip(p=0.5),
    to_float(scale=255.0),
    normalize(mean=(0.4914, 0.4822, 0.4465), std=(0.2023, 0.1994, 0.2010)),
]

loader = DataLoader(ds, batch_size=128, transforms=pipeline)
Filter / Helper Purpose
apply_filters(batch, filters) Run a single FilterFn or a list over a (xb, yb) batch
to_float(scale=255.0) Cast to float32 and divide by scale (if the input is integer-typed)
normalize(mean, std) (x - mean) / std per channel
horizontal_flip(p=0.5) Per-sample horizontal flip with probability p
random_crop(padding=4, crop_size=32) Reflect-pad then per-sample random crop
default_cifar10_filters() The standard CIFAR-10 augmentation recipe
basic_cifar10_filters() No-augmentation baseline (just to_float + normalize)

A FilterFn is just Callable[[np.ndarray, np.ndarray], tuple[np.ndarray, np.ndarray]]. You can write your own — they compose left-to-right in the order they appear in the list.

Custom Datasets

Any object that supports __len__ and __getitem__ works. A sample is a tuple of NumPy arrays; a batch is what you get when the loader calls np.stack along a new leading axis:

class MyDataset:
    def __len__(self):
        return 1000

    def __getitem__(self, i):
        x = ...           # np.ndarray, e.g. (C, H, W)
        y = ...           # np.ndarray, e.g. ()  (scalar)
        return x, y       # tuple of ndarrays

For multi-field samples, return a longer tuple (or a dict with the same keys in every item). The loader will np.stack each field along a new leading axis and yield them in the same shape:

return x_img, x_mask, y_label
# loader yields (xb_img, xb_mask, yb_label) with shapes ((B, C, H, W), (B, H, W), (B,))

See also