step

postgres

Execute PostgreSQL database operations for querying and manipulating data.

Overview

Execute PostgreSQL database operations for querying and manipulating data.

This step enables interaction with PostgreSQL databases for reading and writing data. You can execute SELECT queries, insert records, update existing data, delete records, or run arbitrary SQL statements. The connector supports parameterized queries to prevent SQL injection, connection string or individual parameter authentication, and returns results as dictionaries for easy data manipulation. Perfect for ETL pipelines, data enrichment, persistence, and analytics workflows.

Setup: 1. Set up a PostgreSQL database (local installation, cloud provider, or managed service) 2. Create a database and user with appropriate permissions 3. Note your connection details: host, port, database name, username, password 4. Store credentials securely (e.g., environment variables or connection string)

Connection: Requires host, port, database, username, and password

Examples

Query users

Fetch users from database with filtering

type: postgres
connection_string: ${env:DATABASE_URL}
operation: query
query: "SELECT id, email, name FROM users WHERE created_at > %(date)s"
params:
  date: "2025-01-01"
output_to: users

Insert customer record

Insert a new customer into the database

type: postgres
connection_string: ${env:DATABASE_URL}
operation: insert
table: customers
data:
  email: ${user.email}
  name: ${user.name}
  created_at: ${meta.timestamp}
output_to: inserted_customer

Update order status

Mark an order as shipped

type: postgres
connection_string: ${env:DATABASE_URL}
operation: update
table: orders
set:
  status: shipped
  shipped_at: NOW()
where: "id = %(order_id)s"
params:
  order_id: ${order.id}
output_to: update_result

Delete old records

Clean up temporary data older than 7 days

type: postgres
connection_string: ${env:DATABASE_URL}
operation: delete
table: temp_data
where: "created_at < NOW() - INTERVAL '7 days'"
output_to: delete_result

Complex join query

Query orders with customer details

type: postgres
connection_string: ${env:DATABASE_URL}
operation: query
query: |
  SELECT o.id, o.total, c.email, c.name
  FROM orders o
  JOIN customers c ON o.customer_id = c.id
  WHERE o.status = %(status)s
  ORDER BY o.created_at DESC
  LIMIT 10
params:
  status: pending
output_to: pending_orders

Configuration

Parameter Type Required Description
connection_string string No PostgreSQL connection string (postgresql://user:pass@host:port/db). Use environment variables for credentials.
host string No Database host (alternative to connection_string)
port integer No Database port
Default: 5432
database string No Database name (alternative to connection_string)
user string No Database user (alternative to connection_string)
password string No Database password (alternative to connection_string)
operation string Yes Operation to perform: 'query', 'execute', 'insert', 'update', 'delete'
query string No SQL query to execute. Use %(param)s for parameterized queries.
table string No Table name for insert/update/delete operations
data string No Data to insert (single dict or list of dicts for bulk insert)
set string No Fields to update (for update operation)
where string No WHERE clause for update/delete operations
params string No Parameters for parameterized queries (prevents SQL injection)
output_to string No Event key where the result will be stored
Default: "postgres"
timeout integer No Query timeout in seconds
Default: 30
return_dict boolean No If True, return query results as list of dicts. If False, return as list of tuples.
Default: true

Base Configuration

These configuration options are available on all steps:

Parameter Type Default Description
name null Optional name for this step (for documentation and debugging)
description null Optional description of what this step does
retries integer 0 Number of retry attempts (0-10)
backoff_seconds number 0 Backoff (seconds) applied between retry attempts
retry_propagate boolean false If True, raise last exception after exhausting retries; otherwise swallow.