Skip to main content

NoETL Distributed Processing Enhancement Plan

Partially superseded

Phase 1 ("Schema Enhancements") — specifically P1.2 ("noetl.execution.state loop progress via trigger") — is superseded by noetl_async_sharded_architecture.md. The trg_execution_state_upsert trigger is removed; an async ProjectionWorker computes execution.state per shard. The unique indexes from P1.1 (uidx_event_command_issued_command_id, uidx_event_loop_done_loop_id, idx_event_loop_id_type) remain authoritative. Phase 0, Phase 2 (storage), Phase 3 (fan-out), Phase 4 (NATS transport), and Phase 5 (observability) remain authoritative; see the async/sharded doc for how they interact with the new projection and shard model.

Context and Motivation

This plan addresses a set of confirmed bugs and architectural gaps in NoETL's distributed server-worker execution model discovered during the test_pft_flow regression (10 facilities × 1000 patients × 5 data types). The workload exposed:

  1. Premature loop.done emission — the race between concurrent call.done events causes the server to fire loop.done before all iterations have saved their results, silently dropping patients.
  2. Over-dispatch — duplicate command.issued events generated by concurrent handle_event() coroutines cause the same step to execute more times than intended.
  3. Loop state loss — NATS KV is not a durable store; server restart or NATS outage loses loop progress and prevents recovery.
  4. Stale claim reclaim is in the wrong layer — playbooks embed FOR UPDATE ... WHERE claimed_at < NOW() - INTERVAL '5 minutes' SQL as a workaround for missing orchestration-level claim expiry.
  5. No distributed fan-out — large parallel workloads (10⁵+ items) are limited to max_in_flight within a single loop on a single server coroutine; true cross-worker shard distribution is specced but not implemented.
  6. HTTP event emission overhead — every worker result requires a new HTTP connection to the server; high-frequency events (heartbeats, shard completions) add unnecessary latency.

The noetl.execution table is a projection table — it maintains a live snapshot of each playbook execution instance and is updated by a PostgreSQL trigger on every noetl.event INSERT. The same projection pattern is used for loop progress tracking.

All phases below are independently deliverable. Phases 0 and 1 are prerequisites for all later phases.


Architecture Invariants (must not be violated by any phase)

  • Server is the sole routing authority. Workers never synthesize routing decisions. Workers emit events; the server evaluates next.arcs[].when conditions and issues commands.
  • Event table is the authoritative source of truth. NATS KV and in-memory state are caches. Any state that must survive server restart must be in noetl.event or a projection table.
  • Workers do not communicate with each other directly. All coordination flows through the server (via NATS subject or HTTP API).
  • Command dispatch is idempotent. A command.issued event for a given (execution_id, command_id) must exist at most once. The unique index enforces this at the DB level.
  • Loop collection is never stored in volatile memory as the only copy. Large collections go to object storage (GCS/S3/MinIO); a reference is stored in the loop.started event.

Phase 0: Correctness Fixes

Goal: Eliminate the confirmed race conditions causing patient data loss and over-dispatch. These bugs block all meaningful regression testing of later phases.

P0.1 — Atomic command.issued deduplication

Problem: Two concurrent handle_event() coroutines both pass the read-check-write deduplication guard and both insert command.issued events for the same (execution_id, step, attempt), causing double dispatch.

Fix: Add a unique partial index on noetl.event:

CREATE UNIQUE INDEX IF NOT EXISTS uidx_event_command_issued_command_id
ON noetl.event (execution_id, (meta->>'command_id'))
WHERE event_type = 'command.issued' AND meta ? 'command_id';

In handle_event() (server-side events.py), replace the read-check-write pattern with try/catch on the INSERT:

try:
await db.execute("INSERT INTO noetl.event ... VALUES (...)")
# succeeded → sole issuer → publish to NATS
await nats.publish(command_notification)
except UniqueViolationError:
logger.info("[DEDUP] command.issued already exists for command_id=%s — skipping", command_id)
return []

