bottledwater-pg icon indicating copy to clipboard operation
bottledwater-pg copied to clipboard

Consistent view of database tables

Open kanterov opened this issue 9 years ago • 10 comments

First of all, I want to thank you for doing such great job.

I was digging into source code figuring out how to construct a consistent view of database tables from Kafka. As far as different tables go to different topics, reading concurrent messages from topics doesn't work.

I feel it would be possible if each message from Kafka had wal_pos, and there is a separate topic which contains checkpoint messages with consistent values of wal_pos.

I don't know much about Postgres log format, what do you think about this? Probably it is already possible?

kanterov avatar Apr 23 '15 19:04 kanterov

Yes, I think that would be possible. We get a wal_pos for every event from Postgres (begin transaction, commit transaction, insert, update, delete) so that could be added to events that come from the replication log. We don't get a wal_pos for events from the consistent snapshot, so that would have to be 0 or something, and handled as a special case by consumers.

If you feel like implementing this, it would be a great contribution!

ept avatar Apr 23 '15 20:04 ept

I voiced the same concern as @kanterov on the original blog post, and was redirected here.

In addition to a wal_pos for each message, wouldn't we also need to delay log compaction? In particular, you cannot replace or compact a message with a newer version until the wal_pos of the newer version is declared consistent (which will happen when this wal_pos appears in the separate topic, which in turn will only happen after all topics affected by this newer transaction are updated).

shashank025 avatar Apr 24 '15 15:04 shashank025

I was actually in the process of creating a project just like bottled water -- so it was great to see the approach validated. I'm really interested in the Avro serialization it uses.

Anyway, ordering and consistency is extremely important in my context. The approach I took requires some client processing, but LSN (aka wal-pos) checkpoints are interleaved into the Kafka stream. The protocol is extremely simple. The client must buffer a limited amount of messages in memory until a checkpoint is received. My main goal was high performance with asynchronous and batch writes via librdkafka, and then utilize checkpointing to ensure ordering and non-missing guarantees. Idempotency via the LSN is up to the client application if a duplicate message is received.

Partitioning and topic'ing is still a work in progress, but it should be a configuration (or pluggable) option to determine how to parallelize the topic. Concurrent processing of a change stream is very specific to different problems.

This is very early and rudimentary code, but I thought you all might be interested. Perhaps we can collaborate a bit.

jarreds avatar Apr 24 '15 18:04 jarreds

@shashank025 Good point. AFAIK it's not possible for clients to control when log compaction happens, so if you want to be sure that you see the full set of changes for each transaction (as opposed to the most recent value for a given primary key), you probably can't use log compaction. In this case, it might be better to not go via Kafka, but to use JNI bindings (see #2) to link the Bottled Water client directly into your application — that way you'll get all the events grouped by transaction.

@jarreds Do I understand correctly that you currently write everything to one topic-partition? That sounds like a fine approach, although you probably can't maintain a snapshot of the entire database in Kafka that way.

ept avatar Apr 25 '15 08:04 ept

@ept At the moment, yes. And I agree, we won't be able to store the entire DB log in Kafka. I'm trying to figure out a stable way to work topics/partitions into the mix without breaking the guarantee that certain database operations are always processed in order.

A simple example would be two tables with a foreign key constraint between them. An insert must occur in table A before a dependent insert can occur in table B. If this data is on two different topics (or partitions), the guarantee that the client will process them in order is broken.

Due to the asynchronous nature of librbkafka, especially in spotty connection scenarios, along with the lack of a librdkafka batch API, how do you guarantee ordering of database operations -- even if all of the messages are on the same topic/partition? Do I have a misunderstanding of how librdkafka publishes messages?

Also, I'm new to Kafka. Schooling of any misunderstandings I'm having is welcomed. The above are concerns that I had when implementing a logical decoding client. Until the transactional batch API is implemented in librbkafka, I fell back to inserting checkpoints into the Kafka stream to let the client know that everything written up to a certain LSN has been written to Kafka successfully.

jarreds avatar Apr 27 '15 20:04 jarreds

@jarreds You are right in that currently there is no guarantee that messages produced with librdkafka are produced in order. They most likely will be and librdkafka makes every effort to do this, but in the case of intermittent delivery failures (e.g., broker state hickups, but not so much network problems) and the use of retries (which causes messages to be resent, possibly after sub-sequent message has been sent) makes it possible for out of ordering to happen. The transactional batching API suggested here: https://github.com/edenhill/librdkafka/issues/204 solves this, the application provided message batch (for a single topic+partition) will be guaranteed to be delivered or fail atomically. If this is something that is needed to alleviate the problem you are discussing I'd be happy to expedite its implementation.

edenhill avatar Apr 27 '15 20:04 edenhill

@edenhill Thanks for chiming in. I didn't realise that librdkafka may currently reorder messages. As Bottled Water relies on message ordering for correctness, it would be great to have a batching API, even just for a single topic-partition.

As I understand it, that batching API wouldn't be sufficient to ensure clients can see a transactionally consistent view of the database across different topic-partitions (if you need that, I think you'd be better off skipping Kafka and hooking directly into the Bottled Water client, see #2). However, the batching would still be important to ensure that a sequence of writes to the same key appear in the correct order in Kafka (and thus, log compaction gives the correct final value for a key).

ept avatar Apr 29 '15 10:04 ept

It would be possible to void the risk of reordering by only allowing one outstanding produce request to the broker at any given time (for a specific topic-partition), but this would effectively kill performance. E.g., throughput is limited to broker round-trip-time (RTT) including processing. Currently librdkafka keeps as many requests in-flight as possible, so a new configuration property would be added to limit this to one.

Re #2, I'm not sure how integrating with the Java client would be any different though, it has the same problem. Any client with multiple produce requests in-flight and retries>0 is at risk of reordering in case of intermittent broker hickups.

edenhill avatar Apr 29 '15 10:04 edenhill

Allowing only one outstanding produce request per topic-partition seems like it might be ok — any produce requests that are issued while one request is in flight would have to be queued up, and the queue would be produced as a batch when the first request completes. Would that work performance-wise?

I elaborated a bit more on #2. The idea is to not go via Kafka at all in this situation, but to consume directly from Postgres.

ept avatar Apr 29 '15 11:04 ept

Yeah, internal automatic batching is the default mode of operation, so if you produce() 1000 messages in short term order they will most likely end up in the same ProduceRequest going to the broker, but it might as well be two requests, one with 3 and one with 997 messages.

Atomic batches will let the application define exactly what messages goes in one ProduceRequest.

Both these will work with the future max.inflight.produce.requests=1 property (or whatever it will be called).

Re #2: gotcha

edenhill avatar Apr 29 '15 11:04 edenhill