Skip to main content

NoETL Rust Worker Pool

The Rust Worker Pool is a high-performance worker implementation that executes workflow commands received from the NoETL control plane via NATS messaging.

Overview

Architecture

┌─────────────────┐     ┌─────────────┐     ┌──────────────────┐
│ NoETL Server │────▶│ NATS │────▶│ Rust Worker │
│ (Control Plane)│ │ JetStream │ │ Pool │
└─────────────────┘ └─────────────┘ └──────────────────┘
│ │
│◀───────── HTTP Events/Heartbeats ─────────│
└────────────────────────────────────────────┘

Components

ComponentDescription
noetl-toolsShared tool library with implementations for shell, HTTP, Rhai, DuckDB, PostgreSQL, Python, Snowflake, Transfer, and Script tools
worker-poolWorker binary that subscribes to NATS and executes commands

Supported Tools

ToolDescription
shellExecute shell commands
httpMake HTTP requests with authentication
rhaiExecute Rhai scripts
duckdbQuery DuckDB databases
postgresQuery PostgreSQL databases
pythonExecute Python scripts
snowflakeExecute Snowflake SQL queries
transferTransfer data between databases
scriptExecute scripts as Kubernetes jobs

Local Development

Prerequisites

  • Rust 1.75+ (install via rustup)
  • Docker (for containerized dependencies)
  • NATS Server (for message queue)

Building from Source

# Clone the repository
git clone https://github.com/noetl/noetl.git
cd noetl

# Build in debug mode
cargo build -p worker-pool

# Build in release mode (optimized)
cargo build --release -p worker-pool

# Run tests
cargo test -p noetl-tools
cargo test -p worker-pool

Running Locally

1. Start Dependencies

# Start NATS with JetStream
docker run -d --name nats \
-p 4222:4222 \
-p 8222:8222 \
nats:latest -js

# Start PostgreSQL (optional, for postgres tool)
docker run -d --name postgres \
-e POSTGRES_PASSWORD=demo \
-e POSTGRES_USER=noetl \
-e POSTGRES_DB=noetl \
-p 5432:5432 \
postgres:15

2. Configure Environment

Create a .env file:

# Worker Configuration
WORKER_ID=local-worker-1
WORKER_POOL_NAME=local-pool
NOETL_SERVER_URL=http://localhost:8082
NATS_URL=nats://localhost:4222
NATS_STREAM=NOETL_COMMANDS
NATS_CONSUMER=worker-pool
WORKER_HEARTBEAT_INTERVAL=15
WORKER_MAX_CONCURRENT=4

# Logging
RUST_LOG=info,worker_pool=debug,noetl_tools=debug

3. Run the Worker

# Using cargo
cargo run -p worker-pool

# Or run the release binary directly
./target/release/noetl-worker

Development Workflow

# Watch for changes and rebuild
cargo watch -x 'build -p worker-pool'

# Run with specific log level
RUST_LOG=debug cargo run -p worker-pool

# Run clippy for linting
cargo clippy -p noetl-tools -p worker-pool

# Format code
cargo fmt --all

Kind Cluster Deployment

Prerequisites

Setup Kind Cluster

# Create kind cluster with ingress support
cat <<EOF | kind create cluster --config=-
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
- role: control-plane
kubeadmConfigPatches:
- |
kind: InitConfiguration
nodeRegistration:
kubeletExtraArgs:
node-labels: "ingress-ready=true"
extraPortMappings:
- containerPort: 80
hostPort: 80
protocol: TCP
- containerPort: 443
hostPort: 443
protocol: TCP
- role: worker
- role: worker
EOF

# Verify cluster
kubectl cluster-info

Build and Load Images

# Build the worker-pool image
docker build -f crates/worker-pool/Dockerfile -t noetl-worker-pool:latest .

# Load into kind cluster
kind load docker-image noetl-worker-pool:latest

# Build and load NoETL server image (if needed)
docker build -f docker/noetl/pip/Dockerfile -t noetl:latest .
kind load docker-image noetl:latest

Deploy Infrastructure

# Deploy NATS
helm repo add nats https://nats-io.github.io/k8s/helm/charts/
helm install nats nats/nats \
--namespace nats \
--create-namespace \
--set nats.jetstream.enabled=true

