iceberg icon indicating copy to clipboard operation
iceberg copied to clipboard

Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions

Open ahlagever opened this issue 2 years ago • 28 comments

Apache Iceberg version

1.2.0 (latest release)

Query engine

Spark

Please describe the bug 🐞

According to the documentation, when using Iceberg, one should set spark.sql.extensions to org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions, but setting this property seems to cause an exception to be thrown when trying to write to an Iceberg table using Spark structured streaming.

The exception that is thrown is:

Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: days(ts) is not currently supported
=== Streaming Query ===
Identifier: [id = cfb83943-cd87-4c84-bf25-a290e8891e19, runId = ddf71690-7e5d-41f6-8a8e-84c425683a26]
Current Committed Offsets: {}
Current Available Offsets: {MemoryStream[ts#3,a#4,b#5]: 0}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
WriteToMicroBatchDataSource spark_catalog.default.test_iceberg_table, cfb83943-cd87-4c84-bf25-a290e8891e19, [path=test_iceberg_table, fanout-enabled=true, checkpointLocation=/tmp/spark-checkpoint-16659193840247202419], Append
+- StreamingDataSourceV2Relation [ts#3, a#4, b#5], org.apache.spark.sql.execution.streaming.MemoryStreamScanBuilder@4533d1cf, MemoryStream[ts#3,a#4,b#5]

Code to reproduce:

package com.example

import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{Encoders, SparkSession}

import java.nio.file.Files
import java.sql.Timestamp

case class Bla(ts: Timestamp, a: String, b: Double)

object MinEx {
  def main(args: Array[String]): Unit = {
    val warehouseDir = Files.createTempDirectory("spark-warehouse-iceberg-").toString
    val checkpointDir = Files.createTempDirectory("spark-checkpoint-").toString
    val spark = SparkSession.builder()
      .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
      .config("spark.sql.catalog.spark_catalog.type", "hadoop")
      .config("spark.sql.catalog.spark_catalog.warehouse", warehouseDir)
      .config("spark.sql.warehouse.dir", warehouseDir)
      .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
      .appName("BugRepro")
      .master("local[*]")
      .enableHiveSupport()
      .getOrCreate()

    spark.sql("create table test_iceberg_table(ts timestamp, a string, b double) using iceberg partitioned by (days(ts))")

    implicit val sqlContext = spark.sqlContext
    implicit val encoder = Encoders.product[Bla]
    val memStream = MemoryStream[Bla]
    val now = System.currentTimeMillis()
    val day = 86400000
    memStream.addData(List(
      Bla(new Timestamp(now), "test", 12.34),
      Bla(new Timestamp(now - 1 * day), "test 1d", 33.34),
      Bla(new Timestamp(now - 3 * day), "test 3d", 44.34),
      Bla(new Timestamp(now - 2 * day), "test 2d", 55.34),
    ))

    memStream.toDF()
      .writeStream
      .format("iceberg")
      .outputMode("append")
      .option("path", "test_iceberg_table")
      .option("fanout-enabled", true)
      .option("checkpointLocation", checkpointDir)
      .trigger(Trigger.Once())
      .start()
      .awaitTermination()
  }
}

The code works as expected when the statement that configures spark.sql.extensions is commented out.

ahlagever avatar Mar 28 '23 14:03 ahlagever

Can reproduce this locally. Found in Iceberg 1.0.0 onward.

Fokko avatar Jun 18 '23 20:06 Fokko

@adigerber I know that it is a while ago, but which version of Spark are you using?

Fokko avatar Jun 18 '23 20:06 Fokko

Able to reproduce this using:

rm -rf /tmp/warehouse
rm -rf /tmp/spark-checkpoint

mkdir -p /tmp/warehouse
mkdir -p /tmp/spark-checkpoint

./bin/spark-shell \
 --packages org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.0.0 \
 --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
 --conf spark.sql.catalog.demo=org.apache.iceberg.spark.SparkCatalog \
 --conf spark.sql.catalog.demo.type=hadoop \
 --conf spark.sql.catalog.demo.warehouse=/tmp/warehouse \
 --conf spark.sql.warehouse.dir=/tmp/warehouse \
 --conf spark.sql.defaultCatalog=demo 
spark.sql("create table test_iceberg_table(ts timestamp, a string, b double) using iceberg partitioned by (days(ts))")


import java.nio.file.Files
import java.sql.Timestamp
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{Encoders, SparkSession}

case class Bla(ts: Timestamp, a: String, b: Double)

implicit val sqlContext = spark.sqlContext
implicit val encoder = Encoders.product[Bla]
val memStream = MemoryStream[Bla]
val now = System.currentTimeMillis()
val day = 86400000
memStream.addData(List(
  Bla(new Timestamp(now), "test", 12.34),
  Bla(new Timestamp(now - 1 * day), "test 1d", 33.34),
  Bla(new Timestamp(now - 3 * day), "test 3d", 44.34),
  Bla(new Timestamp(now - 2 * day), "test 2d", 55.34),
))

memStream.toDF()
  .writeStream
  .format("iceberg")
  .outputMode("append")
  .option("path", "/tmp/warehouse/test_iceberg_table")
  .option("checkpointLocation", "/tmp/spark-checkpoint")
  .option("fanout-enabled", true)
  .trigger(Trigger.Once())
  .start()
  .awaitTermination()

Any idea @aokolnychyi? Not really looking forward to bisecting between 0.13.1 and 1.0.0 :3

Fokko avatar Jun 18 '23 20:06 Fokko

Spark 3.1 with Iceberg 1.3.0 works fine. It seems that something is off with Spark 3.2+

Fokko avatar Jun 19 '23 15:06 Fokko

The stack trace:

23/06/19 17:45:53 ERROR MicroBatchExecution: Query [id = 2507e68d-0fb2-4aa2-98e4-427bfa1326c5, runId = 308060c0-6572-4328-9588-ece2b00c054a] terminated with error
org.apache.spark.sql.AnalysisException: days(ts) is not currently supported
	at org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.$anonfun$toCatalyst$1(V2ExpressionUtils.scala:64)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.toCatalyst(V2ExpressionUtils.scala:64)
	at org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$.$anonfun$prepareQuery$1(DistributionAndOrderingUtils.scala:36)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
	at org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$.prepareQuery(DistributionAndOrderingUtils.scala:36)
	at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:93)
	at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:43)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
	at org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:39)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211)
	at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
	at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
	at scala.collection.immutable.List.foldLeft(List.scala:91)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
	at org.apache.spark.sql.execution.streaming.IncrementalExecution.$anonfun$optimizedPlan$1(IncrementalExecution.scala:81)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
	at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan$lzycompute(IncrementalExecution.scala:82)
	at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan(IncrementalExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:136)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:154)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:151)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:656)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:646)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:256)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:219)
	at org.apache.spark.sql.execution.streaming.SingleBatchExecutor.execute(TriggerExecutor.scala:39)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:213)
	at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)

