Postgres Sink
The PostgreSQL sink connector allows you to consume messages from Iggy topics and store them in PostgreSQL databases.
Features
- Automatic Table Creation: Optionally create tables automatically
- Batch Processing: Insert messages in configurable batches for performance
- Metadata Storage: Store Iggy message metadata (offset, timestamp, topic, etc.)
- Raw Payload Storage: Store original message payload as raw bytes
- Flexible Data Handling: Works with any payload type (JSON, text, binary, protobuf, etc.)
- Connection Pooling: Efficient database connection management
Configuration
{
"connection_string": "postgresql://username:password@localhost:5432/database",
"target_table": "iggy_messages",
"batch_size": 100,
"max_connections": 10,
"auto_create_table": true,
"include_metadata": true,
"include_checksum": true,
"include_origin_timestamp": true
}
Configuration Options
connection_string: PostgreSQL connection stringtarget_table: Name of the table to insert messages intobatch_size: Number of messages to insert in each batch (default: 100)max_connections: Maximum database connections (default: 10)auto_create_table: Automatically create the target table if it doesn't exist (default: false)include_metadata: Include Iggy metadata columns (default: true)include_checksum: Include message checksum (default: true)include_origin_timestamp: Include original message timestamp (default: true)
Table Schema
When auto_create_table is enabled, the following table structure is created:
CREATE TABLE iggy_messages (
id DECIMAL(39, 0) PRIMARY KEY,
iggy_offset BIGINT,
iggy_timestamp TIMESTAMP WITH TIME ZONE,
iggy_stream TEXT,
iggy_topic TEXT,
iggy_partition_id INTEGER,
iggy_checksum BIGINT,
iggy_origin_timestamp TIMESTAMP WITH TIME ZONE,
payload BYTEA,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
Data Storage
The payload is stored as raw bytes in the payload column, regardless of the original format:
JSON Messages
JSON payloads are stored as UTF-8 encoded bytes. You can query them using PostgreSQL's JSON functions:
SELECT id, payload::text::jsonb->>'user_id' as user_id
FROM iggy_messages
WHERE payload::text::jsonb->>'user_id' IS NOT NULL;
Text Messages
Text payloads are stored as UTF-8 encoded bytes:
SELECT id, convert_from(payload, 'UTF8') as message_text
FROM iggy_messages;
Binary Messages
Binary data is stored directly as bytes:
SELECT id, encode(payload, 'base64') as payload_base64
FROM iggy_messages;
Protocol Buffer Messages
Protobuf messages are stored as raw bytes and can be processed by your application:
SELECT id, payload, length(payload) as payload_size
FROM iggy_messages;
Usage Example
- Configure the sink connector in your Iggy connectors runtime
- Messages consumed from the specified topics will be inserted into PostgreSQL
- Query the data using standard SQL:
-- Get all messages from a specific stream
SELECT * FROM iggy_messages WHERE iggy_stream = 'user_events';
-- Get messages with their payload as text (for text/JSON payloads)
SELECT id, iggy_offset, convert_from(payload, 'UTF8') as payload_text
FROM iggy_messages
WHERE iggy_stream = 'user_events';
-- Get messages from a specific time range
SELECT * FROM iggy_messages
WHERE created_at >= '2024-01-01'
AND created_at < '2024-02-01';
-- Get payload size statistics
SELECT
iggy_stream,
iggy_topic,
COUNT(*) as message_count,
AVG(length(payload)) as avg_payload_size,
MAX(length(payload)) as max_payload_size
FROM iggy_messages
GROUP BY iggy_stream, iggy_topic;
Performance Considerations
- Use appropriate
batch_sizefor your workload (larger batches = better throughput) - Consider creating indexes on frequently queried columns
- Monitor connection pool usage with
max_connections - Create indexes for efficient queries:
CREATE INDEX idx_iggy_messages_stream ON iggy_messages (iggy_stream);
CREATE INDEX idx_iggy_messages_topic ON iggy_messages (iggy_topic);
CREATE INDEX idx_iggy_messages_created_at ON iggy_messages (created_at);
CREATE INDEX idx_iggy_messages_offset ON iggy_messages (iggy_offset);
Working with Different Payload Types
JSON Payloads
For JSON data, you can use PostgreSQL's JSON operators:
-- Extract JSON fields (assuming payload is JSON)
SELECT
id,
payload::text::jsonb->>'name' as name,
payload::text::jsonb->>'email' as email
FROM iggy_messages
WHERE payload::text::jsonb->>'name' IS NOT NULL;
-- Create a GIN index for faster JSON queries
CREATE INDEX idx_iggy_messages_payload_gin ON iggy_messages USING GIN ((payload::text::jsonb));
Text Payloads
For text data, convert bytes to text:
-- Full-text search on text payloads
SELECT id, convert_from(payload, 'UTF8') as message
FROM iggy_messages
WHERE convert_from(payload, 'UTF8') LIKE '%error%';
Binary Payloads
For binary data, work with raw bytes or encode as needed:
-- Get binary payload as hex
SELECT id, encode(payload, 'hex') as payload_hex
FROM iggy_messages;
-- Get payload size
SELECT id, length(payload) as payload_size
FROM iggy_messages
ORDER BY payload_size DESC;