Skip to main content

NATS K/V Distributed Cache

NoETL uses NATS JetStream Key-Value (K/V) store for distributed loop state management, enabling horizontal scaling of server pods.

Overview

Previously, NoETL used an in-memory cache (_memory_cache) in engine.py to store execution state, including loop iteration results. This approach worked for single-server deployments but prevented horizontal scaling because each server pod maintained its own isolated state.

NATS K/V Solution:

  • Stores loop state in a distributed NATS JetStream K/V bucket
  • Multiple server pods share the same execution state
  • Workers can process loop iterations across any available server
  • Atomic updates with optimistic locking prevent race conditions

Architecture

Components

Server (engine.py):

  1. On loop initialization: Store initial state in NATS K/V
  2. After each step.exit: Append result to NATS K/V
  3. On loop completion check: Read iteration count from NATS K/V

NATS K/V Cache (nats_kv.py):

  • Connects to NATS server at nats://nats.nats.svc.cluster.local:4222
  • K/V bucket: noetl_execution_state (1hr TTL, 1MB max value)
  • Key format: exec:{execution_id}:loop:{step_name}
  • Atomic append with optimistic locking (5 retries, exponential backoff)

Data Flow

Architecture:

Server Pod (uvicorn workers 1-N) → NATS K/V

Worker Pods 1-3 (via server API)

Loop Execution:

1. Loop Init:
Server → NATS K/V: {collection_size: 100, results: [], iterator: "item", mode: "sequential"}

2. Step Exit (each iteration):
Worker → Server → NATS K/V: append result to results array (atomic)

3. Loop Completion Check:
Server → NATS K/V: get completed_count = len(results)
Server: if completed_count < collection_size → create next command

4. Loop Done:
Server → NATS K/V: mark loop as completed
Server → NATS K/V: delete execution state (cleanup)

Configuration

Environment Variables:

# NATS server URL (with credentials in manifest ConfigMaps)
NATS_URL=nats://noetl:[email protected]:4222

# NATS credentials (default: noetl/noetl)
NATS_USER=noetl
NATS_PASSWORD=noetl

Kubernetes ConfigMaps:

  • ci/manifests/noetl/configmap-server.yaml - Server NATS config
  • ci/manifests/noetl/configmap-worker.yaml - Worker NATS config

API Reference

NATSKVCache Class

File: noetl/core/cache/nats_kv.py

Methods:

async def connect(nats_url: Optional[str] = None) -> None:
"""Connect to NATS and create/get K/V bucket."""

async def get_loop_state(execution_id: str, step_name: str) -> Optional[dict]:
"""Retrieve loop state for execution/step."""

async def set_loop_state(execution_id: str, step_name: str, state: dict) -> bool:
"""Store complete loop state."""

async def append_loop_result(execution_id: str, step_name: str, result: Any) -> bool:
"""Atomically append result to loop results array (optimistic locking)."""

async def delete_execution_state(execution_id: str) -> bool:
"""Delete all keys for an execution (cleanup)."""

Singleton Instance

from noetl.core.cache import get_nats_cache

nats_cache = await get_nats_cache()
await nats_cache.append_loop_result("123", "fetch_data", {"id": 1})

Integration Points

Engine.py Changes

1. Loop Initialization (lines 877-910):

# Get completed count from NATS K/V (authoritative)
nats_cache = await get_nats_cache()
nats_loop_state = await nats_cache.get_loop_state(str(state.execution_id), step.step)

if nats_loop_state:
completed_count = len(nats_loop_state.get("results", []))
else:
# Initialize and store in NATS K/V
await nats_cache.set_loop_state(state.execution_id, step.step, {...})

2. Result Storage (lines 1132-1150):

# Add to local state
state.add_loop_result(event.step, result)

# Sync to NATS K/V (distributed)
nats_cache = await get_nats_cache()
await nats_cache.append_loop_result(state.execution_id, event.step, result)

3. Completion Check (lines 1197-1250):

# Read from NATS K/V for authoritative count
nats_cache = await get_nats_cache()
nats_loop_state = await nats_cache.get_loop_state(state.execution_id, event.step)