Spark 3.2 uses a V2 data source.

Fokko avatar Jun 19 '23 20:06 Fokko

@adigerber I know that it is a while ago, but which version of Spark are you using?

Either 3.3.1 or 3.3.2. IIRC it still happens in Spark 3.3.2 + Iceberg 1.2.1

ahlagever avatar Jun 19 '23 20:06 ahlagever

Was able to chase this down. Looks like the catalog is not passed down to the execution.

    // TODO (SPARK-27484): we should add the writing node before the plan is analyzed.
    sink match {
      case s: SupportsWrite =>
        val relationOpt = plan.catalogAndIdent.map {
          case (catalog, ident) => DataSourceV2Relation.create(s, Some(catalog), Some(ident))
        }
        WriteToMicroBatchDataSource(
          relationOpt,
          table = s,
          query = _logicalPlan,
          queryId = id.toString,
          extraOptions,
          outputMode)

The relationOpt is None

Fokko avatar Jun 23 '23 08:06 Fokko

Closing in. One workaround is to use toTable('test_iceberg_table ') instead of start().

Fokko avatar Jun 23 '23 08:06 Fokko

Closing in. One workaround is to use toTable('test_iceberg_table ') instead of start().

@Fokko For Spark 3.3.0 and Iceberg 1.2.1, calling the toTable method didn't solve the issue. Code:

df.writeStream
  .format("iceberg")
  .outputMode("append")
  .option("checkpointLocation", checkpointLocation)
  .toTable(tableName)
  .awaitTermination()

Error: org.apache.spark.sql.AnalysisException: hours(dt) is not currently supported

Could you suggest any other workarounds, please?

shuvaevv avatar Jun 28 '23 15:06 shuvaevv

@shuvaevv Sorry for the late reply, I missed this one. Do you have a stacktrace? I've cleaned up the PR, and that should be ready for the next release.

Fokko avatar Jul 13 '23 13:07 Fokko

same issue for me

Marcus-Rosti avatar Aug 08 '23 03:08 Marcus-Rosti

The stack trace:

23/06/19 17:45:53 ERROR MicroBatchExecution: Query [id = 2507e68d-0fb2-4aa2-98e4-427bfa1326c5, runId = 308060c0-6572-4328-9588-ece2b00c054a] terminated with error
org.apache.spark.sql.AnalysisException: days(ts) is not currently supported
	at org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.$anonfun$toCatalyst$1(V2ExpressionUtils.scala:64)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.toCatalyst(V2ExpressionUtils.scala:64)
	at org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$.$anonfun$prepareQuery$1(DistributionAndOrderingUtils.scala:36)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
	at org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$.prepareQuery(DistributionAndOrderingUtils.scala:36)
	at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:93)
	at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:43)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
	at org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:39)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211)
	at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
	at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
	at scala.collection.immutable.List.foldLeft(List.scala:91)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
	at org.apache.spark.sql.execution.streaming.IncrementalExecution.$anonfun$optimizedPlan$1(IncrementalExecution.scala:81)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
	at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan$lzycompute(IncrementalExecution.scala:82)
	at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan(IncrementalExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:136)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:154)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:151)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:656)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:646)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:256)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:219)
	at org.apache.spark.sql.execution.streaming.SingleBatchExecutor.execute(TriggerExecutor.scala:39)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:213)
	at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)