Acceptance criteria:

  • Unique index exists in schema DDL and applied to all partitions.
  • handle_event() catches UniqueViolationError on command.issued INSERT and returns [] without publishing to NATS.
  • Unit test: two concurrent coroutines processing the same call.done event → exactly one command.issued row in DB, exactly one NATS publish.
  • tooling_non_blocking fixture: each core tool step shows issued_count == 5, terminal_count == 5 after full run.
  • test_pft_flow regression: no over-dispatch observed on any data-type batch loop.

P0.2 — Atomic loop.done emission

Problem: The server computes new_count >= collection_size to decide whether to fire loop.done. Two concurrent coroutines both read new_count = N-1, both increment to N = collection_size, and both attempt to claim loop.done via NATS KV try_claim_loop_done. Under NATS KV failure or race, neither or both fire loop.done, terminating the batch loop prematurely.

Fix: Replace the NATS KV try_claim_loop_done claim with an event-table-based atomic claim:

-- Insert loop.done event atomically; only the first succeeds
INSERT INTO noetl.event (execution_id, event_id, event_type, node_name, meta, catalog_id, created_at)
VALUES ($1, $2, 'loop.done', $step, $meta::jsonb, $catalog_id, NOW())
ON CONFLICT (execution_id, (meta->>'loop_id'), event_type) -- needs partial unique index
DO NOTHING
RETURNING event_id;
-- Only the coroutine that gets a RETURNING row proceeds to generate next routing commands.

Add partial unique index:

CREATE UNIQUE INDEX IF NOT EXISTS uidx_event_loop_done_loop_id
ON noetl.event (execution_id, (meta->>'loop_id'))
WHERE event_type = 'loop.done' AND meta ? 'loop_id';

Acceptance criteria:

  • Unique index for loop.done per (execution_id, loop_id) exists.
  • events.py loop-done path: only one coroutine per loop_id generates routing commands; all others detect DO NOTHING and return empty.
  • Unit test: 20 concurrent call.done events for a 20-item loop → exactly one loop.done event in DB, exactly one next-step command.issued.
  • test_pft_flow regression: validate_facility_results shows 1000/1000 for assessments (not ~100 as under the race bug). assessments_done == assessments_queue_done for all facilities.

P0.3 — Loop state persistence: event table as authority

Problem: Loop collection and counters live in NATS KV. On server restart or NATS outage, loop progress is lost and recovery emits duplicate commands or misses loop.done.

Fix: Write a loop.started event at loop initialization time carrying the collection reference and size. Counters are derived from event table counts rather than NATS KV counters. NATS KV remains a write-through cache for hot-path reads.

New event types to introduce (see schema doc for full field specification):

event_typeTriggerKey meta fieldscontext fields
loop.startedServer, on first loop commandloop_id, collection_sizecollection_ref (GCS/S3/MinIO URI)
loop.itemServer, per iterationloop_id, iter_index
loop.doneServer, atomicloop_id, collection_size

Recovery on server startup or NATS KV miss:

async def recover_loop_state_from_events(execution_id, step_name, loop_id, db):
return await db.fetchrow("""
SELECT
COALESCE(MAX(current_index), -1) AS current_index,
COUNT(*) FILTER (WHERE event_type = 'command.completed') AS done_count,
COUNT(*) FILTER (WHERE event_type = 'command.failed') AS failed_count,
MAX(CASE WHEN event_type = 'loop.started'
THEN context->>'collection_ref' END) AS collection_ref,
MAX(CASE WHEN event_type = 'loop.started'
THEN (meta->>'collection_size')::int END) AS collection_size
FROM noetl.event
WHERE execution_id = $1
AND node_name = $2
AND meta->>'loop_id' = $3
""", execution_id, step_name, loop_id)

