Parsing pgoutput format with psycopg2

This document defines the exact implementation pattern for consuming PostgreSQL’s native pgoutput logical decoding stream using psycopg2. The operational…

Operational Intent

This document defines the exact implementation pattern for consuming PostgreSQL’s native pgoutput logical decoding stream using psycopg2. The operational objective is singular: deploy a deterministic, low-latency WAL stream consumer that decodes binary pgoutput messages into structured change events while maintaining exact LSN progression, preventing replication slot bloat, and guaranteeing zero data loss during network partitions. This workflow targets database engineers, data platform teams, Python ETL developers, and DevOps operators responsible for CDC pipeline automation.

PostgreSQL Parameter Baseline

Before initializing the consumer, the source cluster must enforce strict replication parameters. Deviations cause silent WAL truncation, slot deactivation, or protocol rejection. Values below assume PostgreSQL 14+.

Parameter Required Value Operational Rationale
wal_level logical Enables logical decoding output. replica or minimal will reject CREATE_REPLICATION_SLOT. Requires full restart.
max_replication_slots ≥ 10 Reserves shared memory for slot metadata. Insufficient allocation triggers FATAL: too many replication slots. Requires restart.
max_wal_senders ≥ 10 Caps concurrent WAL streaming processes. Must exceed active logical + physical replication consumers. Requires restart.
wal_sender_timeout 60s Terminates idle connections. Prevents orphaned slots from holding WAL indefinitely. Reload-safe.
wal_keep_size 1GB (minimum) Guarantees WAL retention during consumer restarts. Adjust based on peak transaction throughput and checkpoint intervals. Reload-safe.

Apply changes via ALTER SYSTEM or postgresql.conf, then execute SELECT pg_reload_conf();. Parameters requiring restart must be followed by a controlled cluster bounce. Verify with SHOW wal_level; and SELECT slot_name, active, restart_lsn FROM pg_replication_slots;.

Connection & Slot Lifecycle Management

psycopg2 exposes logical replication through psycopg2.extras.LogicalReplicationConnection. The connection must explicitly request the pgoutput plugin, declare the target slot, and operate in raw binary mode (decode=False) to preserve protocol alignment.

python
import psycopg2
from psycopg2.extras import LogicalReplicationConnection
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT

DSN = "host=primary-db port=5432 dbname=cdc_source user=replicator password=secure_pass replication=database"

def ensure_slot(cur: psycopg2.extensions.cursor, slot_name: str, publication: str) -> None:
    """Idempotent slot creation with restart_lsn validation."""
    cur.execute("SELECT slot_name, active, restart_lsn FROM pg_replication_slots WHERE slot_name = %s", (slot_name,))
    row = cur.fetchone()

    if row is None:
        cur.execute(
            "CREATE_REPLICATION_SLOT %s LOGICAL pgoutput (PROTOCOL_VERSION 1, PUBLICATIONS %s)",
            (psycopg2.sql.Identifier(slot_name), publication)
        )
        return

    active, restart_lsn = row[1], row[2]
    if active:
        raise RuntimeError(f"Slot {slot_name} is already active. Terminate existing consumer before proceeding.")
    # If inactive, reuse existing restart_lsn to prevent WAL gap

def init_consumer(slot_name: str, publication: str):
    conn = psycopg2.connect(DSN, connection_factory=LogicalReplicationConnection)
    conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
    cur = conn.cursor()

    ensure_slot(cur, slot_name, publication)

    cur.start_replication(
        slot_name=slot_name,
        options={"proto_version": "1", "publication_names": publication},
        decode=False  # Mandatory for binary pgoutput parsing
    )
    return conn, cur

The replication=database DSN parameter is mandatory. Omission defaults to standard query mode and raises psycopg2.ProgrammingError: replication connections are not allowed. Always verify slot state before invoking start_replication to avoid protocol desync.

stateDiagram-v2
  [*] --> BEGIN
  BEGIN --> RELATION: schema is new or changed
  BEGIN --> DML: relation already cached
  RELATION --> DML
  DML --> DML: INSERT / UPDATE / DELETE
  DML --> COMMIT
  COMMIT --> [*]: flush LSN advances

Binary pgoutput Message Decoding

The pgoutput plugin emits a strict binary protocol. Each message begins with a single-byte type identifier, followed by a 4-byte big-endian length field, then the payload. Parsing requires precise byte alignment and explicit null-terminated string handling.

python
import struct
import io
from typing import Dict, Any, Generator

