crystal-pg icon indicating copy to clipboard operation
crystal-pg copied to clipboard

Subscribe to logical replication

Open jgaskins opened this issue 7 months ago • 5 comments

The basic idea is that you create a replication subscription with PG.connect_replication. It's similar to PG.connect_listen but, unlike a LISTEN connection where every message is the same type (it just yields strings), each message can be a different type. So instead of passing a block you pass a PG::Replication::Handler instance with received methods defined for receiving different types of replication messages:

class MyHandler
  include PG::Replication::Handler

  # Override this to tell the Postgres server where you are in the WAL.
  # Make sure you `yield` in between any setup and teardown.
  def received(data : PG::Replication::XLogData, connection : PG::Replication::Connection)
    yield
    connection.last_wal_byte_applied = data.wal_end
    connection.last_wal_byte_flushed = data.wal_end
  end

  # New relation defined
  def received(relation : PG::Replication::Relation)
  end

  # Record updated
  def received(update : PG::Replication::Update)
  end
end

subscriber = PG.connect url,
  handler: MyHandler.new,
  publication_name: "my_publication",
  slot_name: "my_replication_slot"

The default is to do nothing when receiving a WALMessage.

Most of the work here happens in XLogData events, which contain the WALMessages that people will actually care about. Everything else seems to be protocol meta. The objects passed to the handler's received methods are the XLogData's message value and are decoded based on message formats specified in the Postgres docs.

There are still some things I need and/or want to do here:

  • [x] Implement message types
    • [x] Begin
    • [x] Message
    • [x] Commit
    • [x] Origin
    • [x] Relation
    • [x] Type
    • [x] Insert
    • [x] Update
    • [x] Delete
    • [x] Truncate
  • [ ] Allow decoding TupleData as other types (rather than raw Bytes)
  • [ ] Allow starting the WAL from somewhere other than the beginning
  • [x] Implement keepalive responses
  • [x] Since logical replication requires a change to postgresql.conf, the specs need to be opt-in or enabled only when logical replication is enabled

The initial spec does successfully invoke the handler's methods for the CREATE TABLE, INSERT, and UPDATE commands.

jgaskins avatar Jul 18 '25 17:07 jgaskins

I've not done much with logical replication myself, so I cant comment on the details of the protocol here, but it all seems super cool!

will avatar Jul 20 '25 16:07 will

I've had to add Debezium to our stack to pull some data into our data warehouse. All the trouble I've had getting it working got me wondering how easy or difficult it would be to just consume the WAL ourselves and emit a similar output structure. Completely nerd-sniped.

I cant comment on the details of the protocol here

I'm hoping nobody will need to be all that concerned with it. 😄 It's not super complicated, but I want to come up with a good set of tests that illustrate that replication is working/broken by just performing actions and asserting on things appearing in the stream of logical replication events.

jgaskins avatar Jul 20 '25 23:07 jgaskins

Alrighty, I think this is at least ready for initial review. Feel free to let me know if anything is unclear.

jgaskins avatar Jul 25 '25 05:07 jgaskins

Awesome, thanks! I think this is all good to go. Could you add a changelog entry too please first?

will avatar Jul 29 '25 06:07 will

Sweet, will do. I did find a bug the other night, though, that I still need to address. Specifically: error frames aren't being decoded on this connection. While looking into that, I realized the error frame is this error, which you've already got here. Same message structure, which makes sense.

Then I wondered, since that's the same as ErrorResponse, if the CopyBoth message is documented on that page, too. And it is. That's when I realized I could've reused the same connection structure as LISTEN like I originally intended.

I think, for the moment, I'll just reimplement the error frame here (the pattern I used in implementing these messages turned out pretty decent) and refactor toward reuse later.

jgaskins avatar Aug 06 '25 05:08 jgaskins