Spark 3.2 uses a V2 data source.

same stack trace for me on 1.3.1 and spark 3.3

Marcus-Rosti avatar Aug 08 '23 12:08 Marcus-Rosti

toTable wasn't a fix

Marcus-Rosti avatar Aug 08 '23 14:08 Marcus-Rosti

same issue for me

arun-dhingra avatar Aug 10 '23 09:08 arun-dhingra

This is a Spark issue, not an Iceberg issue (at least in 3.4). We may consider fixing Spark 3.3 and older but I am not so sure about 3.4. In Spark 3.4, we are relying on the function catalog API to resolve transforms.

@Marcus-Rosti, could you confirm toTable works in 3.4? I believe you tried 3.3 before.

@Fokko is right that start(), unlike toTable(), does not populate the catalog, hence we can't resolve the transforms. I believe the right solution would be to fix Spark to use SupportsCatalogOptions when loading Table from TableProvider.

aokolnychyi avatar Aug 22 '23 02:08 aokolnychyi

Hello, my friends.

I'm using Structured streaming writes to partitioned table, and I'm using Iceberg's transformations to partition.

Trying to record with .start(), I get the same error, but testing to use the approach of passing toTable('iceberg_table') instead of start(), it worked.

I am using Iceberg 1.3.0 and Spark 3.4.0.

This way it didn't work

df.writeStream.format("iceberg").outputMode("append").trigger(
    once=True
).option("path", iceberg_table).option("fanout-enabled", "true").option(
    "checkpointLocation",
    checkpoint_location,
).start().awaitTermination()

This way it worked:

df.writeStream.format("iceberg").outputMode("append").trigger(
    once=True
).option("path", iceberg_table).option("fanout-enabled", "true").option(
    "checkpointLocation",
    checkpoint_location,
).toTable(
    iceberg_table
).awaitTermination()

FabricioZGalvani avatar Aug 22 '23 21:08 FabricioZGalvani

Hello, my friends.

I'm using Structured streaming writes to partitioned table, and I'm using Iceberg's transformations to partition.

Trying to record with .start(), I get the same error, but testing to use the approach of passing toTable('iceberg_table') instead of start(), it worked.

I am using Iceberg 1.3.0 and Spark 3.4.0.

This way it didn't work

df.writeStream.format("iceberg").outputMode("append").trigger(
    once=True
).option("path", iceberg_table).option("fanout-enabled", "true").option(
    "checkpointLocation",
    checkpoint_location,
).start().awaitTermination()

This way it worked:

df.writeStream.format("iceberg").outputMode("append").trigger(
    once=True
).option("path", iceberg_table).option("fanout-enabled", "true").option(
    "checkpointLocation",
    checkpoint_location,
).toTable(
    iceberg_table
).awaitTermination()

Yeah I got an error with iceberg 1.3 and spark 3.3, that's interesting

Marcus-Rosti avatar Aug 26 '23 13:08 Marcus-Rosti

Hello, my friends.

I'm using Structured streaming writes to partitioned table, and I'm using Iceberg's transformations to partition.

Trying to record with .start(), I get the same error, but testing to use the approach of passing toTable('iceberg_table') instead of start(), it worked.

I am using Iceberg 1.3.0 and Spark 3.4.0.

This way it didn't work

