iceberg icon indicating copy to clipboard operation
iceberg copied to clipboard

Adding MergeInto into the Spark Scala API

Open nssalian opened this issue 3 years ago • 9 comments

Context: https://iceberg.apache.org/#spark-writes/#merge-into I see that merge into is the recommended writing pattern instead of Insert Overwrite. But there's no Dataframe/ DataSet Scala API equivalent and we have to resort to using SQL via Spark Sql. It would be great to have it exposed like append, overwrite.

Possible solution: This has to be pushed out in the Spark community and an addition is needed in the Dataframe/ DataSet API. Delta has a solution to this: https://github.com/delta-io/delta/blob/7c6a0ceca84b5d04653dec7361aa82075542dfee/core/src/main/scala/io/delta/tables/DeltaTable.scala#L354-L406 It would help users in the Iceberg community to have this as well.

Concern: This might take longer to implement on the Spark side. Perhaps we could do something to add it to Iceberg.

nssalian avatar Dec 03 '21 23:12 nssalian

@nssalian Just for my understanding. Spark has this Merge into table support. Anything else that needs to be implemented on OSS Spark side?

huaxingao avatar Dec 08 '21 00:12 huaxingao

@huaxingao thanks for chiming in. Could you point me to the Dataframe/ DataSource API where the mergeInto is exposed? I was under the impression that only Delta has this implemented. CC @kbendick if I missed something.

nssalian avatar Dec 08 '21 01:12 nssalian

@nssalian There isn't a mergeInto Dataframe/ DataSet API in Spark. The SQL syntax merge into is supported in Spark, though. I am just trying to figure out what exactly needs to be done in Spark to make this work. I might be able to implement that.

huaxingao avatar Dec 08 '21 01:12 huaxingao

That's right. The request is the mergeInto Dataframe/ DataSet API in Spark, so that Iceberg could take advantage of it in the API rather than SQL. Not a blocker but would be great to have along side append, overwrite.

nssalian avatar Dec 08 '21 01:12 nssalian

@huaxingao , following up on this. Any update if this might be on the roadmap for Spark to add it part of the Dataframe,etc API?

nssalian avatar Jan 05 '22 17:01 nssalian

I think we could add some implicits in Iceberg's Scala Modules outside of Spark as well. That might be a good path forward

RussellSpitzer avatar Jan 05 '22 18:01 RussellSpitzer

Sorry I didn't have a chance to check with the Spark SQL folks yet. Will do and get back to you this week.

huaxingao avatar Jan 05 '22 18:01 huaxingao

Yes, it is possible to add the Merge API into DataFrameWriterV2.

huaxingao avatar Jan 06 '22 23:01 huaxingao

@huaxingao - wow, having a Merge in the DataFrameWriterV2 would be great - is this already tracked by the Spark folks, jira or issue we can monitor?

kpe avatar Aug 11 '22 15:08 kpe

@huaxingao any update from Spark side?

manuzhang avatar Nov 01 '22 06:11 manuzhang

Hi @RussellSpitzer @huaxingao

If this functionality is still required and Spark side is still pending, I might try to implement it (as a Scala developer I would really like to have the merge API).

I just need to have a chat with someone who knows the Scala code base to clarify some missing pieces in my analysis.

I see two possible implementations, both following the implicit approach.

Option 1:

  • implicits that extends DataSet[T] with a new method mergeTo(table,condition)
  • implicits that extends DataFrameWriterV2[T] with the following method:
    • whenMatched(condition:Option[String],actions:Seq[String])
    • whenNotMatched(condition:Option[String],actions:Seq[String])
    • merge() that will calls the final operations
ds
.mergeTo(table,condition)
.whenMatched(something here)
.whenNotMatched(something here)
.merge()

Option 2:

  • implicits that extends DataFrameWriterV2[T] with the following method:
    • mergeWhen(condition:Option[String])
    • whenMatched(condition:Option[String],actions:Seq[String])
    • whenNotMatched(condition:Option[String],actions:Seq[String])
    • merge() that will calls the final operations
ds
.writeTo(table)
.mergeWhen(condition)
.whenMatched(something here)
.whenNotMatched(something here)
.merge()

The difference between the two approaches is in the syntax of the API: on the first one we generate a new 'mergeTo' method, on the other we refer to the already existing 'writeTo'.

Looking at the actual code base I see that a MergeInto operation has already been resolved in a Spark LogicalPlan, probably we could reuse this logic and just call the executePlan on the final merge call

giucris avatar Mar 05 '23 13:03 giucris

Sorry I didn't have a chance to work on this yet. I think it's probably better to support this in Spark directly. What do you think @RussellSpitzer?

huaxingao avatar Mar 06 '23 06:03 huaxingao

I do think changing this in Spark is probably the best approach for adding the api to DataframeWriter but I also would be glad to see some code in the Iceberg repo giving us functionality until then. Because we expect this function may be deprecated when the Spark API becomes available, I think it makes sense to keep this as just a public static method rather than an implicit. This should make it a lot clearer when we deprecate and remove it for a Spark native alternative.

Maybe something like

IcebergMergeInto(Table).from(source).when(.....)

Let's also try to keep this api Java friendly.

RussellSpitzer avatar Mar 06 '23 20:03 RussellSpitzer

@RussellSpitzer If the plan is to deprecate it in the near future in favour of the Spark native solution I completely understand the point of not creating a true Scala DSL.

If you want I can try to write something down.

giucris avatar Mar 06 '23 20:03 giucris

I would really appreciate that, I think it's great to have it ASAP.

RussellSpitzer avatar Mar 06 '23 20:03 RussellSpitzer

Waiting for Dataframe api for merge into. This is pretty basic not sure why is it not yet available.

adarsh112 avatar Mar 10 '23 11:03 adarsh112

Hello folks, Our data science community using DataFrame APIs quite heavily. Missing this merge functionality using Spark's DataFrame API is pushing back on adopting Iceberg against Delta. Is there any way we can move this up in priority list? Please help.

abhinavofficial avatar Apr 06 '23 02:04 abhinavofficial

The best way to get this done is to write a pull request. This is an oss project and everyone here is basically a volunteer.

RussellSpitzer avatar Apr 06 '23 03:04 RussellSpitzer

In future releases, there also has been an agreement to ship this natively in Spark. That said, nothing prevents us from having a temp solution inside Iceberg for now.

aokolnychyi avatar May 16 '23 04:05 aokolnychyi

Hi @aokolnychyi I will complete the MR with other test cases then let me know if you want to merge it or not. That's not an issue. Anyway It was really fun for me to try to implement it.

giucris avatar May 17 '23 06:05 giucris

@aokolnychyi, is it possible to get the DataFrame API in Spark updated now that 3.5 has implementations for these plans?

rdblue avatar Sep 22 '23 18:09 rdblue

Spark mergeInto API is merged in 4.0.0.

Example usage:

spark.table("source")
  .mergeInto("target", $"source.id" === $"target.id")
  .whenMatched($"salary" === 100)
  .delete()
  .whenNotMatched()
  .insertAll()
  .whenNotMatchedBySource($"salary" === 100)
  .update(Map(
    "salary" -> lit(200)
  ))
 .merge()

huaxingao avatar Dec 21 '23 02:12 huaxingao