NoETL Distributed Processing Enhancement Plan
Phase 1 ("Schema Enhancements") — specifically P1.2 ("noetl.execution.state loop progress via trigger") — is superseded by noetl_async_sharded_architecture.md. The trg_execution_state_upsert trigger is removed; an async ProjectionWorker computes execution.state per shard. The unique indexes from P1.1 (uidx_event_command_issued_command_id, uidx_event_loop_done_loop_id, idx_event_loop_id_type) remain authoritative. Phase 0, Phase 2 (storage), Phase 3 (fan-out), Phase 4 (NATS transport), and Phase 5 (observability) remain authoritative; see the async/sharded doc for how they interact with the new projection and shard model.
Context and Motivation
This plan addresses a set of confirmed bugs and architectural gaps in NoETL's distributed server-worker execution model discovered during the test_pft_flow regression (10 facilities × 1000 patients × 5 data types). The workload exposed:
- Premature
loop.doneemission — the race between concurrentcall.doneevents causes the server to fireloop.donebefore all iterations have saved their results, silently dropping patients. - Over-dispatch — duplicate
command.issuedevents generated by concurrenthandle_event()coroutines cause the same step to execute more times than intended. - Loop state loss — NATS KV is not a durable store; server restart or NATS outage loses loop progress and prevents recovery.
- Stale claim reclaim is in the wrong layer — playbooks embed
FOR UPDATE ... WHERE claimed_at < NOW() - INTERVAL '5 minutes'SQL as a workaround for missing orchestration-level claim expiry. - No distributed fan-out — large parallel workloads (10⁵+ items) are limited to
max_in_flightwithin a single loop on a single server coroutine; true cross-worker shard distribution is specced but not implemented. - HTTP event emission overhead — every worker result requires a new HTTP connection to the server; high-frequency events (heartbeats, shard completions) add unnecessary latency.
The noetl.execution table is a projection table — it maintains a live snapshot of each playbook execution instance and is updated by a PostgreSQL trigger on every noetl.event INSERT. The same projection pattern is used for loop progress tracking.
All phases below are independently deliverable. Phases 0 and 1 are prerequisites for all later phases.
Architecture Invariants (must not be violated by any phase)
- Server is the sole routing authority. Workers never synthesize routing decisions. Workers emit events; the server evaluates
next.arcs[].whenconditions and issues commands. - Event table is the authoritative source of truth. NATS KV and in-memory state are caches. Any state that must survive server restart must be in
noetl.eventor a projection table. - Workers do not communicate with each other directly. All coordination flows through the server (via NATS subject or HTTP API).
- Command dispatch is idempotent. A
command.issuedevent for a given(execution_id, command_id)must exist at most once. The unique index enforces this at the DB level. - Loop collection is never stored in volatile memory as the only copy. Large collections go to object storage (GCS/S3/MinIO); a reference is stored in the
loop.startedevent.
Phase 0: Correctness Fixes
Goal: Eliminate the confirmed race conditions causing patient data loss and over-dispatch. These bugs block all meaningful regression testing of later phases.
P0.1 — Atomic command.issued deduplication
Problem: Two concurrent handle_event() coroutines both pass the read-check-write deduplication guard and both insert command.issued events for the same (execution_id, step, attempt), causing double dispatch.
Fix: Add a unique partial index on noetl.event:
CREATE UNIQUE INDEX IF NOT EXISTS uidx_event_command_issued_command_id
ON noetl.event (execution_id, (meta->>'command_id'))
WHERE event_type = 'command.issued' AND meta ? 'command_id';
In handle_event() (server-side events.py), replace the read-check-write pattern with try/catch on the INSERT:
try:
await db.execute("INSERT INTO noetl.event ... VALUES (...)")
# succeeded → sole issuer → publish to NATS
await nats.publish(command_notification)
except UniqueViolationError:
logger.info("[DEDUP] command.issued already exists for command_id=%s — skipping", command_id)
return []
Acceptance criteria:
- Unique index exists in schema DDL and applied to all partitions.
-
handle_event()catchesUniqueViolationErroroncommand.issuedINSERT and returns[]without publishing to NATS. - Unit test: two concurrent coroutines processing the same
call.doneevent → exactly onecommand.issuedrow in DB, exactly one NATS publish. -
tooling_non_blockingfixture: each core tool step showsissued_count == 5,terminal_count == 5after full run. -
test_pft_flowregression: no over-dispatch observed on any data-type batch loop.
P0.2 — Atomic loop.done emission
Problem: The server computes new_count >= collection_size to decide whether to fire loop.done. Two concurrent coroutines both read new_count = N-1, both increment to N = collection_size, and both attempt to claim loop.done via NATS KV try_claim_loop_done. Under NATS KV failure or race, neither or both fire loop.done, terminating the batch loop prematurely.
Fix: Replace the NATS KV try_claim_loop_done claim with an event-table-based atomic claim:
-- Insert loop.done event atomically; only the first succeeds
INSERT INTO noetl.event (execution_id, event_id, event_type, node_name, meta, catalog_id, created_at)
VALUES ($1, $2, 'loop.done', $step, $meta::jsonb, $catalog_id, NOW())
ON CONFLICT (execution_id, (meta->>'loop_id'), event_type) -- needs partial unique index
DO NOTHING
RETURNING event_id;
-- Only the coroutine that gets a RETURNING row proceeds to generate next routing commands.
Add partial unique index:
CREATE UNIQUE INDEX IF NOT EXISTS uidx_event_loop_done_loop_id
ON noetl.event (execution_id, (meta->>'loop_id'))
WHERE event_type = 'loop.done' AND meta ? 'loop_id';
Acceptance criteria:
- Unique index for
loop.doneper(execution_id, loop_id)exists. -
events.pyloop-done path: only one coroutine perloop_idgenerates routing commands; all others detectDO NOTHINGand return empty. - Unit test: 20 concurrent
call.doneevents for a 20-item loop → exactly oneloop.doneevent in DB, exactly one next-stepcommand.issued. -
test_pft_flowregression:validate_facility_resultsshows 1000/1000 for assessments (not ~100 as under the race bug).assessments_done == assessments_queue_donefor all facilities.
P0.3 — Loop state persistence: event table as authority
Problem: Loop collection and counters live in NATS KV. On server restart or NATS outage, loop progress is lost and recovery emits duplicate commands or misses loop.done.
Fix: Write a loop.started event at loop initialization time carrying the collection reference and size. Counters are derived from event table counts rather than NATS KV counters. NATS KV remains a write-through cache for hot-path reads.
New event types to introduce (see schema doc for full field specification):
event_type | Trigger | Key meta fields | context fields |
|---|---|---|---|
loop.started | Server, on first loop command | loop_id, collection_size | collection_ref (GCS/S3/MinIO URI) |
loop.item | Server, per iteration | loop_id, iter_index | — |
loop.done | Server, atomic | loop_id, collection_size | — |
Recovery on server startup or NATS KV miss:
async def recover_loop_state_from_events(execution_id, step_name, loop_id, db):
return await db.fetchrow("""
SELECT
COALESCE(MAX(current_index), -1) AS current_index,
COUNT(*) FILTER (WHERE event_type = 'command.completed') AS done_count,
COUNT(*) FILTER (WHERE event_type = 'command.failed') AS failed_count,
MAX(CASE WHEN event_type = 'loop.started'
THEN context->>'collection_ref' END) AS collection_ref,
MAX(CASE WHEN event_type = 'loop.started'
THEN (meta->>'collection_size')::int END) AS collection_size
FROM noetl.event
WHERE execution_id = $1
AND node_name = $2
AND meta->>'loop_id' = $3
""", execution_id, step_name, loop_id)
Acceptance criteria:
-
loop.startedevent written tonoetl.eventwhenever a loop is initialized. -
loop.startedeventcontext->>'collection_ref'points to a valid object storage URI when collection size > configurable threshold (default 1000 items). - Server startup auto-resume rebuilds loop state from event table for all
RUNNINGexecutions with active loops (does not require NATS KV). - Integration test: kill server mid-loop (50% complete), restart, verify loop resumes from correct index without re-processing already-done iterations.
- NATS KV remains a cache; its absence does not cause data loss or incorrect
loop.done.
P0.4 — Orchestration-level claim expiry (reaper enhancement)
Problem: When a worker crashes mid loop-iteration, the command.heartbeat stops. The work-queue row stays claimed indefinitely. Playbooks work around this by embedding WHERE claimed_at < NOW() - INTERVAL '5 minutes' SQL, coupling business logic to infrastructure recovery.
Fix: Extend the server reaper to detect commands with no heartbeat beyond NOETL_COMMAND_HEARTBEAT_TIMEOUT_SECONDS (default: 300s) and re-enqueue them:
# In reaper / recovery.py:
stale_commands = await db.fetch("""
SELECT e.execution_id, e.meta->>'command_id' AS command_id, e.node_name, e.event_id
FROM noetl.event e
WHERE e.event_type = 'command.claimed'
AND NOT EXISTS (
SELECT 1 FROM noetl.event h
WHERE h.execution_id = e.execution_id
AND h.meta->>'command_id' = e.meta->>'command_id'
AND h.event_type IN ('command.completed', 'command.failed', 'command.heartbeat')
AND h.created_at > NOW() - INTERVAL '{timeout}s'
)
AND e.created_at < NOW() - INTERVAL '{timeout}s'
""", timeout=heartbeat_timeout)
# Re-enqueue each stale command via NATS
Acceptance criteria:
- Reaper detects
command.claimedevents with no heartbeat within the timeout window. - Re-enqueued commands respect
max_attempts; exhausted commands emitcommand.failed. - Workers receiving re-enqueued commands use the advisory lock (
pg_try_advisory_xact_lock) oncommand_idto avoid double execution if the original worker recovers. - Integration test: suspend a worker mid-loop-iteration for > timeout, verify the reaper re-enqueues and a different worker completes the iteration.
-
test_pft_flow: remove thereclaim_staleCTE from allload_patients_for_*steps after this is validated.
Phase 1: Schema Enhancements
Goal: Extend noetl.event, add projection tables, and update noetl.execution trigger to support loop progress tracking, fan-in tracking, and the new pvc/minio storage tier — all without breaking existing event consumers.
See companion document noetl_schema_enhancements.md for full DDL.
P1.1 — New loop event types and indexes
Add indexes supporting loop progress queries and fan-in counting:
-- Fast fan-in counting: COUNT(*) WHERE execution_id=$1 AND meta->>'loop_id'=$2 AND event_type IN (...)
CREATE INDEX IF NOT EXISTS idx_event_loop_id_type
ON noetl.event (execution_id, (meta->>'loop_id'), event_type)
WHERE meta ? 'loop_id';
-- Atomic loop.done claim (see P0.2)
CREATE UNIQUE INDEX IF NOT EXISTS uidx_event_loop_done_loop_id
ON noetl.event (execution_id, (meta->>'loop_id'))
WHERE event_type = 'loop.done' AND meta ? 'loop_id';
-- Atomic command.issued dedup (see P0.1)
CREATE UNIQUE INDEX IF NOT EXISTS uidx_event_command_issued_command_id
ON noetl.event (execution_id, (meta->>'command_id'))
WHERE event_type = 'command.issued' AND meta ? 'command_id';
Acceptance criteria:
- All three indexes created idempotently in
schema_ddl.sqland applied to all existing partitions. -
EXPLAINon fan-in count query shows index scan onidx_event_loop_id_type. - Existing event writer code produces no new constraint violations on normal (non-duplicate) workloads.
P1.2 — noetl.execution.state loop progress via trigger
Extend trg_execution_state_upsert to maintain execution.state->'loop' JSON for each active loop step. The noetl.execution table is a projection table — it must always be reconstructable from noetl.event.
execution.state schema:
{
"loop": {
"{step_name}": {
"loop_id": "...",
"total": 1000,
"done": 456,
"failed": 2,
"collection_ref": "s3://noetl-bucket/exec/.../loop/step_name/collection.json",
"completed": false
}
},
"fanout": {
"{step_name}": {
"loop_id": "...",
"total_shards": 20,
"done": 15,
"failed": 1,
"status": "running"
}
}
}
Trigger handles: loop.started, loop.item, loop.done, loop.fanout.started, loop.shard.done, loop.shard.failed, loop.fanin.completed.
Acceptance criteria:
- Trigger updated in
schema_ddl.sql; existing trigger dropped and recreated. - After
loop.startedevent:execution.state->'loop'->'step_name'->>'total'matchesmeta->>'collection_size'. - After each
call.donewithin loop:execution.state->'loop'->'step_name'->>'done'increments correctly. - After
loop.done:execution.state->'loop'->'step_name'->>'completed'=true. -
GET /api/executions/{id}can return loop progress fromexecution.statewithout scanning event table. - Reconstruction test: delete
noetl.executionrow for a completed execution, replay its events through the trigger, verifystatematches expected.
P1.3 — result_ref.store_tier extension for MinIO/PVC
ALTER TABLE noetl.result_ref
DROP CONSTRAINT IF EXISTS result_ref_store_tier_check;
ALTER TABLE noetl.result_ref
ADD CONSTRAINT result_ref_store_tier_check
CHECK (store_tier IN (
'memory', 'kv', 'object', 's3', 'gcs', 'db', 'duckdb', 'eventlog', 'minio', 'pvc'
));
minio: MinIO endpoint (S3-compatible);physical_uri=s3://bucket/keywith MinIO endpoint configured via env.pvc: Kubernetes PVC or FUSE-mounted volume;physical_uri= absolute local path (e.g.,/data/exec/{eid}/step/name.parquet).
Acceptance criteria:
- Constraint updated; existing rows unaffected.
-
StoreTierPython enum updated withMINIO = "minio"andPVC = "pvc". -
result_store.pydispatchesStoreTier.MINIOto MinIO S3-compatible client. -
result_store.pydispatchesStoreTier.PVCto local filesystem read/write.
Phase 2: Storage Layer — MinIO and PVC
Goal: Add MinIO as an S3-compatible local storage backend for kind/local Kubernetes deployments, eliminating the need for GCS/S3 credentials in development. Add PVC/FUSE-mounted volume support for large file exchange between workers.
P2.1 — MinIO Kubernetes deployment for local kind
Deploy MinIO in the kind cluster as part of the NoETL stack:
# ops/helm/noetl/templates/minio.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: noetl-minio
namespace: noetl
spec:
replicas: 1
template:
spec:
containers:
- name: minio
image: minio/minio:latest
args: ["server", "/data", "--console-address", ":9001"]
env:
- name: MINIO_ROOT_USER
value: noetl
- name: MINIO_ROOT_PASSWORD
valueFrom:
secretKeyRef:
name: noetl-minio-secret
key: password
volumeMounts:
- name: minio-data
mountPath: /data
volumes:
- name: minio-data
persistentVolumeClaim:
claimName: noetl-minio-pvc
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: noetl-minio-pvc
namespace: noetl
spec:
accessModes: [ReadWriteOnce]
resources:
requests:
storage: 20Gi
---
apiVersion: v1
kind: Service
metadata:
name: noetl-minio
namespace: noetl
spec:
ports:
- name: api
port: 9000
- name: console
port: 9001
selector:
app: noetl-minio
Worker/server environment variables for MinIO:
NOETL_MINIO_ENDPOINT=http://noetl-minio.noetl.svc.cluster.local:9000
NOETL_MINIO_ACCESS_KEY=noetl
NOETL_MINIO_SECRET_KEY=<secret>
NOETL_MINIO_BUCKET=noetl-results
NOETL_MINIO_REGION=us-east-1 # MinIO ignores this but boto3 requires it
NOETL_DEFAULT_STORE_TIER=minio # Override default from gcs/s3 for kind
Acceptance criteria:
-
noetlHelm chart includes optional MinIO deployment controlled byminio.enabled: true. - MinIO deploys successfully on local kind cluster.
- MinIO bucket
noetl-resultscreated automatically at startup (via init container or SDK). -
noetl runplaybooks in kind cluster successfully externalize results to MinIO whenNOETL_DEFAULT_STORE_TIER=minio. - MinIO console accessible via
kubectl port-forward svc/noetl-minio 9001:9001.
P2.2 — MinIO backend in result_store.py
elif store == StoreTier.MINIO:
endpoint = os.getenv("NOETL_MINIO_ENDPOINT")
bucket = os.getenv("NOETL_MINIO_BUCKET", "noetl-results")
key = f"exec/{temp_ref.execution_id}/{temp_ref.source_step}/{temp_ref.name}"
client = _get_minio_client() # cached boto3 client with endpoint_url
await asyncio.to_thread(
client.put_object, Bucket=bucket, Key=key, Body=data_bytes
)
temp_ref.physical_uri = f"s3://{bucket}/{key}"
temp_ref.store = StoreTier.MINIO
return temp_ref.ref
Resolve:
elif store == StoreTier.MINIO:
bucket, key = _parse_s3_uri(ref.physical_uri)
response = await asyncio.to_thread(
_get_minio_client().get_object, Bucket=bucket, Key=key
)
return json.loads(response["Body"].read())
Acceptance criteria:
-
StoreTier.MINIOstore/resolve round-trip works for JSON and binary payloads. - Fallback chain:
MINIO → KV → OBJECTwhen MinIO is unavailable. - Unit test with MinIO testcontainer or mock: store 15MB dataset, resolve, verify content.
- DuckDB tool can read Parquet from MinIO via
s3_urlwith MinIO endpoint override.
P2.3 — PVC/FUSE-mounted volume store tier
For kind (ReadWriteMany NFS or hostPath):
# Add shared data volume to worker and server pods
volumes:
- name: noetl-shared-data
persistentVolumeClaim:
claimName: noetl-shared-data-pvc # ReadWriteMany for multi-worker
containers:
- name: noetl-worker
volumeMounts:
- name: noetl-shared-data
mountPath: /data
env:
- name: NOETL_PVC_MOUNT_PATH
value: /data
For GCP (gcsfuse-csi driver):
volumes:
- name: noetl-gcs-fuse
csi:
driver: gcsfuse.csi.storage.gke.io
volumeAttributes:
bucketName: noetl-demo-data
mountOptions: "implicit-dirs,file-cache:max-size-mb:2048"
result_store.py PVC tier:
elif store == StoreTier.PVC:
mount_root = os.getenv("NOETL_PVC_MOUNT_PATH", "/data")
path = f"{mount_root}/exec/{temp_ref.execution_id}/{temp_ref.source_step}/{temp_ref.name}"
os.makedirs(os.path.dirname(path), exist_ok=True)
async with aiofiles.open(path, "wb") as f:
await f.write(data_bytes)
temp_ref.physical_uri = path
return temp_ref.ref
DuckDB steps access PVC-stored Parquet directly:
- name: analyze
kind: duckdb
spec:
query: |
SELECT COUNT(*) FROM read_parquet('{{ input.data_path }}')
Acceptance criteria:
-
StoreTier.PVCstore/resolve works for files up to 1GB. - Two workers in the same kind cluster can read a PVC file written by a third worker.
- DuckDB tool accepts
physical_urifrom aresult_refwithstore_tier='pvc'and reads without downloading. - Integration test: step A writes 500MB Parquet to PVC, step B reads with DuckDB, asserts row count.
- Tier selection: files >
NOETL_PVC_TIER_THRESHOLD_BYTES(default 10MB) route to PVC whenNOETL_PVC_MOUNT_PATHis set.
Phase 3: Distributed Fan-out
Goal: Implement the loop_mode: fanout execution profile specified in distributed_fanout_mode_spec.md. Each loop iteration becomes an independent command dispatched to any available worker, with server-managed fan-in tracking using the event table.
P3.1 — Fan-out server-side command generation
When a step has spec.loop_mode: fanout, the server's handle_event() on the step entry event:
- Evaluates
loop.into get the collection. - Stores collection in MinIO/GCS/PVC via
result_refand writes aloop.fanout.startedevent withmeta = {loop_id, total_shards: N}. - Generates N independent
command.issuedevents, each carryingmeta = {loop_id, shard_id, iter_index, iter_key}and a single-iteminput(the iteration element or a reference to it). - Publishes N NATS notifications.
No single worker is responsible for the loop; the server splits it before dispatch.
New event types:
event_type | Author | meta fields | Notes |
|---|---|---|---|
loop.fanout.started | Server | loop_id, total_shards | Written once per fan-out activation |
loop.shard.enqueued | Server | loop_id, shard_id, iter_index, iter_key | One per shard |
loop.shard.done | Worker | loop_id, shard_id, command_id | Replaces command.completed for shard commands |
loop.shard.failed | Worker | loop_id, shard_id, command_id, error | Replaces command.failed for shard commands |
loop.fanin.completed | Server | loop_id | When done+failed == total_shards |
Acceptance criteria:
-
spec.loop_mode: fanoutin a step spec activates fan-out path. -
loop.fanout.startedevent written with correcttotal_shards. - N
command.issuedevents generated, each with uniqueshard_idanditer_index. - Unit test: 100-item fan-out → 100
command.issuedrows in DB, 100 NATS publishes. - All 100 commands carry distinct
(loop_id, shard_id, iter_index)tuples.
P3.2 — Fan-in tracking and completion detection
On each loop.shard.done or loop.shard.failed event, the server queries:
SELECT
(meta->>'total_shards')::int AS total_shards,
COUNT(*) FILTER (WHERE event_type = 'loop.shard.done') AS done,
COUNT(*) FILTER (WHERE event_type = 'loop.shard.failed') AS failed
FROM noetl.event
WHERE execution_id = $1
AND node_name = $2
AND meta->>'loop_id' = $3
AND event_type IN ('loop.fanout.started', 'loop.shard.done', 'loop.shard.failed')
GROUP BY 1;
When done + failed = total_shards, emit loop.fanin.completed (atomic via unique index on (execution_id, meta->>'loop_id') WHERE event_type='loop.fanin.completed') and evaluate the parent step's next.arcs.
fanin.* variables available in next.arcs[].when expressions:
next:
arcs:
- step: aggregate_results
when: '{{ fanin.status == "complete" }}'
- step: handle_partial
when: '{{ fanin.status == "partial" }}'
Acceptance criteria:
- Fan-in count query uses
idx_event_loop_id_typeindex (verify withEXPLAIN). -
loop.fanin.completedemitted exactly once perloop_id(unique index enforced). -
fanin.status,fanin.done,fanin.failed,fanin.totalavailable inwhenexpressions after fan-in. -
execution.state->'fanout'->'step_name'updated by trigger on each shard event. - Integration test: 50-shard fan-out where 48 succeed and 2 fail,
fanin.status = "partial", routing to correct arc.
P3.3 — Shard-level retry
Failed shards are retried as independent commands (not full loop restart):
# In handle_event() on loop.shard.failed:
if shard_attempts < max_shard_retries:
retry_command = Command(
execution_id=execution_id,
step=step,
tool=original_tool,
input=original_shard_input,
meta={
"loop_id": loop_id,
"shard_id": shard_id,
"iter_index": iter_index,
"iter_key": iter_key,
"shard_attempt": shard_attempts + 1,
}
)
await issue_command(retry_command)
else:
# Count as failed, check fan-in completion
Acceptance criteria:
-
spec.fanout.max_shard_retriesconfigurable per step (default 2). - Retry uses same
(loop_id, shard_id, iter_key)for idempotency. - Retry command goes through standard
command.issuedunique index (prevents duplicate retries). - Integration test: 10-shard fan-out where 3 fail on attempt 1, succeed on attempt 2 →
fanin.status = "complete".
Phase 4: Communication Architecture
Goal: Reduce HTTP overhead for high-frequency worker→server event emission by routing events through NATS JetStream. Keep HTTP for synchronous operations that require a response. No worker-to-worker direct communication.
See companion document noetl_worker_communication.md for the full analysis and rationale.
P4.1 — NATS bidirectional event transport
Add a NATS JetStream consumer on the server for worker-emitted events:
NATS subject: noetl.events.{execution_id}
Stream: NOETL_WORKER_EVENTS
Retention: WorkQueuePolicy (consumed once by server)
Workers publish:
- command.completed
- command.failed
- command.heartbeat
- loop.shard.done
- loop.shard.failed
Server subscribes via push consumer with ack.
Workers detect transport mode from environment:
NOETL_EVENT_TRANSPORT=nats # nats | http (default: http for backwards compat)
NOETL_EVENT_NATS_SUBJECT_PREFIX=noetl.events
Operations that remain on HTTP (require synchronous response):
command.claimed— synchronous advisory lock result needed- Management: registration, health, status
- Command context fetch:
GET /api/commands/{event_id}
Acceptance criteria:
- Workers with
NOETL_EVENT_TRANSPORT=natspublishcommand.completed/failed/heartbeatto NATS. - Server NATS consumer processes these events with the same
handle_event()pipeline as HTTP path. - NATS-emitted events are persisted to
noetl.eventtable identically to HTTP-emitted events. -
command.claimedremains HTTP. - Backwards compatibility:
NOETL_EVENT_TRANSPORT=httpuses existing path unchanged. - Load test: 1000 concurrent loop iterations, NATS transport, p99 latency for event ingestion < 50ms.
-
test_pft_flowpasses end-to-end withNOETL_EVENT_TRANSPORT=nats.
P4.2 — Worker capacity registry via heartbeat
Workers include available capacity in their heartbeat payload:
# In worker heartbeat (every 15s):
await emit_event(Event(
name="worker.capacity",
payload={
"worker_id": self.worker_id,
"available_slots": self.max_concurrent - len(self.active_tasks),
"active_tasks": len(self.active_tasks),
"tool_pools": {
"postgres": pg_pool.available,
"duckdb": duckdb_available,
}
}
))
Server maintains in-memory capacity registry (not persisted; rebuilt from heartbeats):
worker_capacity: dict[str, WorkerCapacity] = {}
# Updated on each worker.capacity event
Acceptance criteria:
-
runtimetablecapacitycolumn updated from heartbeat payload. - Server in-memory capacity registry populated within 15s of worker startup.
-
GET /api/workersreturns current capacity per worker. - No breaking change to existing workers without capacity payload.
P4.3 — Capacity-aware NATS dispatch (targeted subjects)
When capacity registry is populated, server publishes to worker-specific NATS subject instead of broadcast:
Broadcast: noetl.commands (current, all workers)
Targeted: noetl.commands.{worker_id} (new, specific worker)
Worker subscribes to both subjects; targeted messages take priority.
Acceptance criteria:
- Server selects worker with
available_slots > 0for tool kind required. - Broadcast subject retained as fallback when no targeted worker available.
- Integration test: 3 workers with different capacities → commands route to workers with capacity.
- No message loss when targeted worker goes offline before receiving (redelivery via broadcast).
Phase 5: Observability and Streaming
Goal: Expose live execution progress from projection tables, add partial/streaming result support for large tool outputs.
P5.1 — Execution status API from projection
GET /api/executions/{id} reads from noetl.execution (projection) without scanning noetl.event:
# In execution.py API handler:
row = await db.fetchrow("""
SELECT execution_id, status, last_event_type, last_node_name,
start_time, end_time, state, error
FROM noetl.execution
WHERE execution_id = $1
""", execution_id)
Loop progress available as state.loop.{step_name} without additional queries.
Acceptance criteria:
-
GET /api/executions/{id}response includesloop_progressfield derived fromexecution.state. - Response time < 10ms for any execution (single-row lookup, no event scan).
-
noetl status --jsonCLI uses this endpoint; output includes per-step loop counts. - Reconstruction: truncate
noetl.execution, replay all events, verify state matches.
P5.2 — Partial result streaming (call.partial event type)
Workers emitting large sequential results (postgres cursor, DuckDB scan) emit call.partial events per chunk before the final call.done:
async for chunk in postgres_cursor.fetchmany(chunk_size):
await emit_event(Event(
name="call.partial",
payload={
"chunk_index": chunk_idx,
"rows": chunk,
"has_more": True,
},
meta={"command_id": command_id}
))
chunk_idx += 1
await emit_event(Event(name="call.done", ...))
DSL: steps can react to call.partial via arc condition on: call.partial.
Acceptance criteria:
-
call.partialevents stored innoetl.eventwithmeta->>'chunk_index'. - Next step wired with
on: call.partialreceives each chunk as it arrives. -
call.donestill required as terminal event;call.partialwithoutcall.donedoes not route. - Integration test: postgres step streaming 10,000 rows in 10 chunks → downstream step processes each chunk independently.
- Memory bound: worker heap does not exceed 2× chunk size regardless of total dataset size.
Test Fixture Requirements per Phase
| Phase | New/Modified Fixtures | Pass Criterion |
|---|---|---|
| P0 | test_pft_flow (existing) | 1000/1000 per facility, all 5 types |
| P0 | tooling_non_blocking (existing) | issued == terminal == 5 per tool |
| P0.3 | New: loop_state_recovery | Loop resumes after server kill at 50% |
| P1 | test_pft_flow with state query | execution.state.loop matches actual counts |
| P2 | New: minio_large_result | 500MB round-trip store/resolve in kind |
| P2 | New: pvc_worker_exchange | Worker A writes Parquet, Worker B reads DuckDB |
| P3 | New: fanout_basic (100 shards) | All 100 shards complete, fan-in fires once |
| P3 | New: fanout_partial_failure | 2/10 shards fail, fanin.status=partial |
| P4 | test_pft_flow with NATS transport | Same pass criterion as P0 |
| P5 | New: streaming_chunks | 10K rows, 10 chunks, downstream asserts each |
Dependency Graph
P0.1 (dedup) ──┐
P0.2 (loop.done)─┤──→ P1.1 (indexes) ──→ P1.2 (trigger) ──→ P5.1 (status API)
P0.3 (loop state)┘ ──→ P3.1 (fan-out)
│
P0.4 (reaper) ├──→ P3.2 (fan-in)
└──→ P3.3 (retry)
P1.3 (store_tier) ──→ P2.1 (MinIO k8s) ──→ P2.2 (backend)
──→ P2.3 (PVC)
P4.1 (NATS transport) ──→ P4.2 (capacity) ──→ P4.3 (targeted dispatch)
P5.2 (call.partial) — independent
File References
| Document | Location |
|---|---|
| This plan | docs/features/noetl_distributed_processing_plan.md |
| Schema DDL details | docs/features/noetl_schema_enhancements.md |
| Worker communication analysis | docs/features/noetl_worker_communication.md |
| Distributed fan-out spec | docs/features/distributed_fanout_mode_spec.md |
| Rust worker pool | docs/features/worker-pool-rust.md |
| DSL assignment/reference spec | docs/features/noetl_dsl_assignment_and_reference_spec.md |
| DSL refactoring spec | docs/features/noetl_dsl_refactoring_spec.md |