delta
delta copied to clipboard
Why bucket table is not supported in Delta
Why bucket table is not supported in Delta? I have implemented bucket table in Delta, includes
- create a bucket table using delta
- convert a bucket table to delta
- read and insert, update/delete/merge on bucket delta table via SQL
- bucket join (without shuffle)
Do we need this feature? I can file a PR.
Hidden partitioning ( #490 ) is a better solution for this. It's pretty easy to support bucket tables on top of hidden partitioning.
@zsxwing : Could you explain how bucketing can be implemented on top of partitioning? Say, we have a users table bucketed by user_id, so we can join with this table without shuffle. That works out of the box in plain Spark, and I'd rather have the same in Delta. With hidden partitioning, I'd have to specify partitioning based on something like pmod(hash($"user_id"), N)
? That will probably add additional unnecessary path elements, which is inelegant, but the key question is whether Spark will skip shuffle in this case? In other works, whether dataframe created when reading such table will have proper outputPartitioning
?
I'm also interested in this. I'd like to write a delta table to s3 with df.repartition(5, "someCol")
and it would be nice if delta can write HashPartitioning(5, someCol)
into the metadata along with 5 files. Then on read it can have that partitioning so it can avoid future shuffles.
I dont like using DataFrameWriter.bucketBy
because it requires a metastore, the above ^ should be doable without it. Also DataFrameWriter.partitionBy
produces 1 file per partition value which is not great when its a hash.
Switching to bucketed parquet tables to avoid shuffles has brought my processing pipeline down from 2 hours to 5 minutes.
Today, databricks returns warnings (hints?) that I am not using Delta tables:
Accelerate queries with Delta: This query is on a non-Delta table with many small files. To improve the performance of queries, convert to Delta and run the OPTIMIZE command on the table
but if I attempt to switch formats, I get the error:
"AnalysisException: Operation not allowed:
Bucketing
is not supported for Delta tables"
Ideally this would be supported, but at the very least, databricks shouldn't be hinting to switch to delta if the current format is bucketed parquet.
Hi @alainbryden - if you have Databricks-specific questions, please contact the appropriate Databricks support.
We can only answer questions about open-source Delta Lake :)
@scottsand-db This is not my ticket, I merely added my experience as a comment. The original question does not appear to be databricks specific. I suggest reopening.
Any update on this feature request?. we are also interested in this feature.
@dennyglee Hi from ActionIQ, is this ticket written clearly and does it have the right tags to get triaged?
Hi @MasterDDT - the ticket is clearly written but I guess the concern I immediately have is that df.repartiton()
would require changes to how Spark would "decide" to run the Hashpartition(y, $col)
. We would want to design the API so how Trino, Rust, Python (and all of the other frameworks) would interact with this metadata. We'll probably have to noodle this design a bit so that way all systems could leverage this, eh?!
There's already the _delta_log/*
metadata that all clients need to read, could you stick it in there?
Yeah I thought maybe the implementation of the hash function could be an issue, I dont know if df.repartition(y, $col)
produces different layout of data based on engine (Spark vs Trino) version, Java version, CPU arch, etc. It could be best effort so if you write with (sparkVer=x, javaVer=x, ...) and read it back with same thing, it would honor that and give you an already partitioned dataset?
Essentially what we want is to get away from using a metastore...its only useful (for us) to keep schema + partitioning information, which delta could do too, and then the data is next to the metadata which is very nice for portability. We might still use a metastore but then its more like a cache (we dont care about dropping it).
It's been an year, any updates on this feature support? Basically our workload is same as @MasterDDT that we need to bucketBy UserId and filter by specific UserId and read only one bucket instead of all data. We have tried zorder by UserId and the perf is not ideal.
Hi all, what's the status on the feature? We also want bucket by a list of columns, so consumers can leverage such info for joins and aggregations.
I investigated Z ordering and liquid clustering, but they are for data skipping.
This might be a naïve question, but why we want to support bucketing using df.repartition
instead of the DataFrameWriter.bucketBy
operation?
As a simpler implementation, you could ignore supporting the Spark Dataframe partitioning 🤷 . Ultimately, here is what I'm looking for:
someDF
.write.format("delta")
.option("newHashBucketingColumn", "foo")
.option("newHashBucketingNumPartitions", "5")
.save("s3://somewhere")
spark.read.format("delta").load("s3://somewhere").queryExecution.sparkPlan.outputPartitioning match {
case HashPartitioning(5, "foo") => yes!
case _ => no
}
^ allows me to take two Delta tables with the same bucketing spec and join them without a shuffle. Otherwise I am pretty sure ^ will always shuffle no matter what zorder/liquid/etc you do.
Hi all, I have done a lot of investigation and made this design of implementing bucketing in the delta.
Please take a look at the design and share your thoughts! Also, I assume that everyone in this thread would be interested in the bucketing feature, so please upvote this feature in any channel to help speed up this feature.
Hi all, since this issue has label as question
, I formally filled a feature request. Please upvote that feature (or provide any other help) to make it happen sooner!
I am willing to contribute this feature, but since I am new to the Delta community, I need some suggestion and guidance from all of you to make it happen, any help is welcomed!