seatunnel icon indicating copy to clipboard operation
seatunnel copied to clipboard

[Bug] [Translation][Flink] The Flink engine cannot work properly when the data type is `BasicType.VOID_TYPE`.

Open CheneyYin opened this issue 1 year ago • 1 comments

Search before asking

  • [X] I had searched in the issues and found no similar issues.

What happened

related:

  • #4345

The Flink engine cannot work properly when the seatunnel data type is BasicType.VOID_TYPE. I encountered this problem like #4345.

I don't know much about flink either. I tried to find the reason. This should be caused by an internal defect in flink. The following are details of the direct cause of the problem.

public <T> DataStream<T> toAppendStream(Table table, TypeInformation<T> typeInfo) {
        OutputConversionModifyOperation modifyOperation =
                new OutputConversionModifyOperation(
                        table.getQueryOperation(),
                        TypeConversions.fromLegacyInfoToDataType(typeInfo),
                        OutputConversionModifyOperation.UpdateMode.APPEND);
        return toStreamInternal(table, modifyOperation);
    }

https://github.com/apache/flink/blob/e78e3cd29aa3f0dc14eeaa9b5a5d912efcbea863/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java#L326-L333

BasicTypeInfo.VOID_TYPE_INTO was converted to one LegacyTypeInformation, and typeRoot of theLegacyTypeInformation is LogicalTypeRoot.RAW.

