qbeast-spark icon indicating copy to clipboard operation
qbeast-spark copied to clipboard

Error when indexing a table with BIGINT

Open osopardo1 opened this issue 1 year ago • 2 comments
trafficstars

What went wrong?

When creating a table using BIGINT on a date column and inserting a set of 10 rows, a ScalaMatch error appears.

We would need to investigate the flow for the BIGINT type. Is weird, because when using CREATE TABLE(id LONG) it maps the type to BIGINT in the SHOW CREATE TABLE statement. But the indexing works ok 🤔

How to reproduce?

Different steps about how to reproduce the problem.

1. Code that triggered the bug, or steps to reproduce:

spark.sql("CREATE TABLE table(date BIGINT) USING qbeast OPTIONS('columnsToIndex'='date')")
spark.sql("INSERT INTO table SELECT temp_date FROM temp_view")

2. Branch and commit id:

main

3. Spark version:

On the spark shell run spark.version.

3.5.0

4. Hadoop version:

On the spark shell run org.apache.hadoop.util.VersionInfo.getVersion().

3.3.4

5. How are you running Spark?

Are you running Spark inside a container? Are you launching the app on a remote K8s cluster? Or are you just running the tests in a local computer?

Local

6. Stack trace:

Trace of the log/error messages.


java.lang.illegalargumentexception: requirement failed
        at scala.Predef$.require(Predef.scala:268)
        at io.qbeast.core.model.CubeId$.containers(CubeId.scala:88)
        at io.qbeast.core.model.CubeDomainsBuilder.computeWeightsAndSizes(CubeDomainsBuilder.scala:132)
        at io.qbeast.core.model.CubeDomainsBuilder.resultInternal(CubeDomainsBuilder.scala:118)
        at io.qbeast.core.model.CubeDomainsBuilder.result(CubeDomainsBuilder.scala:114)
        at io.qbeast.spark.index.DoublePassOTreeDataAnalyzer$.$anonfun$computePartitionCubeDomains$6(OTreeDataAnalyzer.scala:177)
        at org.apache.spark.sql.execution.MapPartitionsExec.$anonfun$doExecute$3(objects.scala:198)

osopardo1 avatar Sep 16 '24 08:09 osopardo1

I am not able to reproduce the error using a synthetic dataset in main.

Need to:

  • Try with the original data that was written.
  • Test with the previous version (0.7.0). Will try this before asking for the data.

osopardo1 avatar Oct 17 '24 09:10 osopardo1

Cannot reproduce with version 0.7.0 :/ Will try with a subset of the original data.

osopardo1 avatar Oct 17 '24 09:10 osopardo1

@cdelfosse needs to provide a way to reproduce.

fpj avatar Oct 29 '24 13:10 fpj