Acceptance criteria:

  • loop.started event written to noetl.event whenever a loop is initialized.
  • loop.started event context->>'collection_ref' points to a valid object storage URI when collection size > configurable threshold (default 1000 items).
  • Server startup auto-resume rebuilds loop state from event table for all RUNNING executions with active loops (does not require NATS KV).
  • Integration test: kill server mid-loop (50% complete), restart, verify loop resumes from correct index without re-processing already-done iterations.
  • NATS KV remains a cache; its absence does not cause data loss or incorrect loop.done.

P0.4 — Orchestration-level claim expiry (reaper enhancement)

Problem: When a worker crashes mid loop-iteration, the command.heartbeat stops. The work-queue row stays claimed indefinitely. Playbooks work around this by embedding WHERE claimed_at < NOW() - INTERVAL '5 minutes' SQL, coupling business logic to infrastructure recovery.

Fix: Extend the server reaper to detect commands with no heartbeat beyond NOETL_COMMAND_HEARTBEAT_TIMEOUT_SECONDS (default: 300s) and re-enqueue them:

# In reaper / recovery.py:
stale_commands = await db.fetch("""
SELECT e.execution_id, e.meta->>'command_id' AS command_id, e.node_name, e.event_id
FROM noetl.event e
WHERE e.event_type = 'command.claimed'
AND NOT EXISTS (
SELECT 1 FROM noetl.event h
WHERE h.execution_id = e.execution_id
AND h.meta->>'command_id' = e.meta->>'command_id'
AND h.event_type IN ('command.completed', 'command.failed', 'command.heartbeat')
AND h.created_at > NOW() - INTERVAL '{timeout}s'
)
AND e.created_at < NOW() - INTERVAL '{timeout}s'
""", timeout=heartbeat_timeout)
# Re-enqueue each stale command via NATS

Acceptance criteria:

  • Reaper detects command.claimed events with no heartbeat within the timeout window.
  • Re-enqueued commands respect max_attempts; exhausted commands emit command.failed.
  • Workers receiving re-enqueued commands use the advisory lock (pg_try_advisory_xact_lock) on command_id to avoid double execution if the original worker recovers.
  • Integration test: suspend a worker mid-loop-iteration for > timeout, verify the reaper re-enqueues and a different worker completes the iteration.
  • test_pft_flow: remove the reclaim_stale CTE from all load_patients_for_* steps after this is validated.

Phase 1: Schema Enhancements

Goal: Extend noetl.event, add projection tables, and update noetl.execution trigger to support loop progress tracking, fan-in tracking, and the new pvc/minio storage tier — all without breaking existing event consumers.

See companion document noetl_schema_enhancements.md for full DDL.

P1.1 — New loop event types and indexes

Add indexes supporting loop progress queries and fan-in counting:

-- Fast fan-in counting: COUNT(*) WHERE execution_id=$1 AND meta->>'loop_id'=$2 AND event_type IN (...)
CREATE INDEX IF NOT EXISTS idx_event_loop_id_type
ON noetl.event (execution_id, (meta->>'loop_id'), event_type)
WHERE meta ? 'loop_id';

-- Atomic loop.done claim (see P0.2)
CREATE UNIQUE INDEX IF NOT EXISTS uidx_event_loop_done_loop_id
ON noetl.event (execution_id, (meta->>'loop_id'))
WHERE event_type = 'loop.done' AND meta ? 'loop_id';

-- Atomic command.issued dedup (see P0.1)
CREATE UNIQUE INDEX IF NOT EXISTS uidx_event_command_issued_command_id
ON noetl.event (execution_id, (meta->>'command_id'))
WHERE event_type = 'command.issued' AND meta ? 'command_id';

Acceptance criteria:

  • All three indexes created idempotently in schema_ddl.sql and applied to all existing partitions.
  • EXPLAIN on fan-in count query shows index scan on idx_event_loop_id_type.
  • Existing event writer code produces no new constraint violations on normal (non-duplicate) workloads.

P1.2 — noetl.execution.state loop progress via trigger

