paimon
paimon copied to clipboard
[Bug] Paimon CDC not working in Dynamic Bucket Mode
Search before asking
- [x] I searched in the issues and found nothing similar.
Paimon version
The latest code in master branch on Apr 22, commit id = 80040f69d593056e6b8daf18099f87360266e5e7
Compute Engine
Flink
Minimal reproduce step
Use CREATE DATABASE AS clause in Flink SQL to synchronize an existing database to a Paimon catalog in tests.
What doesn't meet your expectations?
The following error is thrown in the Flink job
Caused by: java.lang.AssertionError
at org.apache.paimon.table.sink.KeyAndBucketExtractor.bucket(KeyAndBucketExtractor.java:53)
at org.apache.paimon.table.sink.FixedBucketRowKeyExtractor.bucket(FixedBucketRowKeyExtractor.java:70)
at org.apache.paimon.flink.sink.RowDataKeyAndBucketExtractor.bucket(RowDataKeyAndBucketExtractor.java:50)
...
Anything else?
I found that the problem is caused by the following invocation process
CdcDynamicBucketSink -> CdcRecordKeyAndBucketExtractor -> KeyAndBucketExtractor.bucket
CdcDynamicBucketSink is used when numBuckets = -1, but KeyAndBucketExtractor.bucket requires that numBuckets > 0, or an AssertionError would be thrown.
From this point of view, similar problem should also exist for postpone bucket mode, where numBuckets = -2.
Are you willing to submit a PR?
- [x] I'm willing to submit a PR!