df.writeStream.format("iceberg").outputMode("append").trigger( once=True ).option("path", iceberg_table).option("fanout-enabled", "true").option( "checkpointLocation", checkpoint_location, ).start().awaitTermination() This way it worked:

df.writeStream.format("iceberg").outputMode("append").trigger( once=True ).option("path", iceberg_table).option("fanout-enabled", "true").option( "checkpointLocation", checkpoint_location, ).toTable( iceberg_table ).awaitTermination()

For me as well toTable works in spark 3.4

arundh93 avatar Sep 01 '23 08:09 arundh93

If I remember correctly, Spark 3.3 does not have the function catalog. Therefore, we can't resolve Iceberg transforms. I think the right approach is either to use toTable in Spark 3.4 and above or fix Spark the old loading API (probably won't be trivial).

aokolnychyi avatar Oct 12 '23 01:10 aokolnychyi

hello @aokolnychyi . We are meeting similar issue. Spark 3.3.1 and iceberg 1.1.0. We previously use spark3.2.1 and iceberg 1.1.0 it works fine. And we find the issue when we bump up to spark 3.3.1. Any suggestion is appreciate. thanks

kaijiezhang0319 avatar Jan 26 '24 20:01 kaijiezhang0319

We're encountering this issue using iceberg 1.4.3 on Spark 3.3.0 (AWS Glue 4.0) and using toTable hasn't fixed the issue.

greg-roberts-bbc avatar Mar 14 '24 13:03 greg-roberts-bbc

We've found a workaround in our use case. (Iceberg 1.4.3, Spark 3.3.0 on Glue 4.0).

Our previous flow was:

TABLE = "glue_catalog.<database>.<table>"

# set up readStream
read_stream = spark.readStream.format(
<setup read stream>
.load()

# dataframe operations
df = read_stream.select(
<various dataframe operations>
)

# setup write stream
write_stream = df.writeStream.format("iceberg").outputMode("append").trigger(
    processingTime=job_args["TRIGGER_PROCESSING_TIME"]
).options(**{
    "fanout-enabled": job_args["FANOUT_ENABLED"],
    "checkpointLocation": job_args["CHECKPOINT_LOCATION"],
}).toTable(TABLE)

which always failed on the insert with the above described error.

Our new flow is to use processBatch:

def process_batch(df, batch_id):
    df = df.select(
    <various dataframe operations>
    )

    df.writeTo(TABLE).append()


read_stream.writeStream.forEachBatch(process_batch).start()

The above is for completeness, as we're actually using Glue's inbuilt GlueContext.forEachBatch but it does exactly the same thing.

and this is no longer failing. We're able to write to the table with partition transforms (we're using hour() to partition our data).

Interestingly, the data is now being written to S3 as you'd expect for the S3FileIO implementation (i.e. writes are prefixed with a random string, where previously this wasn't happening.

It would be nice to use the inbuilt write triggers as described in the docs but we are happy with a working solution. and this allows us to add MERGE behaviour in with SQL.

Hope someone else finds this useful!

greg-roberts-bbc avatar Mar 21 '24 12:03 greg-roberts-bbc

We've found a workaround in our use case. (Iceberg 1.4.3, Spark 3.3.0 on Glue 4.0).

Our previous flow was:

TABLE = "glue_catalog.<database>.<table>"

# set up readStream
read_stream = spark.readStream.format(
<setup read stream>
.load()

# dataframe operations
df = read_stream.select(
<various dataframe operations>
)

# setup write stream
write_stream = df.writeStream.format("iceberg").outputMode("append").trigger(
    processingTime=job_args["TRIGGER_PROCESSING_TIME"]
).options(**{
    "fanout-enabled": job_args["FANOUT_ENABLED"],
    "checkpointLocation": job_args["CHECKPOINT_LOCATION"],
}).toTable(TABLE)

which always failed on the insert with the above described error.

Our new flow is to use processBatch:

def process_batch(df, batch_id):
    df = df.select(
    <various dataframe operations>
    )

    df.writeTo(TABLE).append()


read_stream.writeStream.forEachBatch(process_batch).start()

The above is for completeness, as we're actually using Glue's inbuilt GlueContext.forEachBatch but it does exactly the same thing.

and this is no longer failing. We're able to write to the table with partition transforms (we're using hour() to partition our data).

Interestingly, the data is now being written to S3 as you'd expect for the S3FileIO implementation (i.e. writes are prefixed with a random string, where previously this wasn't happening.

It would be nice to use the inbuilt write triggers as described in the docs but we are happy with a working solution. and this allows us to add MERGE behaviour in with SQL.

