NATS JetStream Integration
Overview
NoETL uses NATS JetStream for event-driven worker coordination. The event table is the single source of truth - NATS provides lightweight notifications with event references.
Pure Event Sourcing Architecture
Workers receive event_id references via NATS and fetch command details from the event table through the API.
Flow:
- Client: POST /api/execute
- Server: Creates execution, emits command.issued events
- Server: Publishes to NATS:
{execution_id, event_id, command_id, step, server_url} - Worker: Receives notification via pull subscription
- Worker: GET
/api/commands/{event_id}to fetch details - Worker: Executes tool
- Worker: POST /api/events (emit command.completed)
- Server: Evaluates next steps, emits new command.issued events
Key Concepts
Event Table as Single Source of Truth
- command.issued: Server emits when a step is ready for execution
- command.claimed: Worker emits to atomically claim a command
- command.completed: Worker emits with execution result
- command.failed: Worker emits on execution failure
NATS Notifications
NATS carries lightweight references, not full command data:
{
"execution_id": 7341234567890123,
"event_id": 7341234567890124,
"command_id": "7341234567890123:step_name",
"step": "step_name",
"server_url": "http://noetl.noetl.svc.cluster.local:8082"
}
Workers use event_id to fetch full command details from /api/commands/{event_id}.
Components
NATS Client (noetl/core/messaging/nats_client.py)
NATSCommandPublisher (Server-side):
- Connects to NATS JetStream
- Creates NOETL_COMMANDS stream if not exists
- Publishes lightweight command notifications
NATSCommandSubscriber (Worker-side):
- Connects to NATS JetStream
- Creates durable pull consumer per worker pool
- Subscribes to noetl.commands subject
- Acknowledges/NAKs messages based on execution success
V2 Worker (noetl/worker/v2_worker_nats.py)
Architecture:
- Subscribe to NATS noetl.commands subject
- Receive notification:
{execution_id, event_id, command_id, step, server_url} - Emit command.claimed event (atomic claim via unique constraint)
- Fetch full command from GET
/api/commands/{event_id} - Execute tool based on tool.kind
- Emit command.completed or command.failed event
Benefits:
- No database polling
- Instant notification of new commands
- Horizontal scalability (multiple workers, one consumer group)
- Automatic retry via NATS (max_deliver=3)
- 30-second ack timeout
NATS Configuration
Stream: NOETL_COMMANDS
- Subjects: noetl.commands
- Retention: 1 hour (3600s)
- Storage: File-based (persistent)
Consumer: noetl-worker-pool
- Durable: Yes
- Ack policy: Explicit
- Max deliver: 3
- Ack wait: 30 seconds
Access:
- K8s: nats://noetl:noetl@nats.nats.svc.cluster.local:4222
- Local (via NodePort): nats://noetl:noetl@localhost:30422
Environment Variables
Server:
NATS_URL=nats://noetl:[email protected]:4222
SERVER_API_URL=http://noetl.noetl.svc.cluster.local:8082
Worker:
NATS_URL=nats://noetl:[email protected]:4222
SERVER_API_URL=http://noetl.noetl.svc.cluster.local:8082
Monitoring
# NATS stream stats
kubectl exec -n nats nats-0 -- nats stream report
# Consumer lag
kubectl exec -n nats nats-0 -- nats consumer report NOETL_COMMANDS
# Worker subscription status
kubectl logs -n noetl -l app=noetl-worker | grep "Subscribed to"
Troubleshooting
Workers not receiving commands:
- Check NATS connection: kubectl logs -n nats nats-0
- Verify stream exists: nats stream info NOETL_COMMANDS
- Check consumer: nats consumer info NOETL_COMMANDS noetl-worker-pool
- Verify NATS_URL environment variable
Commands not being claimed:
- Check event table: SELECT * FROM noetl.event WHERE event_type = 'command.issued' ORDER BY created_at DESC LIMIT 10
- Verify NATS published: nats stream view NOETL_COMMANDS
- Check worker logs for errors
Messages not acknowledged:
- Check ack timeout (30s default)
- Verify worker emits events successfully
- Check max_deliver limit (3 retries)