paimon icon indicating copy to clipboard operation
paimon copied to clipboard

[Feature] Paimon Spark 2025 Roadmap

Open Zouxxyy opened this issue 11 months ago • 19 comments

Motivation

2025 has arrived, and we would like to thank everyone for the contributions in the past! Here we present the 2025 Paimon Spark roadmap, and welcome to take ownership of them or expand upon them!

Name Introduction Link
Variant Type [feat] Support Variant Type, unlocking support for semi-structured data. #4471
Optimized Write [perf] Optimize table writing, including automatic repartitioning and rebalancing data and so on.
Distributed Planning [perf] Support distributed planning in the scan phase. #4864
Dataframe Writer V2 [feat] Integrate Spark's Dataframe Writer V2.
Liquid Clustering [perf] Support liquid clustering. #4815
Isolation Level [feat] Transaction isolation that supports more levels, like serializable isolation level. #4616
Support For Spark Connect [feat] Support Spark Connect, calling "Paimon Connect".
Default Value [feat] Support default values for specified fields.
Constraints [feat] Support adding constraints to fields, such as not null or other custom constraints.
Partition Stats [feat] Support partition stats.
Row Lineage [feat] Support tracking row lineage.
Identity Column [feat] When no explicit values are provided during writing, generate unique values for identity column.
Generated Columns [feat] Support generated columns whose values are automatically generated based on a user-specified function over other columns.
CDC For Non-PK Table [feat] Support CDC for non-pk table.

Zouxxyy avatar Jan 02 '25 04:01 Zouxxyy

Any other features or requirements that you would like to have, please comment here. Then we can discuss and modify this roadmap together. Thanks.

YannByron avatar Jan 02 '25 06:01 YannByron

And if someone want to take one or some, take it and let us know.

YannByron avatar Jan 02 '25 06:01 YannByron

Can I take on this task?

Distributed Planning : [perf] Support distributed planning in the scan phase.

Aiden-Dong avatar Jan 08 '25 08:01 Aiden-Dong

@Aiden-Dong Yes, feel free for it, you can create an issue for it, additionally, this feature actually requires changes in the core, and then each compute engine will need to support it.

Zouxxyy avatar Jan 08 '25 08:01 Zouxxyy

@Aiden-Dong Yes, feel free for it, you can create an issue for it, additionally, this feature actually requires changes in the core, and then each compute engine will need to support it.

Yes, I understand that we need to extend the functionality of AbstractFileStoreScan.readAndMergeFileEntries.

Aiden-Dong avatar Jan 08 '25 08:01 Aiden-Dong

https://github.com/apache/paimon/issues/4864

Aiden-Dong avatar Jan 08 '25 08:01 Aiden-Dong

@Zouxxyy Thank you for raising this, these optimizations are all highly anticipated!

[feat] Integrate Spark's DataFrame V2 API.

If no one has worked on this, I would like to volunteer to take it on. We are currently endeavoring to enhance write performance by utilizing the V2 write RequiresDistributionAndOrdering. In fact, I am on the verge of completing a MVP version locally.

zhongyujiang avatar Jan 09 '25 08:01 zhongyujiang

@Zouxxyy Thank you for raising this, these optimizations are all highly anticipated!

[feat] Integrate Spark's DataFrame V2 API.

If no one has worked on this, I would like to volunteer to take it on. We are currently endeavoring to enhance write performance by utilizing the V2 write RequiresDistributionAndOrdering. In fact, I am on the verge of completing a MVP version locally.

So glad if you can take it on. I just want to remind you that be aware of the support for scenarios with different bucket modes, especially dynamic bucket mode in your implementation. This is why we compromised to use V1 write at first.

YannByron avatar Jan 09 '25 10:01 YannByron

especially dynamic bucket mode in your implementation

Yeah, I haven't found a easy way to support this yet. In fact, I've only implemented V2 write for the fixed bucket mode. I think we can first let the unsupported bucket modes fall back to V1 write.

zhongyujiang avatar Jan 09 '25 11:01 zhongyujiang