Hope someone else finds this useful!

But this way there is no checkpointLocation. How do you manage the offset?

stevenlii avatar Apr 10 '24 10:04 stevenlii

@stevenlii

But this way there is no checkpointLocation. How do you manage the offset?

As I said, we're using GlueContext.forEachBatch which allows you to specify the checkpoint location as follows:

glue_context.forEachBatch(
    frame=read_stream,
    batch_function=process_batch,
    options={
        "windowSize": job_args["TRIGGER_PROCESSING_TIME"],
        "fanout-enabled": job_args["FANOUT_ENABLED"],
        "checkpointLocation": job_args["CHECKPOINT_LOCATION"],
        "persistDataFrame": "false",
    }
)

The equivalent call to writeStream.forEachBatch would be something like:

read_stream.writeStream.foreachBatch(process_batch).trigger(processingTime=windowSize).option("checkpointLocation", checkpointLocation)

As can be seen in the Glue source code I linked above, here: https://github.com/awslabs/aws-glue-libs/blob/master/awsglue/context.py#L641

greg-roberts-bbc avatar Apr 11 '24 11:04 greg-roberts-bbc

I am writing using structured spark streaming to iceberg, my spark version is 3.4.2 and iceberg version is 2. I am facing this issue also and changing .option() to .toTable didn't help. `24/04/22 12:15:45 WARN V2ExpressionUtils: V2 function years with parameter types (timestamp) is used in partition transforms, but its definition couldn't be found in the function catalog provided 24/04/22 12:15:45 WARN V2ExpressionUtils: V2 function years with parameter types (timestamp) is used in partition transforms, but its definition couldn't be found in the function catalog provided 24/04/22 12:15:45 ERROR MicroBatchExecution: Query [id = 5222ef61-86e7-4f64-981c-6f330483730b, runId = c873f460-5c5b-43f3-a182-9bfa09d29b34] terminated with error org.apache.spark.sql.AnalysisException: years(timestamp) is not currently supported at org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.$anonfun$toCatalyst$1(V2ExpressionUtils.scala:71) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.toCatalyst(V2ExpressionUtils.scala:71) at org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$.$anonfun$prepareQuery$2(DistributionAndOrderingUtils.scala:45) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198) at org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$.prepareQuery(DistributionAndOrderingUtils.scala:45) at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:95) at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:43) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488) at org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:43) at org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:39) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222) at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126) at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122) at scala.collection.immutable.List.foldLeft(List.scala:91) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211) at scala.collection.immutable.List.foreach(List.scala:431) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88) at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182) at org.apache.spark.sql.execution.streaming.IncrementalExecution.$anonfun$optimizedPlan$1(IncrementalExecution.scala:91) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201) at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan$lzycompute(IncrementalExecution.scala:83) at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan(IncrementalExecution.scala:83) at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:153) at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:171) at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:168) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:717) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:706) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:284) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:247) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:237) at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:306) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:284) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:207) 24/04/22 12:15:45 ERROR MicroBatchExecution: Query [id = 4cf32767-e385-43b7-8550-09125c6f638c, runId = 6de5e2e7-7638-4289-9a5c-aa45f2281889] terminated with error org.apache.spark.sql.AnalysisException: years(timestamp) is not currently supported at org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.$anonfun$toCatalyst$1(V2ExpressionUtils.scala:71) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.toCatalyst(V2ExpressionUtils.scala:71) at org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$.$anonfun$prepareQuery$2(DistributionAndOrderingUtils.scala:45) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198) at org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$.prepareQuery(DistributionAndOrderingUtils.scala:45) at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:95) at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:43) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488) at org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:43) at org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:39) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222) at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126) at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122) at scala.collection.immutable.List.foldLeft(List.scala:91) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211) at scala.collection.immutable.List.foreach(List.scala:431) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88) at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182) at org.apache.spark.sql.execution.streaming.IncrementalExecution.$anonfun$optimizedPlan$1(IncrementalExecution.scala:91) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201) at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan$lzycompute(IncrementalExecution.scala:83) at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan(IncrementalExecution.scala:83) at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:153) at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:171) at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:168) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:717) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:706) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:284) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:247) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:237) at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:306) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:284) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:207) 24/04/22 12:15:45 INFO AppInfoParser: App info kafka.admin.client for adminclient-2 unregistered 24/04/22 12:15:45 INFO AppInfoParser: App info kafka.admin.client for adminclient-1 unregistered 24/04/22 12:15:45 INFO Metrics: Metrics scheduler closed 24/04/22 12:15:45 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter 24/04/22 12:15:45 INFO Metrics: Metrics reporters closed 24/04/22 12:15:45 INFO MicroBatchExecution: Async log purge executor pool for query [id = 4cf32767-e385-43b7-8550-09125c6f638c, runId = 6de5e2e7-7638-4289-9a5c-aa45f2281889] has been shutdown 24/04/22 12:15:45 INFO Metrics: Metrics scheduler closed 24/04/22 12:15:45 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter 24/04/22 12:15:45 INFO Metrics: Metrics reporters closed 24/04/22 12:15:45 INFO MicroBatchExecution: Async log purge executor pool for query [id = 5222ef61-86e7-4f64-981c-6f330483730b, runId = c873f460-5c5b-43f3-a182-9bfa09d29b34] has been shutdown 24/04/22 12:15:45 ERROR TestJobPipelineImpl: stream error: {} org.apache.spark.sql.streaming.StreamingQueryException: years(timestamp) is not currently supported === Streaming Query === Identifier: [id = 4cf32767-e385-43b7-8550-09125c6f638c, runId = 6de5e2e7-7638-4289-9a5c-aa45f2281889] Current Committed Offsets: {} Current Available Offsets: {KafkaV2[Subscribe[com.engati.write.user.journey.conversion.data.to.db]]: {"com.engati.write.user.journey.conversion.data.to.db":{"0":729547}}}

