Skip to main content

NoETL Distributed Runtime + Event-Sourced Shared Memory Spec

Status: design proposal, ready for staged refactoring. Owner: NoETL core (repos/noetl). Companion specs that this revision builds on (do not re-spec):

  • noetl_async_sharded_architecture.md — sharded projection workers, epoch barriers.
  • noetl_cursor_loop_design.md — cursor-driven worker loops (already in production for PFT v2).
  • noetl_data_plane_architecture.md — control/data plane split, reference-only event envelopes, TempStore tiers.
  • noetl_distributed_processing_plan.md — phased rollout: P0 shipped, P1–P3 partial.
  • noetl_pft_performance_optimization.md — PFT-specific tuning.
  • distributed_fanout_mode_spec.md — designed but not shipped; this spec absorbs it.
  • nats_kv_distributed_cache.md — NATS K/V scope.

Not in scope here: DSL surface changes or the playbook authoring guide. Event-sourcing invariants are in scope because they are the contract that lets the distributed runtime reproduce system state at a requested point in time.


1. Why this revision exists

The PFT v2 workload (fixtures/playbooks/pft_flow_test/test_pft_flow_v2.yaml) is the canonical stress test for the distributed runtime. It exercises:

  • 10 facilities × 1000 patients × (assessments × 4 pages + conditions × 3 + medications × 3 + vital signs + demographics + MDS detail fan-out) ≈ 120k HTTP requests + ~26k claim cycles per execution.
  • A successful 2026-05-15 local-kind run completed in 3h 54m 21s, wrote 26,199 commands to noetl.event with event_type='command.*', and required two automatic recoveries by the in-process command reaper.

Even after the cursor-loop refactor (noetl_cursor_loop_design.md) eliminated per-iteration collection materialization, three structural costs remain:

CostWhere it shows upOrder of magnitude in PFT v2
Server-side per-claim coordinationclaim_next_loop_indices and _issue_cursor_loop_commands in repos/noetl/noetl/core/dsl/engine/executor/one HTTP claim round-trip per cursor row
Per-fragment event amplificationeach worker claim emits command.issued / claimed / started / call.done / step.exit / command.completed6× events × ~26k fragments ≈ 150k event rows
Server-centric projectiononly the server folds events into projection statesingle writer to projection store; saturates Postgres pool under MDS bursts

The 2026-05-15 GKE amber run (memory/archive/2026/05/20260515-193100-gke-runtime-reaper-pft-v2-amber.md) saw both the NoETL API DB pool and NATS get into pressure during facility-1 MDS, confirming the central path is the bottleneck.

This spec proposes the next reduction step, but with one correction to the mental model: the event log is not an implementation detail or a backend adapter concern. It is the temporal kernel of the NoETL business operating system. Frames, Arrow IPC, distributed caches, local indexes, streaming materialized views, analytical projections, and object storage are accelerators around that kernel. They must never become the only source of state.

That means every durable state transition must be representable as an append-only event with a deterministic serialization contract. Projections, caches, materialized views, local shared memory regions, analytical tables, streaming state, and distributed indexes are all rebuildable derived state. Given an execution id, tenant id, and timestamp or event position, NoETL must be able to replay the canonical event stream and reconstruct the system's command state, frame state, loop progress, projection outputs, and payload references as they existed then.

The implementation priorities are:

  1. Event-sourced state reproduction. noetl.event remains the canonical append-only ledger. Every frame, lease, cursor, payload reference, projection checkpoint, and tenant-visible business state transition is replayable from event data plus immutable payload objects.
  2. Shared memory as a core advantage. The runtime uses a tiered shared-state fabric: in-process memory, node-local Arrow IPC, local disk/NVMe, NATS K/V for small distributed coordination state, and durable object storage. This fabric makes colocated execution fast without weakening replay.
  3. Worker-side batched loop execution. Workers claim and execute frames (multi-row windows) instead of single cursor rows. Per-fragment events drop by a factor equal to the frame size.
  4. Decentralized projection. Projection workers become pluggable, sharded, and horizontally scalable, with NATS consumer groups for fan-out and a stable identity scheme so they can re-attach to their shard after restart.
  5. Cloud-agnostic event / payload / projection store abstractions. Port/adapter architecture for NATS JetStream / Kafka / Pub/Sub / Event Hubs / Kinesis; for S3 / GCS / Azure Blob / SeaweedFS; and for operational, analytical, search, vector, and streaming projection backends.

The end state is a distributed business operating system in the NoETL sense: every tenant, organization, execution, compute actor, queue, stream, cache object, projection, and payload is addressable via a unified resource locator; workers scale on real backlog signals; loops collapse to map-reduce style stages whose data plane uses shared memory whenever colocated and durable storage otherwise; and auditors/operators can reproduce the state of the organization at a given event position.


2. Non-goals

  • This is not a rewrite of the NoETL DSL. Existing next.arcs[], set:, mode: semantics stay.
  • This does not replace Postgres as the default projection store. Postgres remains the reference; new backends are opt-in.
  • This is not a new Arrow distribution mechanism. We use pyarrow and arrow-rs as-is.
  • This is not a green-field event sourcing platform. We keep noetl.event as the canonical source of truth; new layers wrap, distribute, cache, mirror, and project it, but they do not replace it.

3. Architecture overview

┌─────────────────────────────────────────────┐
│ Server (planner) │
│ - schedules stages, not iterations │
│ - mints frame leases on each loop │
│ - exposes claim/heartbeat/commit endpoints │
└────────────┬──────────────────┬─────────────┘
│ control events │ frame leases
▼ ▼
┌─────────────────────────┐ ┌──────────────────────────┐
│ Event Store (Tier A) │ │ Projection Store (B) │
│ NATS JS / Kafka / │ │ Postgres / DynamoDB / │
│ Pub/Sub / Event Hubs / │ │ Firestore / Cassandra / │
│ Kinesis │ │ analytical / search / │
│ │ │ operational backends │
│ (append-only ledger + │ │ (derived, replayable │
│ stream adapters) │ │ state) │
└──────────┬──────────────┘ └────────────┬─────────────┘
│ subscribe │ project
▼ ▼
┌─────────────────────────────────────────────┐
│ Workers (runners) │
│ - claim a frame (N rows / N seconds / │
│ bounded memory) │
│ - run inner DSL block in-process │
│ - accumulate results in Arrow RecordBatch │
│ - flush to IPC + Tier 3 + emit one event │
│ - heartbeat lease; on crash, frame is │
│ handed off via cursor checkpoint │
└────────────┬──────────────────┬─────────────┘
│ │
▼ ▼
┌─────────────────────────┐ ┌──────────────────────────┐
│ Tier 1.5 — Arrow IPC │ │ Tier 3 — Payload Store │
│ shared memory │ │ S3 / GCS / Azure Blob / │
│ (same host only) │ │ SeaweedFS (durable, │
│ │ │ content-addressed) │
└─────────────────────────┘ └──────────────────────────┘

