Skip to main content

Vars Block Implementation Summary

Status: ✅ FULLY IMPLEMENTED AND TESTED

Implementation Date: December 2025
Test Execution: 508176494351351907 (successful - 4 variables extracted)
REST API: /api/vars/* endpoints available for variable management

Feature Overview

The vars block feature enables declarative extraction of values from step execution results. Variables are automatically stored in the transient database table and become accessible in subsequent workflow steps through template syntax.

Architecture:

  • Server: Direct database access via TransientVars service (pool connections)
  • Workers: REST API access (/api/vars/{execution_id}) - NO direct database connections
  • Database: noetl.transient table with execution-scoped isolation

Design Decisions

1. Template Syntax: {{ result.field }}

Decision: Use {{ result.field }} for current step result access within vars block.

Rejected Alternatives:

  • {{ STEP.step_name.data.field }} - Too verbose, adds unnecessary nesting
  • {{ step_name.data.field }} - Requires extra .data extraction

Rationale:

  • Simpler and more intuitive for users
  • Consistent with other template namespaces (workload, vars)
  • "result" clearly indicates the current step's output
  • Reduces cognitive overhead and typing

2. Variable Type: step_result

Decision: Store extracted variables with var_type='step_result' in transient table.

Rationale:

  • Aligns with existing database constraint: CHECK (var_type IN ('user_defined', 'step_result', 'computed', 'iterator_state'))
  • Clear semantic meaning: variable originated from step execution result
  • Distinguishes from future user_defined variables (set via API)
  • Enables filtering and analytics by variable source

3. Direct Step Name References

Decision: Use direct step names without wrapper objects (e.g., {{ step_name.field }} not {{ STEP.step_name.field }}).

Rationale:

  • Consistency with existing namespaces: {{ workload.field }}, {{ vars.field }}
  • Less typing, cleaner YAML playbooks
  • Reduces complexity for end users
  • Standard Jinja2 pattern - direct object property access

4. Processing Location: Server-Side Orchestrator

Decision: Process vars block in orchestrator.py after step_completed event emission.

Rationale:

  • Server has full eval_ctx with all step results
  • Centralized variable storage logic
  • Worker remains stateless (no local state management)
  • Natural event-driven timing: after step completes, before next transitions

5. Result Normalization: Extract .data Field

Decision: Server normalizes step results by extracting .data field when present.

Implementation:

  • eval_ctx[step_name] contains the step's actual return value, not envelope
  • If step returns {"status": "success", "data": {...}}, only {...} is stored
  • Templates access fields directly: {{ step_name.field }} not {{ step_name.data.field }}

Rationale:

  • Simplifies template expressions for most common case
  • Handles both envelope and direct return patterns transparently
  • Reduces user confusion about data structure

Implementation Details

Code Location

File: noetl/server/api/run/orchestrator.py
Function: _process_step_vars() (lines 750-834)
Integration: Called from _process_transitions() after step_completed event (~line 1005)

Processing Flow

1. Worker executes step → Reports action_completed
2. Orchestrator processes completion → Emits step_completed event
3. Orchestrator calls _process_step_vars()
a. Extracts vars dict from step definition
b. Renders each template using eval_ctx (with 'result' pointing to current step)
c. Stores rendered variables via TransientVars.set_multiple()
d. Logs success/error for each variable
4. Orchestrator evaluates next transitions
5. Subsequent steps load vars into template context
6. Worker receives args with rendered {{ vars.* }} values

Key Code Implementation

async def _process_step_vars(
execution_id: int,
step_name: str,
step_def: dict,
eval_ctx: dict
) -> None:
"""
Process vars block to extract values from step result.

Args:
execution_id: Current execution identifier
step_name: Name of completed step
step_def: Step definition from playbook
eval_ctx: Template context with all step results
- eval_ctx['result']: Current step's result (normalized)
- eval_ctx['step_name']: Previous steps' results
"""
vars_block = step_def.get("vars")
if not vars_block:
return

logger.info(f"Processing vars block for step '{step_name}'")

# Render templates
env = Environment(loader=BaseLoader())
rendered_vars = {}

for var_name, var_template in vars_block.items():
try:
template = env.from_string(str(var_template))
rendered_value = template.render(eval_ctx)
rendered_vars[var_name] = rendered_value
logger.info(f"✓ Rendered var '{var_name}': {rendered_value}")
except Exception as e:
logger.error(f"✗ Failed to render var '{var_name}': {e}")

# Store in transient
if rendered_vars:
count = await TransientVars.set_multiple(
variables=rendered_vars,
execution_id=execution_id,
var_type="step_result",
source_step=step_name
)
logger.info(f"✓ Stored {count} variables from step '{step_name}'")

REST API Access

Variables are accessed via REST API for external systems and workers.

API Endpoints

Base Path: /api/vars

MethodEndpointDescriptionAccess Tracking
GET/api/vars/{execution_id}List all variables with metadataNo (bulk read)
GET/api/vars/{execution_id}/{var_name}Get single variableYes (increments count)
POST/api/vars/{execution_id}Set/update multiple variablesNo
DELETE/api/vars/{execution_id}/{var_name}Delete variableNo

Example Usage

Get all variables:

curl http://noetl-server:8080/api/vars/507861119290048685

Response:

{
"execution_id": 507861119290048685,
"variables": {
"user_id": {
"value": 12345,
"type": "step_result",
"source_step": "fetch_user",
"created_at": "2025-12-13T10:00:00Z",
"accessed_at": "2025-12-13T10:01:00Z",
"access_count": 5
}
},
"count": 1
}

Set variables:

curl -X POST http://noetl-server:8080/api/vars/507861119290048685 \
-H "Content-Type: application/json" \
-d '{
"variables": {"config_timeout": 60, "retry_enabled": true},
"var_type": "user_defined",
"source_step": "manual_config"
}'

Worker Access Pattern

Workers must use REST API for variable access:

import httpx
import os

SERVER_URL = os.getenv("NOETL_SERVER_URL", "http://localhost:8080")

async def load_variables(execution_id: int) -> dict:
"""Load all variables via REST API."""
async with httpx.AsyncClient() as client:
response = await client.get(f"{SERVER_URL}/api/vars/{execution_id}")
response.raise_for_status()
data = response.json()
return {name: var["value"] for name, var in data["variables"].items()}

Configuration:

# Set server URL for workers
export NOETL_SERVER_URL="http://noetl-server:8080"

DSL Syntax

Basic Extraction

- step: fetch_data
tool: postgres
query: "SELECT user_id, email FROM users WHERE active = true LIMIT 1"
vars:
user_id: "{{ result[0].user_id }}"
email: "{{ result[0].email }}"
next:
- step: send_notification

Using Extracted Variables

- step: send_notification
tool: http
method: POST
endpoint: "https://api.example.com/notify"
payload:
user_id: "{{ vars.user_id }}"
email: "{{ vars.email }}"
timestamp: "{{ workload.execution_time }}"

Complex Extraction

- step: analyze_data
tool:
kind: python
libs: {}
args: {}
code: |
# Pure Python code - no imports, no def main()
result = {
"status": "success",
"data": {
"users": [
{"id": 123, "name": "Alice"},
{"id": 456, "name": "Bob"}
],
"metadata": {
"count": 2,
"source": "production_db"
}
}
}
vars:
first_user_id: "{{ result.data.users[0].id }}"
first_user_name: "{{ result.data.users[0].name }}"
total_users: "{{ result.data.metadata.count }}"
data_source: "{{ result.data.metadata.source }}"

Template Namespace Reference

NamespaceScopeUsageExample
resultVars block onlyCurrent step's result{{ result.field }}
step_nameEntire workflowPrevious step result{{ fetch_data.users[0] }}
varsAfter definitionStored variables{{ vars.user_id }}
workloadEntire workflowGlobal workflow vars{{ workload.timeout }}
execution_idEntire workflowExecution identifier{{ execution_id }}

Database Schema

Table: transient

CREATE TABLE transient (
execution_id BIGINT NOT NULL,
var_name VARCHAR(255) NOT NULL,
var_type VARCHAR(50) NOT NULL CHECK (var_type IN (
'user_defined',
'step_result', -- ✅ Used by vars block
'computed',
'iterator_state'
)),
var_value JSONB NOT NULL,
source_step VARCHAR(255),
access_count INTEGER DEFAULT 0,
created_at TIMESTAMPTZ DEFAULT NOW(),
accessed_at TIMESTAMPTZ DEFAULT NOW(),
PRIMARY KEY (execution_id, var_name)
);

Test Results

Test Playbook: tests/fixtures/playbooks/vars_test/test_vars_block.yaml
Execution ID: 508176494351351907
Result: ✅ SUCCESS

Variables Extracted:

first_user_id: 123
first_email: "[email protected]"
user_count: 2
data_source: "test_db"

Worker Log Confirmation:

INFO Loaded 4 variables for execution 508176494351351907

Integration with Context Service

File: noetl/server/api/context/service.py
Function: build_rendering_context() (lines 136-151)

async def build_rendering_context(
playbook_data: dict,
extra_context: dict = None
) -> dict:
"""Build template rendering context."""
base_ctx = {
"workload": playbook_data.get("workload", {}),
"vars": {}, # Populated below
# ... other context ...
}

# Load stored variables
execution_id = extra_context.get("execution_id")
if execution_id:
vars_data = await TransientVars.get_all_vars(execution_id)
base_ctx["vars"] = vars_data
logger.info(f"✓ Loaded {len(vars_data)} variables")

return base_ctx
  • Design Document: docs/variables_feature_design.md - Complete feature design and rationale
  • DSL Specification: docs/dsl_spec.md - Vars block syntax and template namespace
  • Test Playbook: tests/fixtures/playbooks/vars_test/test_vars_block.yaml - Working example
  • TransientVars API: noetl/server/api/context/transient.py - Storage layer implementation

Future Enhancements (Not Implemented)

  1. Variable Management API (Phase 2 Task 3):

    • GET /api/vars/{execution_id} - List all variables
    • GET /api/vars/{execution_id}/{var_name} - Get specific variable
    • POST /api/vars/{execution_id} - Set variable manually (user_defined)
    • DELETE /api/vars/{execution_id}/{var_name} - Delete variable
  2. Computed Variables (var_type='computed'):

    • Variables calculated from expressions
    • Example: counter: "{{ vars.counter + 1 }}"
  3. Iterator State Variables (var_type='iterator_state'):

    • Loop metadata (index, current_item, etc.)
    • Automatic population during iterator execution

Migration Notes

No breaking changes - Feature is additive:

  • Existing playbooks continue to work without modification
  • vars block is optional
  • No changes to existing template syntax
  • TransientVars table already existed and migrated from execution_variable

Conclusion

The vars block feature is production-ready and provides a declarative, user-friendly way to extract and reuse values from step results. The design prioritizes simplicity and consistency with existing template patterns, while maintaining clean separation between variable storage (server-side) and execution (worker-side).