NoETL V2 NATS Integration
Overview
The V2 architecture now uses NATS JetStream for event-driven worker coordination, eliminating the need for database polling.
Architecture
┌──────────────┐ ┌──────────────┐
│ Client │ │ NATS │
│ │ │ JetStream │
└──────┬───────┘ └───────┬──────┘
│ │
│ POST /api/v2/execute │
▼ │
┌──────────────┐ │
│ Server │ │
│ V2 API │◄───────────────────────────┤
│ │ subscribe │
└──────┬───────┘ │
│ │
│ 1. Create execution │
│ 2. Insert command to queue │
│ 3. Publish to NATS ───────────────►│
│ │
│ │ Pull subscription
│ │ (durable consumer)
│ ▼
│ ┌──────────────┐
│ │ V2 Worker │
│ │ (Pool) │
│ └──────┬───────┘
│ │
│ GET /api/postgres/execute │
│ (fetch + lock command) ◄──────────┤
│ │
│ │ Execute tool
│ │
│ POST /api/v2/events │
│ (emit events) ◄───────────────────┤
│ │
│ Generate next commands │
│ Publish to NATS ──────────────────►
│
▼
Components
1. NATS Client (noetl/core/messaging/nats_client.py)
NATSCommandPublisher (Server-side):
- Connects to NATS JetStream
- Creates
NOETL_COMMANDSstream if not exists - Publishes lightweight command notifications
- Message format:
{execution_id, queue_id, step, server_url}
NATSCommandSubscriber (Worker-side):
- Connects to NATS JetStream
- Creates durable pull consumer per worker
- Subscribes to
noetl.commandssubject - Acknowledges/NAKs messages based on execution success
2. V2 API Updates (noetl/server/api/v2.py)
Changes:
- Added
_nats_publisherglobal instance get_nats_publisher()initialization functionstart_execution(): Publishes to NATS after queueing commandshandle_event(): Publishes to NATS when generating new commands
Configuration:
NATS_URLenv var (default:nats://noetl:[email protected]:4222)SERVER_API_URLenv var for worker API calls
3. V2 Worker with NATS (noetl/worker/v2_worker_nats.py)
Architecture:
- Subscribe to NATS
noetl.commandssubject - Receive notification:
{execution_id, queue_id, step, server_url} - Fetch full command from server API (atomically lock via UPDATE...RETURNING)
- Execute tool based on
tool.kind - Emit events to
POST /api/v2/events
Benefits:
- No database polling
- Instant notification of new commands
- Horizontal scalability (multiple workers, one consumer group)
- Automatic retry via NATS (max_deliver=3)
- 30-second ack timeout
4. CLI Integration (noetl/cli/ctl.py)
noetl worker start --v2
Reads configuration from:
NATS_URL(default:nats://noetl:noetl@localhost:30422)SERVER_API_URL(from settings or default:http://localhost:8082)
Message Flow
Start Execution
- Client:
POST /api/v2/execute - Server:
- Creates execution state
- Generates initial commands
- Inserts to
noetl.queue(status='queued') - Publishes to NATS:
{execution_id, queue_id, step, server_url}
- NATS: Stores message in JetStream
- Worker: Receives notification via pull subscription
Execute Command
- Worker receives NATS message
- Worker:
POST /api/postgres/execute(UPDATE...RETURNING to lock command) - Worker: Executes tool (python, http, postgres, duckdb)
- Worker: Emits events (
step.enter,call.done,step.exit) - Server: Processes events, generates next commands
- Server: Publishes next commands to NATS
- Worker: ACKs NATS message
Error Handling
- Worker execution fails
- Worker emits error event (
call.donewith error payload) - Worker NAKs NATS message
- NATS redelivers up to
max_deliver=3times - After max retries, message moves to dead letter
NATS Configuration
Stream: NOETL_COMMANDS
- Subjects:
noetl.commands - Retention: 1 hour (3600s)
- Storage: File-based (persistent)
Consumer: noetl-worker-pool (or per-worker)
- Durable: Yes
- Ack policy: Explicit
- Max deliver: 3
- Ack wait: 30 seconds
Access:
- K8s:
nats://noetl:[email protected]:4222 - Local (via NodePort):
nats://noetl:noetl@localhost:30422
Deployment
Dependencies
Add to pyproject.toml:
"nats-py>=2.10.0",
Environment Variables
Server:
NATS_URL=nats://noetl:[email protected]:4222
SERVER_API_URL=http://noetl.noetl.svc.cluster.local:8082
Worker:
NATS_URL=nats://noetl:[email protected]:4222
SERVER_API_URL=http://noetl.noetl.svc.cluster.local:8082
Build and Deploy
# Build image with NATS support
task docker-build-noetl
# Load to kind
kind load docker-image local/noetl:$(cat .noetl_last_build_tag.txt) --name noetl
# Update deployments
kubectl set image deployment/noetl-server noetl-server=local/noetl:$(cat .noetl_last_build_tag.txt) -n noetl
kubectl set image deployment/noetl-worker worker=local/noetl:$(cat .noetl_last_build_tag.txt) -n noetl
Testing
# Start execution
curl -X POST http://localhost:8082/api/v2/execute \
-H "Content-Type: application/json" \
-d '{"path": "tests/fixtures/playbooks/hello_world", "payload": {"message": "NATS Test"}}'
# Check NATS stream
kubectl exec -n nats nats-0 -- nats stream info NOETL_COMMANDS
# Check NATS consumer
kubectl exec -n nats nats-0 -- nats consumer info NOETL_COMMANDS noetl-worker-pool
# Check worker logs
kubectl logs -n noetl -l app=noetl-worker --tail=50
Benefits
- No Polling: Workers react instantly to new commands
- Scalability: Multiple workers share durable consumer
- Reliability: NATS handles message persistence and retry
- Decoupling: Workers don't need direct database access for queue
- Observability: NATS monitoring shows message flow
- Lightweight: Notifications are small JSON messages (~100 bytes)
Migration from V1
V1 (Database Polling):
- Workers poll
noetl.queueevery N seconds - High database load with many workers
- Latency = poll interval
- No message acknowledgment
V2 (NATS):
- Workers subscribe to NATS
- Instant notification (< 10ms)
- Database only for command details
- NATS handles acknowledgment and retry
Monitoring
# NATS stream stats
kubectl exec -n nats nats-0 -- nats stream report
# Consumer lag
kubectl exec -n nats nats-0 -- nats consumer report NOETL_COMMANDS
# Worker subscription status
kubectl logs -n noetl -l app=noetl-worker | grep "Subscribed to"
Troubleshooting
Workers not receiving commands:
- Check NATS connection:
kubectl logs -n nats nats-0 - Verify stream exists:
nats stream info NOETL_COMMANDS - Check consumer:
nats consumer info NOETL_COMMANDS noetl-worker-pool - Verify NATS_URL environment variable
Commands stuck in queue:
- Check queue status:
SELECT status, COUNT(*) FROM noetl.queue GROUP BY status - Verify NATS published:
nats stream view NOETL_COMMANDS - Check worker logs for errors
Messages not acknowledged:
- Check ack timeout (30s default)
- Verify worker emits events successfully
- Check max_deliver limit (3 retries)
Next Steps
- Monitoring: Add Prometheus metrics for NATS message flow
- Dead Letter Queue: Handle messages that exceed max_deliver
- Priority Queues: Use NATS stream subjects for priority (e.g.,
noetl.commands.high) - Worker Groups: Different consumer groups for different tool types
- Message Tracing: Add correlation IDs for end-to-end observability