clickhouse.rs icon indicating copy to clipboard operation
clickhouse.rs copied to clipboard

Improve inserter delivery guarantees

Open nstylo opened this issue 8 months ago • 11 comments

I am using the inserter to periodically write batches of messages from a Kafka topic into ClickHouse. I wonder what happens to the data when an error happens on write/commit. It seems like the internal buffer is dropped and the unfinished binary data stream is aborted. This essentially means, that any data is lost on error. Is this true? What are some best practices to guarantee data delivery or how would I best implement a retry mechanism?

nstylo avatar Apr 21 '25 20:04 nstylo

I'm also interested in retry functionality, due to these errors:

2025-04-27T20:42:21.712914Z ERROR inserter commit error: network error: client error (Connect)
2025-04-27T20:42:53.106309Z  INFO inserter commited stats: Quantities { bytes: 2281, rows: 9, transactions: 9 }
2025-04-27T20:43:27.786357Z  INFO inserter commited stats: Quantities { bytes: 1302, rows: 5, transactions: 5 }
2025-04-27T20:44:01.391209Z  INFO inserter commited stats: Quantities { bytes: 1941, rows: 8, transactions: 8 }
2025-04-27T20:44:25.226900Z ERROR inserter commit error: network error: client error (Connect)
2025-04-27T20:45:04.232866Z  INFO inserter commited stats: Quantities { bytes: 2343, rows: 9, transactions: 9 }
2025-04-27T20:45:24.480669Z  INFO inserter commited stats: Quantities { bytes: 2538, rows: 10, transactions: 10 }
2025-04-27T20:45:53.531691Z  INFO inserter commited stats: Quantities { bytes: 2445, rows: 8, transactions: 8 }
2025-04-27T20:46:22.680096Z  INFO inserter commited stats: Quantities { bytes: 2033, rows: 8, transactions: 8 }
2025-04-27T20:46:56.989796Z  INFO inserter commited stats: Quantities { bytes: 770, rows: 3, transactions: 3 }
2025-04-27T20:47:25.232948Z  INFO inserter commited stats: Quantities { bytes: 2555, rows: 10, transactions: 10 }
2025-04-27T20:47:51.754128Z  INFO inserter commited stats: Quantities { bytes: 1512, rows: 6, transactions: 6 }
2025-04-27T20:48:21.794250Z ERROR inserter commit error: network error: client error (Connect)
2025-04-27T20:48:53.660183Z  INFO inserter commited stats: Quantities { bytes: 1792, rows: 7, transactions: 7 }
2025-04-27T20:49:26.061922Z  INFO inserter commited stats: Quantities { bytes: 1778, rows: 7, transactions: 7 }
2025-04-27T20:50:01.966845Z  INFO inserter commited stats: Quantities { bytes: 2810, rows: 11, transactions: 11 }
2025-04-27T20:50:25.743520Z ERROR inserter commit error: network error: client error (Connect)

My understanding is that inserter just resets the state on error and rows are lost:

            Err(err) => {
                self.pending = Quantities::ZERO;
                Err(err)
            }

bocharov avatar Apr 27 '25 21:04 bocharov

You would need to implement retries on error. Likely best to do so with reconnection. If you need exactly once semantics things will get more complicated

serprex avatar May 02 '25 06:05 serprex

Perhaps this could be transformed into a feature request for retries on inserter only.

slvrtrn avatar Jun 10 '25 10:06 slvrtrn

@bocharov I had issues with network errors too. For me the issue was that I was committing to CH every 5 seconds, which was exactly the threshold for CH to cancel an open connection. As far as i understood the inserter is opening a long running connection to clickhouse and streams data incrementally to evenly distribute network traffic for large inserts. This meant when the connection was dropped all the data was too. So on insertion error you'd have to manually keep track of the batch to be able to re-stream it.

nstylo avatar Jun 10 '25 11:06 nstylo

@serprex At least once would already be an improvement. Right now batches are fully dropped on network errors when inserting.

nstylo avatar Jun 10 '25 11:06 nstylo

Can we support to return the Bytes when error happened? In our case, we want to get the failed Bytes for retry rather than serializing and compressing the data again.

Rachelint avatar Jun 11 '25 17:06 Rachelint

I agree with @Rachelint. I think for most usecases we just want to retry the Bytes batch again.

nstylo avatar Jun 12 '25 11:06 nstylo

Can we support to return the Bytes when error happened?

There is no API to send raw bytes, which is a significant disclosure of implementation details. However, it would be possible to return some special guard with dedicated API (not raw bytes).

for retry rather than serializing and compressing the data again.

It should be a typical case, so the overhead to do it should be insignificant.


In general, I think you should implement a pending queue on your own; the inserter gives you information on how many rows are inserted to implement custom acks, and it's enough to implement at-least-once guarantee pairing with in-memory or on-disk storage.

I see several problems with including such logic in the crate:

  1. Retrying next to the insertion point is often only part of the whole problem. In my practice, the most reliable way to provide strong guarantees on the delivery is by implementing end-to-end ACKs. So, you can have more than one service (and multiple tasks/actors in one service) before the insertion point, but such API will cover only the last point. So, other failures (some task failed, a producer restarted, network issues between the producing service and the writing backed, and so on) will loose data.
  2. Providing any FS-based buffer is strange in this crate (but, in practice, a pretty common way to do it), so we will be forced to store all rows in memory. However, the inserter's design allows for streams GBs per INSERT without high memory consumption on the service's side (so, writing services can be more lightweight in contrast to DB).
  3. Extending surface of API if also retry logic is included.

loyd avatar Jun 15 '25 10:06 loyd

There is no API to send raw bytes, which is a significant disclosure of implementation details. However, it would be possible to return some special guard with dedicated API (not raw bytes).

Agree, it should not expose raw bytes directly.

It should be a typical case, so the overhead to do it should be insignificant.

Yes, the cpu overhead is really insignificant. However, we can't release the memory raw rows(rows before serializing and compressing) before successfully inserting or exceeding retry limit. In my lightweight writer service, this behavior costs too many memory...

Rachelint avatar Jun 15 '25 11:06 Rachelint

@loyd As an alternative(also a more general solution for other problems), is it possible to expose more low-level api? So we users can custom more logics in services.

I have implemented a draft about it #237 , and by using the low-level api, the memory usage in our service reduce to 10% of original.

I think it may be useful for other similar usecases.

Rachelint avatar Jun 15 '25 11:06 Rachelint

There is no API to send raw bytes, which is a significant disclosure of implementation details. However, it would be possible to return some special guard with dedicated API (not raw bytes).

We should have it, however. It is the only thing that can allow for inserting files directly with optimal performance using the Rust client. See https://github.com/ClickHouse/clickhouse-rs/issues/174

slvrtrn avatar Jun 16 '25 15:06 slvrtrn