bottledwater-pg
bottledwater-pg copied to clipboard
Round tripping data from Postgres to Postgres via Kafka
Issue #7 and comments on the last blog post sniffed around what bottled water's promise of consistent snapshots actually meant. In that vein, I think that a potential functional test of bottled water's consistency guarantees would be moving data from Postgres instance A to Kafka to Postgres instance B and have Postgres instances A and B be equivalent. If it were possible to "totally" recreate a database from the output of bottled water, then I'd feel comfortable believing that bottled water created a consistent snapshot of the database. Write Ahead Logging was originally meant for logical replication of Postgres databases in this manner, so, assuming the WAL delivers on its promises, it should be possible for bottled water to round trip data. The appeal to me of mixing up Kafka and Postgres' WAL is being able to take data captured by Postgres and use Kafka to allow anyone in an organization to have access to it.
Here are some of the topics/issues I had with the current design of bottled water that brought me around to desiring round tripping of data:
- Foreign keys: the current method of separating out each table into it's own topic is problematic once records in separate tables start referring to each other. It's not hard to imagine instances where consumers of topics diverge in how fast they are writing to a database consuming Kafka events, and so the consuming database stops being consistent with any previous state of the original database. While the recovery process may be just to wait for the consumers to catch up, that leaves me uncomfortable, especially in the face of potential failure of the consumer and being stuck with an inconsistent database.
- Preservation of transaction structure: In a similar way, during the consumption of a large transaction, the consuming database will not be consistent with any previous snapshot of the database. By disregarding transaction grouping, ranges of time are introduced where the consuming database would be inconsistent in regards to the original database.
- Namespacing of data: While less of a general concern and more an edge case later on, it seems likely that large organizations will have multiple disjoint Postgres instances that nevertheless have the exact same table names. The current design of bottled water would mix the events generated by the two databases in bad ways. Saying that organizations should have completely disjoint sets of table names sounds like an onerous and problematic requirement (no longer just plug and play).
- Log compaction scares me: I'm still learning about log compaction and what it means in Kafka, but what I understand of it frightens me. The idea of any data related to or coming from the main database being deleted for any reason whatsoever sends up red flags. While the demand for quality and quantity of user interaction/activity data is elastic, the demand for core user data is inelastic. For many organizations, the volume of user interaction data requires log compaction for but I'm unsure that the volume of changes to core user data requires log compaction in as many cases. I'll assert that it is more expensive to recover from even a small bug in log compaction algorithms than to just be dumb and buy more disk space.
The design encouraged by the above concerns is different from what currently exists. To wit, for each distinct Postgres instance, for every transaction committed we produce a single event or sequence of events to Kafka. We don't break transactions out by table and so preserve updates that cross tables as well as the structure of the transaction itself. If need be, based on maximum message size, a transaction could be broken up into a sequence of events to be pushed into the stream with metadata indicating which events where singletons and which were part of certain transactions. Namespacing is now as easy as publishing to a topic with a name like "bottled-watter/[insert uuid here]". My understanding is that this method would prevent log compaction, but I'm not so concerned about losing log compaction. If folks need to recreate the current design for some reason, it should be possible with Samza, but the current design cannot be turned into the proposed design. This method lends itself to a general "unbottling" program as well that takes the replicated Kafka transactions and inserts them back into a separate Postgres instance. Pushing data into separate tables/topics that are unique to each application means that any software written to reingest that data will need some configuration (or at least further coordination via metadata topics/events).
The main drawback I've seen to my proposed design, as @ept pointed out in #7, is that there are times when, yes, it would be nice to put all the data into one topic/partition but that this isn't always possible due to out sized data taking up too much space. I have no answer to this besides wondering where the approximate limit of a topic/partition is in terms of size and what amount of data volume/velocity that corresponds to. We don't have nearly as much data to move around as LinkedIn and others do; each of our datasets can fit comfortably on one machine right now and if we shelled out some more for disk space we could fit everything onto one beefy machine. We're interested in Kafka not for the distributed aspects but rather for the solid abstractions around data sharing and integration that promise data sharing (and efficient aggregating via Samza) without scp insanity. So take my design with a big grain of low scalability salt :) It's probably possible to distribute the events across related topics/partitions using WAL internals though and address the same concerns from above without losing the distributed advantage of Kafka, but I don't know enough yet to say whether it's a decent idea (especially in regards to the sorts of constraints that puts on the consumers themselves).
Lastly, the idea of pushing data to Postgres and back again means we could try to reuse Postgres' own internal tests for bottled water. In theory, but who knows in practice!, we can write to one Postgres instance in the tests but send the reads over to the replicated instance during the tests. I'm looking into it to see if this is even remotely feasible.
All this, combining Postgres and Kafka is something I'm exploring currently, within a changing context from what's required for our internal projects. I hope to contribute something more than critiques and criticism in the coming days, but this is all new areas to me, so I can't promise anything.
Hi @zmaril, thanks for your comments.
- Regarding preserving foreign keys and transaction structure: for this purpose, I think you're better off consuming directly from Postgres and not going via Kafka, as I described in #2. The Kafka integration is most useful if you want to get database data into a stream processing system (e.g. see the Samza documentation on stream-table joins).
- Namespacing of data: yes, it should be possible to customize the Kafka topic names, e.g. adding a prefix that identifies the database, to avoid name collisions. That would be a good and simple feature to add to Bottled Water — and a welcome contribution if you feel like making a pull request :)
- Log compaction is totally optional. It's actually turned off by default, and you have to explicitly turn it on when configuring the Kafka cluster or creating the topics.
If your objective is to replicate from Postgres to Postgres, then there's not much point in going via Kafka — you might as well consume directly from Postgres (or wait for the Postgres core team to build logical replication, which is what they're doing on top of logical decoding).
It doesn't sound like @zmaril's objective is to replicate from postgres to postgres, but rather to do a round-trip through kafka as a "a potential functional test of bottled water's consistency guarantees"
@ept If only we could do Postgres to Postgres! Life would be so much easier. We do need to do stream processing on our data and we like the ideas Kafka/Samza is pitching. The roundtrip test I described above is more a round up of concerns that are cropping up about how to store data in Kafka in such a way that we are comfortable using Kafka as the source of truth for all of our downstream data needs. it's not that we only have to do Postgres replication, it's that if we could do Postgres replication we would understand how to solve many of our other concerns about keeping the system consistent. Reusing the Postgres test suite also gives us an easy answer to start from for #3. Paired with something like Jepson, then any of the guarantees about resumption after failure and other distributed systems problems could be at least somewhat tested during development.
Yeah, that makes sense. There are two different things we could potentially test:
- Consuming the change stream directly from Postgres would give the strongest consistency (the relay could make a transaction in the output database for every transaction in the input database), but it wouldn't test the Kafka part of the pipeline.
- Consuming the change stream from Kafka would change the order of writes (assuming the current topic-per-table model, Kafka does not provide any ordering guarantee across topic-partitions), so it would lead to intermediate inconsistent states (e.g. foreign key constraints would not work). However, if it is tested with a stream of writes that is eventually quiescent, and the Kafka consumer is run to completion, then the output database contents should equal the input database contents (i.e. it should be "eventually consistent").
Either test could be useful, and the various parts of the system could be abused in various ways (e.g. Jepsen) while the test is running. But they are two different tests. What do you think would be more useful?
The latter test is more interesting to me because it tests the ability to recreate the original database with the constraints imposed by Kafka's architecture. In addition, with the addition of transaction metadata like transaction id's, it might be possible to have a consumer that can read all of the topics/partitions that can locally recreate the stream of transactions from the WAL. Transaction meta data might allow the second test design to merge with the first and address many of our concerns.
One issue would be having a Sufficiently Smart Consumer that could take in the various tables spread across all the topics/partitions and recreate what we want. Still learning about Zookeeper and what it is supposed to be used for, but it might be reasonable to have Zookeeper coordinate state about which topics are from which Postgres instance. Then all a consumer would have to know would be the namespace corresponding to the database it wants to consume, then it could look up the topics/tables it needs to consume and start iterating through the log in order. No idea if that's what Zookeeper is supposed to do though.
About the "eventually consistent",is there any possible solution to let the consumer know that one topic its getting is behind the head (or behide another table)? e.g. put a transaction number in each message, if a topic's latest transaction number is 5, another topic's transaction number is 6, and if application knows that twotopics are updated together, then application can wait for getting transaction number 6 and then do its transaction-dependent work.
Namespacing of data: yes, it should be possible to customize the Kafka topic names, e.g. adding a prefix that identifies the database, to avoid name collisions. That would be a good and simple feature to add to Bottled Water — and a welcome contribution if you feel like making a pull request :)
Just a note that a --topic-prefix
option was added in PR #57 with this behaviour.