seatunnel
seatunnel copied to clipboard
[Bug] [api-draft] Using the transform sql plugin will cause the field data type to change
Search before asking
- [X] I had searched in the issues and found no similar issues.
What happened
I found in the test that using the transform sql plugin (select * from type_test_number
) will cause the field data type to change
Changed from BigInteger
to GenericType<java.math.BigInteger>
This would result in a data type mismatch. then the program fails
If you cancel the Transform SQL, you won't have any problems
SeaTunnel Version
api-draft branch
SeaTunnel Config
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######
env {
# You can set flink configuration here
execution.parallelism = 1
job.mode = "STREAMING"
# job.mode = "BATCH"
#execution.checkpoint.interval = 10000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
jdbc {
url = "jdbc:mysql://localhost/test?serverTimezone=GMT%2b8"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 10
user = "root"
password = "123456"
query = "select * from type_number"
parallelism = 1
result_table_name = "type_test_number"
}
# If you would like to get more information about how to configure seatunnel and see full list of source plugins,
# please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
}
transform {
sql {
sql = "select * from type_test_number"
}
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
}
sink {
Console {}
# If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
# please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
}
Running Command
None
Error Exception
Caused by: org.apache.flink.table.codegen.CodeGenException: Incompatible types of expression and result type. Expression[GeneratedExpression(result$22,isNull$23,,BigInteger,false)] type is [BigInteger], result type is [GenericType<java.math.BigInteger>]
at org.apache.flink.table.codegen.CodeGenerator$$anonfun$generateResultExpression$2.apply(CodeGenerator.scala:379)
at org.apache.flink.table.codegen.CodeGenerator$$anonfun$generateResultExpression$2.apply(CodeGenerator.scala:377)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.flink.table.codegen.CodeGenerator.generateResultExpression(CodeGenerator.scala:377)
at org.apache.flink.table.codegen.CodeGenerator.generateConverterResultExpression(CodeGenerator.scala:295)
at org.apache.flink.table.plan.nodes.datastream.StreamScan$class.generateConversionProcessFunction(StreamScan.scala:115)
at org.apache.flink.table.plan.nodes.datastream.StreamScan$class.convertToInternalRow(StreamScan.scala:74)
at org.apache.flink.table.plan.nodes.datastream.DataStreamScan.convertToInternalRow(DataStreamScan.scala:46)
at org.apache.flink.table.plan.nodes.datastream.DataStreamScan.translateToPlan(DataStreamScan.scala:90)
at org.apache.flink.table.plan.nodes.datastream.DataStreamSink.translateInput(DataStreamSink.scala:189)
at org.apache.flink.table.plan.nodes.datastream.DataStreamSink.writeToSink(DataStreamSink.scala:84)
at org.apache.flink.table.plan.nodes.datastream.DataStreamSink.translateToPlan(DataStreamSink.scala:59)
at org.apache.flink.table.planner.StreamPlanner.org$apache$flink$table$planner$StreamPlanner$$translateToCRow(StreamPlanner.scala:278)
at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:123)
at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:120)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:120)
at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toStreamInternal(StreamTableEnvironmentImpl.java:439)
at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:528)
at org.apache.seatunnel.flink.util.TableUtil.tableToDataStream(TableUtil.java:46)
at org.apache.seatunnel.flink.transform.Sql.processStream(Sql.java:53)
at org.apache.seatunnel.core.starter.flink.execution.TransformExecuteProcessor.execute(TransformExecuteProcessor.java:75)
... 4 more
Flink or Spark Version
No response
Java or Scala Version
No response
Screenshots
No response
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Can I close this issue, cause #2052 already merged?