[Bug] [spark] The op field conflicts with the internal field of the Seatunnel Spark engine and cannot be read from the data source.
Search before asking
- [X] I had searched in the issues and found no similar issues.
What happened
When I read the op field from json, and defind it as string type, I get the following error:
java.lang.ClassCastException: java.lang.Byte cannot be cast to org.apache.spark.unsafe.types.UTF8String
This is because of the following code, it defind op column as byte type:
org.apache.seatunnel.translation.spark.serialization.InternalRowConverter#convert(org.apache.seatunnel.api.table.type.SeaTunnelRow, org.apache.seatunnel.api.table.type.SeaTunnelRowType)
SeaTunnel Version
2.3.6
SeaTunnel Config
# Defining the runtime environment
env {
parallelism = 1
job.mode = "BATCH"
}
source {
Kafka {
topic = "test"
bootstrap.servers = "localhost:9092"
consumer.group = "seatunnel"
format = "json"
kafka.config = {
max.poll.records = 1000
auto.offset.reset = "earliest"
enable.auto.commit = "false"
},
schema = {
columns = [
{
name = before
type = string
nullable = true
},
{
name = after
type = string
nullable = true
},
{
name = source
type = string
nullable = true
},
{
name = op
type = string
nullable = true
},
{
name = ts_ms
type = bigint
nullable = true
}
]
}
}
}
transform {
}
sink {
Console {
}
}
Running Command
./bin/start-seatunnel-spark-3-connector-v2.sh --master "local[4]" --deploy-mode client --config ./config/batch.conf
Error Exception
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (local executor driver): java.lang.ClassCastException: java.lang.Byte cannot be cast to org.apache.spark.unsafe.types.UTF8String
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getUTF8String(rows.scala:46)
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getUTF8String$(rows.scala:46)
at org.apache.spark.sql.catalyst.expressions.SpecificInternalRow.getUTF8String(SpecificInternalRow.scala:193)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:32)
at org.apache.seatunnel.core.starter.spark.execution.TransformExecuteProcessor$TransformIterator.hasNext(TransformExecuteProcessor.java:174)
at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:45)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:435)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1538)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:480)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:381)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Zeta or Flink or Spark Version
Spark: 3.3
Java or Scala Version
java 1.8
Screenshots
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Can you provide a message data case? I'll follow up.
hi , @lizc9 Previously op was a reserved field, but now it is open and you can refer to the latest dev
Can you provide a message data case? I'll follow up. kafka key:
{
"id": "c-ecaae896-0bb8-3a37-a142-ef8b7e662900"
}
kafka content:
{
"before": {
"id": "c-ecaae896-0bb8-3a37-a142-ef8b7e662900",
"ingestion_time": 1723018622139909,
"city": "unknown"
},
"after": {
"id": "c-ecaae896-0bb8-3a37-a142-ef8b7e662900",
"ingestion_time": 1723018622139909,
"city": "unknown"
},
"source": {
"version": "2.0.1.Final",
"connector": "postgresql",
"name": "polar",
"ts_ms": 1723018690396,
"snapshot": "false",
"db": "prod_datastore01",
"sequence": "[\"343153176654624\",\"343153176654624\"]",
"schema": "public",
"table": "clips",
"txId": 2982233777,
"lsn": 343153176654624,
"xmin": null
},
"op": "u",
"ts_ms": 1723018690754,
"transaction": null
}
hi , @lizc9 Previously op was a reserved field, but now it is open and you can refer to the latest dev
Thanks, may I ask in which Seatunnel version this feature will be released? Is there a release plan?
This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.
This issue has been closed because it has not received response for too long time. You could reopen it if you encountered similar problems in the future.