Current State: ACTIVE Thread State: RUNNABLE

Logical Plan: WriteToMicroBatchDataSource RelationV2[bot_ref#441, user_id#442, timestamp#443, source_type#444, conversion_type#445, ad_id#446, ad_source#447, ad_type#448, broadcast_id#449, broadcast_response_type#450, flow_id#451, attribute_id#452] local.user_journey.conversion_analytics local.user_journey.conversion_analytics, local.user_journey.conversion_analytics, 4cf32767-e385-43b7-8550-09125c6f638c, [checkpointLocation=/tmp/checkpointOne, fanout-enabled=true], Append +- Project [bot_ref#78, user_id#91, timestamp#143, source_type#156, conversion_type#195, ad_id#104, ad_source#117, ad_type#130, broadcast_id#169, broadcast_response_type#182, flow_id#208, attribute_id#221] +- Project [ad_id#104, ad_source#117, ad_type#130, attributeId#43 AS attribute_id#221, bot_ref#78, broadcast_id#169, broadcast_response_type#182, conversion_type#195, flow_id#208, source_type#156, timestamp#143, user_id#91] +- Project [ad_id#104, ad_source#117, ad_type#130, attributeId#43, bot_ref#78, broadcast_id#169, broadcast_response_type#182, conversion_type#195, flowId#48 AS flow_id#208, source_type#156, timestamp#143, user_id#91] +- Project [ad_id#104, ad_source#117, ad_type#130, attributeId#43, bot_ref#78, broadcast_id#169, broadcast_response_type#182, conversionType#47 AS conversion_type#195, flowId#48, source_type#156, timestamp#143, user_id#91] +- Project [ad_id#104, ad_source#117, ad_type#130, attributeId#43, bot_ref#78, broadcast_id#169, broadcastResponseType#46 AS broadcast_response_type#182, conversionType#47, flowId#48, source_type#156, timestamp#143, user_id#91] +- Project [ad_id#104, ad_source#117, ad_type#130, attributeId#43, bot_ref#78, broadcastId#45 AS broadcast_id#169, broadcastResponseType#46, conversionType#47, flowId#48, source_type#156, timestamp#143, user_id#91] +- Project [ad_id#104, ad_source#117, ad_type#130, attributeId#43, bot_ref#78, broadcastId#45, broadcastResponseType#46, conversionType#47, flowId#48, sourceType#49 AS source_type#156, timestamp#143, user_id#91] +- Project [ad_id#104, ad_source#117, ad_type#130, attributeId#43, bot_ref#78, broadcastId#45, broadcastResponseType#46, conversionType#47, flowId#48, sourceType#49, timestamp#50 AS timestamp#143, user_id#91] +- Project [ad_id#104, ad_source#117, adType#42 AS ad_type#130, attributeId#43, bot_ref#78, broadcastId#45, broadcastResponseType#46, conversionType#47, flowId#48, sourceType#49, timestamp#50, user_id#91] +- Project [ad_id#104, adSource#41 AS ad_source#117, adType#42, attributeId#43, bot_ref#78, broadcastId#45, broadcastResponseType#46, conversionType#47, flowId#48, sourceType#49, timestamp#50, user_id#91] +- Project [adId#40 AS ad_id#104, adSource#41, adType#42, attributeId#43, bot_ref#78, broadcastId#45, broadcastResponseType#46, conversionType#47, flowId#48, sourceType#49, timestamp#50, user_id#91] +- Project [adId#40, adSource#41, adType#42, attributeId#43, bot_ref#78, broadcastId#45, broadcastResponseType#46, conversionType#47, flowId#48, sourceType#49, timestamp#50, userId#51 AS user_id#91] +- Project [adId#40, adSource#41, adType#42, attributeId#43, botRef#44 AS bot_ref#78, broadcastId#45, broadcastResponseType#46, conversionType#47, flowId#48, sourceType#49, timestamp#50, userId#51] +- TypedFilter com.engati.analytics.etl.pipeline.impl.TestJobPipelineImpl$$Lambda$1655/1994143461@2556e117, class com.engati.analytics.etl.extract.models.ConversionDTO, [StructField(adId,StringType,true), StructField(adSource,StringType,true), StructField(adType,StringType,true), StructField(attributeId,IntegerType,true), StructField(botRef,IntegerType,true), StructField(broadcastId,StringType,true), StructField(broadcastResponseType,StringType,true), StructField(conversionType,StringType,true), StructField(flowId,IntegerType,true), StructField(sourceType,StringType,true), StructField(timestamp,TimestampType,true), StructField(userId,StringType,true)], initializejavabean(newInstance(class com.engati.analytics.etl.extract.models.ConversionDTO), (setFlowId,staticinvoke(class java.lang.Integer, ObjectType(class java.lang.Integer), valueOf, cast(flowId#48 as int), true, false, true)), (setConversionType,cast(conversionType#47 as string).toString), (setTimestamp,staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, cast(timestamp#50 as timestamp), true, false, true)), (setBotRef,staticinvoke(class java.lang.Integer, ObjectType(class java.lang.Integer), valueOf, cast(botRef#44 as int), true, false, true)), (setBroadcastId,cast(broadcastId#45 as string).toString), (setAdType,cast(adType#42 as string).toString), (setAdSource,cast(adSource#41 as string).toString), (setBroadcastResponseType,cast(broadcastResponseType#46 as string).toString), (setAttributeId,staticinvoke(class java.lang.Integer, ObjectType(class java.lang.Integer), valueOf, cast(attributeId#43 as int), true, false, true)), (setSourceType,cast(sourceType#49 as string).toString), (setUserId,cast(userId#51 as string).toString), (setAdId,cast(adId#40 as string).toString)) +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, com.engati.analytics.etl.extract.models.ConversionDTO, true])).getAdId, true, false, true) AS adId#40, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, com.engati.analytics.etl.extract.models.ConversionDTO, true])).getAdSource, true, false, true) AS adSource#41, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, com.engati.analytics.etl.extract.models.ConversionDTO, true])).getAdType, true, false, true) AS adType#42, knownnotnull(assertnotnull(input[0, com.engati.analytics.etl.extract.models.ConversionDTO, true])).getAttributeId.intValue AS attributeId#43, knownnotnull(assertnotnull(input[0, com.engati.analytics.etl.extract.models.ConversionDTO, true])).getBotRef.intValue AS botRef#44, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, com.engati.analytics.etl.extract.models.ConversionDTO, true])).getBroadcastId, true, false, true) AS broadcastId#45, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, com.engati.analytics.etl.extract.models.ConversionDTO, true])).getBroadcastResponseType, true, false, true) AS broadcastResponseType#46, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, com.engati.analytics.etl.extract.models.ConversionDTO, true])).getConversionType, true, false, true) AS conversionType#47, knownnotnull(assertnotnull(input[0, com.engati.analytics.etl.extract.models.ConversionDTO, true])).getFlowId.intValue AS flowId#48, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, com.engati.analytics.etl.extract.models.ConversionDTO, true])).getSourceType, true, false, true) AS sourceType#49, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, knownnotnull(assertnotnull(input[0, com.engati.analytics.etl.extract.models.ConversionDTO, true])).getTimestamp, true, false, true) AS timestamp#50, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, com.engati.analytics.etl.extract.models.ConversionDTO, true])).getUserId, true, false, true) AS userId#51] +- MapElements com.engati.analytics.etl.pipeline.impl.TestJobPipelineImpl$$Lambda$1641/1836606934@1e93834b, class java.lang.String, [StructField(value,StringType,true)], obj#39: com.engati.analytics.etl.extract.models.ConversionDTO +- DeserializeToObject cast(value#21 as string).toString, obj#38: java.lang.String +- Project [cast(value#8 as string) AS value#21] +- StreamingDataSourceV2Relation [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@48bd89fe, KafkaV2[Subscribe[com.engati.write.user.journey.conversion.data.to.db]]

at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:333)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:207)

