Subscribe to logical replication
The basic idea is that you create a replication subscription with PG.connect_replication. It's similar to PG.connect_listen but, unlike a LISTEN connection where every message is the same type (it just yields strings), each message can be a different type. So instead of passing a block you pass a PG::Replication::Handler instance with received methods defined for receiving different types of replication messages:
class MyHandler
include PG::Replication::Handler
# Override this to tell the Postgres server where you are in the WAL.
# Make sure you `yield` in between any setup and teardown.
def received(data : PG::Replication::XLogData, connection : PG::Replication::Connection)
yield
connection.last_wal_byte_applied = data.wal_end
connection.last_wal_byte_flushed = data.wal_end
end
# New relation defined
def received(relation : PG::Replication::Relation)
end
# Record updated
def received(update : PG::Replication::Update)
end
end
subscriber = PG.connect url,
handler: MyHandler.new,
publication_name: "my_publication",
slot_name: "my_replication_slot"
The default is to do nothing when receiving a WALMessage.
Most of the work here happens in XLogData events, which contain the WALMessages that people will actually care about. Everything else seems to be protocol meta. The objects passed to the handler's received methods are the XLogData's message value and are decoded based on message formats specified in the Postgres docs.
There are still some things I need and/or want to do here:
- [x] Implement message types
- [x] Begin
- [x] Message
- [x] Commit
- [x] Origin
- [x] Relation
- [x] Type
- [x] Insert
- [x] Update
- [x] Delete
- [x] Truncate
- [ ] Allow decoding
TupleDataas other types (rather than rawBytes) - [ ] Allow starting the WAL from somewhere other than the beginning
- [x] Implement keepalive responses
- [x] Since logical replication requires a change to
postgresql.conf, the specs need to be opt-in or enabled only when logical replication is enabled
The initial spec does successfully invoke the handler's methods for the CREATE TABLE, INSERT, and UPDATE commands.
I've not done much with logical replication myself, so I cant comment on the details of the protocol here, but it all seems super cool!
I've had to add Debezium to our stack to pull some data into our data warehouse. All the trouble I've had getting it working got me wondering how easy or difficult it would be to just consume the WAL ourselves and emit a similar output structure. Completely nerd-sniped.
I cant comment on the details of the protocol here
I'm hoping nobody will need to be all that concerned with it. 😄 It's not super complicated, but I want to come up with a good set of tests that illustrate that replication is working/broken by just performing actions and asserting on things appearing in the stream of logical replication events.
Alrighty, I think this is at least ready for initial review. Feel free to let me know if anything is unclear.
Awesome, thanks! I think this is all good to go. Could you add a changelog entry too please first?
Sweet, will do. I did find a bug the other night, though, that I still need to address. Specifically: error frames aren't being decoded on this connection. While looking into that, I realized the error frame is this error, which you've already got here. Same message structure, which makes sense.
Then I wondered, since that's the same as ErrorResponse, if the CopyBoth message is documented on that page, too. And it is. That's when I realized I could've reused the same connection structure as LISTEN like I originally intended.
I think, for the moment, I'll just reimplement the error frame here (the pattern I used in implementing these messages turned out pretty decent) and refactor toward reuse later.