Pipeline (Decorator-Only)
This guide introduces Alpenstock's lightweight pipeline utility for stage-based execution with disk cache and resume support.
Design Summary
- No base class is required.
- You define an
attrsclass and decorate it with@define_pipeline(...). - Stage methods are decorated with
@stage_func(id="...", order=<int>)and may be sync or async. - Cache files are saved under a user field bound via
save_path_field. - Stage payload uses pickle by default, with optional
__saver/__loaderhooks.
Field Kinds
Use the helper fields to declare semantics:
spec()input()state()output()transient()
All kinds can appear multiple times, including spec.
Unmarked fields are treated as transient.
spec() fields are frozen after construction, and on_setattr cannot be overridden.
Minimal Example
from pathlib import Path
from alpenstock.pipeline import (
define_pipeline,
stage_func,
spec,
input,
state,
output,
transient,
)
@define_pipeline(save_path_field="save_to", kw_only=True)
class ToyPipeline:
spec_lr: float = spec()
spec_steps: int = spec(default=10)
x: float = input()
y: float = input()
w: float = state(default=0.0)
loss: float = output(default=0.0)
save_to: str | Path | None = transient(default=None)
def run(self) -> None:
self.init_stage()
self.train_stage()
@stage_func(id="init", order=0)
def init_stage(self) -> None:
self.w = self.x + self.y
@stage_func(id="train", order=1)
def train_stage(self) -> None:
self.w = self.w - self.spec_lr * self.spec_steps
self.loss = self.w * self.w
Async Example
import asyncio
from pathlib import Path
from alpenstock.pipeline import define_pipeline, stage_func, spec, input, state, output, transient
@define_pipeline(save_path_field="save_to", kw_only=True)
class AsyncToyPipeline:
spec_lr: float = spec()
x: float = input()
y: float = input()
w: float = state(default=0.0)
loss: float = output(default=0.0)
save_to: str | Path | None = transient(default=None)
async def run(self) -> None:
await self.init_stage()
await self.train_stage()
@stage_func(id="init", order=0)
async def init_stage(self) -> None:
await asyncio.sleep(0)
self.w = self.x + self.y
@stage_func(id="train", order=1)
async def train_stage(self) -> None:
await asyncio.sleep(0)
self.w = self.w - self.spec_lr
self.loss = self.w * self.w
Runtime Behavior
- If
save_toisNone, no cache is used. - Stage calls are strictly ordered by ascending
order; only the next stage can be called. - Async stages must be awaited from async entry methods.
- Same-instance concurrent stage execution is unsupported and may fail fast.
- Sync stages remain sync; if you call them from async code, they may block the event loop.
- If
save_tois set, bootstrap happens on the first stage call (not before the entry method): - validate
spec.yaml(spec_fields+field_schema) - if
spec.yamlis missing while any*.pklsnapshot exists, raise an error and require manual cleanup - validate stage snapshot continuity by stage
order - write
spec.yamlonly for a fresh cache (no stage snapshots yet) - Each completed stage writes
<stage_id>.pkl. - On stage cache hit:
- stage body is skipped when its finished marker exists
stateandoutputare restored from snapshotinputandtransientkeep current instance values- In read-only mode, after the instance has been constructed:
- bootstrap does not create cache directories or write
spec.yaml - stage calls only restore from existing snapshots and do not execute the stage body
- missing or incomplete stage snapshots raise an error immediately
Validation Rules
save_path_fieldmust point to atransientfield.save_path_fieldannotation must be compatible withstr | Path(optionallyNone).specandinputfields must be constructor-initialized (init=Falseis not allowed).- Stage IDs must be unique in one class.
- Stage IDs must match
^[A-Za-z0-9_]+$. - Stage
ordermust be an integer (boolis rejected),>= 0, and unique in one class hierarchy. - Stage methods must take only
self. - Sync stage methods must return
None. - Async stage methods must resolve to
Nonewhen awaited. - Stage calls must be strictly sequential by
order; calling a future or already-finished stage raises an error. - On resume,
spec_fieldsand field schema (name -> kind) must matchspec.yaml. - If
spec.yamlis missing and any*.pklexists, bootstrap treats cache as corrupted and raises. - If an earlier stage snapshot is missing but a later snapshot exists, bootstrap raises an error and asks users to clean cache manually.
Persistence Model
spec.yaml stores:
spec_fields: normalized values of allspecfieldsfield_schema: all fields with kind labels
Each stage snapshot stores:
- full
state + output - stage completion markers
__stage_finished_<id>
Writes use atomic replace semantics.
Serialization Hooks
- Default stage payload serialization uses pickle.
- You can override stage payload I/O by implementing
__saver(path, payload)and__loader(path). - Hook lookup supports Python name mangling, so
def __saver(...)anddef __loader(...)work as expected. - In async stage execution, the sync hooks are invoked in worker threads so they do not block the event loop.
- Custom loader must return a
dict.
Helper Functions
The module also exposes three global helper functions:
get_state_dict(ins, *, spec=False, input=False, state=True, transient=False, output=True, include_finished_markers=False)load_pipeline(*, cls, cache_dir, read_only=True)(**overrides)load_spec(cls, save_to, *, include_field_schema=False)
get_state_dict returns grouped runtime data by kind. Example:
from pathlib import Path
from alpenstock.pipeline import get_state_dict, load_spec
p = ToyPipeline(
spec_lr=0.1,
spec_steps=10,
x=1.0,
y=2.0,
save_to=Path("./cache"),
)
p.run()
snapshot = get_state_dict(
p,
spec=True,
input=True,
state=True,
output=True,
include_finished_markers=True,
)
# {
# "spec": {...}, "input": {...}, "state": {...}, "output": {...},
# "finished_markers": {"init": True, "train": True}
# }
When include_finished_markers=True, markers come from the current instance runtime memory only.
The function does not bootstrap or read cache files from disk.
load_pipeline is a convenience helper for reopening an existing cache:
from pathlib import Path
from alpenstock.pipeline import load_pipeline
p = load_pipeline(
cls=ToyPipeline,
cache_dir=Path("./cache"),
)()
p.run() # only restores cached stages; does not execute stage bodies
print(p.loss)
p_rw = load_pipeline(
cls=ToyPipeline,
cache_dir=Path("./cache"),
read_only=False,
)(x=1.0, y=2.0)
Behavior:
- The outer call controls loader behavior (
cls,cache_dir,read_only). - The inner call forwards keyword overrides to the pipeline constructor.
- Saved
specfields are always loaded fromspec.yaml; constructor overrides cannot replace them. - The configured
save_path_fieldis always bound from the outercache_dirargument; do not pass it again in the inner overrides. - In read-only mode, missing required
inputfields are auto-filled withNoneif the constructor still needs them. - In read-only mode, the returned instance reuses existing cache on stage calls; missing
spec.yaml, a missing stage snapshot, or an incomplete stage snapshot raise clear errors instead of rebuilding cache through normal stage execution. - In read-write mode, normal cache/resume behavior is preserved, so callers should pass real input values.
load_pipelinedoes not backfillinit=Falsefields after construction. Ifsave_path_fieldisinit=False, the constructor itself must initialize it to the providedcache_dir.load_pipelinetype hints are intentionally approximate. Runtime checks and this guide define the exact calling constraints.
load_spec reads <save_to>/spec.yaml for a pipeline class:
spec_fields = load_spec(ToyPipeline, Path("./cache"))
full_spec = load_spec(ToyPipeline, Path("./cache"), include_field_schema=True)
If spec.yaml does not exist, load_spec returns None.
If spec.yaml exists, load_spec validates cached field_schema against the provided pipeline class.
For supported annotations, load_spec also reconstructs spec values back into Python objects:
- container types such as
list[...],tuple[...], anddict[str, ...]are rebuilt recursively - Python
dataclasstypes are rebuilt recursively from their field annotations attrs.defineclasses and pydantic models receive the cached mapping and handle their own construction for constructor-initialized fields- unsupported or unconstrained annotations fall back to the raw YAML-loaded value
spec(init=False)andinput(init=False)are unsupported by design and rejected when defining the pipeline
Best Practices
- Keep
__attrs_post_init__lightweight. It is fine for setup work such as allocating buffers or preparing lightweight runtime objects. - Put real data loading, expensive initialization, and any logic that depends on
inputinto the first stage whenever possible. load_pipeline(..., read_only=True)is a best-effort reopen mode. Constructor hooks such as__attrs_post_init__still run during instance creation, so strict read-only behavior only applies once stage wrappers take over.- This keeps cache, resume, and read-only reopening behavior aligned:
load_pipeline(..., read_only=True)works best when constructor hooks do not perform stage-like work.
Notes and Limitations
- Cache key is effectively
save_to + stage_id. - No automatic cache invalidation is performed for code/input changes.
- Different pipelines should not share the same
save_todirectory. - No concurrency guarantees are provided for sharing one cache path across processes.
- The framework rejects direct calls to non-next stages; users must call stages in strict
order. - In async pipelines,
__saver/__loaderstay sync hooks and are run in worker threads; do not rely on them being awaitable. specfreezing does not prevent in-place mutation of nested mutable objects (for example,dict/list).load_pipelineis best suited for inspecting or reopening an existing cache. If a class validates or consumesinputduring construction, callers may still need to provide those fields explicitly even in read-only mode.