if nats_loop_state:
completed_count = len(nats_loop_state.get("results", []))
collection_size = nats_loop_state.get("collection_size", 0)

Atomic Updates

Optimistic Locking

The append_loop_result() method uses NATS K/V revision numbers for atomic updates:

async def append_loop_result(self, execution_id: str, step_name: str, result: Any) -> bool:
max_retries = 5
for attempt in range(max_retries):
try:
# Get current state with revision
entry = await self._kv.get(key)
state = json.loads(entry.value.decode('utf-8'))

# Append result
state["results"].append(result)

# Update with revision check (optimistic lock)
value = json.dumps(state).encode('utf-8')
await self._kv.update(key, value, last=entry.revision)
return True

except Exception as e:
if "wrong last sequence" in str(e) and attempt < max_retries - 1:
await asyncio.sleep(0.01 * (attempt + 1)) # Exponential backoff
continue
return False

Retry Strategy:

  • 5 attempts maximum
  • Exponential backoff: 10ms, 20ms, 30ms, 40ms, 50ms
  • Handles concurrent updates from multiple server pods

Scaling

Server Scaling (Uvicorn Workers)

The server uses uvicorn with multiple worker processes (not Kubernetes replicas):

# ci/manifests/noetl/configmap-server.yaml
NOETL_SERVER_WORKERS: "4" # Uvicorn worker processes

Server Architecture:

  • Single Kubernetes pod (1 replica)
  • Multiple uvicorn workers inside the pod
  • Shared NATS K/V state across uvicorn workers

Worker Scaling (Kubernetes Pods)

Scale worker pods horizontally for distributed execution:

# k8s/worker-deployment.yaml
spec:
replicas: 3 # Multiple worker pods

Benefits:

  • Load balancing across worker instances
  • High availability (failover)
  • Workers subscribe to NATS for command notifications
  • Query server API for command details

Testing

Validate NATS K/V Integration

# Deploy full environment with NATS
task bring-all

# Test loop execution (http_to_postgres_iterator)
task test-http-to-postgres-iterator-full

# Check NATS K/V bucket
kubectl exec -it nats-0 -n nats -- nats kv ls
kubectl exec -it nats-0 -n nats -- nats kv get noetl_execution_state

Multi-Worker Testing

# Scale workers to 3 pods
kubectl scale deployment noetl-worker --replicas=3

# Verify worker pods are running
kubectl get pods -n noetl -l app=noetl-worker

# Run loop test and verify distributed execution
task test:regression:full

Performance

Metrics

Before (in-memory cache):

  • Single server pod only
  • No horizontal scaling
  • 360ms+ per iteration (with database queries)

After (NATS K/V):

  • Horizontal scaling with multiple server pods
  • ~10-50ms per NATS K/V operation
  • Atomic updates with retry logic

Optimization

Collection Rendering:

  • Render collection once during initialization
  • Store collection_size in NATS K/V (not full collection)
  • Reduces template rendering overhead

Result Count:

  • Use len(results) instead of database COUNT(*) queries
  • Eliminates 360ms+ database roundtrip per iteration

Cleanup

Loop state is automatically cleaned up:

TTL-based:

  • NATS K/V bucket has 1-hour TTL
  • Expired keys automatically deleted by NATS

Manual cleanup:

nats_cache = await get_nats_cache()
await nats_cache.delete_execution_state(execution_id)

Cleanup happens on:

  • Execution COMPLETED event
  • Execution FAILED event
  • Manual cleanup (optional)

Migration Notes

Removing In-Memory Cache

Future task: Remove _memory_cache from engine.py (line 325):

# BEFORE
_memory_cache: dict[str, ExecutionState] = {}

# AFTER (to be implemented)
# Cache removed - use NATS K/V only

Impact:

  • Server becomes fully stateless
  • All state reconstructed from events + NATS K/V
  • Enables true horizontal scaling

Backward Compatibility

The current implementation maintains local cache as fallback:

if nats_loop_state:
# Use NATS K/V (authoritative)
completed_count = len(nats_loop_state.get("results", []))
else:
# Fallback to local cache
loop_state = state.loop_state.get(step.step)
completed_count = len(loop_state.get("results", []))

This ensures gradual migration without breaking existing executions.

See Also