I can reproduce this bug in the qbeast demo data studio (https://qbeast-demo.cloud.qbeast.io/), but not locally on my laptop.

  • Create first a coder workspace, with 2 CPU and 6 GB of RAM.
  • Start it
  • Modify .spark/spark-defaults.conf:
    • set worker role to spark-arm-worker instead of spark-worker
    • set spark.dynamicAllocation.maxExecutors to 4 instead of 32
  • then run this in a workspace terminal:
$ spark-sql
> use qbeast_demo;

> CREATE TABLE test (
    date BIGINT
)
USING qbeast
OPTIONS (columnsToIndex "date", cubeSize "100000")
LOCATION 's3://qbeast-demo/test/';

-- Transform the date which is a string as a bigint, and insert into test
> INSERT INTO TABLE test SELECT CAST((unix_timestamp(to_timestamp(date, "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")) * 1000 + SUBSTRING(date, 21, 3)) AS BIGINT) AS timestamp_bigint FROM datadog_logs_qbeast limit 10;

24/10/31 09:02:52 WARN TaskSetManager: Lost task 0.0 in stage 48.0 (TID 8514) (192.168.117.62 executor 1): java.lang.IllegalArgumentException: requirement failed
        at scala.Predef$.require(Predef.scala:268)
        at io.qbeast.core.model.CubeId$.containers(CubeId.scala:88)
        at io.qbeast.core.model.CubeDomainsBuilder.computeWeightsAndSizes(CubeDomainsBuilder.scala:132)
        at io.qbeast.core.model.CubeDomainsBuilder.resultInternal(CubeDomainsBuilder.scala:118)
        at io.qbeast.core.model.CubeDomainsBuilder.result(CubeDomainsBuilder.scala:114)
        at io.qbeast.spark.index.DoublePassOTreeDataAnalyzer$.$anonfun$computePartitionCubeDomains$6(OTreeDataAnalyzer.scala:177)
        at org.apache.spark.sql.execution.MapPartitionsExec.$anonfun$doExecute$3(objects.scala:198)
...

cdelfosse avatar Oct 31 '24 08:10 cdelfosse

I made a video of how to reproduce the bug: https://drive.google.com/file/d/1G9bUKJqV-b6O8IykHl6tFwzvNk1cGU3Z/view

Here is the backtrace at the end:

24/11/04 11:34:48 WARN TaskSetManager: Lost task 0.0 in stage 54.0 (TID 11322) (192.168.126.12 executor 1): org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (`SparkPointWeightIndexer$$Lambda$5782/0x00007f2b45836d18`: (struct<date:bigint>, int) => array<binary>).
        at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:198)
        at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.generate_doConsume_0$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
        at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
        at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
        at org.apache.spark.scheduler.Task.run(Task.scala:141)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
        at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
        at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.IllegalArgumentException: requirement failed
        at scala.Predef$.require(Predef.scala:268)
        at io.qbeast.core.model.CubeId$.containers(CubeId.scala:88)
        at io.qbeast.core.model.PointWeightIndexer.findTargetCubeIds(PointWeightIndexer.scala:44)
        at io.qbeast.spark.index.SparkPointWeightIndexer.$anonfun$findTargetCubeIdsUDF$1(SparkPointWeightIndexer.scala:38)
        at io.qbeast.spark.index.SparkPointWeightIndexer.$anonfun$findTargetCubeIdsUDF$1$adapted(SparkPointWeightIndexer.scala:35)
        ... 19 more

cdelfosse avatar Nov 04 '24 10:11 cdelfosse

Okei, this issue seems to be turning out far more complex than we initially thought. I'm still trying to reproduce the behavior locally with a small amount of data, but here are my findings:

  • For context: when indexing a DataFrame, we load it twice. One for analyzing the min/max count values of the columns, and other to actually index each row.
  • I've experienced the same error with different configurations of maxNumberOfExecutors. But using just 1 give no problems.
  • Using the LIMIT clause gives non-deterministic results when analyzing and indexing the DataFrame. This makes the initial min/max of the DataFrame different that the final inserted data. And we need the min/max for normalizing the values to the [0.0, 1.0) space. That is why CubeId fails: it detects a value outside this range and does not know how to create the particular cube.
  • Appending new data to the source table while indexing can give the same inconsistency, because min/max is affected after being computed. Example:
    • Insert Data (1, 2, 3)
    • Analyze (1, 2, 3) = min:1, max:3
    • Insert Data (4, 5, 6)
    • Finish inserting (4, 5, 6)
    • Index (1, 2, 3, 4, 5, 6) -> (4, 5, 6) falls out of min:1, max:3 -> error
  • This behavior is not experienced in smaller scales, because the time window between Analyzing the data frame and Indexing is not big enough. But if we are talking about large amounts of data, I think it can cause inconsistencies.

A solution:

  • Use columnStats to hardcode the Revision parameters. If we index timestamps, we can create a Revision that goes from the first date + one year forward.
CREATE TABLE test (date BIGINT) 
USING qbeast 
OPTIONS (
    columnsToIndex = "date",
    cubeSize = "100000",
    columnStats = "{\"date_min\": 1720766770250, \"date_max\": 1730905198741}"
);

osopardo1 avatar Nov 07 '24 13:11 osopardo1

I've tested the solution proposed (columnStats), and the error does not show anymore. It fails due to a task killed when arriving at the DataWriter stage, but that is another issue.

Another worrying problem might be the usage of dynamicAllocation.