Three core moves vs today:

  • Event stream as the kernel. Append-only events plus immutable payload objects are the durable operating-system journal. Everything else is an index, cache, materialized view, or delivery stream derived from that journal.
  • Stage-shaped scheduling. The server no longer thinks in cursor rows; it thinks in frames. A frame is a unit of work a worker can execute end-to-end, of bounded duration and bounded result size, and is independently recoverable.
  • Worker as the loop interpreter. The inner DSL block of a loop step is interpreted inside the worker that owns the frame, not via N round-trips to the server.
  • Data plane separate from control plane. Frame outputs flow as Arrow record batches over shared memory when possible and as content-addressed objects in Tier 3 always. The event store only ever carries lightweight manifests.

3.1 Event-sourced state contract

NoETL has one canonical state rule:

If it must be reproduced, audited, routed, billed, retried, or explained later, it must be represented by an event and any referenced immutable payload objects.

The event stream is the durable timeline for a multitenant organization. Projections are allowed to be fast, denormalized, and backend-specific, but they must be disposable. A projection rebuild from (tenant_id, organization_id, execution_id, from_position, to_position) must produce the same state as the original run, modulo explicitly declared non-deterministic external side effects.

Required event envelope fields:

{
"event_id": "snowflake-or-ulid",
"tenant_id": "tenant-123",
"organization_id": "org-456",
"execution_id": "987654321",
"stream_id": "execution/987654321/stage/fetch_patients",
"aggregate_id": "frame/123",
"aggregate_type": "frame",
"event_type": "frame.committed",
"schema_name": "noetl.frame.committed",
"schema_version": 1,
"event_time": "2026-05-16T12:34:56.789Z",
"ingest_time": "2026-05-16T12:34:56.812Z",
"producer": "noetl://tenant/tenant-123/org/org-456/cluster/prod-1/.../worker/cpu-01",
"causation_id": "event-that-caused-this-event",
"correlation_id": "execution-or-business-flow-id",
"idempotency_key": "tenant/org/execution/frame/event-kind/attempt",
"payload_ref": {
"uri": "noetl://tenant/tenant-123/org/org-456/payloads/sha256/...",
"sha256": "...",
"media_type": "application/vnd.apache.arrow.stream"
},
"meta": {}
}

Envelope invariants:

  • tenant_id and organization_id are mandatory on every event and payload reference. They are the isolation boundary for routing, replay, retention, encryption, and billing.
  • event_id is globally unique and monotonic enough for local ordering, but replay correctness depends on (stream_id, stream_version) or backend position, not wall-clock time.
  • expected_version is enforced per aggregate/stream where the backend supports it. Where the transport cannot enforce it directly, a side index records stream versions.
  • idempotency_key is mandatory for externally retried transitions. Duplicate delivery may occur; duplicate durable effects must not.
  • payload_ref points to immutable content. Events never point to mutable cache paths as their only copy.

3.2 Time-travel and replay

The replay API is a first-class implementation target, not a diagnostic afterthought:

GET /api/replay/state
query:
tenant_id
organization_id
execution_id
as_of_event_id | as_of_position | as_of_time
projection = execution | frame | loop | business_object | all

Replay reconstructs state by:

  1. Loading the latest validated snapshot at or before the requested position.
  2. Reading canonical events from that snapshot position through the requested cutoff.
  3. Resolving immutable payload references through the payload store.
  4. Applying schema upcasters in deterministic order.
  5. Folding events with the same projection code used by live projectors.

Snapshots are performance accelerators. They are never the authority. A snapshot must record:

  • event position covered;
  • projection code version;
  • schema upcaster versions;
  • payload digest set or Merkle root;
  • tenant encryption context;
  • deterministic fold checksum.

Replay verification becomes a release gate for every phase after Phase 0: run live execution, rebuild projections from events, and compare checksums for execution state, frame state, loop progress, and configured business projections.

3.3 Visual component diagram

The diagram below maps all named components to their roles, storage tiers, and primary data flows. Solid arrows are runtime data paths; dashed arrows are control or fallback paths.

graph TD
subgraph TENANT["Tenant / Organization Boundary"]

subgraph CP["Control Plane"]
SRV["Server / Planner\nStage scheduling · Frame lease minting\nClaim / Heartbeat / Commit endpoints\nFrame reaper"]
end

subgraph KERNEL["Event Kernel — Append-Only Temporal Authority"]
PG[("Canonical Log\nPostgres noetl.event\npartitioned by tenant / time / execution")]
DIST[("Distribution Stream\nNATS JetStream · Kafka · Pub/Sub\nEvent Hubs · Kinesis\nfan-out transport — not canonical authority")]
end

subgraph OBJSTORE["Immutable Payload Store — Port/Adapter"]
PAY[("Content-Addressed Objects\nS3 · GCS · Azure Blob · SeaweedFS\nsha256-keyed · immutable on write\nstreaming upload / download")]
end

subgraph FABRIC["Shared-State Fabric — Rebuildable from Events"]
T1["Tier 1\nIn-process LRU\nper process"]
T15["Tier 1.5\nApache Arrow IPC shm / memfd\nsame-node zero-copy"]
T2["Tier 2\nNVMe disk cache\nper node"]
NKV["NATS K/V\nLeases · counters · coordination"]
end

subgraph EXEC["Execution Layer — StatefulSet"]
W["Worker / Frame Executor\nClaims N-row frames from stage\nRuns inner DSL block in-process\nArrow RecordBatch to Tier 1.5 + Tier 3\n1 canonical event per frame committed"]
PROJ["Projector — N shards\nDurable NATS pull consumer per shard\nPayloadRef resolved via cache hierarchy\nHorizontally scalable · shard-stable across restarts"]
end

subgraph PSTORES["Projection Stores — Derived · Replayable — Port/Adapter"]
OP[("Operational\nPostgres\nExecution status · API reads")]
AN[("Columnar Analytical\nEvent lake · billing facts · org audit timeline")]
MV[("Streaming MV\nBarrier-aligned incremental aggregations\nWork-queue derived state")]
SV[("Search / Vector\nRetrieval shapes only\nRebuilt from event / payload source")]
end

PG -->|mirror| DIST
SRV -->|stage.opened · frame lease events| PG
SRV -->|frame claim dispatch| W
DIST -->|durable pull consumer| W
DIST -->|durable pull consumer| PROJ

W -->|Tier 3 — always durable| PAY
W -->|Tier 1.5 — best-effort fast path| T15
W -->|1 canonical event per frame| PG

T1 -. miss .-> T15
T15 -. miss .-> T2
T2 -. miss .-> PAY

PROJ -->|resolve PayloadRef| T1
PROJ -->|write| OP
PROJ -->|write| AN
PROJ -->|write| MV
PROJ -->|write| SV

SRV -. Replay API as_of_position .-> PG
SRV -. Replay API resolve payload refs .-> PAY

end

