Pipelines
Transform and persist volumes using eager and lazy pipelines on VolumeCollections. For the full API, see Query API.
Eager vs Lazy
| Mode | Entry Point | Execution | Best For |
|---|---|---|---|
| Eager | vc.map(fn) |
Immediate — all volumes loaded and transformed | Small datasets, chaining, inspection |
| Lazy | vc.lazy().map(fn) |
Deferred — single-pass on .write() |
Large datasets, ETL pipelines |
Transform Signature
All transforms receive (volume, obs) — the volume array and its obs metadata row:
def normalize(volume: np.ndarray, obs: pd.Series) -> np.ndarray:
return (volume - volume.mean()) / volume.std()
def resample(order):
def fn(volume, obs):
spacing = eval(obs["voxel_spacing"])
factors = tuple(s / 2.0 for s in spacing)
return zoom(volume, factors, order=order)
return fn
Returning obs Updates
Transforms can return a second element to annotate obs metadata:
Updates accumulate through chained .map() calls and are written to obs on .write().
Eager Pipelines
vc.map(fn) applies the transform immediately and returns an EagerQuery:
# Chain transforms, then extract results
results = radi.CT.map(normalize).map(crop).to_list()
# Write transformed volumes to storage
ct_norm = radi.CT.map(normalize).write("s3://bucket/ct_norm")
# Extract non-volume results (e.g. statistics)
stats = radi.CT.map(lambda v, obs: {"mean": v.mean()}).to_list()
Lazy Pipelines
vc.lazy() returns a LazyQuery — transforms are deferred until .write():
Filter Before Transform
Lazy queries support filtering to reduce the number of volumes processed:
ct_subset = (
radi.CT.lazy()
.filter("voxel_spacing == '(1.0, 1.0, 1.0)'")
.head(100)
.map(normalize)
.write("./output")
)
Streaming
Stream volumes one at a time from a LazyQuery:
for vol in radi.CT.lazy().filter("split == 'train'").iter_volumes():
data = vol.to_numpy()
process(data)
Stack to Numpy
Load all matching volumes as a stacked array (N, X, Y, Z):
End-to-End Example
Resample and assemble a new RadiObject:
from radiobject import RadiObject
URI = "s3://bucket/study_2mm"
radi = RadiObject("s3://bucket/study_raw")
ct_2mm = radi.CT.lazy().map(resample(order=1)).write(f"{URI}/collections/CT", name="CT")
seg_2mm = radi.seg.lazy().map(resample(order=0)).write(f"{URI}/collections/seg", name="seg")
radi.add_collection(name="CT", vc=ct_2mm)
radi.add_collection(name="seg", vc=seg_2mm)