Result Storage & References — current DSL
This document merges and supersedes the older result/TempRef docs and updates them to the latest current DSL rules:
- No
sinkconcept: “sink” is a pattern, not a tool kind. - No
eval:/expr:. Outcome handling usesspec.policy.ruleswithwhen. - Events and step context pass reference envelopes only (status + optional
reference+ optional boundedcontext), never raw output bodies. - Storage backends: NATS KV, S3-compatible object storage, Google Cloud Storage, Postgres.
See also:
- current DSL runtime results model:
documentation/docs/reference/dsl/runtime_results.md - Legacy v2 doc (pre-policy DSL):
documentation/docs/reference/result_storage_v2.md
0) Goals
- Event-sourced correctness: event log remains the source of truth for what happened.
- Efficiency: large payloads MUST NOT bloat the event log.
- Composable access: downstream steps can reference:
- latest result
- per-attempt (retry) results
- per-page (pagination) results
- per-iteration (loop) results
- combined results via manifests (streamable)
- Pluggable storage: store results in Postgres / NATS KV / S3-compatible object store / GCS, and keep only refs in events.
- Streaming-friendly: combine via manifests; avoid giant merged arrays.
1) Concepts
1.1 Tool output vs step result
- Tool output: output of a single task invocation (HTTP call, DB command, Python run).
- Step result: the logical outcome of a step; may include multiple tool outputs from pagination/loop/retry.
1.2 Reference-first rule
A task output is either:
- inline (only if size is less than or equal to the cap), OR
- stored externally with a ResultRef (preferred).
The event log stores: metadata + ResultRef + extracted fields + preview.
2) ResultRef (Standard pointer)
2.1 Structure
{
"kind": "result_ref",
"ref": "noetl://execution/123/step/fetch/task/fetch_page/run/abc123",
"store": "memory|kv|disk|s3|gcs|db|eventlog",
"scope": "step|execution|workflow|permanent",
"expires_at": "2026-02-01T13:00:00Z",
"meta": {
"content_type": "application/json",
"bytes": 52480,
"sha256": "abc123...",
"compression": "gzip"
},
"ipc": {
"kind": "arrow_ipc",
"shm_name": "noetl_frame_e2606e14_8918ba44",
"schema_digest": "ca41f6...",
"byte_length": 448,
"row_count": 2,
"producer": "noetl-worker-...",
"node_id": "noetl-control-plane",
"lease_expires_at": "2026-05-21T05:08:09Z",
"media_type": "application/vnd.apache.arrow.stream"
},
"extracted": {
"has_more": true,
"page": 2,
"next_cursor": "c_123"
},
"preview": {
"truncated": true,
"bytes": 1024,
"sample": [{"id": 1}, {"id": 2}]
}
}
2.2 Logical vs physical addressing
refis a logical URI (noetl://...) stable across backends.- Physical details (bucket/key/object/table range) live in
metaand/or a server mapping. ipcis an optional same-node acceleration hint. It is never the source of truth; the durablerefmust remain resolvable without it.
3) Storage backends (Standard)
3.1 NATS KV (small)
Use for:
- cursors, tokens, small JSON
- small page payloads (within practical limits)
Recommended ResultRef meta:
meta.bucket,meta.key
3.2 Arrow IPC shared memory (same-node accelerator)
Use for:
- cursor frame rows
- tabular payloads that a co-located worker may consume immediately
- producer-side map/reduce style fan-out where durable fallback is still required
This is Tier 1.5 in the event-store design. The payload is serialized as
Apache Arrow streaming IPC and may be written into a lease-bound shared-memory
segment. The ResultRef carries an optional ipc hint with the shared-memory
name, Arrow schema digest, byte length, row count, producer identity, node
identity, lease expiration, and media type.
Resolution policy:
- Validate the hint is present, unexpired, and local to the current node.
- Attempt to attach and read Arrow IPC bytes from shared memory.
- If the IPC read fails, increment a miss counter and fall back to the durable
ref. - Decode Arrow IPC bytes into rows for consumers that request materialized payloads.
The implementation records metrics for admission attempts/success/failures, read attempts/hits/misses, fallback reads, and hit ratio. A live Phase 3 proof validated an IPC read, forced eviction of the hint, and then validated durable fallback returned identical rows.
3.3 S3-compatible object storage (medium and large)
Use for:
- multi-MB payloads
- page payloads and streamed chunks that should not live in KV
- local development parity with cloud object storage
Recommended ResultRef meta:
meta.bucket,meta.key, optionalmeta.etag
Guidance:
- Prefer S3-compatible object store / S3 / GCS once payloads exceed about 1 MB.
- The NATS Object Store tier (
store: "object") was removed in the phase 0 RisingWave-alignment release; payloads that referenced it are auto-remapped tostore: "disk"(local SSD cache + async cloud spill). See Storage and Streaming Alignment with RisingWave.
3.4 Google Cloud Storage (large / durable)
Use for:
- large payloads
- longer retention
- cross-system access
Recommended ResultRef meta:
meta.bucket,meta.object(ormeta.uri)
3.5 Postgres (queryable)
Use for:
- queryable intermediate tables (facts, audit rows)
- projections and indices for refs
Recommended ResultRef meta:
meta.schema,meta.table,meta.range(ormeta.pk)
standard: Postgres is both event store/projections and optionally a result store (tables).
4) Auto-selection (recommended)
A runtime SHOULD support store.kind: auto with size-aware selection:
<= inline_max_bytes→ inline<= kv_max_bytes→ NATS KV<= object_store_threshold_bytes→ S3-compatible object store- else → GCS (or Postgres table when queryability is required)
Thresholds are runtime config, but the tier model is stable.
Arrow IPC shared memory is not chosen as the durable store in store.kind.
It is attached opportunistically to a durable reference by runtime features such
as cursor frame capture.
5) DSL configuration
5.1 Where config lives
Result storage config is per-task under task.spec.result.
- fetch_page:
kind: http
method: GET
url: "{{ workload.api_url }}/items"
spec:
result:
inline_max_bytes: 65536
preview_max_bytes: 2048
store:
kind: auto # auto|nats_kv|object-store|gcs|postgres
scope: execution # step|execution|workflow|permanent
ttl: "1h"
compression: gzip
select:
- path: "$.paging.hasMore"
as: has_more
- path: "$.paging.page"
as: page
- path: "$.paging.nextCursor"
as: next_cursor
5.2 Extracted fields
select fields become ResultRef.extracted.*.
They are small, safe for:
- routing decisions
- pagination state
- UI/observability
6) Manifests (combined result without bloat)
6.1 Manifest object
{
"kind": "manifest",
"strategy": "append|concat|merge|replace",
"merge_path": "$.data.items",
"parts": [
{ "ref": "noetl://.../page/1/..." },
{ "ref": "noetl://.../page/2/..." }
],
"total_parts": 2,
"total_bytes": 30270
}
6.2 Storage
Manifests are stored reference-first like any other result:
- inline if tiny
- else S3-compatible object store / GCS / Postgres
7) Correlation keys (MUST for indexing)
Each task.done event SHOULD include when applicable:
iteration,iteration_id(loop)page(pagination)attempt(retry)step_run_id,task_run_id
This supports retrieval patterns:
- page 3 of iteration 7
- last successful attempt for page 2
- all parts for step X
8) Passing results to next steps (reference-only)
Standard rule: server binds only latest task envelopes, not full bodies.
Recommended runtime binding in tool: []:
- key is task label; if omitted key is
task_<index> - value is latest envelope for that key (
status+ optionalreference+ optionalcontext) goto/jump/retry overwrite the same key with latest result_prevpoints to the latest executed task envelope- pointer scope is local to that step pipeline only
Downstream steps:
- use context fields directly
- explicitly resolve full payload if needed via reference
Recovery rule:
- if a reference cannot be resolved (for example worker restarted before persist), runtime returns
outcome.error.code = REFERENCE_NOT_AVAILABLE - policy can replay by
jumpto the producer task, includingto: previousfor immediate predecessor replay insidetool: [].
9) Resolving refs (explicit)
Full payload resolution MUST be explicit:
- server API:
GET /results/resolve?ref=... - or tool:
kind: artifact/kind: resultwithaction: get
Example:
- load_full_page:
kind: artifact
action: get
args:
ref: "{{ fetch_page.__ref__ }}"
10) Standard pagination + streaming pattern (single worker logical thread)
Use loop for outer fan-out (endpoints/cities/hotels), but inside each iteration run a sequential stream:
- fetch page
- transform
- route store by response code (200 vs 404 etc.)
- decide continue (jump fetch) or finish (break)
All control is via task policy: spec.policy.rules.
11) Implementation requirements (what to build)
11.1 Worker-side (data plane)
Implement a result handler that runs after each task:
- stable serialize (JSON)
- apply
task.spec.result:- preview (optional)
- select/extract (optional)
- choose store backend
- write payload externally (unless inline)
- return final
outcomewith:outcome.resultcontaining small fieldsoutcome.result.__ref__containing ResultRef (if externalized)
Emit task.done with:
outcome.status,outcome.error,outcome.meta- inline small result or ResultRef (+ extracted + preview)
- correlation keys
11.2 Server-side (control plane)
On event ingest:
- persist event
- upsert projections:
result_indexkeyed by execution/step/task/iteration/page/attemptstep_statekeyed by execution/step with latest refs and status
Context binding:
- inject extracted fields and ResultRefs only
12) Garbage collection
Use scope + ttl:
step→ cleanup at step finalizeexecution→ cleanup at execution finalizeworkflow→ cleanup at workflow finalizepermanent→ manual only
Backend-specific deletes:
- NATS KV: delete key
- S3-compatible object storage: delete object
- GCS: delete object
- Postgres: delete rows (or partition retention)
13) Terminology: TempRef vs ResultRef
Legacy docs used TempRef (kind: temp_ref) for step-to-step pointers.
current DSL uses ResultRef (kind: result_ref) everywhere.
If reading legacy:
- treat
temp_refas alias ofresult_refat resolution time - emit only
result_refgoing forward