Caused by: org.apache.spark.sql.AnalysisException: years(timestamp) is not currently supported at org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.$anonfun$toCatalyst$1(V2ExpressionUtils.scala:71) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.toCatalyst(V2ExpressionUtils.scala:71) at org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$.$anonfun$prepareQuery$2(DistributionAndOrderingUtils.scala:45) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198) at org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$.prepareQuery(DistributionAndOrderingUtils.scala:45) at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:95) at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:43) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488) at org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:43) at org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:39) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222) at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126) at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122) at scala.collection.immutable.List.foldLeft(List.scala:91) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211) at scala.collection.immutable.List.foreach(List.scala:431) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88) at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182) at org.apache.spark.sql.execution.streaming.IncrementalExecution.$anonfun$optimizedPlan$1(IncrementalExecution.scala:91) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201) at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan$lzycompute(IncrementalExecution.scala:83) at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan(IncrementalExecution.scala:83) at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:153) at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:171) at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:168) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:717) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:706) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:284) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:247) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:237) at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:306) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:284) ... 1 more 24/04/22 12:15:45 INFO SparkContext: Invoking stop() from shutdown hook 24/04/22 12:15:45 INFO SparkContext: SparkContext is stopping with exitCode 0. 24/04/22 12:15:45 INFO SparkUI: Stopped Spark web UI at http://ip-10-12-72-49.ap-south-1.compute.internal:4040 24/04/22 12:15:45 INFO StandaloneSchedulerBackend: Shutting down all executors 24/04/22 12:15:45 INFO StandaloneSchedulerBackend$StandaloneDriverEndpoint: Asking each executor to shut down 24/04/22 12:15:45 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 24/04/22 12:15:45 INFO MemoryStore: MemoryStore cleared 24/04/22 12:15:45 INFO BlockManager: BlockManager stopped 24/04/22 12:15:45 INFO BlockManagerMaster: BlockManagerMaster stopped 24/04/22 12:15:45 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 24/04/22 12:15:45 INFO SparkContext: Successfully stopped SparkContext 24/04/22 12:15:45 INFO ShutdownHookManager: Shutdown hook called 24/04/22 12:15:45 INFO ShutdownHookManager: Deleting directory /tmp/spark-bd10ae64-0bf2-4638-a070-2074fe0aeef7 24/04/22 12:15:45 INFO ShutdownHookManager: Deleting directory /tmp/spark-75a1686e-6c8f-483b-9923-0d09834ccbd3`