hello, i'm interested with support default value for specified fields, can i try this?

davidyuan1223 avatar Feb 05 '25 08:02 davidyuan1223

hello, i'm interested with support default value for specified fields, can i try this?

Of course! thanks for participating

Zouxxyy avatar Feb 06 '25 02:02 Zouxxyy

hello, i'm interested with support default value for specified fields, can i try this?

Of course! thanks for participating

hello, the issue is #5015 , and i commit a basic pr for sprak table to support default value, could you review it and give me some suggestion?

davidyuan1223 avatar Feb 06 '25 04:02 davidyuan1223

i found you need implement the row lineage,i know kyuubi has row lineage resolve plugin for spark, i think we could refer to it https://github.com/apache/kyuubi/commit/9716548380a21f2608ec1f413f1f6986366b4f18

davidyuan1223 avatar Feb 12 '25 13:02 davidyuan1223

Hi @Zouxxyy , what's the plan of this ?

[perf] Optimize table writing, including automatic repartitioning and rebalancing data and so on.

I found that for paimon bucket table, the writer is possible distributed not evenly due to hash collision (especially for multi partition write), can this be solved by this optimization.

Aitozi avatar Mar 26 '25 14:03 Aitozi

[perf] Optimize table writing, including automatic repartitioning and rebalancing data and so on.

I found that for paimon bucket table, the writer is possible distributed not evenly due to hash collision (especially for multi partition write), can this be solved by this optimization.

@Aitozi I think this can be addressed by implementing Spark V2 write RequiresDistributionAndOrdering, which allows the write to indicate the data distribution mode while also requesting an advisory (not guaranteed) shuffle partition size. I think this can address the issue of data skew in fixed-bucket tables.

zhongyujiang avatar Apr 01 '25 01:04 zhongyujiang

@Aitozi @zhongyujiang Paimon calls REPARTITION_BY_COL on the bucket column for bucket table, which has two advantages:

  1. It allows data to be more concentrated, reducing the overhead of writers and avoiding small files.
  2. It does not produce concurrent compaction.

In reality, Spark has another mode called REBALANCE_PARTITIONS_BY_COL, which offers an additional skew splitting effect, but it cannot guarantee the second advantage. Therefore, we can choose to use this type of shuffle in a write-only scenario.

Additionally, if skew occurs, it may still be necessary to consider whether the bucket key values or the number of bucket keys are reasonable.

Zouxxyy avatar Apr 03 '25 02:04 Zouxxyy

@Aitozi @zhongyujiang Another cause of skew can be referenced at https://github.com/apache/paimon/issues/3651. This issue arises due to the inconsistency between Paimon's hash algorithm and Spark's hash algorithm. This can potentially be avoided when we integrate the bucket function of the v2 writer.

Zouxxyy avatar Apr 03 '25 02:04 Zouxxyy

I'd like to give it a try.

Default Value Partition Stats Row Lineage 

I'd appreciate the opportunity.

gsralex avatar Apr 04 '25 14:04 gsralex

I'd like to give it a try. Default Value Constraints  Partition Stats   Row Lineage 

I'd appreciate the opportunity.

Default value is processing,see #5015 ,but which will change many files,if you are interested in it,join with me

davidyuan1223 avatar Apr 04 '25 14:04 davidyuan1223

Is support for Multi-Field Bucketed Joins part of 2025 roadmap?

Background Paimon currently allows specifying bucket keys via the bucket-key table property during table creation. However, there is no mechanism to alter bucket keys post-creation. Additionally, Spark's native support for multi-field bucketing is limited (e.g., Expressions.bucket(numBuckets, column) only accepts a single column), leading to inefficiencies in join operations when multiple fields are required as bucket keys.

Minimal test project: https://github.com/qidian99/paimon-bucket-join-test Note that same spark plans is generated for primary key tables.

if not applicable, may be we should allow ALTER TABLE to modify bucket keys so that incremental data starting from the new schema can have optimized bucket join performance.

qidian99 avatar May 17 '25 09:05 qidian99