Extend trg_execution_state_upsert to maintain execution.state->'loop' JSON for each active loop step. The noetl.execution table is a projection table — it must always be reconstructable from noetl.event.

execution.state schema:

{
"loop": {
"{step_name}": {
"loop_id": "...",
"total": 1000,
"done": 456,
"failed": 2,
"collection_ref": "s3://noetl-bucket/exec/.../loop/step_name/collection.json",
"completed": false
}
},
"fanout": {
"{step_name}": {
"loop_id": "...",
"total_shards": 20,
"done": 15,
"failed": 1,
"status": "running"
}
}
}

Trigger handles: loop.started, loop.item, loop.done, loop.fanout.started, loop.shard.done, loop.shard.failed, loop.fanin.completed.

Acceptance criteria:

  • Trigger updated in schema_ddl.sql; existing trigger dropped and recreated.
  • After loop.started event: execution.state->'loop'->'step_name'->>'total' matches meta->>'collection_size'.
  • After each call.done within loop: execution.state->'loop'->'step_name'->>'done' increments correctly.
  • After loop.done: execution.state->'loop'->'step_name'->>'completed' = true.
  • GET /api/executions/{id} can return loop progress from execution.state without scanning event table.
  • Reconstruction test: delete noetl.execution row for a completed execution, replay its events through the trigger, verify state matches expected.

P1.3 — result_ref.store_tier extension for MinIO/PVC

ALTER TABLE noetl.result_ref
DROP CONSTRAINT IF EXISTS result_ref_store_tier_check;

ALTER TABLE noetl.result_ref
ADD CONSTRAINT result_ref_store_tier_check
CHECK (store_tier IN (
'memory', 'kv', 'object', 's3', 'gcs', 'db', 'duckdb', 'eventlog', 'minio', 'pvc'
));
  • minio: MinIO endpoint (S3-compatible); physical_uri = s3://bucket/key with MinIO endpoint configured via env.
  • pvc: Kubernetes PVC or FUSE-mounted volume; physical_uri = absolute local path (e.g., /data/exec/{eid}/step/name.parquet).

Acceptance criteria:

  • Constraint updated; existing rows unaffected.
  • StoreTier Python enum updated with MINIO = "minio" and PVC = "pvc".
  • result_store.py dispatches StoreTier.MINIO to MinIO S3-compatible client.
  • result_store.py dispatches StoreTier.PVC to local filesystem read/write.

Phase 2: Storage Layer — MinIO and PVC

Goal: Add MinIO as an S3-compatible local storage backend for kind/local Kubernetes deployments, eliminating the need for GCS/S3 credentials in development. Add PVC/FUSE-mounted volume support for large file exchange between workers.

P2.1 — MinIO Kubernetes deployment for local kind

Deploy MinIO in the kind cluster as part of the NoETL stack:

# ops/helm/noetl/templates/minio.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: noetl-minio
namespace: noetl
spec:
replicas: 1
template:
spec:
containers:
- name: minio
image: minio/minio:latest
args: ["server", "/data", "--console-address", ":9001"]
env:
- name: MINIO_ROOT_USER
value: noetl
- name: MINIO_ROOT_PASSWORD
valueFrom:
secretKeyRef:
name: noetl-minio-secret
key: password
volumeMounts:
- name: minio-data
mountPath: /data
volumes:
- name: minio-data
persistentVolumeClaim:
claimName: noetl-minio-pvc
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: noetl-minio-pvc
namespace: noetl
spec:
accessModes: [ReadWriteOnce]
resources:
requests:
storage: 20Gi
---
apiVersion: v1
kind: Service
metadata:
name: noetl-minio
namespace: noetl
spec:
ports:
- name: api
port: 9000
- name: console
port: 9001
selector:
app: noetl-minio

Worker/server environment variables for MinIO:

NOETL_MINIO_ENDPOINT=http://noetl-minio.noetl.svc.cluster.local:9000
NOETL_MINIO_ACCESS_KEY=noetl
NOETL_MINIO_SECRET_KEY=<secret>
NOETL_MINIO_BUCKET=noetl-results
NOETL_MINIO_REGION=us-east-1 # MinIO ignores this but boto3 requires it
NOETL_DEFAULT_STORE_TIER=minio # Override default from gcs/s3 for kind

Acceptance criteria:

  • noetl Helm chart includes optional MinIO deployment controlled by minio.enabled: true.
  • MinIO deploys successfully on local kind cluster.
  • MinIO bucket noetl-results created automatically at startup (via init container or SDK).
  • noetl run playbooks in kind cluster successfully externalize results to MinIO when NOETL_DEFAULT_STORE_TIER=minio.
  • MinIO console accessible via kubectl port-forward svc/noetl-minio 9001:9001.

P2.2 — MinIO backend in result_store.py

elif store == StoreTier.MINIO:
endpoint = os.getenv("NOETL_MINIO_ENDPOINT")
bucket = os.getenv("NOETL_MINIO_BUCKET", "noetl-results")
key = f"exec/{temp_ref.execution_id}/{temp_ref.source_step}/{temp_ref.name}"
client = _get_minio_client() # cached boto3 client with endpoint_url
await asyncio.to_thread(
client.put_object, Bucket=bucket, Key=key, Body=data_bytes
)
temp_ref.physical_uri = f"s3://{bucket}/{key}"
temp_ref.store = StoreTier.MINIO
return temp_ref.ref

Resolve:

elif store == StoreTier.MINIO:
bucket, key = _parse_s3_uri(ref.physical_uri)
response = await asyncio.to_thread(
_get_minio_client().get_object, Bucket=bucket, Key=key
)
return json.loads(response["Body"].read())

Acceptance criteria:

  • StoreTier.MINIO store/resolve round-trip works for JSON and binary payloads.
  • Fallback chain: MINIO → KV → OBJECT when MinIO is unavailable.
  • Unit test with MinIO testcontainer or mock: store 15MB dataset, resolve, verify content.
  • DuckDB tool can read Parquet from MinIO via s3_url with MinIO endpoint override.

P2.3 — PVC/FUSE-mounted volume store tier

For kind (ReadWriteMany NFS or hostPath):

# Add shared data volume to worker and server pods
volumes:
- name: noetl-shared-data
persistentVolumeClaim:
claimName: noetl-shared-data-pvc # ReadWriteMany for multi-worker
containers:
- name: noetl-worker
volumeMounts:
- name: noetl-shared-data
mountPath: /data
env:
- name: NOETL_PVC_MOUNT_PATH
value: /data

For GCP (gcsfuse-csi driver):

volumes:
- name: noetl-gcs-fuse
csi:
driver: gcsfuse.csi.storage.gke.io
volumeAttributes:
bucketName: noetl-demo-data
mountOptions: "implicit-dirs,file-cache:max-size-mb:2048"

result_store.py PVC tier:

elif store == StoreTier.PVC:
mount_root = os.getenv("NOETL_PVC_MOUNT_PATH", "/data")
path = f"{mount_root}/exec/{temp_ref.execution_id}/{temp_ref.source_step}/{temp_ref.name}"
os.makedirs(os.path.dirname(path), exist_ok=True)
async with aiofiles.open(path, "wb") as f:
await f.write(data_bytes)
temp_ref.physical_uri = path
return temp_ref.ref

DuckDB steps access PVC-stored Parquet directly:

- name: analyze
kind: duckdb
spec:
query: |
SELECT COUNT(*) FROM read_parquet('{{ input.data_path }}')

