clickhouse-sink-connector icon indicating copy to clipboard operation
clickhouse-sink-connector copied to clipboard

RETRYING the same batch again

Open vpol opened this issue 1 year ago • 11 comments

connector: lightweight version: 2.1.0

I've noticed a lot of messages RETRYING the same batch again, but no errors around the message signalling something was wrong with the insert.

any info why could it happen and what to do with it?

vpol avatar May 24 '24 12:05 vpol

@vpol the error message should be in the log file of course but you can probably check at the CH level. system.query_log where exception != '' You should see it if it is a server side issue.

aadant avatar May 24 '24 13:05 aadant

Hi. I've tired to investigate it, but it looks like there are no errors in the log and no errors on the clickhouse side.

I'm not 100% sure if it's related, but I've noticed that almost always select * from replica_source_info returns nothing.

vpol avatar Jun 05 '24 15:06 vpol

@subkanthi could it be related this breaking change in 2.1.0 ?

Breaking Changes.

The configuration clickhouse.server.database is now deprecated with the multiple database support. By default the source MySQL/postgres database name will be used as the ClickHouse database name.

aadant avatar Jun 05 '24 20:06 aadant

@subkanthi could it be related this breaking change in 2.1.0 ?

Breaking Changes.

The configuration clickhouse.server.database is now deprecated with the multiple database support. By default the source MySQL/postgres database name will be used as the ClickHouse database name.

Not sure, @vpol , do you have the LOGGING_LEVEL environment variable set to debug. it looks there is a problem with persisting the offsets to the offset table as the batch is cleared only after the offsets are persisted. Please review the offset.storage. configuration variables, it's possible the user does not have permissions to write to the offsets table.


# offset.storage.jdbc.offset.table.name: The name of the database table where connector offsets are to be stored.
offset.storage.jdbc.offset.table.name: "altinity_sink_connector.replica_source_info"

# offset.storage.jdbc.url: The JDBC URL for the database where connector offsets are to be stored.
offset.storage.jdbc.url: "jdbc:clickhouse://clickhouse:8123/altinity_sink_connector"

# offset.storage.jdbc.user: The name of the database user to be used when connecting to the database where connector offsets are to be stored.
offset.storage.jdbc.user: "root"

# offset.storage.jdbc.password: The password of the database user to be used when connecting to the database where connector offsets are to be stored.
offset.storage.jdbc.password: "root"

subkanthi avatar Jun 05 '24 21:06 subkanthi

I've tried LOGGING_LEVEL=debug but it does not increase verbosity.

Here is the config that I'm using

name: "postgres-sync-clickhouse"

# database.hostname: IP address or hostname of the PostgreSQL database server.
database.hostname: "<ip>"

# database.port: Integer port number of the PostgreSQL database server listening for client connections.
database.port: "<port>"

# database.user: Name of the PostgreSQL database user to be used when connecting to the database.
database.user: "<user>"

# database.password: Password of the PostgreSQL database user to be used when connecting to the database.
database.password: "<password>"

# database.server.name: The name of the PostgreSQL database from which events are to be captured when not using snapshot mode.
database.server.name: "api"

# schema.include.list: An optional list of regular expressions that match schema names to be monitored;
schema.include.list: "public"

# plugin.name:  The name of the PostgreSQL logical decoding plug-in installed on the PostgreSQL server. Supported values are decoderbufs, and pgoutput.
plugin.name: "pgoutput"

slot.name: "clickhouse_sync"

# table.include.list: An optional list of regular expressions that match fully-qualified table identifiers for tables to be monitored;
table.include.list: "<tables-list>"

# clickhouse.server.url: Specify only the hostname of the Clickhouse Server.
clickhouse.server.url: "https://<something>.us-central1.gcp.clickhouse.cloud"

# clickhouse.server.user: Clickhouse Server User
clickhouse.server.user: "<user>"

# clickhouse.server.password: Clickhouse Server Password
clickhouse.server.password: "<password>"

# clickhouse.server.port: Clickhouse Server Port
clickhouse.server.port: "<port>"

# clickhouse.server.database: Clickhouse Server Database
clickhouse.server.database: "<database>"

# database.allowPublicKeyRetrieval: "true" https://rmoff.net/2019/10/23/debezium-mysql-v8-public-key-retrieval-is-not-allowed/
database.allowPublicKeyRetrieval: "true"

# snapshot.mode: Debezium can use different modes when it runs a snapshot. The snapshot mode is determined by the snapshot.mode configuration property.
# snapshot.mode: "no_data"
snapshot.mode: "never"

# offset.flush.interval.ms: The number of milliseconds to wait before flushing recent offsets to Kafka. This ensures that offsets are committed within the specified time interval.
offset.flush.interval.ms: 60000
offset.flush.timeout.ms: 30000
buffer.max.records: 1000000
buffer.flush.timeout.ms: 120000
max.batch.size: 20000
max.queue.size: 100000