# Deploy PostgreSQL
helm repo add bitnami https://charts.bitnami.com/bitnami
helm install postgres bitnami/postgresql \
--namespace postgres \
--create-namespace \
--set auth.postgresPassword=demo \
--set auth.database=noetl

Deploy NoETL Stack

Using Helm Directly

# Deploy with local images
helm upgrade --install noetl automation/helm/noetl \
--namespace noetl \
--create-namespace \
--set image.repository=noetl \
--set image.tag=latest \
--set image.pullPolicy=Never \
--set workerPool.enabled=true \
--set workerPool.image.repository=noetl-worker-pool \
--set workerPool.image.tag=latest \
--set workerPool.image.pullPolicy=Never

Using Playbook

# Deploy full stack
noetl run automation/deployment/noetl-stack.yaml \
--set action=deploy \
--set registry='' \
--set namespace=noetl

Verify Deployment

# Check pods
kubectl get pods -n noetl

# Check logs
kubectl logs -n noetl -l app=noetl-worker-pool --tail=50

# Port forward to access server
kubectl port-forward -n noetl svc/noetl 8082:8082

Cleanup

# Remove NoETL
helm uninstall noetl -n noetl

# Delete kind cluster
kind delete cluster

GKE (Google Kubernetes Engine) Deployment

Prerequisites

  • Google Cloud account with billing enabled
  • gcloud CLI installed and configured
  • GKE cluster created
  • Artifact Registry repository for images

Setup GCP Resources

# Set project
export PROJECT_ID=your-project-id
export REGION=us-central1
export CLUSTER_NAME=noetl-cluster

gcloud config set project $PROJECT_ID

# Create Artifact Registry repository
gcloud artifacts repositories create noetl \
--repository-format=docker \
--location=$REGION \
--description="NoETL container images"

# Configure Docker authentication
gcloud auth configure-docker ${REGION}-docker.pkg.dev

# Create GKE Autopilot cluster (recommended)
gcloud container clusters create-auto $CLUSTER_NAME \
--region=$REGION \
--project=$PROJECT_ID

# Or create Standard cluster
gcloud container clusters create $CLUSTER_NAME \
--region=$REGION \
--num-nodes=3 \
--machine-type=e2-standard-4 \
--enable-autoscaling \
--min-nodes=1 \
--max-nodes=10

Build and Push Images

# Set registry
export REGISTRY=${REGION}-docker.pkg.dev/${PROJECT_ID}/noetl

# Build worker-pool
docker build -f crates/worker-pool/Dockerfile \
-t ${REGISTRY}/noetl-worker-pool:latest \
-t ${REGISTRY}/noetl-worker-pool:$(git rev-parse --short HEAD) \
.

# Push to Artifact Registry
docker push ${REGISTRY}/noetl-worker-pool:latest
docker push ${REGISTRY}/noetl-worker-pool:$(git rev-parse --short HEAD)

# Build and push NoETL server
docker build -f docker/noetl/pip/Dockerfile \
-t ${REGISTRY}/noetl:latest \
.
docker push ${REGISTRY}/noetl:latest

Using Playbook for Build/Push

noetl run automation/deployment/worker-pool.yaml \
--set action=all \
--set registry=${REGISTRY}

Deploy to GKE

Configure kubectl

gcloud container clusters get-credentials $CLUSTER_NAME \
--region=$REGION \
--project=$PROJECT_ID

Deploy Infrastructure

# Deploy NATS with JetStream
helm repo add nats https://nats-io.github.io/k8s/helm/charts/
helm install nats nats/nats \
--namespace nats \
--create-namespace \
--set nats.jetstream.enabled=true \
--set nats.jetstream.fileStore.pvc.size=10Gi

# Deploy Cloud SQL Proxy or managed PostgreSQL
# Option 1: Cloud SQL with proxy
# Option 2: Self-managed PostgreSQL
helm install postgres bitnami/postgresql \
--namespace postgres \
--create-namespace \
--set auth.postgresPassword=your-secure-password \
--set auth.database=noetl \
--set primary.persistence.size=50Gi \
--set primary.resources.requests.memory=1Gi \
--set primary.resources.requests.cpu=500m

