Pipeline Execution (Canonical v10)
Pipeline execution in NoETL is an atomic, ordered sequence of tasks executed within a single step-run on a single worker lease.
Canonical v10 properties:
- Data threading via
_prev(previous task result) - Per-task flow control via
task.spec.policy.rules(when→then.do) - Deterministic replay via the event log (no hidden control flow)
- No legacy
eval/expr, no step-levelcase, and nostep.when - Server owns step admission + routing; worker owns pipeline execution + task policy control
Overview (control plane vs data plane)
High-level execution sequence:
token arrives at step
↓
SERVER: admission gate (step.spec.policy.admit.rules)
↓
SERVER: step.scheduled → dispatch step-run to worker
↓
WORKER: executes step.tool tasks in order
task1 → policy → task2 → policy → ... → terminal
↓
WORKER: emits step.done / step.failed / loop.done
↓
SERVER: evaluates step.next.arcs[] on boundary event → schedules next step(s)
Hard rule: worker task policy MUST NOT start steps. Only server routing (step.next.arcs[]) starts steps.
Canonical step shape (pipeline + router)
- step: fetch_transform_store
spec:
policy:
admit:
rules:
- else:
then: { allow: true }
tool:
- fetch:
kind: http
method: GET
url: "{{ workload.api_url }}/data"
spec:
policy:
rules:
- when: "{{ outcome.status == 'error' and outcome.http.status in [429,500,502,503,504] }}"
then: { do: retry, attempts: 5, backoff: exponential, delay: 2 }
- when: "{{ outcome.status == 'error' }}"
then: { do: fail }
- else:
then: { do: continue }
- transform:
kind: python
args: { data: "{{ _prev }}" }
code: |
result = transform(data)
- store:
kind: postgres
auth: pg_k8s
command: "INSERT INTO ..."
spec:
policy:
rules:
- when: "{{ outcome.status == 'error' and outcome.pg.code in ['40001','40P01'] }}"
then: { do: retry, attempts: 5, backoff: exponential, delay: 2 }
- when: "{{ outcome.status == 'error' }}"
then: { do: fail }
- else:
then:
do: break
set_ctx: { store_ok: true }
next:
spec: { mode: exclusive }
arcs:
- step: continue
when: "{{ event.name == 'step.done' and ctx.store_ok == true }}"
- step: failed
when: "{{ event.name == 'step.done' and ctx.store_ok != true }}"
- step: failed
when: "{{ event.name == 'step.failed' }}"
Notes:
step.toolis always an ordered pipeline (nopipe:block).- Task labels (
fetch,transform,store) are stable identifiers forjumptargets and event correlation. step.nextis a router object:next.spec+next.arcs[].
Runtime variables (templates + policies)
Available during pipeline execution:
| Name | Scope | Meaning |
|---|---|---|
workload | execution | immutable merged inputs |
keychain | execution | resolved credentials (read-only) |
ctx | execution | mutable execution context (event-sourced patches) |
iter | iteration | mutable iteration context (loop only; isolated per iteration) |
args | step-run | token payload / arc inscription |
_task | pipeline | current task label |
_prev | pipeline | previous task result (outcome.result of previous task) |
_attempt | pipeline | attempt number for current task (1-based) |
outcome | pipeline | current task outcome envelope (policy evaluation only) |
event | routing | boundary event (server-side next.arcs[].when evaluation) |
There is no canonical
varsscope in v10. Useiter(within loops) andctx(cross-step) via policy patches.
Outcome envelope (outcome)
Each tool invocation produces exactly one final outcome:
outcome.status:"ok"or"error"outcome.result: success output (small inline payload or reference)outcome.error: structured error object (MUST includekind; SHOULD includeretryable)outcome.meta: attempt, duration, timestamps, trace ids
Tool helpers MAY be present (examples):
- HTTP:
outcome.http.status,outcome.http.headers - Postgres:
outcome.pg.code,outcome.pg.sqlstate - Python:
outcome.py.exception_type
Task policy rules (control actions)
Canonical placement: task.spec.policy.rules.
Shape:
spec:
policy:
rules:
- when: "{{ ... }}"
then:
do: continue|retry|jump|break|fail
attempts: 5
backoff: none|linear|exponential
delay: 1.0
to: some_task_label
set_iter: { ... }
set_ctx: { ... }
- else:
then: { do: continue }
Semantics:
- Evaluate rules top-to-bottom; first match wins;
elseis fallback. - If
spec.policyis omitted:- ok →
continue - error →
fail
- ok →
- If policy is present but no rule matches and no
elseis provided:- default →
continue(canonical v10 default)
- default →
Streaming pagination inside a pipeline (canonical pattern)
Pagination and polling are modeled as pipeline control flow:
- keep counters/cursors in
iter.* - use a
paginatedecision task thatjumps back tofetch_pageorbreaks
For a full canonical example, see documentation/docs/reference/dsl/pagination.md.
Best practices
- Prefer policy-based retry (
do: retry) over tool-internal retry so events stay accurate and replayable. - Use
iterfor per-item paging/cursors; use external storage + ResultRefs for large page bodies. - Avoid writing conflicting keys into
ctxfrom parallel iterations until reducers/atomics exist.