Key observations from the diagram:

  • Event Kernel is the single append authority. The canonical log is the only target for durable writes. The distribution stream mirrors it for low-latency fan-out but carries no replay authority.
  • Payload Store and Canonical Log are the correctness pair. Losing either breaks replay. Every other tier and store can be dropped and rebuilt from these two.
  • Shared-state fabric follows a strict miss chain. Tier 1 → 1.5 → 2 → Tier 3 (Payload Store). Writers always write Tier 3 first; higher tiers are best-effort and admission-controlled.
  • Projectors are the sole writers to projection stores. After Phase 2, the server never writes projection state directly.
  • Port/Adapter boundaries (labeled on subgraphs) mark where backend substitutions happen at deploy time; the runtime image does not change.

4. Workload baseline (instrument before refactor)

Before any code change ships, baseline the PFT v2 run with telemetry the team can reason from. This is the metric set every later phase is judged against.

Per execution, capture:

MetricHowToday's value (PFT v2 GKE 2026-05-15)
total command.* event countSELECT count(*) FROM noetl.event WHERE execution_id = $1 AND event_type LIKE 'command.%'≈ 26k × 6 ≈ 150k
frame countsum of cursor claims that returned > 0 rows≈ 26k
mean rows per frametotal rows / frame count1.0 (cursor today claims one row)
server CPU on /claim hot pathrequest-log percentiles from gatewaydominated by GKE pool pressure
Postgres pool depth high-watermarkpg_stat_activity pollhit 50 waiters
NATS reschedule eventskubectl get events -n nats1 during facility-1 MDS
payload bytes written to Tier 3TempStore counternot currently instrumented
projection store write ratecounter on mark_step_completedsingle writer
execution wall timenoetl.execution.end_time - start_time3h 54m

Target after Phases 1–3:

MetricTargetMechanism
total command.* event count÷10frame-shaped claims, mean rows/frame ≥ 50
server /claim requests per execution÷50one claim per frame
Postgres pool depth high-watermark< 20 sustainedclaim path narrower + projection sharded
Tier 3 bytes writtenunchangeddata goes to Tier 3 either way
Tier 1.5 cache hit ratio> 60% on colocated consumersnew metric, see Phase 3
execution wall time÷2parallelism + reduced coordination

A separate dashboard tile per metric is mandatory before merging any phase that claims an improvement.


5. Control plane: stage and frame model

5.1 New tables (additive, no breaking change)

-- Stage describes a unit of orchestration the planner cares about.
-- One stage per loop step or per fan-out step. Tool steps keep using
-- the existing noetl.command path.
CREATE TABLE IF NOT EXISTS noetl.stage (
stage_id BIGINT PRIMARY KEY, -- snowflake id
execution_id BIGINT NOT NULL REFERENCES noetl.execution(execution_id),
parent_event_id BIGINT REFERENCES noetl.event(event_id),
kind TEXT NOT NULL CHECK (kind IN ('loop','fanout','reduce')),
step_name TEXT NOT NULL,
dsl_ref TEXT NOT NULL, -- pointer to playbook step
status TEXT NOT NULL DEFAULT 'OPEN',
frame_policy JSONB NOT NULL, -- size, time, memory bounds
started_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
ended_at TIMESTAMPTZ
);

-- Frame is a worker-claimable window of work inside a stage.
CREATE TABLE IF NOT EXISTS noetl.frame (
frame_id BIGINT PRIMARY KEY, -- snowflake id
stage_id BIGINT NOT NULL REFERENCES noetl.stage(stage_id),
cursor JSONB NOT NULL, -- driver-specific resume hint
row_count INTEGER NOT NULL DEFAULT 0,
status TEXT NOT NULL DEFAULT 'PENDING',
owner_worker TEXT,
lease_until TIMESTAMPTZ,
output_ref JSONB, -- {tier3_sha, ipc_handle?}
events_emitted INTEGER NOT NULL DEFAULT 0,
attempts INTEGER NOT NULL DEFAULT 0,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
completed_at TIMESTAMPTZ
);
CREATE INDEX IF NOT EXISTS frame_open_idx
ON noetl.frame (stage_id, status, lease_until)
WHERE status IN ('PENDING','CLAIMED','RUNNING');

These coexist with noetl.event and noetl.command. Legacy tool steps are not migrated.

5.2 Frame policy

frame_policy is the configurable bound that decides how much work a single frame contains. The planner picks one of these strategies based on the step's DSL:

frame_policy:
size: 100 # max rows per frame, optional
duration_ms: 5000 # max wall-time per frame, optional
memory_bytes: 67108864 # max in-flight Arrow buffer per frame
parallelism: 4 # how many frames can be in-flight per worker

For PFT v2, a sensible default is { size: 50, duration_ms: 30000, memory_bytes: 64MB, parallelism: 1 }. That collapses today's 26k frames (size=1) to ~520 frames, a 50× drop, while keeping each frame under a half minute so worker crashes lose at most half a minute of work.

5.3 Claim / heartbeat / commit API

