HTTP to PostgreSQL Data Transfer
This guide demonstrates various patterns for transferring data from HTTP APIs to PostgreSQL databases.
:::tip Working Examples Complete, tested data transfer playbooks are available in the repository:
Pattern Overview
| Pattern | Complexity | Best For |
|---|---|---|
| Transfer Tool | Low | Simple field mapping, production ETL |
| Python Batch | Medium | Custom transformations |
| Iterator | High | Large datasets, per-record logic |
| Direct SQL | Low | Quick prototypes |
Transfer Tool Pattern
The simplest approach using NoETL's built-in transfer tool:
apiVersion: noetl.io/v1
kind: Playbook
metadata:
name: http_to_postgres_transfer
path: data_transfer/http_to_postgres_transfer
workload:
api_url: "https://jsonplaceholder.typicode.com/posts"
target_table: "public.posts"
workflow:
- step: start
next:
- step: transfer_data
- step: transfer_data
tool: transfer
source:
type: http
url: "{{ workload.api_url }}"
method: GET
target:
type: postgres
auth:
type: postgres
credential: pg_demo
table: "{{ workload.target_table }}"
mode: insert
mapping:
post_id: id
user_id: userId
title: title
body: body
next:
- step: end
- step: end
Advantages:
- No code required
- Declarative field mapping
- Built-in error handling
- Automatic batching
Python Batch Pattern
For custom transformations and validation:
apiVersion: noetl.io/v1
kind: Playbook
metadata:
name: http_to_postgres_python
path: data_transfer/http_to_postgres_python
workload:
api_url: "https://jsonplaceholder.typicode.com/posts"
workflow:
- step: start
next:
- step: fetch_data
- step: fetch_data
tool: http
method: GET
endpoint: "{{ workload.api_url }}"
vars:
posts: "{{ result.data }}"
next:
- step: transform_data
- step: transform_data
tool:
kind: python
code: |
def main(posts):
"""Transform and validate posts data."""
transformed = []
for post in posts:
# Validate required fields
if not post.get('id') or not post.get('title'):
continue
transformed.append({
'post_id': post['id'],
'user_id': post.get('userId', 0),
'title': post['title'][:255], # Truncate if needed
'body': post.get('body', '')[:1000],
'word_count': len(post.get('body', '').split())
})
return {'records': transformed, 'count': len(transformed)}
args:
posts: "{{ vars.posts }}"
vars:
transformed_posts: "{{ result.data.records }}"
next:
- step: insert_data
- step: insert_data
tool:
kind: postgres
auth:
type: postgres
credential: pg_demo
query: |
INSERT INTO public.posts (post_id, user_id, title, body, word_count)
SELECT
(p->>'post_id')::int,
(p->>'user_id')::int,
p->>'title',
p->>'body',
(p->>'word_count')::int
FROM jsonb_array_elements('{{ vars.transformed_posts | tojson }}'::jsonb) p
ON CONFLICT (post_id) DO UPDATE SET
title = EXCLUDED.title,
body = EXCLUDED.body,
word_count = EXCLUDED.word_count
RETURNING post_id;
vars:
inserted_count: "{{ result.data.command_1 | length }}"
next:
- step: end
- step: end
Advantages:
- Full control over transformations
- Custom validation logic
- Dynamic field generation
Loop Pattern
For large datasets or per-record processing:
apiVersion: noetl.io/v2
kind: Playbook
metadata:
name: http_to_postgres_loop
path: data_transfer/http_to_postgres_loop
workload:
api_url: "https://jsonplaceholder.typicode.com/posts"
workflow:
- step: start
next:
- step: fetch_data
- step: fetch_data
tool:
kind: http
method: GET
url: "{{ workload.api_url }}"
vars:
all_posts: "{{ result.data }}"
next:
- step: insert_posts
- step: insert_posts
tool:
kind: postgres
auth:
type: postgres
credential: pg_demo
query: |
INSERT INTO public.posts (post_id, user_id, title, body)
VALUES (
{{ current_post.id }},
{{ current_post.userId }},
'{{ current_post.title | replace("'", "''") }}',
'{{ current_post.body | replace("'", "''") }}'
)
ON CONFLICT (post_id) DO UPDATE SET
title = EXCLUDED.title,
body = EXCLUDED.body;
loop:
in: "{{ vars.all_posts }}"
iterator: current_post
mode: sequential
next:
- step: end
- step: end
Advantages:
- Lower memory footprint
- Per-record error handling
- Progress tracking
Direct SQL Pattern
Quick prototyping with direct JSON insertion:
apiVersion: noetl.io/v1
kind: Playbook
metadata:
name: http_to_postgres_direct
path: data_transfer/http_to_postgres_direct
workload:
api_url: "https://jsonplaceholder.typicode.com/posts"
workflow:
- step: start
next:
- step: fetch_and_insert
- step: fetch_and_insert
tool:
kind: http
method: GET
endpoint: "{{ workload.api_url }}"
vars:
raw_data: "{{ result.data }}"
next:
- step: bulk_insert
- step: bulk_insert
tool:
kind: postgres
auth:
type: postgres
credential: pg_demo
query: |
INSERT INTO public.posts_raw (data, fetched_at)
VALUES (
'{{ vars.raw_data | tojson }}'::jsonb,
NOW()
);
next:
- step: end
- step: end
Advantages:
- Minimal configuration
- Fast prototyping
- Preserves raw data
Paginated API Transfer
For APIs with pagination:
apiVersion: noetl.io/v1
kind: Playbook
metadata:
name: paginated_api_transfer
path: data_transfer/paginated_api
workload:
api_base: "https://api.example.com"
workflow:
- step: start
next:
- step: fetch_all_pages
- step: fetch_all_pages
tool:
kind: http
method: GET
url: "{{ workload.api_base }}/items"
params:
page: 1
per_page: 100
loop:
pagination:
type: response_based
continue_while: "{{ response.data.meta.has_more }}"
next_page:
params:
page: "{{ (response.data.meta.page | int) + 1 }}"
merge_strategy: append
merge_path: data.items
max_iterations: 50
vars:
all_items: "{{ result.data }}"
next:
- step: batch_insert
- step: batch_insert
tool:
kind: python
libs: {}
args:
items: "{{ vars.all_items }}"
batch_size: 1000
code: |
batches = []
for i in range(0, len(items), batch_size):
batches.append(items[i:i+batch_size])
result = {
'batches': batches,
'total_items': len(items),
'batch_count': len(batches)
}
vars:
batches: "{{ result.batches }}"
next:
- step: insert_batches
- step: insert_batches
tool:
kind: postgres
auth:
type: postgres
credential: pg_demo
query: |
INSERT INTO public.items (id, name, value, updated_at)
SELECT
(item->>'id')::int,
item->>'name',
(item->>'value')::numeric,
NOW()
FROM jsonb_array_elements('{{ current_batch | tojson }}'::jsonb) item
ON CONFLICT (id) DO UPDATE SET
name = EXCLUDED.name,
value = EXCLUDED.value,
updated_at = EXCLUDED.updated_at;
loop:
in: "{{ vars.batches }}"
iterator: current_batch
mode: sequential
next:
- step: end
- step: end
Multi-Database Transfer
Transfer to multiple databases in parallel:
apiVersion: noetl.io/v2
kind: Playbook
metadata:
name: http_to_multi_db
path: data_transfer/multi_db
workload:
api_url: "https://api.example.com/data"
workflow:
- step: start
next:
- step: fetch_data
- step: fetch_data
tool:
kind: http
method: GET
url: "{{ workload.api_url }}"
vars:
data: "{{ result.data }}"
next:
- step: postgres_insert
- step: analytics_insert # Parallel execution
- step: postgres_insert
tool:
kind: postgres
auth:
type: postgres
credential: pg_primary
query: |
INSERT INTO app_data.records (data, source)
VALUES ('{{ vars.data | tojson }}'::jsonb, 'api')
next:
- step: end
- step: analytics_insert
tool: duckdb
auth:
type: gcs
credential: gcp_service_account
query: |
COPY (
SELECT
value->>'id' as id,
value->>'name' as name,
current_timestamp as ingested_at
FROM (
SELECT unnest(from_json('{{ vars.data | tojson }}', '["json"]')) as value
)
) TO 'gs://analytics-bucket/ingested/{{ execution_id }}.parquet' (FORMAT PARQUET);
next:
- step: end
- step: end
Error Handling
Handle API and database errors gracefully:
- step: fetch_data
tool: http
method: GET
endpoint: "{{ workload.api_url }}"
retry:
max_attempts: 3
initial_delay: 1.0
retryable_status_codes: [429, 500, 502, 503]
next:
- when: "{{ fetch_data.status == 'error' }}"
then:
- step: handle_fetch_error
- step: process_data
- step: handle_fetch_error
tool: python
code: |
def main(error):
return {
'status': 'failed',
'error': error,
'action': 'manual_review_required'
}
args:
error: "{{ fetch_data.error }}"
next:
- step: end
Performance Tips
- Batch inserts: Use
jsonb_array_elementsfor bulk operations - Use COPY for large datasets: DuckDB + GCS for high throughput
- Parallel execution: Split data and process concurrently
- Connection pooling: Enable for high-frequency inserts
- Appropriate page sizes: Balance API limits and memory