seatunnel icon indicating copy to clipboard operation
seatunnel copied to clipboard

[Bug] [api-draft] Using the transform sql plugin will cause the field data type to change

Open ic4y opened this issue 2 years ago • 1 comments

Search before asking

  • [X] I had searched in the issues and found no similar issues.

What happened

I found in the test that using the transform sql plugin (select * from type_test_number) will cause the field data type to change

Changed from BigInteger to GenericType<java.math.BigInteger> This would result in a data type mismatch. then the program fails

If you cancel the Transform SQL, you won't have any problems

SeaTunnel Version

api-draft branch

SeaTunnel Config

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
  # You can set flink configuration here
  execution.parallelism = 1
  job.mode = "STREAMING"
#   job.mode = "BATCH"
  #execution.checkpoint.interval = 10000
  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}

source {
  # This is a example source plugin **only for test and demonstrate the feature source plugin**
    jdbc {
        url = "jdbc:mysql://localhost/test?serverTimezone=GMT%2b8"
        driver = "com.mysql.cj.jdbc.Driver"
        connection_check_timeout_sec = 10
        user = "root"
        password = "123456"
        query = "select * from type_number"
        parallelism = 1
        result_table_name = "type_test_number"
    }

  # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
  # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
}

transform {
    sql {
      sql = "select * from type_test_number"
    }

  # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
  # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
}

sink {
  Console {}

  # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
  # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
}

Running Command

None

Error Exception

Caused by: org.apache.flink.table.codegen.CodeGenException: Incompatible types of expression and result type. Expression[GeneratedExpression(result$22,isNull$23,,BigInteger,false)] type is [BigInteger], result type is [GenericType<java.math.BigInteger>]
	at org.apache.flink.table.codegen.CodeGenerator$$anonfun$generateResultExpression$2.apply(CodeGenerator.scala:379)
	at org.apache.flink.table.codegen.CodeGenerator$$anonfun$generateResultExpression$2.apply(CodeGenerator.scala:377)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.flink.table.codegen.CodeGenerator.generateResultExpression(CodeGenerator.scala:377)
	at org.apache.flink.table.codegen.CodeGenerator.generateConverterResultExpression(CodeGenerator.scala:295)
	at org.apache.flink.table.plan.nodes.datastream.StreamScan$class.generateConversionProcessFunction(StreamScan.scala:115)
	at org.apache.flink.table.plan.nodes.datastream.StreamScan$class.convertToInternalRow(StreamScan.scala:74)
	at org.apache.flink.table.plan.nodes.datastream.DataStreamScan.convertToInternalRow(DataStreamScan.scala:46)
	at org.apache.flink.table.plan.nodes.datastream.DataStreamScan.translateToPlan(DataStreamScan.scala:90)
	at org.apache.flink.table.plan.nodes.datastream.DataStreamSink.translateInput(DataStreamSink.scala:189)
	at org.apache.flink.table.plan.nodes.datastream.DataStreamSink.writeToSink(DataStreamSink.scala:84)
	at org.apache.flink.table.plan.nodes.datastream.DataStreamSink.translateToPlan(DataStreamSink.scala:59)
	at org.apache.flink.table.planner.StreamPlanner.org$apache$flink$table$planner$StreamPlanner$$translateToCRow(StreamPlanner.scala:278)
	at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:123)
	at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:120)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:120)
	at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toStreamInternal(StreamTableEnvironmentImpl.java:439)
	at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:528)
	at org.apache.seatunnel.flink.util.TableUtil.tableToDataStream(TableUtil.java:46)
	at org.apache.seatunnel.flink.transform.Sql.processStream(Sql.java:53)
	at org.apache.seatunnel.core.starter.flink.execution.TransformExecuteProcessor.execute(TransformExecuteProcessor.java:75)
	... 4 more

Flink or Spark Version

No response

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

ic4y avatar Jun 17 '22 04:06 ic4y

Can I close this issue, cause #2052 already merged?

Hisoka-X avatar Jun 29 '22 02:06 Hisoka-X