class PgOutputParser:
    """Production-grade binary pgoutput decoder aligned with PostgreSQL 14+ protocol."""

    @staticmethod
    def _read_string(buf: io.BytesIO) -> str:
        data = b""
        while True:
            byte = buf.read(1)
            if byte == b"\x00":
                break
            data += byte
        return data.decode("utf-8", errors="replace")

    @staticmethod
    def _read_tuple(buf: io.BytesIO, natts: int) -> Dict[str, Any]:
        row = {}
        for i in range(natts):
            flag = buf.read(1)
            if flag == b"n":
                continue  # NULL
            elif flag == b"u":
                continue  # Unchanged TOAST (requires prior schema context)
            elif flag == b"t":
                length = struct.unpack("!I", buf.read(4))[0]
                value = buf.read(length)
                row[f"col_{i}"] = value.decode("utf-8", errors="replace")
        return row

    @staticmethod
    def parse_stream(raw_bytes: bytes) -> Generator[Dict[str, Any], None, None]:
        buf = io.BytesIO(raw_bytes)
        while buf.tell() < len(raw_bytes):
            msg_type = buf.read(1)
            if not msg_type:
                break
            length = struct.unpack("!I", buf.read(4))[0]
            payload = buf.read(length)

            if msg_type == b"B":  # BEGIN
                lsn, timestamp, xid = struct.unpack("!Qqi", payload)
                yield {"type": "BEGIN", "lsn": lsn, "xid": xid, "timestamp": timestamp}

            elif msg_type == b"C":  # COMMIT
                flags, commit_lsn, end_lsn, timestamp = struct.unpack("!BQQq", payload)
                yield {"type": "COMMIT", "commit_lsn": commit_lsn, "end_lsn": end_lsn, "timestamp": timestamp}

            elif msg_type == b"R":  # RELATION
                rel_id = struct.unpack("!I", payload[:4])[0]
                sub = io.BytesIO(payload[4:])
                namespace = PgOutputParser._read_string(sub)
                name = PgOutputParser._read_string(sub)
                replica_identity = sub.read(1)
                natts = struct.unpack("!H", sub.read(2))[0]
                # Column metadata omitted for brevity; map to schema registry in production
                yield {"type": "RELATION", "rel_id": rel_id, "schema": namespace, "table": name, "natts": natts}

            elif msg_type in (b"I", b"U", b"D"):  # INSERT, UPDATE, DELETE
                rel_id = struct.unpack("!I", payload[:4])[0]
                sub = io.BytesIO(payload[4:])
                # Tuple parsing requires schema context; simplified for stream consumption
                yield {"type": msg_type.decode(), "rel_id": rel_id, "payload": sub.read().hex()}

Binary parsing must be stateful in production. Cache RELATION messages to map rel_id to fully qualified table names and column definitions. This eliminates redundant schema lookups and enables deterministic Python CDC Parser Development workflows where schema evolution is tracked externally.

LSN Acknowledgment & Feedback Loop

PostgreSQL retains WAL segments until the consumer acknowledges receipt. Failure to send periodic feedback triggers FATAL: replication slot "..." is active, followed by unbounded disk growth. The feedback loop must run on a strict interval, independent of message consumption.

python
import time
import threading

def feedback_loop(conn: psycopg2.extensions.connection, flush_lsn: int, interval: float = 5.0):
    """Background thread sending standby status updates to prevent WAL accumulation."""
    while True:
        try:
            conn.send_feedback(
                flush_lsn=flush_lsn,
                write_lsn=flush_lsn,
                apply_lsn=flush_lsn,
                reply=True
            )
        except psycopg2.OperationalError:
            break  # Connection dropped; let main loop handle reconnect
        time.sleep(interval)

def consume_stream(conn, cur, parser: PgOutputParser):
    current_flush_lsn = 0
    feedback_thread = threading.Thread(
        target=feedback_loop, args=(conn, current_flush_lsn), daemon=True
    )
    feedback_thread.start()

    for msg in cur.consume():
        if msg is None:
            continue  # Keepalive

        events = list(parser.parse_stream(msg.payload))
        for event in events:
            if event["type"] == "COMMIT":
                current_flush_lsn = event["commit_lsn"]
                # Update feedback thread reference or use thread-safe queue in production
                # Process event downstream (Kafka, Avro, DB sink)

reply=True requests that the primary immediately acknowledge the feedback, ensuring the replication slot’s confirmed position advances promptly. Never block the feedback thread with downstream I/O; decouple consumption from acknowledgment using an in-memory queue or async event loop.

Production Hardening & Pipeline Integration

Deploying a raw pgoutput consumer requires explicit handling of backpressure, schema drift, and failure recovery. Unlike managed connectors, this pattern places operational responsibility on the engineering team. When designing CDC Pipeline Implementation with Python & Debezium, evaluate whether a lightweight psycopg2 consumer satisfies latency SLAs or if a JVM-based connector with built-in offset management is warranted.

Threshold Tuning & Backpressure: Set wal_sender_timeout to 60s and configure the feedback interval to ≤ 5s. If downstream sinks (e.g., Kafka brokers, Avro schema registries) experience latency, buffer events in a bounded queue. Drop or pause consumption only when queue depth exceeds memory thresholds; never stall the LSN feedback loop.

JSON to Avro Transformation & Event Routing: pgoutput outputs raw column values. Transform these into typed Avro records using a schema registry (e.g., Confluent Schema Registry). Route events by schema.table to dedicated Kafka topics. Enforce idempotency by embedding xid and commit_lsn in the Avro payload, enabling exactly-once semantics at the sink.

Fallback Chains & Disaster Recovery: Replication slots are not replicated across standby nodes. Implement a slot recreation routine that queries pg_stat_replication on failover, identifies the last known restart_lsn, and reattaches the consumer. Archive WAL segments to object storage (wal_level=logical + archive_mode=on) to guarantee recovery windows during catastrophic slot loss. Monitor pg_replication_slots.wal_status for lost or extended states, which indicate WAL gaps requiring manual intervention.

This architecture delivers sub-100ms latency for high-throughput CDC workloads while maintaining strict PostgreSQL protocol compliance. Validate against your cluster’s checkpoint frequency, transaction volume, and network topology before promoting to production.