UtkarshSharma2612 avatar Apr 22 '24 12:04 UtkarshSharma2612

spark version: 3.3.3 , iceberg version: 1.4.3 ,same stack trace for me. my table is:

CREATE TABLE iceberg_catalog.doi.doi_log ( date_time TIMESTAMP, ip STRING, protocol STRING, operation STRING, response STRING, duration STRING, doi STRING) USING iceberg PARTITIONED BY (days(date_time)) TBLPROPERTIES ( 'format' = 'iceberg/parquet', 'format-version' = '2', 'write.metadata.delete-after-commit.enabled' = 'true', 'write.metadata.previous-versions-max' = '20', 'write.parquet.bloom-filter-enabled.column.doi' = 'true', 'write.parquet.compression-codec' = 'zstd', 'write.target-file-size-bytes' = '1073741824')

the error is "org.apache.spark.sql.AnalysisException: days(date_time) is not currently supported" when I set 'write.distribution-mode=none' the error is "days(date_time) ASC NULLS FIRST is not currently supported" My code is like

val kafkaDF = spark read from kafka topic kafkaDF.writeStream .outputMode("append") .queryName("doi") .option("checkpointLocation", "path/to/checkpoint") .option("fanout-enabled", "true") .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES)) .format("iceberg") .toTable("iceberg_catalog.doi.doi_log")

wfxxh avatar Aug 06 '24 07:08 wfxxh

Same error. Any update?

darklord1807 avatar Dec 18 '24 08:12 darklord1807

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

github-actions[bot] avatar Jun 17 '25 00:06 github-actions[bot]

This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale'

github-actions[bot] avatar Jul 01 '25 00:07 github-actions[bot]