Acceptance criteria:

  • StoreTier.PVC store/resolve works for files up to 1GB.
  • Two workers in the same kind cluster can read a PVC file written by a third worker.
  • DuckDB tool accepts physical_uri from a result_ref with store_tier='pvc' and reads without downloading.
  • Integration test: step A writes 500MB Parquet to PVC, step B reads with DuckDB, asserts row count.
  • Tier selection: files > NOETL_PVC_TIER_THRESHOLD_BYTES (default 10MB) route to PVC when NOETL_PVC_MOUNT_PATH is set.

Phase 3: Distributed Fan-out

Goal: Implement the loop_mode: fanout execution profile specified in distributed_fanout_mode_spec.md. Each loop iteration becomes an independent command dispatched to any available worker, with server-managed fan-in tracking using the event table.

P3.1 — Fan-out server-side command generation

When a step has spec.loop_mode: fanout, the server's handle_event() on the step entry event:

  1. Evaluates loop.in to get the collection.
  2. Stores collection in MinIO/GCS/PVC via result_ref and writes a loop.fanout.started event with meta = {loop_id, total_shards: N}.
  3. Generates N independent command.issued events, each carrying meta = {loop_id, shard_id, iter_index, iter_key} and a single-item input (the iteration element or a reference to it).
  4. Publishes N NATS notifications.

No single worker is responsible for the loop; the server splits it before dispatch.

New event types:

event_typeAuthormeta fieldsNotes
loop.fanout.startedServerloop_id, total_shardsWritten once per fan-out activation
loop.shard.enqueuedServerloop_id, shard_id, iter_index, iter_keyOne per shard
loop.shard.doneWorkerloop_id, shard_id, command_idReplaces command.completed for shard commands
loop.shard.failedWorkerloop_id, shard_id, command_id, errorReplaces command.failed for shard commands
loop.fanin.completedServerloop_idWhen done+failed == total_shards

Acceptance criteria:

  • spec.loop_mode: fanout in a step spec activates fan-out path.
  • loop.fanout.started event written with correct total_shards.
  • N command.issued events generated, each with unique shard_id and iter_index.
  • Unit test: 100-item fan-out → 100 command.issued rows in DB, 100 NATS publishes.
  • All 100 commands carry distinct (loop_id, shard_id, iter_index) tuples.

P3.2 — Fan-in tracking and completion detection

On each loop.shard.done or loop.shard.failed event, the server queries:

SELECT
(meta->>'total_shards')::int AS total_shards,
COUNT(*) FILTER (WHERE event_type = 'loop.shard.done') AS done,
COUNT(*) FILTER (WHERE event_type = 'loop.shard.failed') AS failed
FROM noetl.event
WHERE execution_id = $1
AND node_name = $2
AND meta->>'loop_id' = $3
AND event_type IN ('loop.fanout.started', 'loop.shard.done', 'loop.shard.failed')
GROUP BY 1;

When done + failed = total_shards, emit loop.fanin.completed (atomic via unique index on (execution_id, meta->>'loop_id') WHERE event_type='loop.fanin.completed') and evaluate the parent step's next.arcs.

fanin.* variables available in next.arcs[].when expressions:

next:
arcs:
- step: aggregate_results
when: '{{ fanin.status == "complete" }}'
- step: handle_partial
when: '{{ fanin.status == "partial" }}'

Acceptance criteria:

  • Fan-in count query uses idx_event_loop_id_type index (verify with EXPLAIN).
  • loop.fanin.completed emitted exactly once per loop_id (unique index enforced).
  • fanin.status, fanin.done, fanin.failed, fanin.total available in when expressions after fan-in.
  • execution.state->'fanout'->'step_name' updated by trigger on each shard event.
  • Integration test: 50-shard fan-out where 48 succeed and 2 fail, fanin.status = "partial", routing to correct arc.

P3.3 — Shard-level retry

Failed shards are retried as independent commands (not full loop restart):

# In handle_event() on loop.shard.failed:
if shard_attempts < max_shard_retries:
retry_command = Command(
execution_id=execution_id,
step=step,
tool=original_tool,
input=original_shard_input,
meta={
"loop_id": loop_id,
"shard_id": shard_id,
"iter_index": iter_index,
"iter_key": iter_key,
"shard_attempt": shard_attempts + 1,
}
)
await issue_command(retry_command)
else:
# Count as failed, check fan-in completion

