seatunnel icon indicating copy to clipboard operation
seatunnel copied to clipboard

[Bug] [spark] The op field conflicts with the internal field of the Seatunnel Spark engine and cannot be read from the data source.

Open lizc9 opened this issue 1 year ago • 4 comments

Search before asking

  • [X] I had searched in the issues and found no similar issues.

What happened

When I read the op field from json, and defind it as string type, I get the following error: java.lang.ClassCastException: java.lang.Byte cannot be cast to org.apache.spark.unsafe.types.UTF8String This is because of the following code, it defind op column as byte type: org.apache.seatunnel.translation.spark.serialization.InternalRowConverter#convert(org.apache.seatunnel.api.table.type.SeaTunnelRow, org.apache.seatunnel.api.table.type.SeaTunnelRowType)

SeaTunnel Version

2.3.6

SeaTunnel Config

# Defining the runtime environment
env {
  parallelism = 1
  job.mode = "BATCH"
}
source {
  Kafka {
    topic = "test"
    bootstrap.servers = "localhost:9092"
    consumer.group = "seatunnel"
    format = "json"
    kafka.config = {
      max.poll.records = 1000
      auto.offset.reset = "earliest"
      enable.auto.commit = "false"
    },
    schema = {
        columns = [
            {
              name = before
              type = string
              nullable = true
            },
            {
              name = after
              type = string
              nullable = true
            },
            {
              name = source
              type = string
              nullable = true
            },
            {
              name = op
              type = string
              nullable = true
            },
            {
              name = ts_ms
              type = bigint
              nullable = true
            }
        ]
    }
  }
}


transform {
}

sink {
  Console {
  }
}

Running Command

./bin/start-seatunnel-spark-3-connector-v2.sh --master "local[4]" --deploy-mode client --config ./config/batch.conf

Error Exception

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (local executor driver): java.lang.ClassCastException: java.lang.Byte cannot be cast to org.apache.spark.unsafe.types.UTF8String
	at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getUTF8String(rows.scala:46)
	at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getUTF8String$(rows.scala:46)
	at org.apache.spark.sql.catalyst.expressions.SpecificInternalRow.getUTF8String(SpecificInternalRow.scala:193)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:32)
	at org.apache.seatunnel.core.starter.spark.execution.TransformExecuteProcessor$TransformIterator.hasNext(TransformExecuteProcessor.java:174)
	at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:45)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:435)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1538)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:480)
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:381)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Zeta or Flink or Spark Version

Spark: 3.3

Java or Scala Version

java 1.8

Screenshots

image image

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

lizc9 avatar Aug 13 '24 13:08 lizc9

Can you provide a message data case? I'll follow up.

zhilinli123 avatar Aug 14 '24 09:08 zhilinli123

hi , @lizc9 Previously op was a reserved field, but now it is open and you can refer to the latest dev

Carl-Zhou-CN avatar Aug 16 '24 05:08 Carl-Zhou-CN

Can you provide a message data case? I'll follow up. kafka key:

{
    "id": "c-ecaae896-0bb8-3a37-a142-ef8b7e662900"
}

kafka content:

{
  "before": {
    "id": "c-ecaae896-0bb8-3a37-a142-ef8b7e662900",
    "ingestion_time": 1723018622139909,
    "city": "unknown"
  },
  "after": {
    "id": "c-ecaae896-0bb8-3a37-a142-ef8b7e662900",
    "ingestion_time": 1723018622139909,
    "city": "unknown"
  },
  "source": {
    "version": "2.0.1.Final",
    "connector": "postgresql",
    "name": "polar",
    "ts_ms": 1723018690396,
    "snapshot": "false",
    "db": "prod_datastore01",
    "sequence": "[\"343153176654624\",\"343153176654624\"]",
    "schema": "public",
    "table": "clips",
    "txId": 2982233777,
    "lsn": 343153176654624,
    "xmin": null
  },
  "op": "u",
  "ts_ms": 1723018690754,
  "transaction": null
}

lizc9 avatar Aug 19 '24 11:08 lizc9

hi , @lizc9 Previously op was a reserved field, but now it is open and you can refer to the latest dev

Thanks, may I ask in which Seatunnel version this feature will be released? Is there a release plan?

lizc9 avatar Aug 19 '24 12:08 lizc9

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.

github-actions[bot] avatar Jul 12 '25 00:07 github-actions[bot]

This issue has been closed because it has not received response for too long time. You could reopen it if you encountered similar problems in the future.

github-actions[bot] avatar Jul 20 '25 00:07 github-actions[bot]