paimon
paimon copied to clipboard
[Feature] 1-1 mapping between paimon buckets and kafka partitions
Search before asking
- [X] I searched in the issues and found nothing similar.
Motivation
When creating an Append for Message Queue table, as depicted in the screenshot here:
we can notice the following:
- 5 buckets are specified, but unless data comes in the bucket is not created
- If you check the Kafka partitions; partition 3 has keys 2, 3 and 4
- These keys though end up in different buckets
- Paimon does a shuffle, even though the parallelism is the same because it doesn't do 1-1 mapping
Because it is a Kafka-like message queue functionality, some users are confused, as they expect the same partitioning to happen and overall have a 1-1 mapping, between a Kafka partition and a paimon bucket.
At the same time, I believe this is a really good enhancement and should also allow to remove the shuffle between the operators, thus improving performance.
Solution
No response
Anything else?
No response
Are you willing to submit a PR?
- [ ] I'm willing to submit a PR!
If you want 1->1 mapping,Paimon's bucket number should bigger than kafka partitions, and their should be shuffle by kafka partition id. I think paimon already can implement your thoughts. Here is a demo, you can define ddl like this Kafka source table:
CREATE TABLE KafkaTable (
`event_time` TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL, -- from Debezium format
`origin_table` STRING METADATA FROM 'value.source.table' VIRTUAL, -- from Debezium format
`partition_id` int METADATA FROM 'partition' VIRTUAL, -- from Kafka connector
`offset` BIGINT METADATA VIRTUAL, -- from Kafka connector
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'debezium-json'
);
Paimon sink table:
CREATE table if not exists sink_paimon_table
WITH ('connector' = 'paimon',
'bucket' = '3', -- bucket number should bigger than kafka partitions
'bucket-key' = 'partition_id', -- bucket key must be kafka partition_id
'merge-engine' = 'deduplicate',
'primary-key' = 'partition_id,offset' -- parimary key must be partition_id,offset
)
LIKE KafkaTable (EXCLUDING ALL)
So kafka source table‘s data insert into paimon table will shuffle by kafka partition_id,partion_id is a int data type which hashcode equal itself, This pipeline model will let kafka partition record 1->1 to paimon bucket.
@eric666666 Thanks a lot for this. The problem here though is that I'm trying to use an Append for Message Queue table, so I can offload logs from Kafka, but your example suggests using a Primary Key table.
I also try using your example, but when creating the sink_paimon_table (paimon 0.7.0-incubating), Im getting
[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalStateException: Table column [id, a, b, dt] should include all primary key constraint [partition_id, offset]
Let me know if I'm missing something.