Finally, DynamicSinkUtils called supportsImplicitCast function, and this function will return false because of LogicalTypeRoot.RAW. Flink took that the two raw types are not equal.

} else if (sourceRoot == RAW
                        && !targetType.is(BINARY_STRING)
                        && !targetType.is(CHARACTER_STRING)
                || targetRoot == RAW) {
            // the two raw types are not equal (from initial invariant), casting is not possible
            return false;
} else if (sourceRoot == SYMBOL || targetRoot == SYMBOL) {

https://github.com/apache/flink/blob/c41c8e5cfab683da8135d6c822693ef851d6e2b7/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java#L293-L355

SeaTunnel Version

dev

SeaTunnel Config

env {
  parallelism = 1
  job.mode = BATCH
  # checkpoint.interval = 10000
}

source {
  FakeSource {
    row.num = 1
    schema = {
      fields {
        c_null = "null"
        c_string = string
        c_boolean = boolean
      }
    }
    rows = [
      {
        kind = INSERT
        fields = [
          null, "AAA", false
        ]
      }
    ]
    result_table_name = "fake"
  }
}

sink{
  Assert {
    source_table_name = "fake"
    rules =
      {
        row_rules = [
          {
            rule_type = MAX_ROW
            rule_value = 1
          },
          {
            rule_type = MIN_ROW
            rule_value = 1
          }
        ],
        field_rules = [
            {
                field_name = c_string
                field_type = string
                field_value = [
                    {
                        rule_type = NOT_NULL
                        equals_to = "AAA"
                    }
                ]
            },
            {
                field_name = c_boolean
                field_type = boolean
                field_value = [
                    {
                        rule_type = NOT_NULL
                        equals_to = false
                    }
                ]
            }
        ]
    }
  }
}

Running Command

no

Error Exception

024-01-24 21:21:56,107 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: Flink job executed failed
	at org.apache.seatunnel.core.starter.flink.command.FlinkTaskExecuteCommand.execute(FlinkTaskExecuteCommand.java:63)
	at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
	at org.apache.seatunnel.example.flink.v2.SeaTunnelApiExample.main(SeaTunnelApiExample.java:39)
Caused by: org.apache.flink.table.api.ValidationException: Column types of query result and sink for 'null' do not match.
Cause: Incompatible types for sink column 'c_null' at position 0.

Query schema: [c_null: RAW('java.lang.Void', '...'), c_string: STRING, c_boolean: BOOLEAN, c_tinyint: TINYINT, c_smallint: SMALLINT, c_int: INT, c_bigint: BIGINT, c_float: FLOAT, c_double: DOUBLE, c_decimal: STRING, c_date: DATE, c_timestamp: TIMESTAMP(9), c_time: TIME(0), c_bytes: BYTES, c_array: ARRAY<INT>, c_map: MAP<STRING, STRING>, c_map_nest: MAP<STRING, ROW<`c_int` INT, `c_string` STRING>>, c_row: ROW<`c_null` RAW('java.lang.Void', '...'), `c_string` STRING, `c_boolean` BOOLEAN, `c_tinyint` TINYINT, `c_smallint` SMALLINT, `c_int` INT, `c_bigint` BIGINT, `c_float` FLOAT, `c_double` DOUBLE, `c_decimal` STRING, `c_date` DATE, `c_timestamp` TIMESTAMP(9), `c_time` TIME(0), `c_bytes` BYTES, `c_array` ARRAY<INT>, `c_map` MAP<STRING, STRING>>]
Sink schema:  [c_null: RAW('java.lang.Void', ?), c_string: STRING, c_boolean: BOOLEAN, c_tinyint: TINYINT, c_smallint: SMALLINT, c_int: INT, c_bigint: BIGINT, c_float: FLOAT, c_double: DOUBLE, c_decimal: STRING, c_date: DATE, c_timestamp: TIMESTAMP(3), c_time: TIME(0), c_bytes: BYTES, c_array: ARRAY<INT>, c_map: MAP<STRING, STRING>, c_map_nest: MAP<STRING, ROW<`c_int` INT, `c_string` STRING>>, c_row: ROW<`c_null` RAW('java.lang.Void', ?), `c_string` STRING, `c_boolean` BOOLEAN, `c_tinyint` TINYINT, `c_smallint` SMALLINT, `c_int` INT, `c_bigint` BIGINT, `c_float` FLOAT, `c_double` DOUBLE, `c_decimal` STRING, `c_date` DATE, `c_timestamp` TIMESTAMP(3), `c_time` TIME(0), `c_bytes` BYTES, `c_array` ARRAY<INT>, `c_map` MAP<STRING, STRING>>]
	at org.apache.flink.table.planner.connectors.DynamicSinkUtils.createSchemaMismatchException(DynamicSinkUtils.java:453)
	at org.apache.flink.table.planner.connectors.DynamicSinkUtils.validateSchemaAndApplyImplicitCast(DynamicSinkUtils.java:265)
	at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:287)
	at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:185)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
	at scala.collection.Iterator.foreach(Iterator.scala:937)
	at scala.collection.Iterator.foreach$(Iterator.scala:937)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
	at scala.collection.IterableLike.foreach(IterableLike.scala:70)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at scala.collection.TraversableLike.map(TraversableLike.scala:233)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:185)
	at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:223)
	at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:332)
	at org.apache.seatunnel.core.starter.flink.utils.TableUtil.tableToDataStream(TableUtil.java:38)
	at org.apache.seatunnel.core.starter.flink.execution.FlinkAbstractPluginExecuteProcessor.fromSourceTable(FlinkAbstractPluginExecuteProcessor.java:103)
	at org.apache.seatunnel.core.starter.flink.execution.SinkExecuteProcessor.execute(SinkExecuteProcessor.java:96)
	at org.apache.seatunnel.core.starter.flink.execution.FlinkExecution.execute(FlinkExecution.java:116)
	at org.apache.seatunnel.core.starter.flink.command.FlinkTaskExecuteCommand.execute(FlinkTaskExecuteCommand.java:61)
	... 2 more

Zeta or Flink or Spark Version

No response

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

CheneyYin avatar Jan 24 '24 14:01 CheneyYin

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.

github-actions[bot] avatar Feb 25 '24 00:02 github-actions[bot]

This issue has been closed because it has not received response for too long time. You could reopen it if you encountered similar problems in the future.

github-actions[bot] avatar Mar 06 '24 00:03 github-actions[bot]