iceberg
iceberg copied to clipboard
Adding MergeInto into the Spark Scala API
Context:
https://iceberg.apache.org/#spark-writes/#merge-into I see that merge into is the recommended writing pattern instead of Insert Overwrite. But there's no Dataframe/ DataSet Scala API equivalent and we have to resort to using SQL via Spark Sql.
It would be great to have it exposed like append
, overwrite
.
Possible solution: This has to be pushed out in the Spark community and an addition is needed in the Dataframe/ DataSet API. Delta has a solution to this: https://github.com/delta-io/delta/blob/7c6a0ceca84b5d04653dec7361aa82075542dfee/core/src/main/scala/io/delta/tables/DeltaTable.scala#L354-L406 It would help users in the Iceberg community to have this as well.
Concern: This might take longer to implement on the Spark side. Perhaps we could do something to add it to Iceberg.
@nssalian Just for my understanding. Spark has this Merge into table support. Anything else that needs to be implemented on OSS Spark side?
@huaxingao thanks for chiming in. Could you point me to the Dataframe/ DataSource API where the mergeInto is exposed? I was under the impression that only Delta has this implemented. CC @kbendick if I missed something.
@nssalian There isn't a mergeInto Dataframe/ DataSet API in Spark. The SQL syntax merge into is supported in Spark, though. I am just trying to figure out what exactly needs to be done in Spark to make this work. I might be able to implement that.
That's right. The request is the mergeInto Dataframe/ DataSet API in Spark, so that Iceberg could take advantage of it in the API rather than SQL. Not a blocker but would be great to have along side append, overwrite.
@huaxingao , following up on this. Any update if this might be on the roadmap for Spark to add it part of the Dataframe,etc API?
I think we could add some implicits in Iceberg's Scala Modules outside of Spark as well. That might be a good path forward
Sorry I didn't have a chance to check with the Spark SQL folks yet. Will do and get back to you this week.
Yes, it is possible to add the Merge
API into DataFrameWriterV2
.
@huaxingao - wow, having a Merge
in the DataFrameWriterV2
would be great - is this already tracked by the Spark folks, jira or issue we can monitor?
@huaxingao any update from Spark side?
Hi @RussellSpitzer @huaxingao
If this functionality is still required and Spark side is still pending, I might try to implement it (as a Scala developer I would really like to have the merge API).
I just need to have a chat with someone who knows the Scala code base to clarify some missing pieces in my analysis.
I see two possible implementations, both following the implicit approach.
Option 1:
- implicits that extends DataSet[T] with a new method mergeTo(table,condition)
- implicits that extends DataFrameWriterV2[T] with the following method:
- whenMatched(condition:Option[String],actions:Seq[String])
- whenNotMatched(condition:Option[String],actions:Seq[String])
- merge() that will calls the final operations
ds
.mergeTo(table,condition)
.whenMatched(something here)
.whenNotMatched(something here)
.merge()
Option 2:
- implicits that extends DataFrameWriterV2[T] with the following method:
- mergeWhen(condition:Option[String])
- whenMatched(condition:Option[String],actions:Seq[String])
- whenNotMatched(condition:Option[String],actions:Seq[String])
- merge() that will calls the final operations
ds
.writeTo(table)
.mergeWhen(condition)
.whenMatched(something here)
.whenNotMatched(something here)
.merge()
The difference between the two approaches is in the syntax of the API: on the first one we generate a new 'mergeTo' method, on the other we refer to the already existing 'writeTo'.
Looking at the actual code base I see that a MergeInto operation has already been resolved in a Spark LogicalPlan, probably we could reuse this logic and just call the executePlan on the final merge call
Sorry I didn't have a chance to work on this yet. I think it's probably better to support this in Spark directly. What do you think @RussellSpitzer?
I do think changing this in Spark is probably the best approach for adding the api to DataframeWriter but I also would be glad to see some code in the Iceberg repo giving us functionality until then. Because we expect this function may be deprecated when the Spark API becomes available, I think it makes sense to keep this as just a public static method rather than an implicit. This should make it a lot clearer when we deprecate and remove it for a Spark native alternative.
Maybe something like
IcebergMergeInto(Table).from(source).when(.....)
Let's also try to keep this api Java friendly.
@RussellSpitzer If the plan is to deprecate it in the near future in favour of the Spark native solution I completely understand the point of not creating a true Scala DSL.
If you want I can try to write something down.
I would really appreciate that, I think it's great to have it ASAP.
Waiting for Dataframe api for merge into. This is pretty basic not sure why is it not yet available.
Hello folks, Our data science community using DataFrame APIs quite heavily. Missing this merge functionality using Spark's DataFrame API is pushing back on adopting Iceberg against Delta. Is there any way we can move this up in priority list? Please help.
The best way to get this done is to write a pull request. This is an oss project and everyone here is basically a volunteer.
In future releases, there also has been an agreement to ship this natively in Spark. That said, nothing prevents us from having a temp solution inside Iceberg for now.
Hi @aokolnychyi I will complete the MR with other test cases then let me know if you want to merge it or not. That's not an issue. Anyway It was really fun for me to try to implement it.
@aokolnychyi, is it possible to get the DataFrame API in Spark updated now that 3.5 has implementations for these plans?
Spark mergeInto
API is merged in 4.0.0.
Example usage:
spark.table("source")
.mergeInto("target", $"source.id" === $"target.id")
.whenMatched($"salary" === 100)
.delete()
.whenNotMatched()
.insertAll()
.whenNotMatchedBySource($"salary" === 100)
.update(Map(
"salary" -> lit(200)
))
.merge()