[Bug] [JDBC Connector] JDBC Source lost data,when the data of partition_column is null.
Search before asking
- [X] I had searched in the issues and found no similar issues.
What happened
JDBC Source lost data,when the data of partition_column is null. https://github.com/apache/incubator-seatunnel/blob/b09d379489bc906fa5916334ed5d4190e9a1125f/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java#L152 Function of max() and min() will ignore the null .
https://github.com/apache/incubator-seatunnel/blob/b09d379489bc906fa5916334ed5d4190e9a1125f/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java#L115
May be consider the first part
"SELECT * FROM (%s) tt where %s <= ?"
and las part
"SELECT * FROM (%s) tt where %s >= ?"
or
add others part
"SELECT * FROM (%s) tt where %s > max or %s < min ?",
SeaTunnel Version
2.3.1-SNAPSHOT
SeaTunnel Config
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
env {
execution.parallelism = 3
job.mode = "BATCH"
}
source{
jdbc{
url = "jdbc:mysql://mysql-e2e:3306/seatunnel"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = "Abc!@#135_seatunnel"
query = "select * from source"
partition_column = "c_integer"
partition_num = 3
result_table_name = "jdbc"
}
}
transform {
}
sink {
jdbc {
url = "jdbc:mysql://mysql-e2e:3306/seatunnel"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = "Abc!@#135_seatunnel"
connection_check_timeout_sec = 100
query = """insert into sink (c_bit_1, c_bit_8, c_bit_16, c_bit_32, c_bit_64, c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint, c_smallint_unsigned,
c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned,
c_decimal, c_decimal_unsigned, c_float, c_float_unsigned, c_double, c_double_unsigned,
c_char, c_tinytext, c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date,
c_datetime, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary,
c_binary, c_year, c_int_unsigned, c_integer_unsigned)
values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"""
}
}
Running Command
/tmp/seatunnel/bin/start-seatunnel-flink-15-connector-v2.sh --config /tmp/jdbc_mysql_source_and_sink.conf
Error Exception
No Exception
Flink or Spark Version
No response
Java or Scala Version
JDK1.8
Screenshots
No response
Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
Code of Conduct
- [x] I agree to follow this project's Code of Conduct
@ic4y PTAL
Such a situation could occur if the partition_column has null values, but our recommendation is to use the primary key column for partition_column. Of course, there are also measures that can be taken to prevent this from happening, such as adding checks to see if the partition_column contains null values, and stopping execution if they exist. What do you think about this? @choucmei
Can we shard the data directly, just like the snapshot stage of mysqlflinkcdc like this https://github.com/ververica/flink-cdc-connectors/blob/2d0cfeb1d3aca481004d146176fcc3a08ed31d86/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java#L61
Can we shard the data directly, just like the snapshot stage of mysqlflinkcdc like this https://github.com/ververica/flink-cdc-connectors/blob/2d0cfeb1d3aca481004d146176fcc3a08ed31d86/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java#L61
Is it possible without partition_column?
Can we shard the data directly, just like the snapshot stage of mysqlflinkcdc like this https://github.com/ververica/flink-cdc-connectors/blob/2d0cfeb1d3aca481004d146176fcc3a08ed31d86/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java#L61
Such a situation could occur if the partition_column has null values, but our recommendation is to use the primary key column for partition_column. Of course, there are also measures that can be taken to prevent this from happening, such as adding checks to see if the partition_column contains null values, and stopping execution if they exist. What do you think about this? @choucmei
USing PK or stopping this can be solved, but not graceful. I perfer use
first part "SELECT * FROM (%s) tt where %s <= ?" and las part "SELECT * FROM (%s) tt where %s >= ?"
,and this solution is similar to FlinkCDC
Can we shard the data directly, just like the snapshot stage of mysqlflinkcdc like this https://github.com/ververica/flink-cdc-connectors/blob/2d0cfeb1d3aca481004d146176fcc3a08ed31d86/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java#L61
first part "SELECT * FROM (%s) tt where %s <= ?" and las part "SELECT * FROM (%s) tt where %s >= ?"
, this solution is similar to FlinkCDC
Can we shard the data directly, just like the snapshot stage of mysqlflinkcdc like this https://github.com/ververica/flink-cdc-connectors/blob/2d0cfeb1d3aca481004d146176fcc3a08ed31d86/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java#L61
first part "SELECT * FROM (%s) tt where %s <= ?" and las part "SELECT * FROM (%s) tt where %s >= ?", this solution is similar to FlinkCDC
That's nice too, could you contribute this PR?
This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.
This issue has been closed because it has not received response for too long time. You could reopen it if you encountered similar problems in the future.