Runtime Changelog - NATS Backpressure, Throttling, and PgBouncer Pooling
Scope
This changelog documents the runtime hardening work for distributed batch execution, with focus on:
task_sequenceloop completion correctness across multiple server instances.- NATS-driven backpressure and bounded worker concurrency.
- Worker-side throttling based on PostgreSQL pool pressure.
- Reduced payload/context logging and tighter log severity behavior.
- PgBouncer deployment and configuration path for Cloud SQL / in-cluster PostgreSQL traffic.
Why this change was needed
1) Premature loop.done in distributed task_sequence
In multi-server deployments, a call.done event can be processed by a different server instance than the one that initialized the loop state in memory. When local loop_state.collection was empty, completion checks could incorrectly pass too early.
2) Unbounded queue pressure and burst behavior
The worker could accept large numbers of pending NATS messages (max_ack_pending too high in existing durable consumer), causing bursty load against HTTP and Postgres during high-volume loops.
3) Database pressure amplification
Large loop pipelines (especially http -> postgres patterns) could overrun DB pools when fanout increased, causing retries, longer execution time, and unstable latency.
4) Excessive log payload visibility/noise
Request/response and context payloads were too verbose for normal operation and risked exposing sensitive values in non-debug logs.
Implementation summary
A) Distributed-safe loop completion for task_sequence
Area: noetl/core/dsl/v2/engine.py
call.done handling for :task_sequence loop steps now resolves collection size with distributed fallback order:
- Local loop cache (
loop_state.collection) when available. - NATS KV loop state (
collection_size) keyed by execution/loop event. - Re-render loop collection from template when both caches are unavailable.
- Persist re-rendered size back to NATS KV for subsequent distributed checks.
This prevents false-positive loop.done when local server memory misses loop collection.
B) NATS consumer backpressure and reconciliation
Area: noetl/core/messaging/nats_client.py
New controls
max_inflight(process-local in-flight semaphore on worker subscriber).max_ack_pending(JetStream durable consumer cap).fetch_timeoutandfetch_heartbeat(pull-consumer fetch behavior).
Durable consumer config reconciliation
Worker startup now enforces configured max_ack_pending against existing durable consumers:
- If consumer is missing: create with configured
max_ack_pending. - If consumer exists with mismatched
max_ack_pending: delete + recreate durable. - If concurrent worker startup races consumer creation: validate existing config and continue if matched.
This closes the gap where runtime logs warned about mismatch (1000 vs 64) but effective queue cap remained unchanged.
C) Worker throttling and DB-aware gating
Area: noetl/worker/v2_worker_nats.py
Worker now applies two layers of throttling:
- Global in-flight cap for all commands (
NOETL_WORKER_MAX_INFLIGHT_COMMANDS). - DB-heavy in-flight cap for tools likely to hit DB (
postgres,transfer,snowflake,snowflake_transfer).
For DB-heavy commands, worker checks plugin pool stats and waits when saturated:
- Polls
get_plugin_pool_stats(). - Uses threshold
NOETL_WORKER_POSTGRES_POOL_WAITING_THRESHOLD. - Delays execution with
NOETL_WORKER_THROTTLE_POLL_INTERVAL_SECONDSuntil headroom is available.
This avoids thundering-herd behavior against Postgres/PgBouncer under loop bursts.
D) HTTP and Postgres executor pressure controls
HTTP executor
Area: noetl/tools/http/executor.py
- Added shared keep-alive
httpx.Clientreuse per worker process. - Added configurable connection limits and keepalive expiry.
- Logging now records request shape and sanitized metadata instead of full payload dumps.
Postgres executor + pool
Areas:
-
noetl/tools/postgres/executor.py -
noetl/tools/postgres/execution.py -
noetl/tools/postgres/pool.py -
Pooled mode is preferred by default (
NOETL_POSTGRES_USE_POOL_DEFAULT=true). -
Pool parameters are bounded via env (
min/max size,max_waiting,timeout, lifetime/idle). -
Direct connection mode has bounded concurrency and retry with backoff.
-
SQL logs use operation/length summaries instead of raw SQL statements.
E) Logging safety and severity tightening
Areas:
noetl/core/logger.pynoetl/server/middleware.pynoetl/core/runtime/events.py
Logger behavior
- Default effective level now follows
NOETL_LOG_LEVEL(fallback:LOG_LEVEL, defaultINFO). - Log message and
extrafields are sanitized viasanitize_sensitive_data/sanitize_for_logging. - String and complex objects are truncated with
NOETL_LOG_VALUE_MAX_CHARS(default400).
API middleware behavior
- Normal path logs only metadata at debug level (
request_meta,response_meta). - Request payload previews on errors are opt-in only:
NOETL_LOG_INCLUDE_PAYLOAD_ON_ERROR=true
- Timeout and exception logs include payload size/type metadata, not full raw bodies by default.
Event reporting behavior
- On
422event API failures, logs now include metadata summary only (keys + length), not raw payload dump.
F) Large result handling preview hygiene
Area: noetl/core/storage/result_store.py
- Preview generation moved to byte-capped extractor (
create_preview) to avoid oversized previews in event/UI paths.
New/updated runtime configuration
Worker NATS/backpressure and throttling
Defined in:
noetl/core/config.py(WorkerSettings)automation/helm/noetl/values.yaml(config.worker.*)
| Env var | Default | Purpose |
|---|---|---|
NOETL_WORKER_NATS_FETCH_TIMEOUT_SECONDS | 30 | Pull fetch timeout for JetStream consumer |
NOETL_WORKER_NATS_FETCH_HEARTBEAT_SECONDS | 5 | Fetch heartbeat to keep long-poll alive |
NOETL_WORKER_NATS_MAX_ACK_PENDING | 64 | Broker-side cap of unacked messages per consumer |
NOETL_WORKER_MAX_INFLIGHT_COMMANDS | 8 | Process-local cap of concurrently executing commands |
NOETL_WORKER_MAX_INFLIGHT_DB_COMMANDS | 4 | Extra cap for DB-heavy command kinds |
NOETL_WORKER_THROTTLE_POLL_INTERVAL_SECONDS | 0.2 | Sleep interval while waiting for DB pool headroom |
NOETL_WORKER_POSTGRES_POOL_WAITING_THRESHOLD | 2 | Max tolerated waiting requests before throttling |
Postgres plugin pool settings
Defined in Helm worker/server config map defaults (automation/helm/noetl/values.yaml) and consumed by postgres executors:
| Env var | Default |
|---|---|
NOETL_POSTGRES_POOL_MIN_SIZE | 1 |
NOETL_POSTGRES_POOL_MAX_SIZE | 12 |
NOETL_POSTGRES_POOL_MAX_WAITING | 200 |
NOETL_POSTGRES_POOL_TIMEOUT_SECONDS | 60 |
PgBouncer: where config is stored and how NoETL connects
1) Source of truth for PgBouncer deployment settings
File: automation/gcp_gke/noetl_gke_fresh_stack.yaml
workload defaults define PgBouncer runtime knobs, including:
pgbouncer_enabledpgbouncer_namespacepgbouncer_service_namepgbouncer_max_client_connpgbouncer_default_pool_sizepgbouncer_min_pool_sizepgbouncer_reserve_pool_sizepgbouncer_reserve_pool_timeoutpgbouncer_max_db_connectionspgbouncer_server_lifetimepgbouncer_server_idle_timeout
Deployment step deploy_pgbouncer renders a Kubernetes Deployment + Service, and passes these values as PgBouncer container environment variables (MAX_CLIENT_CONN, DEFAULT_POOL_SIZE, etc.).
2) How NoETL server/worker are pointed to PgBouncer
Deploy-time wiring
File: automation/gcp_gke/noetl_gke_fresh_stack.yaml (deploy_noetl step)
DB_HOSTis set toworkload.postgres_host.- In Cloud SQL + PgBouncer mode, this host is typically:
pgbouncer.postgres.svc.cluster.local
- Helm applies this into NoETL server config:
config.server.POSTGRES_HOST=$DB_HOSTconfig.server.POSTGRES_PORT=$DB_PORT
Helm config map storage
Files:
automation/helm/noetl/values.yamlautomation/helm/noetl/templates/configmap-server.yamlautomation/helm/noetl/templates/configmap-worker.yaml
These generate env vars injected into server/worker pods via envFrom.
3) Connection path at runtime
Server -> NoETL system database
- Server reads
POSTGRES_HOST/POSTGRES_PORTfrom config map env. noetl/core/config.pybuilds NoETL connection string.noetl/core/db/pool.pyinitializes serverAsyncConnectionPool.- Host points to PgBouncer service when enabled.
Worker -> user/playbook Postgres targets
- Postgres tool resolves connection details from auth/task config.
- Executor uses pooled mode by default.
- Pool manager reuses connections by connection-string hash.
- If credentials host points to PgBouncer service, worker traffic also goes through PgBouncer.
4) Why PgBouncer does not open a backend for every request
PgBouncer is configured with POOL_MODE=transaction, so client sessions are multiplexed over a reusable backend pool. Combined with NoETL-side connection pooling:
- NoETL avoids per-command connect/disconnect overhead.
- PgBouncer avoids backend churn to PostgreSQL.
- Queue-level throttling prevents sudden spikes from saturating backend capacity.
NATS backpressure: max_ack_pending=64 vs 1000
Operationally:
64: tighter broker-side pressure, smaller burst windows, faster recovery under degradation, less DB shock.1000: allows much larger queued unacked bursts and larger replay spikes after slowdown/restart.
In this runtime, primary throughput control is still worker in-flight semaphores. max_ack_pending acts as broker-side guardrail.
Verification checklist
Use these commands after deploy:
# 1) Confirm worker config values in Kubernetes
kubectl get configmap noetl-worker-config -n noetl -o yaml | rg "NOETL_WORKER_NATS_MAX_ACK_PENDING|NOETL_WORKER_MAX_INFLIGHT_COMMANDS|NOETL_WORKER_MAX_INFLIGHT_DB_COMMANDS"
# 2) Confirm worker startup logs include effective throttling config
kubectl logs deployment/noetl-worker -n noetl --since=15m | rg "starting \(NATS:|THROTTLE|max_ack_pending"
# 3) Confirm PgBouncer is deployed and healthy
kubectl get deploy,svc -n postgres | rg pgbouncer
kubectl logs deployment/pgbouncer -n postgres --since=15m | tail -n 100
# 4) Run a heavy batch smoke execution
noetl --host localhost --port 8082 exec \
catalog://tests/fixtures/playbooks/batch_execution/heavy_payload_pipeline_in_step \
-r distributed
Affected files (implementation)
noetl/core/dsl/v2/engine.pytests/unit/dsl/v2/test_task_sequence_loop_completion.pynoetl/core/messaging/nats_client.pytests/core/test_nats_command_subscriber.pynoetl/core/config.pynoetl/worker/v2_worker_nats.pynoetl/tools/http/executor.pynoetl/tools/postgres/executor.pynoetl/tools/postgres/execution.pynoetl/tools/postgres/pool.pynoetl/core/logger.pynoetl/server/middleware.pynoetl/core/runtime/events.pynoetl/core/storage/result_store.pyautomation/helm/noetl/values.yamlautomation/gcp_gke/noetl_gke_fresh_stack.yaml
Notes
- Recreating durable consumers to enforce
max_ack_pendingis intentional. During rollout, a brief rebalance can occur while workers reconnect. - Keep
NOETL_WORKER_MAX_INFLIGHT_DB_COMMANDSlower than global in-flight for predictable DB pressure. - For production, keep payload logging disabled by default and enable only for short, scoped debugging windows.