Acceptance criteria:

  • spec.fanout.max_shard_retries configurable per step (default 2).
  • Retry uses same (loop_id, shard_id, iter_key) for idempotency.
  • Retry command goes through standard command.issued unique index (prevents duplicate retries).
  • Integration test: 10-shard fan-out where 3 fail on attempt 1, succeed on attempt 2 → fanin.status = "complete".

Phase 4: Communication Architecture

Goal: Reduce HTTP overhead for high-frequency worker→server event emission by routing events through NATS JetStream. Keep HTTP for synchronous operations that require a response. No worker-to-worker direct communication.

See companion document noetl_worker_communication.md for the full analysis and rationale.

P4.1 — NATS bidirectional event transport

Add a NATS JetStream consumer on the server for worker-emitted events:

NATS subject: noetl.events.{execution_id}
Stream: NOETL_WORKER_EVENTS
Retention: WorkQueuePolicy (consumed once by server)
Workers publish:
- command.completed
- command.failed
- command.heartbeat
- loop.shard.done
- loop.shard.failed
Server subscribes via push consumer with ack.

Workers detect transport mode from environment:

NOETL_EVENT_TRANSPORT=nats          # nats | http (default: http for backwards compat)
NOETL_EVENT_NATS_SUBJECT_PREFIX=noetl.events

Operations that remain on HTTP (require synchronous response):

  • command.claimed — synchronous advisory lock result needed
  • Management: registration, health, status
  • Command context fetch: GET /api/commands/{event_id}

Acceptance criteria:

  • Workers with NOETL_EVENT_TRANSPORT=nats publish command.completed/failed/heartbeat to NATS.
  • Server NATS consumer processes these events with the same handle_event() pipeline as HTTP path.
  • NATS-emitted events are persisted to noetl.event table identically to HTTP-emitted events.
  • command.claimed remains HTTP.
  • Backwards compatibility: NOETL_EVENT_TRANSPORT=http uses existing path unchanged.
  • Load test: 1000 concurrent loop iterations, NATS transport, p99 latency for event ingestion < 50ms.
  • test_pft_flow passes end-to-end with NOETL_EVENT_TRANSPORT=nats.

P4.2 — Worker capacity registry via heartbeat

Workers include available capacity in their heartbeat payload:

# In worker heartbeat (every 15s):
await emit_event(Event(
name="worker.capacity",
payload={
"worker_id": self.worker_id,
"available_slots": self.max_concurrent - len(self.active_tasks),
"active_tasks": len(self.active_tasks),
"tool_pools": {
"postgres": pg_pool.available,
"duckdb": duckdb_available,
}
}
))

Server maintains in-memory capacity registry (not persisted; rebuilt from heartbeats):

worker_capacity: dict[str, WorkerCapacity] = {}
# Updated on each worker.capacity event

Acceptance criteria:

  • runtime table capacity column updated from heartbeat payload.
  • Server in-memory capacity registry populated within 15s of worker startup.
  • GET /api/workers returns current capacity per worker.
  • No breaking change to existing workers without capacity payload.

P4.3 — Capacity-aware NATS dispatch (targeted subjects)

When capacity registry is populated, server publishes to worker-specific NATS subject instead of broadcast:

Broadcast:  noetl.commands            (current, all workers)
Targeted: noetl.commands.{worker_id} (new, specific worker)

Worker subscribes to both subjects; targeted messages take priority.

Acceptance criteria:

  • Server selects worker with available_slots > 0 for tool kind required.
  • Broadcast subject retained as fallback when no targeted worker available.
  • Integration test: 3 workers with different capacities → commands route to workers with capacity.
  • No message loss when targeted worker goes offline before receiving (redelivery via broadcast).

