Pagination Handling — Canonical v10
This document describes the Canonical v10 pagination patterns:
- Outer fan-out via
step.loop(often parallel/distributed) - Inner ordered pagination stream within one iteration lease using task policy (
jump/break)
Task format: Tasks use the canonical format with explicit
name:field (e.g.,{ name: "task_name", kind: "http", ... }). See Step Specification for details.
- No
eval:blocks - No
expr:keyword - No
step.when - No
step.spec.next_mode(routing mode belongs tonext.spec.mode) - Pagination is implemented as an ordered stream inside one iteration using task policy (
retry|jump|break|fail|continue) - “Sink” is not a tool kind — storage is just a tool task that returns a reference
1) Mental model
You usually want two layers:
- Outer fan-out over items (endpoints/cities/hotels) via
step.loop- can be parallel and even distributed
- Inner stream per item (pages/rooms) processed sequentially on a single worker lease
- implemented by a pipeline with
jumpback tofetch_pageuntilbreak
- implemented by a pipeline with
This gives you:
- parallelism where safe (across independent items)
- determinism where required (within one item’s pagination stream)
2) Text diagram (two layers)
2.1 Outer fan-out (loop parallel / distributed)
SERVER (control plane)
|
| token(step=fetch_all_endpoints, args={...})
v
SCHEDULE step.run (and loop plan) ──────────────────────────────┐
| |
v |
WORKERS (data plane) execute iterations in parallel |
| |
|-- iteration(endpoint=A) [single lease, sequential pages] |
|-- iteration(endpoint=B) [single lease, sequential pages] |
|-- iteration(endpoint=C) ... |
| |
+--> worker emits loop.done ────────────────────────────────────┘
|
v
SERVER evaluates next.arcs on event=loop.done
|
+--> validate_results (next token(s))
2.2 One iteration = sequential pagination stream (worker-local)
(init_iter)
|
(fetch_page) --policy retry--> (fetch_page)
|
(route_by_status) --jump--> store_200
| |
| +--jump--> paginate
|
+--jump--> store_404
|
+--jump--> paginate
(paginate)
|
+--jump--> fetch_page [if iter.has_more]
|
break [if not iter.has_more]
Key property: no fall-through. Router and store tasks use jump so you never “accidentally” execute both store branches.
3) Canonical variables & wrappers
3.1 Canonical tool outcome
Every tool task produces a final outcome object:
outcome.status:ok|erroroutcome.result: success output (small payload or reference)outcome.error: error object (kind,retryable, message, details)- kind helpers (HTTP, Postgres, etc.):
outcome.http.status,outcome.pg.code, ...
Task policy rules evaluate over outcome.
3.2 HTTP wrapper note (practical)
Many HTTP executors wrap API payloads (example from older docs): the API body sits under .data.
In canonical form, your policy reads are generally:
outcome.http.statusoutcome.result.data(if your HTTP kind wraps the body underresult.data)- or
outcome.result(if your HTTP kind returns the body directly)
Canonical recommendation: standardize HTTP kind to return:
outcome.result.data= API body (object)outcome.http.status= HTTP status code
Then templates can be stable.
4) Canonical pattern: parallel outer loop + sequential stream per iteration
This example:
- loops over endpoints in parallel
- per endpoint, fetches pages sequentially
- retries transient HTTP and DB errors
- routes storage by HTTP status (200 vs 404)
- uses
iterfor paging state - ends iteration via
break - transitions to next step via
next.arcsonloop.done/step.failed
- step: fetch_all_endpoints
loop:
spec:
mode: parallel
max_in_flight: 10
policy:
exec: distributed # optional placement intent
in: "{{ workload.endpoints }}"
iterator: endpoint
tool:
- name: init_iter
kind: noop
spec:
policy:
rules:
- else:
then:
do: continue
set_iter:
page: 1
has_more: true
- name: fetch_page
kind: http
method: GET
url: "{{ workload.api_url }}{{ iter.endpoint.path }}"
params:
page: "{{ iter.page }}"
pageSize: "{{ iter.endpoint.page_size }}"
spec:
timeout: { connect: 5, read: 15 }
policy:
rules:
# transient retry (rate limit / gateway / service unavailable)
- when: "{{ outcome.status == 'error' and outcome.http.status in [429,500,502,503,504] }}"
then: { do: retry, attempts: 10, backoff: exponential, delay: 2.0 }
# permanent fail (auth)
- when: "{{ outcome.status == 'error' and outcome.http.status in [401,403] }}"
then: { do: fail }
# on success, capture status + paging fields for routing and pagination
- else:
then:
do: continue
set_iter:
http_status: "{{ outcome.http.status | default(200) }}"
has_more: "{{ outcome.result.data.paging.hasMore | default(false) }}"
page: "{{ outcome.result.data.paging.page | default(iter.page) }}"
items: "{{ outcome.result.data.data | default([]) }}"
- name: route_by_status
kind: noop
spec:
policy:
rules:
- when: "{{ iter.http_status == 404 }}"
then: { do: jump, to: store_404 }
- else:
then: { do: jump, to: store_200 }
- name: store_200
kind: postgres
auth: pg_k8s
command: |
INSERT INTO results_ok (execution_id, endpoint, page, items, items_count)
VALUES (...)
spec:
policy:
rules:
- when: "{{ outcome.status == 'error' and outcome.pg.code in ['40001','40P01'] }}"
then: { do: retry, attempts: 5, backoff: exponential, delay: 2.0 }
- when: "{{ outcome.status == 'error' }}"
then: { do: fail }
- else:
then: { do: jump, to: paginate }
- name: store_404
kind: postgres
auth: pg_k8s
command: |
INSERT INTO results_not_found (execution_id, endpoint, page)
VALUES (...)
spec:
policy:
rules:
- when: "{{ outcome.status == 'error' }}"
then: { do: fail }
- else:
then: { do: jump, to: paginate }
- name: paginate
kind: noop
spec:
policy:
rules:
- when: "{{ iter.has_more == true }}"
then:
do: jump
to: fetch_page
set_iter:
page: "{{ (iter.page | int) + 1 }}"
- else:
then: { do: break }
next:
spec: { mode: exclusive }
arcs:
- step: validate_results
when: "{{ event.name == 'loop.done' }}"
- step: cleanup
when: "{{ event.name == 'step.failed' }}"
Why this is canonical
- No fall-through: router
jumps to exactly one store task; store tasksjumpto paginate. - Sequential pages per iteration:
paginateloops viajumpwithin one iteration lease. - Parallelism only across iterations: controlled by
loop.spec.
5) Routing storage by status code (more cases)
You can extend route_by_status for multiple stores:
- name: route_by_status
kind: noop
spec:
policy:
rules:
- when: "{{ iter.http_status == 404 }}"
then: { do: jump, to: store_404 }
- when: "{{ iter.http_status == 409 }}"
then: { do: jump, to: store_conflict }
- when: "{{ iter.http_status in [500,502,503,504] }}"
then: { do: jump, to: store_server_error }
- else:
then: { do: jump, to: store_200 }
This keeps “if/else” logic inside policy, using only when.
6) Retry in pagination streams
6.1 Retry the fetch itself
Retry transient errors in fetch_page policy (as shown above).
6.2 Retry storage
Retry retryable DB errors in store_* policy.
6.3 Avoid duplications
If your store is not idempotent, prefer:
- upserts
- unique keys (
execution_id,endpoint,page) - transaction retries with safe replays
7) While / Until (canonical guidance)
Canonical v10 uses policy + jump/break to implement looping.
However, you can model a “while/until” concept by convention inside the paginate task:
while: continue jumping while condition is trueuntil: jump until condition becomes true
Example (until):
- name: paginate
kind: noop
spec:
policy:
rules:
- when: "{{ iter.has_more != true }}"
then: { do: break }
- else:
then:
do: jump
to: fetch_page
set_iter:
page: "{{ (iter.page | int) + 1 }}"
(If you later add first-class while/until, compile it down to this policy form.)
8) Quick reference: expressions you can use
8.1 Variables available in policy rules
Inside task.spec.policy.rules[].when you can reference:
outcome.*(status/result/error/meta + kind helpers)iter.*(if loop present)ctx.*args.*_prev,_task,_attemptworkload.*
8.2 Common pagination guards
- page-number:
iter.has_more == true - cursor:
iter.next_cursor is not none - offset:
iter.offset < iter.total
8.3 Common transient error checks
- HTTP transient:
outcome.http.status in [429,500,502,503,504] - DB deadlock/serialization:
outcome.pg.code in ['40001','40P01']
9) Legacy note (non-canonical)
Older NoETL docs included a pagination: block with continue_while and next_page and merge_path.
Canonical v10 replaces that with:
- iteration state (
iter) + ordered tasks - task policy +
jump/break - explicit storage tasks returning references
If you keep the legacy paginator for backward compatibility, treat it as a compiler into the canonical streaming form.