Deploy NoETL Stack

# Create values override file
cat > gke-values.yaml <<EOF
namespace: noetl

image:
repository: ${REGISTRY}/noetl
tag: latest
pullPolicy: Always

workerPool:
enabled: true
poolName: worker-rust-pool
replicas: 3
image:
repository: ${REGISTRY}/noetl-worker-pool
tag: latest
pullPolicy: Always
resources:
requests:
cpu: "250m"
memory: "512Mi"
limits:
cpu: "2"
memory: "2Gi"

worker:
replicas: 2
resources:
requests:
cpu: "250m"
memory: "512Mi"
limits:
cpu: "1"
memory: "1Gi"

persistence:
data:
enabled: true
storageClassName: standard-rwx
size: 50Gi
logs:
enabled: true
storageClassName: standard
size: 20Gi

ingress:
enabled: true
className: gce
host: noetl.your-domain.com
tls:
enabled: true
managedCertificate:
enabled: true
name: noetl-cert

config:
workerPool:
RUST_LOG: "info,worker_pool=info,noetl_tools=info"
WORKER_POOL_NAME: "gke-rust-pool"
WORKER_MAX_CONCURRENT: "8"
EOF

# Deploy
helm upgrade --install noetl automation/helm/noetl \
--namespace noetl \
--create-namespace \
-f gke-values.yaml \
--wait \
--timeout 10m

Using Playbook

noetl run automation/deployment/noetl-stack.yaml \
--set action=deploy \
--set registry=${REGISTRY} \
--set namespace=noetl

Production Considerations

Resource Sizing

ComponentCPU RequestMemory RequestCPU LimitMemory Limit
Server500m1Gi24Gi
Python Worker250m512Mi12Gi
Rust Worker Pool250m512Mi22Gi

Autoscaling

# Add HPA for worker-pool
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: noetl-worker-pool-hpa
namespace: noetl
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: noetl-worker-pool
minReplicas: 2
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80

Monitoring

# Deploy Prometheus/Grafana for monitoring
helm repo add prometheus-community https://prometheus-community.github.io/helm-charts
helm install prometheus prometheus-community/kube-prometheus-stack \
--namespace monitoring \
--create-namespace

Troubleshooting

Check Pod Status

kubectl get pods -n noetl -o wide
kubectl describe pod -n noetl <pod-name>

View Logs

# Worker pool logs
kubectl logs -n noetl -l app=noetl-worker-pool --tail=100 -f

# All components
kubectl logs -n noetl -l app=noetl-server --tail=50
kubectl logs -n noetl -l app=noetl-worker --tail=50

Debug Connectivity

# Test NATS connection
kubectl run nats-test --rm -it --restart=Never \
--image=natsio/nats-box:latest \
-- nats sub -s nats://nats.nats.svc.cluster.local:4222 ">"

# Test server connectivity
kubectl run curl-test --rm -it --restart=Never \
--image=curlimages/curl:latest \
-- curl -v http://noetl.noetl.svc.cluster.local:8082/api/health

Common Issues

IssueSolution
Pod stuck in PendingCheck resource quotas, node capacity
CrashLoopBackOffCheck logs, verify NATS/server connectivity
ImagePullBackOffVerify registry credentials, image exists
Connection refusedCheck service endpoints, network policies

Configuration Reference

Environment Variables

VariableDescriptionDefault
WORKER_IDUnique worker identifierAuto-generated UUID
WORKER_POOL_NAMEPool name for grouping workersdefault
NOETL_SERVER_URLControl plane server URLhttp://localhost:8082
NATS_URLNATS server connection URLnats://localhost:4222
NATS_STREAMJetStream stream namenoetl_commands
NATS_CONSUMERConsumer nameworker-pool
WORKER_HEARTBEAT_INTERVALHeartbeat interval (seconds)15
WORKER_MAX_CONCURRENTMax concurrent tasks4
RUST_LOGLog level configurationinfo

Helm Values

See values.yaml for complete configuration options.


API Reference

Health Check

The worker pool exposes health information via the control plane API:

curl http://noetl-server:8082/api/workers

Supported Tool Configurations

See individual tool documentation: