postgres
Execute PostgreSQL database operations for querying and manipulating data.
Overview
Execute PostgreSQL database operations for querying and manipulating data.
This step enables interaction with PostgreSQL databases for reading and writing data. You can execute SELECT queries, insert records, update existing data, delete records, or run arbitrary SQL statements. The connector supports parameterized queries to prevent SQL injection, connection string or individual parameter authentication, and returns results as dictionaries for easy data manipulation. Perfect for ETL pipelines, data enrichment, persistence, and analytics workflows.
Setup: 1. Set up a PostgreSQL database (local installation, cloud provider, or managed service) 2. Create a database and user with appropriate permissions 3. Note your connection details: host, port, database name, username, password 4. Store credentials securely (e.g., environment variables or connection string)
Connection: Requires host, port, database, username, and password
Examples
Query users
Fetch users from database with filtering
type: postgres
connection_string: ${env:DATABASE_URL}
operation: query
query: "SELECT id, email, name FROM users WHERE created_at > %(date)s"
params:
date: "2025-01-01"
output_to: users
Insert customer record
Insert a new customer into the database
type: postgres
connection_string: ${env:DATABASE_URL}
operation: insert
table: customers
data:
email: ${user.email}
name: ${user.name}
created_at: ${meta.timestamp}
output_to: inserted_customer
Update order status
Mark an order as shipped
type: postgres
connection_string: ${env:DATABASE_URL}
operation: update
table: orders
set:
status: shipped
shipped_at: NOW()
where: "id = %(order_id)s"
params:
order_id: ${order.id}
output_to: update_result
Delete old records
Clean up temporary data older than 7 days
type: postgres
connection_string: ${env:DATABASE_URL}
operation: delete
table: temp_data
where: "created_at < NOW() - INTERVAL '7 days'"
output_to: delete_result
Complex join query
Query orders with customer details
type: postgres
connection_string: ${env:DATABASE_URL}
operation: query
query: |
SELECT o.id, o.total, c.email, c.name
FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE o.status = %(status)s
ORDER BY o.created_at DESC
LIMIT 10
params:
status: pending
output_to: pending_orders
Configuration
| Parameter | Type | Required | Description |
|---|---|---|---|
connection_string | string | No | PostgreSQL connection string (postgresql://user:pass@host:port/db). Use environment variables for credentials. |
host | string | No | Database host (alternative to connection_string) |
port | integer | No | Database port
Default: 5432 |
database | string | No | Database name (alternative to connection_string) |
user | string | No | Database user (alternative to connection_string) |
password | string | No | Database password (alternative to connection_string) |
operation | string | Yes | Operation to perform: 'query', 'execute', 'insert', 'update', 'delete' |
query | string | No | SQL query to execute. Use %(param)s for parameterized queries. |
table | string | No | Table name for insert/update/delete operations |
data | string | No | Data to insert (single dict or list of dicts for bulk insert) |
set | string | No | Fields to update (for update operation) |
where | string | No | WHERE clause for update/delete operations |
params | string | No | Parameters for parameterized queries (prevents SQL injection) |
output_to | string | No | Event key where the result will be stored
Default: "postgres" |
timeout | integer | No | Query timeout in seconds
Default: 30 |
return_dict | boolean | No | If True, return query results as list of dicts. If False, return as list of tuples.
Default: true |
Base Configuration
These configuration options are available on all steps:
| Parameter | Type | Default | Description |
|---|---|---|---|
name | | null | Optional name for this step (for documentation and debugging) |
description | | null | Optional description of what this step does |
retries | integer | 0 | Number of retry attempts (0-10) |
backoff_seconds | number | 0 | Backoff (seconds) applied between retry attempts |
retry_propagate | boolean | false | If True, raise last exception after exhausting retries; otherwise swallow. |