Operational Intent
This document defines the exact implementation pattern for consuming PostgreSQL’s native pgoutput logical decoding stream using psycopg2. The operational objective is singular: deploy a deterministic, low-latency WAL stream consumer that decodes binary pgoutput messages into structured change events while maintaining exact LSN progression, preventing replication slot bloat, and guaranteeing zero data loss during network partitions. This workflow targets database engineers, data platform teams, Python ETL developers, and DevOps operators responsible for CDC pipeline automation.
PostgreSQL Parameter Baseline
Before initializing the consumer, the source cluster must enforce strict replication parameters. Deviations cause silent WAL truncation, slot deactivation, or protocol rejection. Values below assume PostgreSQL 14+.
| Parameter | Required Value | Operational Rationale |
|---|---|---|
wal_level |
logical |
Enables logical decoding output. replica or minimal will reject CREATE_REPLICATION_SLOT. Requires full restart. |
max_replication_slots |
≥ 10 |
Reserves shared memory for slot metadata. Insufficient allocation triggers FATAL: too many replication slots. Requires restart. |
max_wal_senders |
≥ 10 |
Caps concurrent WAL streaming processes. Must exceed active logical + physical replication consumers. Requires restart. |
wal_sender_timeout |
60s |
Terminates idle connections. Prevents orphaned slots from holding WAL indefinitely. Reload-safe. |
wal_keep_size |
1GB (minimum) |
Guarantees WAL retention during consumer restarts. Adjust based on peak transaction throughput and checkpoint intervals. Reload-safe. |
Apply changes via ALTER SYSTEM or postgresql.conf, then execute SELECT pg_reload_conf();. Parameters requiring restart must be followed by a controlled cluster bounce. Verify with SHOW wal_level; and SELECT slot_name, active, restart_lsn FROM pg_replication_slots;.
Connection & Slot Lifecycle Management
psycopg2 exposes logical replication through psycopg2.extras.LogicalReplicationConnection. The connection must explicitly request the pgoutput plugin, declare the target slot, and operate in raw binary mode (decode=False) to preserve protocol alignment.
import psycopg2
from psycopg2.extras import LogicalReplicationConnection
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
DSN = "host=primary-db port=5432 dbname=cdc_source user=replicator password=secure_pass replication=database"
def ensure_slot(cur: psycopg2.extensions.cursor, slot_name: str, publication: str) -> None:
"""Idempotent slot creation with restart_lsn validation."""
cur.execute("SELECT slot_name, active, restart_lsn FROM pg_replication_slots WHERE slot_name = %s", (slot_name,))
row = cur.fetchone()
if row is None:
cur.execute(
"CREATE_REPLICATION_SLOT %s LOGICAL pgoutput (PROTOCOL_VERSION 1, PUBLICATIONS %s)",
(psycopg2.sql.Identifier(slot_name), publication)
)
return
active, restart_lsn = row[1], row[2]
if active:
raise RuntimeError(f"Slot {slot_name} is already active. Terminate existing consumer before proceeding.")
# If inactive, reuse existing restart_lsn to prevent WAL gap
def init_consumer(slot_name: str, publication: str):
conn = psycopg2.connect(DSN, connection_factory=LogicalReplicationConnection)
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
cur = conn.cursor()
ensure_slot(cur, slot_name, publication)
cur.start_replication(
slot_name=slot_name,
options={"proto_version": "1", "publication_names": publication},
decode=False # Mandatory for binary pgoutput parsing
)
return conn, cur
The replication=database DSN parameter is mandatory. Omission defaults to standard query mode and raises psycopg2.ProgrammingError: replication connections are not allowed. Always verify slot state before invoking start_replication to avoid protocol desync.
stateDiagram-v2 [*] --> BEGIN BEGIN --> RELATION: schema is new or changed BEGIN --> DML: relation already cached RELATION --> DML DML --> DML: INSERT / UPDATE / DELETE DML --> COMMIT COMMIT --> [*]: flush LSN advances
Binary pgoutput Message Decoding
The pgoutput plugin emits a strict binary protocol. Each message begins with a single-byte type identifier, followed by a 4-byte big-endian length field, then the payload. Parsing requires precise byte alignment and explicit null-terminated string handling.
import struct
import io
from typing import Dict, Any, Generator
class PgOutputParser:
"""Production-grade binary pgoutput decoder aligned with PostgreSQL 14+ protocol."""
@staticmethod
def _read_string(buf: io.BytesIO) -> str:
data = b""
while True:
byte = buf.read(1)
if byte == b"\x00":
break
data += byte
return data.decode("utf-8", errors="replace")
@staticmethod
def _read_tuple(buf: io.BytesIO, natts: int) -> Dict[str, Any]:
row = {}
for i in range(natts):
flag = buf.read(1)
if flag == b"n":
continue # NULL
elif flag == b"u":
continue # Unchanged TOAST (requires prior schema context)
elif flag == b"t":
length = struct.unpack("!I", buf.read(4))[0]
value = buf.read(length)
row[f"col_{i}"] = value.decode("utf-8", errors="replace")
return row
@staticmethod
def parse_stream(raw_bytes: bytes) -> Generator[Dict[str, Any], None, None]:
buf = io.BytesIO(raw_bytes)
while buf.tell() < len(raw_bytes):
msg_type = buf.read(1)
if not msg_type:
break
length = struct.unpack("!I", buf.read(4))[0]
payload = buf.read(length)
if msg_type == b"B": # BEGIN
lsn, timestamp, xid = struct.unpack("!Qqi", payload)
yield {"type": "BEGIN", "lsn": lsn, "xid": xid, "timestamp": timestamp}
elif msg_type == b"C": # COMMIT
flags, commit_lsn, end_lsn, timestamp = struct.unpack("!BQQq", payload)
yield {"type": "COMMIT", "commit_lsn": commit_lsn, "end_lsn": end_lsn, "timestamp": timestamp}
elif msg_type == b"R": # RELATION
rel_id = struct.unpack("!I", payload[:4])[0]
sub = io.BytesIO(payload[4:])
namespace = PgOutputParser._read_string(sub)
name = PgOutputParser._read_string(sub)
replica_identity = sub.read(1)
natts = struct.unpack("!H", sub.read(2))[0]
# Column metadata omitted for brevity; map to schema registry in production
yield {"type": "RELATION", "rel_id": rel_id, "schema": namespace, "table": name, "natts": natts}
elif msg_type in (b"I", b"U", b"D"): # INSERT, UPDATE, DELETE
rel_id = struct.unpack("!I", payload[:4])[0]
sub = io.BytesIO(payload[4:])
# Tuple parsing requires schema context; simplified for stream consumption
yield {"type": msg_type.decode(), "rel_id": rel_id, "payload": sub.read().hex()}
Binary parsing must be stateful in production. Cache RELATION messages to map rel_id to fully qualified table names and column definitions. This eliminates redundant schema lookups and enables deterministic Python CDC Parser Development workflows where schema evolution is tracked externally.
LSN Acknowledgment & Feedback Loop
PostgreSQL retains WAL segments until the consumer acknowledges receipt. Failure to send periodic feedback triggers FATAL: replication slot "..." is active, followed by unbounded disk growth. The feedback loop must run on a strict interval, independent of message consumption.
import time
import threading
def feedback_loop(conn: psycopg2.extensions.connection, flush_lsn: int, interval: float = 5.0):
"""Background thread sending standby status updates to prevent WAL accumulation."""
while True:
try:
conn.send_feedback(
flush_lsn=flush_lsn,
write_lsn=flush_lsn,
apply_lsn=flush_lsn,
reply=True
)
except psycopg2.OperationalError:
break # Connection dropped; let main loop handle reconnect
time.sleep(interval)
def consume_stream(conn, cur, parser: PgOutputParser):
current_flush_lsn = 0
feedback_thread = threading.Thread(
target=feedback_loop, args=(conn, current_flush_lsn), daemon=True
)
feedback_thread.start()
for msg in cur.consume():
if msg is None:
continue # Keepalive
events = list(parser.parse_stream(msg.payload))
for event in events:
if event["type"] == "COMMIT":
current_flush_lsn = event["commit_lsn"]
# Update feedback thread reference or use thread-safe queue in production
# Process event downstream (Kafka, Avro, DB sink)
reply=True requests that the primary immediately acknowledge the feedback, ensuring the replication slot’s confirmed position advances promptly. Never block the feedback thread with downstream I/O; decouple consumption from acknowledgment using an in-memory queue or async event loop.
Production Hardening & Pipeline Integration
Deploying a raw pgoutput consumer requires explicit handling of backpressure, schema drift, and failure recovery. Unlike managed connectors, this pattern places operational responsibility on the engineering team. When designing CDC Pipeline Implementation with Python & Debezium, evaluate whether a lightweight psycopg2 consumer satisfies latency SLAs or if a JVM-based connector with built-in offset management is warranted.
Threshold Tuning & Backpressure: Set wal_sender_timeout to 60s and configure the feedback interval to ≤ 5s. If downstream sinks (e.g., Kafka brokers, Avro schema registries) experience latency, buffer events in a bounded queue. Drop or pause consumption only when queue depth exceeds memory thresholds; never stall the LSN feedback loop.
JSON to Avro Transformation & Event Routing: pgoutput outputs raw column values. Transform these into typed Avro records using a schema registry (e.g., Confluent Schema Registry). Route events by schema.table to dedicated Kafka topics. Enforce idempotency by embedding xid and commit_lsn in the Avro payload, enabling exactly-once semantics at the sink.
Fallback Chains & Disaster Recovery: Replication slots are not replicated across standby nodes. Implement a slot recreation routine that queries pg_stat_replication on failover, identifies the last known restart_lsn, and reattaches the consumer. Archive WAL segments to object storage (wal_level=logical + archive_mode=on) to guarantee recovery windows during catastrophic slot loss. Monitor pg_replication_slots.wal_status for lost or extended states, which indicate WAL gaps requiring manual intervention.
This architecture delivers sub-100ms latency for high-throughput CDC workloads while maintaining strict PostgreSQL protocol compliance. Validate against your cluster’s checkpoint frequency, transaction volume, and network topology before promoting to production.