Phase 5: Observability and Streaming

Goal: Expose live execution progress from projection tables, add partial/streaming result support for large tool outputs.

P5.1 — Execution status API from projection

GET /api/executions/{id} reads from noetl.execution (projection) without scanning noetl.event:

# In execution.py API handler:
row = await db.fetchrow("""
SELECT execution_id, status, last_event_type, last_node_name,
start_time, end_time, state, error
FROM noetl.execution
WHERE execution_id = $1
""", execution_id)

Loop progress available as state.loop.{step_name} without additional queries.

Acceptance criteria:

  • GET /api/executions/{id} response includes loop_progress field derived from execution.state.
  • Response time < 10ms for any execution (single-row lookup, no event scan).
  • noetl status --json CLI uses this endpoint; output includes per-step loop counts.
  • Reconstruction: truncate noetl.execution, replay all events, verify state matches.

P5.2 — Partial result streaming (call.partial event type)

Workers emitting large sequential results (postgres cursor, DuckDB scan) emit call.partial events per chunk before the final call.done:

async for chunk in postgres_cursor.fetchmany(chunk_size):
await emit_event(Event(
name="call.partial",
payload={
"chunk_index": chunk_idx,
"rows": chunk,
"has_more": True,
},
meta={"command_id": command_id}
))
chunk_idx += 1

await emit_event(Event(name="call.done", ...))

DSL: steps can react to call.partial via arc condition on: call.partial.

Acceptance criteria:

  • call.partial events stored in noetl.event with meta->>'chunk_index'.
  • Next step wired with on: call.partial receives each chunk as it arrives.
  • call.done still required as terminal event; call.partial without call.done does not route.
  • Integration test: postgres step streaming 10,000 rows in 10 chunks → downstream step processes each chunk independently.
  • Memory bound: worker heap does not exceed 2× chunk size regardless of total dataset size.

Test Fixture Requirements per Phase

PhaseNew/Modified FixturesPass Criterion
P0test_pft_flow (existing)1000/1000 per facility, all 5 types
P0tooling_non_blocking (existing)issued == terminal == 5 per tool
P0.3New: loop_state_recoveryLoop resumes after server kill at 50%
P1test_pft_flow with state queryexecution.state.loop matches actual counts
P2New: minio_large_result500MB round-trip store/resolve in kind
P2New: pvc_worker_exchangeWorker A writes Parquet, Worker B reads DuckDB
P3New: fanout_basic (100 shards)All 100 shards complete, fan-in fires once
P3New: fanout_partial_failure2/10 shards fail, fanin.status=partial
P4test_pft_flow with NATS transportSame pass criterion as P0
P5New: streaming_chunks10K rows, 10 chunks, downstream asserts each

Dependency Graph

P0.1 (dedup)  ──┐
P0.2 (loop.done)─┤──→ P1.1 (indexes) ──→ P1.2 (trigger) ──→ P5.1 (status API)
P0.3 (loop state)┘ ──→ P3.1 (fan-out)

P0.4 (reaper) ├──→ P3.2 (fan-in)
└──→ P3.3 (retry)
P1.3 (store_tier) ──→ P2.1 (MinIO k8s) ──→ P2.2 (backend)
──→ P2.3 (PVC)

P4.1 (NATS transport) ──→ P4.2 (capacity) ──→ P4.3 (targeted dispatch)

P5.2 (call.partial) — independent

File References

DocumentLocation
This plandocs/features/noetl_distributed_processing_plan.md
Schema DDL detailsdocs/features/noetl_schema_enhancements.md
Worker communication analysisdocs/features/noetl_worker_communication.md
Distributed fan-out specdocs/features/distributed_fanout_mode_spec.md
Rust worker pooldocs/features/worker-pool-rust.md
DSL assignment/reference specdocs/features/noetl_dsl_assignment_and_reference_spec.md
DSL refactoring specdocs/features/noetl_dsl_refactoring_spec.md