seatunnel icon indicating copy to clipboard operation
seatunnel copied to clipboard

[Bug] [JDBC Connector] JDBC Source lost data,when the data of partition_column is null.

Open choucmei opened this issue 2 years ago • 7 comments

Search before asking

  • [X] I had searched in the issues and found no similar issues.

What happened

JDBC Source lost data,when the data of partition_column is null. https://github.com/apache/incubator-seatunnel/blob/b09d379489bc906fa5916334ed5d4190e9a1125f/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java#L152 Function of max() and min() will ignore the null .

https://github.com/apache/incubator-seatunnel/blob/b09d379489bc906fa5916334ed5d4190e9a1125f/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java#L115 May be consider the first part "SELECT * FROM (%s) tt where %s <= ?" and las part "SELECT * FROM (%s) tt where %s >= ?" or add others part "SELECT * FROM (%s) tt where %s > max or %s < min ?",

SeaTunnel Version

2.3.1-SNAPSHOT

SeaTunnel Config

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

env {
  execution.parallelism = 3
  job.mode = "BATCH"
}

source{
    jdbc{
        url = "jdbc:mysql://mysql-e2e:3306/seatunnel"
        driver = "com.mysql.cj.jdbc.Driver"
        user = "root"
        password = "Abc!@#135_seatunnel"
        query = "select * from source"
        partition_column = "c_integer"
        partition_num = 3

        result_table_name = "jdbc"
    }
}

transform {
}

sink {
    jdbc {

        url = "jdbc:mysql://mysql-e2e:3306/seatunnel"
        driver = "com.mysql.cj.jdbc.Driver"
        user = "root"
        password = "Abc!@#135_seatunnel"
        connection_check_timeout_sec = 100
        query = """insert into sink (c_bit_1, c_bit_8, c_bit_16, c_bit_32, c_bit_64, c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint, c_smallint_unsigned,
                                                c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned,
                                                c_decimal, c_decimal_unsigned, c_float, c_float_unsigned, c_double, c_double_unsigned,
                                                c_char, c_tinytext, c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date,
                                                c_datetime, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary,
                                                c_binary, c_year, c_int_unsigned, c_integer_unsigned)
                   values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"""
    }
}

Running Command

/tmp/seatunnel/bin/start-seatunnel-flink-15-connector-v2.sh --config /tmp/jdbc_mysql_source_and_sink.conf

Error Exception

No Exception

Flink or Spark Version

No response

Java or Scala Version

JDK1.8

Screenshots

No response

Are you willing to submit PR?

  • [X] Yes I am willing to submit a PR!

Code of Conduct

choucmei avatar May 10 '23 09:05 choucmei

@ic4y PTAL

EricJoy2048 avatar May 10 '23 19:05 EricJoy2048

Such a situation could occur if the partition_column has null values, but our recommendation is to use the primary key column for partition_column. Of course, there are also measures that can be taken to prevent this from happening, such as adding checks to see if the partition_column contains null values, and stopping execution if they exist. What do you think about this? @choucmei

ic4y avatar May 11 '23 11:05 ic4y

Can we shard the data directly, just like the snapshot stage of mysqlflinkcdc like this https://github.com/ververica/flink-cdc-connectors/blob/2d0cfeb1d3aca481004d146176fcc3a08ed31d86/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java#L61

liugddx avatar May 15 '23 01:05 liugddx

Can we shard the data directly, just like the snapshot stage of mysqlflinkcdc like this https://github.com/ververica/flink-cdc-connectors/blob/2d0cfeb1d3aca481004d146176fcc3a08ed31d86/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java#L61

Is it possible without partition_column?

liugddx avatar May 15 '23 01:05 liugddx

Can we shard the data directly, just like the snapshot stage of mysqlflinkcdc like this https://github.com/ververica/flink-cdc-connectors/blob/2d0cfeb1d3aca481004d146176fcc3a08ed31d86/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java#L61

Such a situation could occur if the partition_column has null values, but our recommendation is to use the primary key column for partition_column. Of course, there are also measures that can be taken to prevent this from happening, such as adding checks to see if the partition_column contains null values, and stopping execution if they exist. What do you think about this? @choucmei

USing PK or stopping this can be solved, but not graceful. I perfer use first part "SELECT * FROM (%s) tt where %s <= ?" and las part "SELECT * FROM (%s) tt where %s >= ?" ,and this solution is similar to FlinkCDC

choucmei avatar May 22 '23 08:05 choucmei

Can we shard the data directly, just like the snapshot stage of mysqlflinkcdc like this https://github.com/ververica/flink-cdc-connectors/blob/2d0cfeb1d3aca481004d146176fcc3a08ed31d86/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java#L61

first part "SELECT * FROM (%s) tt where %s <= ?" and las part "SELECT * FROM (%s) tt where %s >= ?" , this solution is similar to FlinkCDC

choucmei avatar May 22 '23 08:05 choucmei

Can we shard the data directly, just like the snapshot stage of mysqlflinkcdc like this https://github.com/ververica/flink-cdc-connectors/blob/2d0cfeb1d3aca481004d146176fcc3a08ed31d86/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java#L61

first part "SELECT * FROM (%s) tt where %s <= ?" and las part "SELECT * FROM (%s) tt where %s >= ?" , this solution is similar to FlinkCDC

That's nice too, could you contribute this PR?

ic4y avatar May 23 '23 03:05 ic4y

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.

github-actions[bot] avatar Jul 02 '23 00:07 github-actions[bot]

This issue has been closed because it has not received response for too long time. You could reopen it if you encountered similar problems in the future.

github-actions[bot] avatar Jul 11 '23 00:07 github-actions[bot]