[Bug] [Translation][Flink] The Flink engine cannot work properly when the data type is `BasicType.VOID_TYPE`.
Search before asking
- [X] I had searched in the issues and found no similar issues.
What happened
related:
- #4345
The Flink engine cannot work properly when the seatunnel data type is BasicType.VOID_TYPE. I encountered this problem like #4345.
I don't know much about flink either. I tried to find the reason. This should be caused by an internal defect in flink. The following are details of the direct cause of the problem.
public <T> DataStream<T> toAppendStream(Table table, TypeInformation<T> typeInfo) {
OutputConversionModifyOperation modifyOperation =
new OutputConversionModifyOperation(
table.getQueryOperation(),
TypeConversions.fromLegacyInfoToDataType(typeInfo),
OutputConversionModifyOperation.UpdateMode.APPEND);
return toStreamInternal(table, modifyOperation);
}
https://github.com/apache/flink/blob/e78e3cd29aa3f0dc14eeaa9b5a5d912efcbea863/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java#L326-L333
BasicTypeInfo.VOID_TYPE_INTO was converted to one LegacyTypeInformation, and typeRoot of theLegacyTypeInformation is LogicalTypeRoot.RAW.
Finally, DynamicSinkUtils called supportsImplicitCast function, and this function will return false because of LogicalTypeRoot.RAW. Flink took that the two raw types are not equal.
} else if (sourceRoot == RAW
&& !targetType.is(BINARY_STRING)
&& !targetType.is(CHARACTER_STRING)
|| targetRoot == RAW) {
// the two raw types are not equal (from initial invariant), casting is not possible
return false;
} else if (sourceRoot == SYMBOL || targetRoot == SYMBOL) {
https://github.com/apache/flink/blob/c41c8e5cfab683da8135d6c822693ef851d6e2b7/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java#L293-L355
SeaTunnel Version
dev
SeaTunnel Config
env {
parallelism = 1
job.mode = BATCH
# checkpoint.interval = 10000
}
source {
FakeSource {
row.num = 1
schema = {
fields {
c_null = "null"
c_string = string
c_boolean = boolean
}
}
rows = [
{
kind = INSERT
fields = [
null, "AAA", false
]
}
]
result_table_name = "fake"
}
}
sink{
Assert {
source_table_name = "fake"
rules =
{
row_rules = [
{
rule_type = MAX_ROW
rule_value = 1
},
{
rule_type = MIN_ROW
rule_value = 1
}
],
field_rules = [
{
field_name = c_string
field_type = string
field_value = [
{
rule_type = NOT_NULL
equals_to = "AAA"
}
]
},
{
field_name = c_boolean
field_type = boolean
field_value = [
{
rule_type = NOT_NULL
equals_to = false
}
]
}
]
}
}
}
Running Command
no
Error Exception
024-01-24 21:21:56,107 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: Flink job executed failed
at org.apache.seatunnel.core.starter.flink.command.FlinkTaskExecuteCommand.execute(FlinkTaskExecuteCommand.java:63)
at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
at org.apache.seatunnel.example.flink.v2.SeaTunnelApiExample.main(SeaTunnelApiExample.java:39)
Caused by: org.apache.flink.table.api.ValidationException: Column types of query result and sink for 'null' do not match.
Cause: Incompatible types for sink column 'c_null' at position 0.
Query schema: [c_null: RAW('java.lang.Void', '...'), c_string: STRING, c_boolean: BOOLEAN, c_tinyint: TINYINT, c_smallint: SMALLINT, c_int: INT, c_bigint: BIGINT, c_float: FLOAT, c_double: DOUBLE, c_decimal: STRING, c_date: DATE, c_timestamp: TIMESTAMP(9), c_time: TIME(0), c_bytes: BYTES, c_array: ARRAY<INT>, c_map: MAP<STRING, STRING>, c_map_nest: MAP<STRING, ROW<`c_int` INT, `c_string` STRING>>, c_row: ROW<`c_null` RAW('java.lang.Void', '...'), `c_string` STRING, `c_boolean` BOOLEAN, `c_tinyint` TINYINT, `c_smallint` SMALLINT, `c_int` INT, `c_bigint` BIGINT, `c_float` FLOAT, `c_double` DOUBLE, `c_decimal` STRING, `c_date` DATE, `c_timestamp` TIMESTAMP(9), `c_time` TIME(0), `c_bytes` BYTES, `c_array` ARRAY<INT>, `c_map` MAP<STRING, STRING>>]
Sink schema: [c_null: RAW('java.lang.Void', ?), c_string: STRING, c_boolean: BOOLEAN, c_tinyint: TINYINT, c_smallint: SMALLINT, c_int: INT, c_bigint: BIGINT, c_float: FLOAT, c_double: DOUBLE, c_decimal: STRING, c_date: DATE, c_timestamp: TIMESTAMP(3), c_time: TIME(0), c_bytes: BYTES, c_array: ARRAY<INT>, c_map: MAP<STRING, STRING>, c_map_nest: MAP<STRING, ROW<`c_int` INT, `c_string` STRING>>, c_row: ROW<`c_null` RAW('java.lang.Void', ?), `c_string` STRING, `c_boolean` BOOLEAN, `c_tinyint` TINYINT, `c_smallint` SMALLINT, `c_int` INT, `c_bigint` BIGINT, `c_float` FLOAT, `c_double` DOUBLE, `c_decimal` STRING, `c_date` DATE, `c_timestamp` TIMESTAMP(3), `c_time` TIME(0), `c_bytes` BYTES, `c_array` ARRAY<INT>, `c_map` MAP<STRING, STRING>>]
at org.apache.flink.table.planner.connectors.DynamicSinkUtils.createSchemaMismatchException(DynamicSinkUtils.java:453)
at org.apache.flink.table.planner.connectors.DynamicSinkUtils.validateSchemaAndApplyImplicitCast(DynamicSinkUtils.java:265)
at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:287)
at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:185)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:185)
at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:223)
at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:332)
at org.apache.seatunnel.core.starter.flink.utils.TableUtil.tableToDataStream(TableUtil.java:38)
at org.apache.seatunnel.core.starter.flink.execution.FlinkAbstractPluginExecuteProcessor.fromSourceTable(FlinkAbstractPluginExecuteProcessor.java:103)
at org.apache.seatunnel.core.starter.flink.execution.SinkExecuteProcessor.execute(SinkExecuteProcessor.java:96)
at org.apache.seatunnel.core.starter.flink.execution.FlinkExecution.execute(FlinkExecution.java:116)
at org.apache.seatunnel.core.starter.flink.command.FlinkTaskExecuteCommand.execute(FlinkTaskExecuteCommand.java:61)
... 2 more
Zeta or Flink or Spark Version
No response
Java or Scala Version
No response
Screenshots
No response
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.
This issue has been closed because it has not received response for too long time. You could reopen it if you encountered similar problems in the future.