As far as I understand, dynamicAllocation asks for resources if needed. Are we sure that these resources have access to the cached data (which I don't think so)? If that is the case, it can be possible to face inconsistencies when doing the double pass through the DataFrame. That would explain why experiments with a single core are deterministic.

osopardo1 avatar Nov 12 '24 06:11 osopardo1

What I am sure right now is that this issue should be close, and I would open another one with a better explanation.

osopardo1 avatar Nov 12 '24 06:11 osopardo1

I assumed that the Source Table was in Delta format, but it was a mistake. Source is Qbeast, and destination is Qbeast too. The error can be reproduced locally with this code:

Code to Reproduce:

     // Define the test data
    val df1 = spark.range(1, 100000).toDF("id") // bigger append

    // Specify the path where data will be written
    val destinationTable = "append_test_table_destination"
    val sourceTable = "append_test_table_source"
    val destinationPath = s"$tmpWarehouse/$destinationTable"
    val sourcePath = s"$tmpWarehouse/$sourceTable"

    spark.sql(s"""CREATE TABLE $sourceTable (id LONG)
         | USING qbeast LOCATION '$sourcePath'
         | OPTIONS (columnsToIndex 'id', cubeSize '100')""".stripMargin)
    spark.sql(s"""CREATE TABLE $destinationTable (id LONG)
         | USING qbeast LOCATION '$destinationPath'
         | OPTIONS (columnsToIndex 'id', cubeSize '100')""".stripMargin)

    // Define the append function
    def appendDataSource(df: org.apache.spark.sql.DataFrame): Unit = {
      df.write.format("qbeast").mode("append").save(sourcePath)
    }

    def appendDataDestinationQbeast(): Unit = {
      spark.sql(s"INSERT INTO $destinationTable SELECT id FROM $sourceTable")
    }

    // First append
    println("FIRST APPEND TO SOURCE")
    appendDataSource(df1)

    // Define thread for append1
    val thread1 = new Thread(new Runnable {
      def run(): Unit = {
        try {
          println("Starting append in DESTINATION...")
          appendDataDestinationQbeast() // Perform append2
          println("append in DESTINATION completed.")
        } catch {
          case e: Exception =>
            println(s"Error on INSERT INTO: ${e.getStackTrace.mkString("\n")}}")
        }
      }
    })

    // Define thread for append2

    val thread2 = new Thread(new Runnable {
      def run(): Unit = {
        println("Starting append in SOURCE...")
        val numberOfSmallAppends = 10
        var count = 0
        var lastRange = 100000
        while (count < numberOfSmallAppends) {
          println(s"Appending $count")
          val df = spark.range(lastRange, lastRange + 10).toDF("id")
          try {
            appendDataSource(df) // Perform append1
            count += 1
            lastRange += 10
            println(s"Appended $count finished")
          } catch {
            case e: Exception =>
              println(s"Error on SMALL APPEND: ${e.getStackTrace.mkString("\n")}}")
              return
          }
        }
        println("append in SOURCE completed")
      }
    })

    // Start both threads
    thread1.start()
    thread2.start()

    // Wait for both threads to complete
    thread1.join()
    thread2.join()

    // Verify the results
    val resultSourceDF = spark.read.format("qbeast").load(sourcePath)
    val resultDestinationDF = spark.read.format("qbeast").load(destinationPath)

    // Check row count to verify both DataFrames were appended
    val expectedRowCount = resultSourceDF.count()
    val actualRowCount = resultDestinationDF.count()

    assert(
      actualRowCount == expectedRowCount,
      s"Test failed! Expected $expectedRowCount rows but found $actualRowCount.")

StackTrace

Error on INSERT INTO: org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
scala.Option.foreach(Option.scala:407)
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)}
24/11/12 13:43:20 WARN TaskSetManager: Lost task 14.0 in stage 81.0 (TID 741) (192.168.1.132 executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 17 in stage 81.0 failed 1 times, most recent failure: Lost task 17.0 in stage 81.0 (TID 744) (192.168.1.132 executor driver): org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (`SparkPointWeightIndexer$$Lambda$5406/455132947`: (struct<id:bigint>, int) => array<binary>).
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:198)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.generate_doConsume_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:168)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.IllegalArgumentException: requirement failed
	at scala.Predef$.require(Predef.scala:268)
	at io.qbeast.core.model.CubeId$.containers(CubeId.scala:88)
	at io.qbeast.core.model.PointWeightIndexer.findTargetCubeIds(PointWeightIndexer.scala:44)
	at io.qbeast.spark.index.SparkPointWeightIndexer.$anonfun$findTargetCubeIdsUDF$1(SparkPointWeightIndexer.scala:38)
	at io.qbeast.spark.index.SparkPointWeightIndexer.$anonfun$findTargetCubeIdsUDF$1$adapted(SparkPointWeightIndexer.scala:35)
	... 20 more

osopardo1 avatar Nov 12 '24 12:11 osopardo1

Closing this issue to investigate the error further!! Seems to be related to the QbeastSnapshot handling when loading a DataFrame with Qbeast.

osopardo1 avatar Nov 13 '24 06:11 osopardo1