New endpoints on the NoETL server, additive to the existing /api/commands/*:

POST /api/stages/{stage_id}/frames/claim
body: { worker_id, want, max_inflight }
returns: [ { frame_id, cursor, lease_until, dsl_ref, frame_policy } ... ]

POST /api/frames/{frame_id}/heartbeat
body: { worker_id, cursor }
returns: { lease_until }

POST /api/frames/{frame_id}/commit
body: { worker_id, cursor, output_ref, row_count, status }
returns: { ok, next_action } # may immediately hand the worker the next frame

The HTTP surface is the operational fallback. The primary path uses NATS JetStream pull consumers (see §6) for lower-latency claim and built-in lease semantics.

5.4 Server's narrower role

After the refactor:

  • For a loop step the server emits one stage.opened event, mints frames lazily as workers ask, and emits one stage.closed when all frames commit.
  • The server does not touch per-row state. Cursors are opaque; the server only records the latest committed cursor per frame.
  • The server does not issue per-row command.issued events. Frame claims are observable via stage and frame rows plus a single frame.dispatched event per claim.

The current command_reaper repurposes to frame reaper: it scans noetl.frame for lease_until < now() AND status IN ('CLAIMED','RUNNING'), marks the lease abandoned, and republishes the frame. Same correctness guarantees; smaller scan surface.


6. Data plane: Arrow IPC zero-copy (Tier 1.5)

Tier 1.5 is part of a broader shared-state fabric, not a one-off optimization. The design target is a proven distributed-data pattern: keep durable state in an append-only log and immutable payload objects; keep hot execution state in rebuildable shared caches, indexes, and materialized views.

LayerBackendsPurposeRebuildable from events?
Canonical lognoetl.event Postgres partitions; optional mirrored JetStream/Kafka/Pub/Sub/Event Hubs/Kinesis streamsDurable timeline and replay authoritySource
Immutable payloadsS3 / GCS / Azure Blob / SeaweedFS / local durable storeLarge event data, Arrow batches, filesReferenced by log
Hot shared memoryin-process LRU + Arrow IPC shm/memfdSame-process and same-node zero-copy readsYes
Warm node cachelocal NVMe/PVC disk cacheReuse payload blocks after restart or rescheduleYes
Small distributed cacheNATS K/VLease hints, loop counters, small coordination stateYes
Streaming materializationsource/table/materialized-view/sink engine with barriersIncremental state over event streams and work queuesYes
Analytical materializationcolumnar analytical projection storeHigh-volume queryable facts, metrics, audit/event lake viewsYes

Only the first two layers are required for correctness. The rest are latency, throughput, and serving-shape advantages. This is the core product advantage: NoETL can run like a low-latency distributed shared-memory system while remaining reproducible from an append-only event log.

6.1 Where it fits in TempStore

The existing repos/noetl/noetl/core/storage/result_store.py already tiers payloads as MEMORY → KV → DISK → S3/GCS/DB. We insert a new tier between MEMORY and DISK:

TierMechanismScopeLifetime
1in-process LRU (existing)per processconfigurable bytes
1.5Apache Arrow IPC over POSIX shm / memfdper host (all processes on the node)frame lease + 30s grace
2local NVMe disk cache (existing)per nodeconfigurable GB
3S3 / GCS / Azure Blob / SeaweedFS (existing)global, content-addressedretention policy

Tier 1.5 is the zero-copy hop for colocated workers. Tier 3 is always written; Tier 1.5 is best-effort fast path.

6.2 Format

Workers materialize loop results as pyarrow.RecordBatch (Python) / arrow_array::RecordBatch (Rust). The on-disk and on-wire format is Arrow IPC stream. The shared memory carrier is one of:

  • POSIX shm_open + mmap (Linux/macOS) — simplest, ubiquitous.
  • memfd_create (Linux only) — preferred where available; no path collisions, anonymous file descriptor passed via SCM_RIGHTS over a Unix domain socket from a node-local broker.

For cross-runtime parity (Python ↔ Rust), the SHM region carries:

  1. A 16-byte header: magic NOETLIPC, format version u32, payload length u64.
  2. The Arrow IPC stream bytes.

This is intentionally simpler than the Plasma object store: NoETL workers are co-tenants of a single Kubernetes pod (one process per container today; if we ever go multi-process per pod we add a small node-local broker — see §6.5). No Plasma client/server fan-out, no shared catalog.

6.3 Reference shape

PayloadReference (existing in result_store.py) gains optional IPC metadata:

@dataclass
class PayloadReference:
tier3_uri: str # noetl://tenant/<tenant>/org/<org>/payloads/sha256/<sha256>
sha256: str
media_type: str # "application/vnd.apache.arrow.stream"
rows: int
bytes: int

# Tier 1.5 fast-path hint, may be None or stale
ipc: Optional[IpcHint] = None

@dataclass
class IpcHint:
node_id: str # noetl://tenant/<tenant>/org/<org>/cluster/<id>/node/<id>
shm_name: str # /noetl-<execution>-<frame>-<seq>
schema_digest: str # quick sanity check before attach
valid_until: datetime # writer-promised minimum lifetime

6.4 Producer / consumer protocol

Producer (any worker that emits a record batch):

  1. Serialize batch to Arrow IPC stream bytes.
  2. Write to Tier 3 keyed by sha256 (idempotent, exists-first check).
  3. Attempt Tier 1.5 write: create / open shm, copy buffer (one memcpy from the IPC stream), set valid_until = now() + lease_until + 30s.
  4. Emit one event whose envelope carries PayloadReference with both Tier 3 URI and the optional IpcHint.

Consumer (frame commit handler, reducer, projection worker):

  1. Read PayloadReference from event envelope.
  2. Try IpcHint if present and node_id == self.node_id and valid_until > now(): open shm, mmap, wrap as pyarrow.RecordBatchStreamReader. Zero-copy.
  3. If hint is missing, stale, or another node: read from Tier 1 cache; on miss, Tier 2; on miss, Tier 3.

The consumer never trusts the IPC hint blindly. It validates schema_digest (cheap), bumps a tier metric, and on any error falls through to the durable read path. The durable read is the source of truth.

6.5 Garbage collection and back-pressure

  • Each shm region is owned by the producer worker for the duration of its frame lease + 30s grace. After grace, the worker shm_unlinks the region.
  • Workers track a per-node tier15_bytes_in_use counter. New shm writes are admission-controlled by a configurable budget (default 1 GB per node). On budget exhaustion the writer skips Tier 1.5 and emits only the durable Tier 3 reference. No data plane stall.
  • For multi-process per pod (future): introduce a noetl-node-broker sidecar that owns the shm namespace and brokers lifetimes via Unix-socket RPC. Not required for the current one-process-per-pod deployment shape.

6.6 Serialization contract

Serialization is part of the replay contract. If two runtimes read the same event stream and payload objects, they must fold the same state.

Rules:

  • Event envelopes: canonical JSON for the persisted envelope metadata. Writers must emit stable field names, UTC timestamps, explicit nulls only where schema allows them, and deterministic map key ordering before signing/checksumming.
  • Large tabular payloads: Apache Arrow IPC stream, media type application/vnd.apache.arrow.stream. Every payload records schema_digest, row_count, byte_length, compression codec, and content SHA-256.
  • Nested business payloads: JSON Schema / OpenAPI-compatible payloads for small objects; Arrow struct, list, and map types for large repeated data. Avoid pickle, language-native binary serialization, or runtime-specific object graphs.
  • Decimals and time: decimals carry precision/scale; timestamps are UTC with explicit unit; local time zones are data fields, not implicit runtime settings.
  • Schema evolution: every event has schema_name and schema_version. Upcasters are pure functions registered by (schema_name, from_version, to_version) and covered by replay tests.
  • Cross-language parity: Python and Rust compliance tests read the same golden event/payload corpus and compare fold checksums.

The durable payload digest is computed over the exact serialized bytes. The projection checksum is computed over a canonical projection serialization, not over backend-specific storage bytes.

6.7 Why Arrow, not raw bytes

  • Columnar layout is the right shape for the heavy paths (Postgres bulk reads, DuckDB joins, fanout reducers, projection writes to analytical stores).
  • Zero-copy between Python and Rust workers via the C Data Interface comes for free with Arrow. Important once we run a mix of pyarrow-using Python workers and arrow-rs Rust workers on the same pod.
  • The wire schema is self-describing enough for payload reads, while the event envelope still records schema identity/version for governance and replay.

7. Decentralized projection

7.1 Current state

noetl_async_sharded_architecture.md already specifies async projection workers and epoch barriers. The projection worker skeleton exists. What is not yet decentralized: the projection still runs inside the server process, and the projection write rate caps at the server's single-writer Postgres connection.

7.2 Refactor

  • Extract projection into its own deployable: noetl-projector. Same image, different entrypoint.
  • Each projector instance owns one or more shards via NATS JetStream pull consumer group (noetl.projection.shard.<n>).
  • Shard assignment is sticky: a projector keeps a shard as long as it heartbeats. On stop, NATS reassigns. This is the standard JetStream durable consumer pattern.
  • Each shard has its own Postgres connection (or its own backend entirely, see §8 for the projection store abstraction). Total projection throughput scales linearly with shard count.
  • Projector reads events from NATS, resolves PayloadReference via the cache hierarchy (Tier 1 → 1.5 → 2 → 3), and writes projection state. Tier 1.5 is the hot path when the projector and the producing worker are colocated.

7.3 Stable identity

To get StatefulSet-style stable identity (required for Tier 2 disk cache continuity and for NATS durable consumer affinity), projectors and workers run as StatefulSet not Deployment:

apiVersion: apps/v1
kind: StatefulSet
metadata:
name: noetl-projector
spec:
serviceName: noetl-projector
replicas: 4
template:
spec:
containers:
- name: projector
env:
- name: NOETL_SHARD_ID
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: NOETL_NODE_ID
valueFrom:
fieldRef:
fieldPath: status.podIP

NOETL_SHARD_ID = noetl-projector-0..3 is stable across restarts and maps directly to a NATS durable consumer name.

7.4 Projection store abstraction (port/adapter)

class ProjectionStorePort(Protocol):
async def save_projection(
self, projection_id: str, state: bytes, version: int
) -> None: ...

async def load_projection(
self, projection_id: str
) -> Optional[Tuple[bytes, int]]: ...

async def save_snapshot(
self, aggregate_id: str, aggregate_type: str,
snapshot: bytes, version: int
) -> None: ...

async def load_snapshot(
self, aggregate_id: str
) -> Optional[Tuple[bytes, int]]: ...

async def query(
self, projection_type: str, filters: Mapping[str, Any],
pagination: Pagination
) -> AsyncIterator[Mapping[str, Any]]: ...

Adapters: Postgres (reference), cloud/serverless document and key-value stores, wide-column stores, columnar analytical stores, search stores, vector stores, and streaming materialized-view engines. Specific products are deployment choices, not architectural dependencies.

Projection backend roles:

  • Operational projection stores serve execution status, current frame state, user-facing API reads, and transactional admin surfaces.
  • Columnar analytical projection stores are the high-volume analytical memory of the system: event lake tables, tenant/org audit timelines, PFT metrics, cost/billing facts, and queryable business histories. They are append-friendly and rebuildable from the event stream; they must not own unreplayable state.
  • Streaming materialized-view engines maintain source/table/materialized-view/sink pipelines, incremental aggregations, work-queue state, and barrier-aligned derived state. They consume canonical events or table sources and emit derived updates, never replacing the canonical log.
  • Search/vector stores serve retrieval shapes only. They are rebuilt from event/payload/projected source data and may lag without compromising correctness.

Every projection row should include enough lineage to support audit and replay comparison: tenant_id, organization_id, execution_id where applicable, source event_id or event-position range, projection version, and checksum.

Each projection type can target a different backend. The projector dispatches by the projection's configured backend (see §11 for the YAML).


8. Event store abstraction (port/adapter)

class EventStorePort(Protocol):
async def append(
self, stream_id: StreamId, events: Sequence[EventEnvelope],
expected_version: Optional[int]
) -> int: ...

async def read(
self, stream_id: StreamId, from_version: int = 0
) -> AsyncIterator[EventEnvelope]: ...

async def subscribe(
self, stream_pattern: str, consumer_group: str,
from_position: ConsumerPosition
) -> Subscription: ...

Adapters (priority order):

  1. Postgres noetl.event — canonical ledger and replay source by default. Partitioned by tenant/time/execution as volumes grow.
  2. NATS JetStream — reference distribution stream. Subject = noetl.events.<tenant>.<execution>.<shard>. Durable consumers per projector / per worker. Events are persisted to the canonical ledger and mirrored to JetStream for low-latency fan-out.
  3. Apache Kafka / Confluent / MSK — partition key = aggregate id; offset checks for expected_version.
  4. Google Pub/Sub — topic per category, ordering key per aggregate, side store (Spanner / Firestore) for expected_version.
  5. Azure Event Hubs — Kafka-compat mode reuses Kafka adapter; native mode uses Event Hubs SDK + Blob checkpoint store.
  6. Amazon Kinesis Data Streams — partition key per aggregate, DynamoDB for version tracking, KCL for consumer coordination.

Adapter design constraints:

  • Idempotent handlers as baseline; assume at-least-once delivery. Do not try to abstract exactly-once differences.
  • Per-aggregate ordering only. No global ordering guarantee.
  • Backend-specific retention / compaction / DLQ / monitoring stays out of the abstraction.
  • Event schema evolution: every event carries schema_version. Adapters do not transform; upcasting lives in the projection layer.
  • Configuration selects the backend at deploy time; same image runs anywhere.
  • The adapter boundary separates canonical append from distribution. A deployment may use Postgres as the canonical append log and JetStream/Kafka as the fan-out transport, or a cloud-native log plus a compacted replay archive, but it must expose the same replay semantics.

9. Payload store abstraction (port/adapter)

class PayloadStorePort(Protocol):
async def store(
self, data: bytes | AsyncIterator[bytes],
content_type: str, metadata: Mapping[str, str]
) -> PayloadReference: ...

async def fetch(
self, reference: PayloadReference
) -> bytes | AsyncIterator[bytes]: ...

async def exists(self, reference: PayloadReference) -> bool: ...

async def delete(self, reference: PayloadReference) -> None: ...

async def resolve_envelope(
self, envelope: EventEnvelope
) -> EventEnvelope: ...

async def externalize_envelope(
self, envelope: EventEnvelope, threshold: int
) -> EventEnvelope: ...

Adapters: S3 (reference, covers SeaweedFS via S3-compat), GCS, Azure Blob, local filesystem.

Constraints:

  • Content-addressed by SHA-256. Two events with identical large payloads share a reference. No duplicate uploads.
  • Immutable on write. Versioning happens through new references, not in-place updates.
  • Transparent to handlers via resolve_envelope / externalize_envelope middleware.
  • Retention is reference-counted by event store retention; configurable TTL as safety net.
  • Encryption at rest is per adapter.
  • Streaming upload / download. No buffering full payloads in memory.

The cache hierarchy described in §6 (Tier 1 → 1.5 → 2 → 3) sits on top of this interface, transparent to user code.


10. Cloud distributed OS surfaces

This layer is what lets NoETL behave like a distributed business operating system for multitenant organizations. It provides identity, addressability, locality, replay, and resource accounting across clusters without coupling application logic to a specific cloud.

10.1 Unified resource locator

Every addressable thing in the system gets a stable URI:

noetl://tenant/<tenant>/org/<org>/cluster/<cluster>/region/<region>/zone/<zone>/node/<node>/process/<pid>/<kind>/<id>

Examples:
noetl://tenant/acme/org/care-network/cluster/prod-1/region/us-central1/zone/us-central1-a/
node/gke-noetl-pool-a-7b/process/1/worker/cpu-01

noetl://tenant/acme/org/care-network/cluster/prod-1/region/global/none/none/none/
stream/events/<execution>/<shard>

noetl://tenant/acme/org/care-network/payloads/sha256/<sha256>

Tenants, organizations, workers, projectors, MCP servers, JetStream streams, Tier 3 payloads, cache entries, projection shards, and frame leases are all referenceable. The locator is the join key across the event store, projection store, and observability layer.

10.1.1 Multitenant isolation

Multitenancy is enforced at the event/payload boundary first, then projected outward:

  • Event partitions include tenant_id, organization_id, and time/execution keys.
  • Payload object keys include tenant/org prefixes and content digests; encryption context is tenant-scoped.
  • Cache keys include tenant/org/execution. Shared cache layers may share infrastructure, not key space.
  • Projection stores use row-level security or physical database/schema separation depending on the tenant isolation tier.
  • Replay APIs require tenant/org scope and never scan a global stream without an explicit operator permission.
  • Cross-tenant materialized analytics are allowed only through curated, governed projections with redaction rules encoded as projection code.

10.2 Topology-aware scheduling

Frame claim has an optional locality preference:

POST /api/stages/{stage_id}/frames/claim
body:
worker_id: noetl://tenant/.../org/.../cluster/.../worker/cpu-01
locality:
prefer_node: <node_id> # for Tier 1.5 colocation
prefer_zone: us-central1-a # for Tier 2 cache locality
max_distance: zone | region | any

The server scheduler tries the closest match. Frames produced by a worker prefer to be reduced by a worker on the same node (Tier 1.5 hit) or in the same zone (Tier 2 hit). Cross-region only when local capacity is exhausted.

10.3 Autoscaling

The KEDA scaler reads two signals:

  • frame_backlog_total{stage_kind=...} — pending frames waiting for a worker.
  • frame_p95_lease_duration — moving p95 of how long frames take.

Autoscale formula: desired_workers = max(1, ceil(frame_backlog_total / target_concurrent_frames_per_worker)), clamped by per-cluster max. No provisioned capacity number.

For multi-cluster, the NATS JetStream supercluster routes the same noetl.events.* subjects across clusters. The scheduler can claim frames from peer clusters when local backlog has cleared. This is opt-in and gated by an egress cost budget.

10.4 Resilience

  • Frame lease + cursor checkpoint = recovery primitive. Worker crash within a frame loses at most one frame's worth of work; the next worker resumes from the last committed cursor.
  • Tier 3 is the durable read. Tier 1.5 / 2 are optimizations; their failure modes are graceful degradation, not data loss.
  • Projection store writes are idempotent by (stage_id, frame_id, partition_id). Replays are safe.
  • Frame reaper (rebranded command reaper) republishes stale leases.

10.5 Quantum-cloud posture

"Quantum cloud" here means cloud infrastructure that can place work across heterogeneous compute and storage fabrics while preserving one event-sourced timeline per tenant organization. The runtime should assume future worker classes beyond CPU/GPU, including confidential compute, specialized accelerators, and quantum-provider task queues.

The contract stays the same:

  • Specialized workers receive frames with typed input payload references.
  • Provider submissions, callbacks, measurements, and result ingests are events with immutable payload references.
  • External provider state is modeled as side effects with correlation/idempotency keys, not as hidden runtime state.
  • Replay can reconstruct what was submitted, what was observed, and what state NoETL derived from it, even when the physical computation cannot be rerun deterministically.

11. Configuration

YAML, shared between Python and Rust runtimes:

runtime:
tenant_id: ${NOETL_TENANT_ID}
organization_id: ${NOETL_ORGANIZATION_ID}
node_id: ${POD_IP} # mandatory, used in locator + IPC hint
cluster_id: ${NOETL_CLUSTER_ID}
shard_id: ${NOETL_SHARD_ID} # for StatefulSet projectors / workers
scheduler:
locality_preference: zone # node | zone | region | any
max_inflight_frames_per_worker: 4

event_store:
canonical_backend: postgres # append-only replay ledger
distribution_backend: nats-jetstream # kafka | google-pubsub | azure-event-hubs | aws-kinesis | aws-msk
connection:
url: nats://nats.nats.svc.cluster.local:4222
stream_prefix: noetl.events
replay:
snapshot_interval_events: 10000
checksum_algorithm: blake3
require_projection_parity: true

serialization:
envelope_format: canonical-json
tabular_payload_format: arrow-ipc-stream
schema_registry:
backend: file # file | db | external
path: /etc/noetl/schemas
compression: zstd
timezone: UTC

payload_store:
backend: s3 # gcs | azure-blob | seaweedfs | local
connection:
endpoint: https://storage.googleapis.com
bucket: noetl-payloads-prod
threshold_bytes: 262144 # 256 KB inline cap
content_addressing: sha256
encryption_at_rest: true
cache:
tier_1_memory_mb: 512
tier_15_node_budget_mb: 1024 # Arrow IPC budget per node
tier_15_grace_seconds: 30
tier_2_disk_gb: 10
tier_2_disk_path: /var/cache/noetl/payloads
gc:
strategy: reference-count # | ttl
ttl_days: 90

projection_stores:
default:
backend: postgres
connection:
dsn: postgresql://noetl:***@pg.noetl.svc:5432/noetl
search:
backend: search-store
connection:
hosts: ["http://search.search.svc:9200"]
analytics:
backend: columnar-analytics
connection:
dsn: ${NOETL_ANALYTICS_DSN}
streaming:
backend: streaming-materialized-view
connection:
dsn: ${NOETL_STREAMING_MV_DSN}

materializations:
event_lake:
backend: columnar-analytics
source: event_store
include_payload_refs: true
operational_mvs:
backend: streaming-materialized-view
source: event_store
barrier_interval_ms: 1000

snapshots:
backend: postgres # defaults to projection_stores.default

frame_policy_defaults:
loop:
size: 50
duration_ms: 30000
memory_bytes: 67108864
parallelism: 1
fanout:
size: 1
duration_ms: 5000
memory_bytes: 16777216
parallelism: 8

The same image runs anywhere. Backend changes are config-only.


12. Refactor plan (phased, additive)

Phase 0 — Instrumentation (1 week)

  • Add the metrics in §4 to Grafana / VictoriaMetrics dashboards (already deployed).
  • Baseline a fresh PFT v2 run on GKE. Capture the metric values and pin to memory.
  • Add noetl.stage and noetl.frame tables via Alembic migration (empty initially).
  • Add canonical event-envelope schema validation, including tenant/org scope, schema_name, schema_version, idempotency_key, payload digest, and deterministic checksum fields.
  • Add replay harness: rebuild execution/frame/loop projections from noetl.event + payload store and compare checksums against live projection rows.

Deliverable: dashboard URL + memory entry with baseline numbers, plus a replay parity report for the baseline run. No code change to hot paths.

Phase 1 — Frame-shaped cursor loops (2 weeks)

Goal: collapse N single-row cursor claims into N/50 multi-row frame claims, with no other architectural change.

  • Extend cursor_worker.py to accept a frame_policy payload alongside the existing cursor spec.
  • New POST /api/stages/{stage_id}/frames/claim endpoint; under the hood it calls existing claim_next_loop_indices with LIMIT = frame_policy.size.
  • Worker iterates the returned rows in-process, accumulating results to a local list (Arrow IPC comes in Phase 3; for now use canonical JSON with payload digests).
  • Worker commits the frame with one event per frame instead of one per row.
  • Migrate test_pft_flow_v2.yaml to opt in via frame_policy: on each mode: cursor step.

Verification:

  • Total command.* count drops from ~150k to < 20k on PFT v2.
  • Wall time should drop modestly (less server CPU on /claim) but not the headline target yet.
  • Replay parity stays green for frame state, loop progress, and execution status.

Phase 2 — Decentralized projection (2 weeks)

Goal: extract projection from server, run as a StatefulSet, scale independently.

  • New noetl-projector binary entrypoint reusing the existing projection worker code.
  • Helm chart adds the StatefulSet, NATS durable consumer per replica, projection-store-only DB user.
  • Remove the in-process projection loop from the server. Server now appends canonical events and mirrors them to the distribution stream.
  • Add per-shard projection lag metric.

Verification:

  • Postgres pool depth high-watermark drops by ~3× on PFT v2 (writer fan-out).
  • Server CPU stops spiking during MDS bursts.

Phase 3 — Arrow IPC Tier 1.5 (3 weeks)

Goal: add zero-copy data plane for colocated workers and projectors.

  • Add pyarrow and arrow-rs (arrow, arrow-ipc) to dependencies. Mark as required for cursor/frame paths.
  • Implement Tier 1.5 in result_store.py (Python) and equivalent in repos/noetl/crates/noetl-core/src/storage/ (Rust).
  • Extend PayloadReference with optional IpcHint.
  • Producer worker writes RecordBatch to shm + Tier 3; emits hint in envelope.
  • Consumer (projector, reducer, downstream stage) checks hint, attaches, falls back.
  • Per-node IPC budget + grace + reaper.
  • Metric: tier15_hit_ratio per consumer.

Verification:

  • tier15_hit_ratio > 60% when projector is colocated with worker.
  • End-to-end PFT v2 wall time targets ÷2 vs Phase 0 baseline.

Phase 4 — Cloud OS surfaces (3 weeks)

Goal: lift the runtime from "well-behaved on one cluster" to "addressable, schedulable, autoscalable across clusters."

  • Implement unified resource locator across all subsystems.
  • StatefulSet identity for workers (not just projectors).
  • KEDA scaler with frame backlog signal.
  • Multi-cluster supercluster docs + an ops playbook to provision two GKE regions feeding the same NATS supercluster.
  • Topology-aware scheduling via locality hint on claim.

Phase 5 — Pluggable event store / payload store / projection store (rolling, separate PRs per adapter)

Goal: every backend class in §§ 8–9 has a working adapter, language-paired, behind a feature flag. Product choices stay deployment-specific; the compliance contract is architectural.

Order (mirroring the existing distributed plan):

  1. NATS JetStream event store adapter (refactor existing to fit the new port). Python + Rust.
  2. S3 payload store adapter (refactor existing to fit the new port). Python + Rust.
  3. Postgres projection store adapter (refactor existing). Python + Rust.
  4. Kafka event store adapter.
  5. GCS payload store adapter.
  6. Cloud key-value/document projection adapter.
  7. Columnar analytical projection adapter.
  8. Streaming materialized-view adapter.
  9. … then the remainder in cloud-priority order.

Every adapter ships with:

  • Compliance test suite (language-agnostic spec, run against both implementations).
  • Replay parity suite for canonical events, payload references, snapshots, and projection checksums.
  • Docker-compose entry for local development.
  • Cloud-provisioning ops playbook in repos/ops/automation/.

Phase 6 — Stage planner for fan-out / reduce (4 weeks)

Goal: extend the stage/frame model from loops to fan-out and reduce, completing the map-reduce shape.

  • Stage kind='fanout': explodes a single input into N partitions, each handed to a frame.
  • Stage kind='reduce': consumes M partitions, emits one output. Reduce frame waits on partition availability events instead of cursor rows.
  • Replace the design in distributed_fanout_mode_spec.md with this materialized version.

13. Risks and open questions

  • Tier 1.5 GC under crash. A producer worker that crashes after writing shm but before committing the frame leaves shm regions that no one will unlink. Mitigation: per-node shm_unlink sweep on worker start (scans /dev/shm/noetl-* and unlinks regions older than tier_15_grace_seconds).
  • NATS supercluster cost. Cross-region replication of every event stream is not free. Mitigation: opt-in per execution; default is single-region.
  • Frame size tuning. A frame too big means more work lost on crash; too small means coordination dominates. Mitigation: start with the §5.2 default, expose per-step override, measure with the §4 dashboard.
  • Reduce-side back-pressure. A reduce stage that is slower than its upstream fanout fills the projection store inbox. Mitigation: reuse the existing max_inflight concept at the stage level, not the worker level.
  • Multi-process per pod. Some MCPs prefer multiple processes. Tier 1.5 then needs a node-local broker. Out of scope for v1; revisit if a real MCP demands it.

14. Source code anchors (current state)

For implementers. Existing code that this spec extends, not replaces:

ConcernFileKey symbol
Loop expansion (parallel mode)noetl/core/dsl/engine/executor/commands.py_create_command_for_step
Cursor dispatchnoetl/core/dsl/engine/executor/transitions.py_issue_cursor_loop_commands
Cursor workernoetl/worker/cursor_worker.pyexecute_cursor_worker
Worker tool dispatchnoetl/worker/nats_worker.py_execute_tool
TempStore tiersnoetl/core/storage/result_store.pyResultStore.put / resolve
TempStore tier enumnoetl/core/storage/models.pyStoreTier
NATS clientnoetl/core/messaging/nats_client.pyNatsClient.connect
NATS K/V cachenoetl/core/cache/nats_kv.pyNatsKv
Claim APInoetl/server/api/core/commands.pyclaim_command
Command reapernoetl/server/command_reaper.pyCommandReaper
Schema DDLnoetl/database/ddl/postgres/schema_ddl.sqlnoetl.event, noetl.command

PFT v2 driver: repos/e2e/fixtures/playbooks/pft_flow_test/test_pft_flow_v2.yaml — six cursor steps, ~120k HTTP calls per execution, the canonical benchmark for every phase above.


15. Out of scope (deferred to future specs)

  • Replacing the DSL with a different language. The DSL is fine; this spec rewires the execution layer underneath it.
  • Replacing noetl.event with a different source of truth. Event log stays Postgres-backed by default; the event store port lets other backends mirror it.
  • Replacing NoETL's existing MCP server architecture. Workers and MCPs are orthogonal.
  • A new container orchestrator. We assume Kubernetes; the locator scheme is friendly to other orchestrators but they are not a v1 target.

16. Decision log (what changed vs the original event-store-design-prompt)

The original event-store-design-prompt.md (now archived) framed the problem as "add an event store abstraction layer." That framing is necessary but not sufficient. This revision changes the framing:

  1. Event sourcing becomes the system kernel. The goal is not just to swap storage backends; it is to reproduce tenant/org system state at a requested time from canonical events plus immutable payloads.
  2. Shared memory becomes a product advantage, not just a cache. The runtime uses Arrow IPC, local disk, distributed K/V, materialized views, and distributed indexes as a rebuildable shared-state fabric around the event log.
  3. Worker-side loop interpretation remains the dominant immediate cost reduction. The PFT v2 baseline shows that even a perfect event store cannot save us from server-side per-row coordination overhead.
  4. A specific GC + admission story for Tier 1.5 (budget, grace, unlink sweep). The original handwaved this; we have to make it concrete or the shm region count will run away.
  5. A staged additive rollout with frame-shaped cursor loops in Phase 1, before Arrow IPC and before pluggable backends. This lets us ship a measurable performance win in two weeks rather than waiting for a multi-quarter abstraction overhaul.

Other elements (three-layer model, backend adapters, content-addressed payloads, configuration schema) carry over from the original with edits to match the existing TempStore and projection-worker code that has shipped since the original was drafted. Product-specific analytical or streaming engines are intentionally not named as dependencies; NoETL adopts the underlying algorithms and contracts.


17. Implementation Roadmap

This section complements §12 (the phased refactor plan) with a phase dependency graph, per-phase acceptance checklist, replay parity gate, and operational readiness criteria. Section 12 defines what each phase builds; this section defines when each phase is ready to ship and what to do if it needs to be rolled back.

17.1 Phase dependency graph

Phases 1 and 2 can run in parallel once the Phase 0 baseline is captured. Phase 3 requires both because it depends on frame-shaped Arrow output (Phase 1) and colocated projectors (Phase 2).

graph LR
P0["Phase 0\nInstrumentation\n1 week"] --> P1 & P2
P1["Phase 1\nFrame-shaped\ncursor loops\n2 weeks"] --> P3
P2["Phase 2\nDecentralized\nprojection\n2 weeks"] --> P3
P3["Phase 3\nArrow IPC\nTier 1.5\n3 weeks"] --> P4
P4["Phase 4\nCloud OS\nsurfaces\n3 weeks"] --> P5
P5["Phase 5\nPluggable\nadapters\nrolling PRs"] --> P6
P6["Phase 6\nFan-out / Reduce\n4 weeks"]

17.2 Pre-flight checklist (before Phase 0 begins)

These conditions must be true before work begins on Phase 0:

  • noetl.stage and noetl.frame DDL reviewed and approved by database owner.
  • Grafana / VictoriaMetrics dashboards provisioned with the §4 metric tile set (empty panels are acceptable at this stage).
  • A reproducible trigger for a full PFT v2 run exists as an ops runbook entry and has been tested by a team member who did not write it.
  • NOETL_TENANT_ID, NOETL_ORGANIZATION_ID, NOETL_CLUSTER_ID env vars injectable via Helm values in the target cluster.
  • Replay harness skeleton (even a no-op stub) runs to completion without error against the current noetl.event table.
  • Schema registry mount path (/etc/noetl/schemas) is provisionable from a ConfigMap.
  • On-call alert routing confirmed for frame_reaper_republish_rate and pg_pool_depth_highwatermark.

17.3 Per-phase acceptance criteria

PhaseShip gateReplay parity requiredRollback procedure
0 — InstrumentationAll §4 metrics non-null on dashboard from a complete PFT v2 run; baseline values pinned to memoryReplay harness exits 0; checksum report attached to memory entryDrop noetl.stage / noetl.frame via inverse Alembic migration; zero runtime impact
1 — Frame loopsTotal command.* count < 20k on PFT v2; wall time not regressed vs Phase 0 baselineFrame state, loop progress, and execution status checksums match live projectionSet frame_policy: null on PFT v2 steps; disable frame claim endpoint; legacy command path unchanged
2 — Decentralized projectionPostgres pool depth < 30 sustained during MDS burst; per-shard projection lag metric visible on dashboardProjection checksums match single-writer baselineRe-enable in-process server projection loop; scale noetl-projector StatefulSet to 0
3 — Arrow IPC Tier 1.5tier15_hit_ratio > 60% on colocated consumer; PFT v2 wall time ≤ Phase 0 baseline ÷ 2Payload digest and projection checksums unchanged vs Phase 2Set tier_15_node_budget_mb: 0; admission control skips Tier 1.5 with no data-path change
4 — Cloud OS surfacesUnified resource locator present on all event envelopes; KEDA scaler responds to frame backlog signal within 30 sLocator fields do not alter event content digest; replay checksum unchangedDisable KEDA scaler; revert locator injection to no-op middleware; locator fields are additive
5 — Pluggable adaptersEach adapter passes compliance + replay parity suite; Docker-compose local dev environment validated end-to-endPython and Rust parity corpus checksums identical for each adapter under testRevert backend config values to previous setting; no code removal required
6 — Fan-out / reduceStage kind='fanout' and kind='reduce' pass all PFT v2 fan-out phases; distributed_fanout_mode_spec.md supersededReduce-frame output checksums match single-partition baselineRevert fan-out DSL steps to mode: parallel; stage/frame tables remain intact

17.4 Replay parity release gate

Every phase after Phase 0 must pass all of the following before any merge to main:

[ ] Event envelope validation passes for 100% of events emitted in the phase's PFT v2 run.
[ ] Every snapshot records: event position, projection code version, upcaster versions,
payload digest set, tenant encryption context, and deterministic fold checksum.
[ ] Replay harness runs start-to-finish against the phase's full event log and payload refs.
[ ] Execution state checksum (frame rows committed, loop progress) matches live projection.
[ ] At least one configured business projection type checksum matches live projection.
[ ] Python and Rust replay paths produce identical fold checksums on the golden corpus.
[ ] No event references an IpcHint path as its only payload copy
(grep for events missing a Tier 3 payload_ref.uri).
[ ] Idempotency key present on 100% of externally retried transitions in the phase's events.

Replay parity failures block the phase merge. They are not deferred to the next phase.

17.5 Operational readiness (per phase shipped to production)

Before enabling any phase for a production tenant:

  • Runbook exists in repos/ops/automation/ covering: how to trigger a replay, how to compare checksums, how to roll back the phase, and how to scale the projector StatefulSet.
  • On-call alerts live for frame_reaper_republish_rate > 0.1/s and tier15_budget_exhaustion_total > 0.
  • Frame budget defaults reviewed against the tenant's actual execution profile (not only PFT v2).
  • Tenant encryption context set and verified for all payload refs in the phase's event schema version.
  • Retention and TTL policy for Tier 3 objects confirmed with legal / compliance for the tenant.
  • Cross-tenant projection isolation verified: no query returns rows from a different (tenant_id, organization_id) pair without explicit operator permission.