Improve inserter delivery guarantees
I am using the inserter to periodically write batches of messages from a Kafka topic into ClickHouse. I wonder what happens to the data when an error happens on write/commit. It seems like the internal buffer is dropped and the unfinished binary data stream is aborted. This essentially means, that any data is lost on error. Is this true? What are some best practices to guarantee data delivery or how would I best implement a retry mechanism?
I'm also interested in retry functionality, due to these errors:
2025-04-27T20:42:21.712914Z ERROR inserter commit error: network error: client error (Connect)
2025-04-27T20:42:53.106309Z INFO inserter commited stats: Quantities { bytes: 2281, rows: 9, transactions: 9 }
2025-04-27T20:43:27.786357Z INFO inserter commited stats: Quantities { bytes: 1302, rows: 5, transactions: 5 }
2025-04-27T20:44:01.391209Z INFO inserter commited stats: Quantities { bytes: 1941, rows: 8, transactions: 8 }
2025-04-27T20:44:25.226900Z ERROR inserter commit error: network error: client error (Connect)
2025-04-27T20:45:04.232866Z INFO inserter commited stats: Quantities { bytes: 2343, rows: 9, transactions: 9 }
2025-04-27T20:45:24.480669Z INFO inserter commited stats: Quantities { bytes: 2538, rows: 10, transactions: 10 }
2025-04-27T20:45:53.531691Z INFO inserter commited stats: Quantities { bytes: 2445, rows: 8, transactions: 8 }
2025-04-27T20:46:22.680096Z INFO inserter commited stats: Quantities { bytes: 2033, rows: 8, transactions: 8 }
2025-04-27T20:46:56.989796Z INFO inserter commited stats: Quantities { bytes: 770, rows: 3, transactions: 3 }
2025-04-27T20:47:25.232948Z INFO inserter commited stats: Quantities { bytes: 2555, rows: 10, transactions: 10 }
2025-04-27T20:47:51.754128Z INFO inserter commited stats: Quantities { bytes: 1512, rows: 6, transactions: 6 }
2025-04-27T20:48:21.794250Z ERROR inserter commit error: network error: client error (Connect)
2025-04-27T20:48:53.660183Z INFO inserter commited stats: Quantities { bytes: 1792, rows: 7, transactions: 7 }
2025-04-27T20:49:26.061922Z INFO inserter commited stats: Quantities { bytes: 1778, rows: 7, transactions: 7 }
2025-04-27T20:50:01.966845Z INFO inserter commited stats: Quantities { bytes: 2810, rows: 11, transactions: 11 }
2025-04-27T20:50:25.743520Z ERROR inserter commit error: network error: client error (Connect)
My understanding is that inserter just resets the state on error and rows are lost:
Err(err) => {
self.pending = Quantities::ZERO;
Err(err)
}
You would need to implement retries on error. Likely best to do so with reconnection. If you need exactly once semantics things will get more complicated
Perhaps this could be transformed into a feature request for retries on inserter only.
@bocharov I had issues with network errors too. For me the issue was that I was committing to CH every 5 seconds, which was exactly the threshold for CH to cancel an open connection. As far as i understood the inserter is opening a long running connection to clickhouse and streams data incrementally to evenly distribute network traffic for large inserts. This meant when the connection was dropped all the data was too. So on insertion error you'd have to manually keep track of the batch to be able to re-stream it.
@serprex At least once would already be an improvement. Right now batches are fully dropped on network errors when inserting.
Can we support to return the Bytes when error happened?
In our case, we want to get the failed Bytes for retry rather than serializing and compressing the data again.
I agree with @Rachelint. I think for most usecases we just want to retry the Bytes batch again.
Can we support to return the Bytes when error happened?
There is no API to send raw bytes, which is a significant disclosure of implementation details. However, it would be possible to return some special guard with dedicated API (not raw bytes).
for retry rather than serializing and compressing the data again.
It should be a typical case, so the overhead to do it should be insignificant.
In general, I think you should implement a pending queue on your own; the inserter gives you information on how many rows are inserted to implement custom acks, and it's enough to implement at-least-once guarantee pairing with in-memory or on-disk storage.
I see several problems with including such logic in the crate:
- Retrying next to the insertion point is often only part of the whole problem. In my practice, the most reliable way to provide strong guarantees on the delivery is by implementing end-to-end ACKs. So, you can have more than one service (and multiple tasks/actors in one service) before the insertion point, but such API will cover only the last point. So, other failures (some task failed, a producer restarted, network issues between the producing service and the writing backed, and so on) will loose data.
- Providing any FS-based buffer is strange in this crate (but, in practice, a pretty common way to do it), so we will be forced to store all rows in memory. However, the inserter's design allows for streams GBs per
INSERTwithout high memory consumption on the service's side (so, writing services can be more lightweight in contrast to DB). - Extending surface of API if also retry logic is included.
There is no API to send raw bytes, which is a significant disclosure of implementation details. However, it would be possible to return some special guard with dedicated API (not raw bytes).
Agree, it should not expose raw bytes directly.
It should be a typical case, so the overhead to do it should be insignificant.
Yes, the cpu overhead is really insignificant.
However, we can't release the memory raw rows(rows before serializing and compressing) before successfully inserting or exceeding retry limit.
In my lightweight writer service, this behavior costs too many memory...
@loyd As an alternative(also a more general solution for other problems), is it possible to expose more low-level api? So we users can custom more logics in services.
I have implemented a draft about it #237 , and by using the low-level api, the memory usage in our service reduce to 10% of original.
I think it may be useful for other similar usecases.
There is no API to send raw bytes, which is a significant disclosure of implementation details. However, it would be possible to return some special guard with dedicated API (not raw bytes).
We should have it, however. It is the only thing that can allow for inserting files directly with optimal performance using the Rust client. See https://github.com/ClickHouse/clickhouse-rs/issues/174