#snapshot.fetch.size: 2048
metrics.enable: "false"
cli.port: 7052
thread.pool.size: 10

# connector.class: The Java class for the connector. This must be set to io.debezium.connector.postgresql.PostgresConnector.
connector.class: "io.debezium.connector.postgresql.PostgresConnector"

# offset.storage: The Java class that implements the offset storage strategy. This must be set to io.debezium.storage.jdbc.offset.JdbcOffsetBackingStore.
offset.storage: "io.debezium.storage.jdbc.offset.JdbcOffsetBackingStore"

# offset.storage.jdbc.offset.table.name: The name of the database table where connector offsets are to be stored.
offset.storage.jdbc.offset.table.name: "api.replica_source_info"

# offset.storage.jdbc.url: The JDBC URL for the database where connector offsets are to be stored.
offset.storage.jdbc.url: "jdbc:clickhouse:https://<something>.us-central1.gcp.clickhouse.cloud:<port>/api"

# offset.storage.jdbc.user: The name of the database user to be used when connecting to the database where connector offsets are to be stored.
offset.storage.jdbc.user: "<user>"

# offset.storage.jdbc.password: The password of the database user to be used when connecting to the database where connector offsets are to be stored.
offset.storage.jdbc.password: "<password>"

# offset.storage.jdbc.offset.table.ddl: The DDL statement used to create the database table where connector offsets are to be stored.
offset.storage.jdbc.offset.table.ddl: "CREATE TABLE if not exists %s
(
    id                String,
    offset_key        String,
    offset_val        String,
    record_insert_ts  DateTime,
    record_insert_seq UInt64,
    _version          UInt64 materialized toUnixTimestamp64Nano(now64(9))
)
ENGINE = ReplacingMergeTree(_version)
ORDER BY id
SETTINGS index_granularity = 8198"
offset.storage.jdbc.offset.table.delete: "delete from %s where 1=1"
schema.history.internal: "io.debezium.storage.jdbc.history.JdbcSchemaHistory"
schema.history.internal.jdbc.url: "jdbc:clickhouse:https://<something>.us-central1.gcp.clickhouse.cloud:<port>/api"
schema.history.internal.jdbc.user: "<user>"
schema.history.internal.jdbc.password: "<password>"

schema.history.internal.schema.history.table.name: "api.replicate_schema_history"

# schema.history.internal.schema.history.table.name: The name of the database table where connector schema history is to be stored.
schema.history.internal.jdbc.schema.history.table.name: "api.replicate_schema_history"
schema.history.internal.jdbc.schema.history.table.ddl: "CREATE TABLE if not exists %s
(
    id String NOT NULL,
    history_data String,
    history_data_seq INTEGER,
    record_insert_ts TIMESTAMP NOT NULL,
    record_insert_seq INTEGER NOT NULL
)
ENGINE=ReplacingMergeTree(record_insert_seq)
order by id
SETTINGS index_granularity = 8198"


# enable.snapshot.ddl: If set to true, the connector wil parse the DDL statements as part of initial load.
enable.snapshot.ddl: "true"

# auto.create.tables: If set to true, the connector will create the database tables for the destination tables if they do not already exist.
auto.create.tables: "true"
auto.create.tables.replicated: "true"

# database.dbname: The name of the PostgreSQL database from which events are to be captured when not using snapshot mode.
database.dbname: "api"

# clickhouse.datetime.timezone: This timezone will override the default timezone of ClickHouse server. Timezone columns will be set to this timezone.
#clickhouse.datetime.timezone: "UTC"

# skip_replica_start: If set to true, the connector will skip replication on startup. sink-connector-client start_replica will start replication.
skip_replica_start: "false"

# binary.handling.mode: The mode for handling binary values. Possible values are bytes, base64, and decode. The default is bytes.
#binary.handling.mode: "base64"

# ignore_delete: If set to true, the connector will ignore delete events. The default is false.
ignore_delete: "false"

#disable.ddl: If set to true, the connector will ignore DDL events. The default is false.
disable.ddl: "false"

#disable.drop.truncate: If set to true, the connector will ignore drop and truncate events. The default is false.
#disable.drop.truncate: "false"

vpol avatar Jun 07 '24 12:06 vpol

A little update: I don't see this issue it in my test environment where I use non-clustered clickhouse.

vpol avatar Jun 07 '24 14:06 vpol

I've done a bit more debugging and identified that if returns error because

static boolean checkIfThereAreInflightRequests

returns true for the batch.

but I'm not sure why does it happen.

vpol avatar Jun 07 '24 19:06 vpol

Hi @vpol , I was able to reproduce this, this can happen if the auto create table fails possibly due to a missing macro

AUTO CREATE TABLE fails, you should a log also in the system.query_log.

2024-06-10 19:11:48.345 INFO  - **** Task(0), AUTO CREATE TABLE (tm2) ***
2024-06-10 19:11:48.355 ERROR - **** Error creating table ***tm2

subkanthi avatar Jun 10 '24 19:06 subkanthi

