clickhouse-kafka-connect
clickhouse-kafka-connect copied to clipboard
fix 'overlapping' logic
Summary
- Excluded excess data processing when state = `BEFORE_PROCESSING'
- Fixed order of chunks in overlapping range cases
- Prevented insertion of the 1 chunk in overlapping range case when state = `BEFORE_PROCESSING' and 1 chunk boundaries are not the same as state
- Fixed and extended different range overlapping cases taking into account that records are trimmed before processing, for example:
State Actual Current After fix
[2, 10] [0, 11] ZERO OVER_LAPPING
[2, 10] [0, 10] ZERO SAME
[2, 10] [1, 11] ERROR OVER_LAPPING
[2, 10] [1, 10] ERROR SAME
Checklist
Delete items not relevant to your PR:
- [x] Unit and integration tests covering the common scenarios were added
- [x] A human-readable description of the changes was provided to include in CHANGELOG
- [ ] For significant changes, documentation in https://github.com/ClickHouse/clickhouse-docs was updated with further explanations or tutorials
@ne1r0n can you provide a test with content that cover this?
@ne1r0n can you provide a test with content that cover this?
@mzitnik it would require a lot of effort from my side as I'm not a professional java programmer.
All I am trying to fix is the causes of missed messages when using a connector in production-like environment.
For example, in rare cases there could be issues with committing offsets, something like:
org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
After that failure consumer could read more messages from kafka than previously so batch of data would contain part that already was inserted into clickhouse and a new part that wasn't. Currently connector will skip the whole batch.
According to the design document, it is either overlapping
or the combination of contains
and new
, where the new
should be inserted.
It make sense to implement an integration test that cover above-mentioned this scenario.
Another test could be for splitRecordsByOffset
checking the order of parts it returns.
@ne1r0n Thanks for your contribution. We will discuss it internally since I want a test that covers it.
I've added 2 tests this PR is trying to fix, please have a look.
@mzitnik , @Paultagoras I will be grateful for any updates or comments
@mzitnik , @Paultagoras I will be grateful for any updates or comments
Hi @ne1r0n! So I was re-looking at this again and re-reading your early comment:
For example, in rare cases there could be issues with committing offsets, something like:
org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
After that failure consumer could read more messages from kafka than previously so batch of data would contain part that already was inserted into clickhouse and a new part that wasn't. Currently connector will skip the whole batch. According to the design document, it is either
overlapping
or the combination ofcontains
andnew
, where thenew
should be inserted.
So we actually solved a bug with deduplication around this (or a very similar) thing. Imagine the following:
- Connector tries to insert batch with minOffset=123 and maxOffset=456
- Some issue happens (like offset failed to commit)
- Connector tries to insert batch with minOffset=123 but maxOffset=789 (so it overlaps with more data)
The old bug would drop the values after 456 (so 457-789). Was that the sort of issue you were seeing?
Hi @Paultagoras,
currently the part shown in the picture will be lost
@ne1r0n So I think I saw what you were mentioning (at least partly) - we just merged in a fix to an issue with overlap. Mind seeing if that solves the issue you were seeing?
@Paultagoras yep, I saw it, thanks. It solved part of the problem.
Now let's imagine the situation like in ProcessPartialOverlappingBeforeProcessingTest
test.
We have 2 batches with overlapping ranges and BEFORE_PROCESSING state after 1 insert.
0 to 599 - 1 batch
350 to 999 - 2 batch
Connector will do the following:
- insert the 1 batch of 600 records
- split the second batch into two parts: 350 to 599 and 600 to 999
- insert the 1st part 350 to 599 in BEFORE_PROCESSING case
- insert the 2nd part 600 to 999 in BEFORE_PROCESSING case
- insert the 2nd part 600 to 999 in AFTER_PROCESSING case
There are 2 issues:
Step 3 correspond to contains case from the design document and should be processed accordingly.
Step 5 extra insert of the second part, here deduplication_token
will fix that, but I think there should be break
before case AFTER_PROCESSING:
Thanks for submitting this! After re-reviewing it, I think what you said makes sense - especially with the extra insert happening 😅