Hi @subkanthi, thanks for checking it out. Unfortunately I can't see any errors like the one you mentioned in the query log. To be honest I don't see ANY errors in the logs for the user/db I'm using for sync.

I'll try to debug it a bit more.

vpol avatar Jun 12 '24 09:06 vpol

Hi

we have the same problem with normal (not lightweight) connector version 2.2.1, no errors in logs and clickhouse

[INFO ] 2024-08-09 12:43:40.783 [Task: Sink Connector thread-pool-1askId] PreparedStatementExecutor - *************** EXECUTED BATCH Successfully Records: 239************** task(0) Thread ID: Sink Connector thread-pool-1 Result: [I@35563dc5 Database: db Table: table2
[INFO ] 2024-08-09 12:43:40.783 [Task: Sink Connector thread-pool-1askId] PreparedStatementExecutor - *** INSERT QUERY for Database(sync2) ***: insert into [table1, redacted]
[INFO ] 2024-08-09 12:43:40.794 [Task: Sink Connector thread-pool-1askId] PreparedStatementExecutor - *************** EXECUTED BATCH Successfully Records: 4************** task(0) Thread ID: Sink Connector thread-pool-1 Result: [I@7b4d921a Database: db Table: table1
[DEBUG] 2024-08-09 12:43:40.794 [Task: Sink Connector thread-pool-1askId] ClickHouseBatchRunnable - ***** RETRYING the same batch again
[INFO ] 2024-08-09 12:43:40.794 [Task: Sink Connector thread-pool-1askId] ClickHouseBatchRunnable - ****** Thread: Sink Connector thread-pool-1 Batch Size: 246 ******
[INFO ] 2024-08-09 12:43:40.794 [Task: Sink Connector thread-pool-1askId] PreparedStatementExecutor - *** INSERT QUERY for Database(sync2) ***: insert into `debezium_signal`(`id`,`type`,`data`,`_version`,`is_deleted`) select `id`,`type`,`data`,`_version`,`is_deleted` from input('`id` String,`type` Nullable(String),`data` Nullable(String),`_version` UInt64,`is_deleted` UInt8')
[INFO ] 2024-08-09 12:43:40.798 [Task: Sink Connector thread-pool-1askId] PreparedStatementExecutor - *************** EXECUTED BATCH Successfully Records: 3************** task(0) Thread ID: Sink Connector thread-pool-1 Result: [I@2ca472e9 Database: db Table: debezium_signal
[INFO ] 2024-08-09 12:43:40.816 [Task: Sink Connector thread-pool-1askId] PreparedStatementExecutor - *** INSERT QUERY for Database(sync2) ***: insert into [table2, redacted]
[INFO ] 2024-08-09 12:43:40.834 [Task: Sink Connector thread-pool-1askId] PreparedStatementExecutor - *************** EXECUTED BATCH Successfully Records: 133************** task(3) Thread ID: Sink Connector thread-pool-1 Result: [I@32aedbe4 Database: db Table: table2
[DEBUG] 2024-08-09 12:43:40.834 [Task: Sink Connector thread-pool-1askId] ClickHouseBatchRunnable - ***** RETRYING the same batch again
[INFO ] 2024-08-09 12:43:40.834 [Task: Sink Connector thread-pool-1askId] ClickHouseBatchRunnable - ****** Thread: Sink Connector thread-pool-1 Batch Size: 133 ******
[INFO ] 2024-08-09 12:43:40.839 [Task: Sink Connector thread-pool-1askId] PreparedStatementExecutor - *************** EXECUTED BATCH Successfully Records: 117************** task(1) Thread ID: Sink Connector thread-pool-1 Result: [I@4d85b7ae Database: db Table: table2
[INFO ] 2024-08-09 12:43:40.839 [Task: Sink Connector thread-pool-1askId] PreparedStatementExecutor - *** INSERT QUERY for Database(sync2) ***: insert into [table1, redacted]

I'm testing it on small tables (1k rows) and sink just trying to insert rows in clickhouse indefinitely.

pservit avatar Aug 09 '24 12:08 pservit

Hi! Same issue with lightweight connector 2.1.0 and 2.2.1 I try sync from postgres15.6 to CH 24.8

I create empty ReplicatedReplacingMergeTree table, then add it to table.include.list send execute-snapshot record to signal.data.collection table and see [DEBUG] ***** RETRYING the same batch again

In clickhouse logs appears lot of info-level lines about signal table and my replicated table :

<Information> dbname.signal_table  (Replicated OutputStream): Block with ID all_15085468674909155566_13758033302413788604 already exists locally as part all_56_56_0; ignoring it.
<Information> dbname.my_table (ba7899aa-66ba-45f0-ad60-a523fde759db) (Replicated OutputStream): Block with ID all_16026509731054093547_2011380493872661054 already exists locally as part all_6_6_0; ignoring it.

Screenshot_694

Chocobo37 avatar Aug 23 '24 21:08 Chocobo37