clickhouse-kafka-connect
clickhouse-kafka-connect copied to clipboard
ExtractTablesMapping's DESCRIBE query times out
Describe the bug
We see every 5-10 minutes, DESCRIBE
queries timeout in Kafka Connect while they have 1ms execution_duration in system.query_log
. We run our connector task with tableRefreshInterval=15
. Based on system.query_log
multiple extractTablesMapping
overlap. I'm also seeing that SHOW TABLES
queries shouldn't run less frequently than 15s runs. I'm wondering if we should add a check that doesn't start ExtractTablesMapping
if it's already running.
Steps to reproduce
- Run a connector with
tableRefreshInterval=15
- Generate test data into Kafka queue
- Observe Kafka Connect logs
Expected behavior
DESCRIBE
query not to timeout.
Error log
8:23:28 AM ERROR
Exception when running describeTable DESCRIBE TABLE `my_database`.`my_table`
connect timed out, server ClickHouseNode [uri=https://{redacted}.us-east-2.aws.clickhouse.cloud:8443/mydb, options={sslmode=STRICT}]@2074393920
com.clickhouse.client.ClickHouseException: connect timed out, server ClickHouseNode [uri=https://{redacted}.us-east-2.aws.clickhouse.cloud:8443/openmeter, options={sslmode=STRICT}]@-166211039
at com.clickhouse.client.ClickHouseException.of(ClickHouseException.java:164)
at com.clickhouse.client.AbstractClient.lambda$execute$0(AbstractClient.java:275)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.SocketTimeoutException: connect timed out
at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.base/java.net.Socket.connect(Socket.java:609)
at java.base/sun.security.ssl.SSLSocketImpl.connect(SSLSocketImpl.java:305)
at java.base/sun.net.NetworkClient.doConnect(NetworkClient.java:177)
at java.base/sun.net.www.http.HttpClient.openServer(HttpClient.java:509)
at java.base/sun.net.www.http.HttpClient.openServer(HttpClient.java:604)
at java.base/sun.net.www.protocol.https.HttpsClient.<init>(HttpsClient.java:266)
at java.base/sun.net.www.protocol.https.HttpsClient.New(HttpsClient.java:373)
at java.base/sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.getNewHttpClient(AbstractDelegateHttpsURLConnection.java:207)
at java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1187)
at java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1081)
at java.base/sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:193)
at java.base/sun.net.www.protocol.http.HttpURLConnection.getOutputStream0(HttpURLConnection.java:1367)
at java.base/sun.net.www.protocol.http.HttpURLConnection.getOutputStream(HttpURLConnection.java:1342)
at java.base/sun.net.www.protocol.https.HttpsURLConnectionImpl.getOutputStream(HttpsURLConnectionImpl.java:246)
at com.clickhouse.client.http.HttpUrlConnectionImpl.post(HttpUrlConnectionImpl.java:225)
at com.clickhouse.client.http.ClickHouseHttpClient.send(ClickHouseHttpClient.java:124)
at com.clickhouse.client.AbstractClient.sendAsync(AbstractClient.java:161)
at com.clickhouse.client.AbstractClient.lambda$execute$0(AbstractClient.java:273)
Configuration
{
"connector.client.config.override.policy": "ALL",
"consumer.auto.offset.reset": "latest",
"consumer.override.auto.offset.reset": "latest",
"consumer.override.max.poll.records": "5000",
"database": "mydb",
"errors.deadletterqueue.context.headers.enable": "true",
"errors.deadletterqueue.topic.name": "om_deadletterqueue",
"errors.deadletterqueue.topic.replication.factor": "3",
"errors.retry.timeout": "30",
"errors.tolerance": "all",
"exactlyOnce": "true",
"hostname": "{redacted}.us-east-2.aws.clickhouse.cloud",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"password": "****************",
"port": "8443",
"schemas.enable": "false",
"ssl": "true",
"tableRefreshInterval": "15",
"topics.regex": "^om_[A-Za-z0-9]+(?:_[A-Za-z0-9]+)*_events$",
"username": "default",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
Environment
- Kafka-Connect version: Confluent Cloud
- Kafka Connect configuration: see above
- Kafka version: Confluent Cloud
- Kafka environment: Confluent Cloud
- OS: Linux
ClickHouse server
- ClickHouse Server version: ClickHouse Cloud (v23.8)
- ClickHouse Server non-default settings, if any: ClickHouse Cloud
-
CREATE TABLE
statements for tables involved: - - Sample data for all these tables, use clickhouse-obfuscator if necessary
cc @mzitnik
@hekike How many workers are you running in parallel & can you show the output from the query log
Hi @hekike - I'm closing this because we haven't heard anything in a while, but if it's still an issue please comment with the details @mzitnik was asking for and we can take another look into it. Thanks!
We chatted with @mzitnik offline but couldn't figure out root cause.
I think the issue here sending lots of describe queries simultaneously with an HTTP connection.
We are receiving a similar issue, instead of timeout. Clickhouse starts to refuse new HTTP connections somehow although there is no limit.
But if we open the connectors one by one, (in other words not sending lots of describe tables simultaneously) Everything works fine.
Here is a patched file to resolve HTTP connection issues. Maybe that can be the also upper issue's cause.
package com.clickhouse.kafka.connect.sink.db.helper;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.clickhouse.client.ClickHouseClient;
import com.clickhouse.client.ClickHouseException;
import com.clickhouse.client.ClickHouseNode;
import com.clickhouse.client.ClickHouseNodeSelector;
import com.clickhouse.client.ClickHouseProtocol;
import com.clickhouse.client.ClickHouseResponse;
import com.clickhouse.client.config.ClickHouseClientOption;
import com.clickhouse.client.config.ClickHouseProxyType;
import com.clickhouse.config.ClickHouseOption;
import com.clickhouse.data.ClickHouseFormat;
import com.clickhouse.data.ClickHouseRecord;
import com.clickhouse.data.ClickHouseValue;
import com.clickhouse.kafka.connect.sink.ClickHouseSinkConfig;
import com.clickhouse.kafka.connect.sink.db.mapping.Column;
import com.clickhouse.kafka.connect.sink.db.mapping.Table;
import com.clickhouse.kafka.connect.util.Utils;
import com.fasterxml.jackson.core.JsonProcessingException;
import lombok.Getter;
public class ClickHouseHelperClient {
private static final Logger LOGGER = LoggerFactory.getLogger(ClickHouseHelperClient.class);
private final String hostname;
private final int port;
private final String username;
@Getter
private final String database;
private final String password;
private final boolean sslEnabled;
private final String jdbcConnectionProperties;
private final int timeout;
@Getter
private ClickHouseNode server = null;
private final int retry;
private ClickHouseProxyType proxyType = null;
private String proxyHost = null;
private int proxyPort = -1;
public ClickHouseHelperClient(ClickHouseClientBuilder builder) {
this.hostname = builder.hostname;
this.port = builder.port;
this.username = builder.username;
this.password = builder.password;
this.database = builder.database;
this.sslEnabled = builder.sslEnabled;
this.jdbcConnectionProperties = builder.jdbcConnectionProperties;
this.timeout = builder.timeout;
this.retry = builder.retry;
this.proxyType = builder.proxyType;
this.proxyHost = builder.proxyHost;
this.proxyPort = builder.proxyPort;
this.server = create();
}
public Map<ClickHouseOption, Serializable> getDefaultClientOptions() {
Map<ClickHouseOption, Serializable> options = new HashMap<>();
options.put(ClickHouseClientOption.PRODUCT_NAME, "clickhouse-kafka-connect/"+ClickHouseClientOption.class.getPackage().getImplementationVersion());
if (proxyType != null && !proxyType.equals(ClickHouseProxyType.IGNORE)) {
options.put(ClickHouseClientOption.PROXY_TYPE, proxyType);
options.put(ClickHouseClientOption.PROXY_HOST, proxyHost);
options.put(ClickHouseClientOption.PROXY_PORT, proxyPort);
}
return options;
}
private ClickHouseNode create() {
String protocol = "http";
if (this.sslEnabled)
protocol += "s";
String tmpJdbcConnectionProperties = jdbcConnectionProperties;
if (tmpJdbcConnectionProperties != null && !tmpJdbcConnectionProperties.startsWith("?")) {
tmpJdbcConnectionProperties = "?" + tmpJdbcConnectionProperties;
}
String url = String.format("%s://%s:%d/%s%s",
protocol,
hostname,
port,
database,
tmpJdbcConnectionProperties
);
LOGGER.info("ClickHouse URL: {}", url);
if (username != null && password != null) {
LOGGER.debug(String.format("Adding username [%s]", username));
Map<String, String> options = new HashMap<>();
options.put("user", username);
options.put("password", password);
server = ClickHouseNode.of(url, options);
} else {
server = ClickHouseNode.of(url);
}
return server;
}
public boolean ping() {
ClickHouseClient clientPing = ClickHouseClient.builder()
.options(getDefaultClientOptions())
.nodeSelector(ClickHouseNodeSelector.of(ClickHouseProtocol.HTTP))
.build();
LOGGER.debug(String.format("Server [%s] , Timeout [%d]", server, timeout));
int retryCount = 0;
while (retryCount < retry) {
if (clientPing.ping(server, timeout)) {
LOGGER.info("Ping was successful.");
clientPing.close();
return true;
}
retryCount++;
LOGGER.warn(String.format("Ping retry %d out of %d", retryCount, retry));
}
LOGGER.error("Unable to ping ClickHouse instance.");
clientPing.close();
return false;
}
public String version() {
try (ClickHouseClient client = ClickHouseClient.builder()
.options(getDefaultClientOptions())
.nodeSelector(ClickHouseNodeSelector.of(ClickHouseProtocol.HTTP))
.build();
ClickHouseResponse response = client.read(server)
.query("SELECT VERSION()")
.executeAndWait()) {
return response.firstRecord().getValue(0).asString();
} catch (ClickHouseException e) {
LOGGER.error("Exception when trying to retrieve VERSION()", e);
return null;
}
}
public ClickHouseResponse query(String query) {
return query(query, null);
}
public ClickHouseResponse query(String query, ClickHouseFormat clickHouseFormat) {
int retryCount = 0;
ClickHouseException ce = null;
while (retryCount < retry) {
try (ClickHouseClient client = ClickHouseClient.builder()
.options(getDefaultClientOptions())
.nodeSelector(ClickHouseNodeSelector.of(ClickHouseProtocol.HTTP))
.build();
ClickHouseResponse response = client.read(server)
.format(clickHouseFormat)
.query(query)
.executeAndWait()) {
return response;
} catch (ClickHouseException e) {
retryCount++;
LOGGER.warn(String.format("Query retry %d out of %d", retryCount, retry), e);
ce = e;
}
}
throw new RuntimeException(ce);
}
public List<String> showTables(String database) {
List<String> tablesNames = new ArrayList<>();
try (ClickHouseClient client = ClickHouseClient.builder()
.options(getDefaultClientOptions())
.nodeSelector(ClickHouseNodeSelector.of(ClickHouseProtocol.HTTP))
.build();
ClickHouseResponse response = client.read(server)
.query(String.format("SHOW TABLES FROM `%s`", database))
.executeAndWait()) {
for (ClickHouseRecord r : response.records()) {
ClickHouseValue v = r.getValue(0);
String tableName = v.asString();
tablesNames.add(tableName);
}
} catch (ClickHouseException e) {
LOGGER.error("Failed in show tables", e);
}
return tablesNames;
}
public Table describeTable(String database, String tableName, ClickHouseClient client) {
if (tableName.startsWith(".inner"))
return null;
String describeQuery = String.format("DESCRIBE TABLE `%s`.`%s`", database, tableName);
LOGGER.debug(describeQuery);
if (client == null) {
try {
client = ClickHouseClient.builder()
.options(getDefaultClientOptions())
.nodeSelector(ClickHouseNodeSelector.of(ClickHouseProtocol.HTTP))
.build();
}
catch (Exception e) {
LOGGER.error("Failed to create client", e);
return null;
}
}
try (ClickHouseResponse response = client.read(server)
.set("describe_include_subcolumns", true)
.format(ClickHouseFormat.JSONEachRow)
.query(describeQuery)
.executeAndWait()) {
Table table = new Table(database, tableName);
for (ClickHouseRecord r : response.records()) {
ClickHouseValue v = r.getValue(0);
ClickHouseFieldDescriptor fieldDescriptor = ClickHouseFieldDescriptor.fromJsonRow(v.asString());
if (fieldDescriptor.isAlias() || fieldDescriptor.isMaterialized()) {
LOGGER.debug("Skipping column {} as it is an alias or materialized view", fieldDescriptor.getName());
continue;
}
if (fieldDescriptor.hasDefault()) {
table.hasDefaults(true);
}
Column column = Column.extractColumn(fieldDescriptor);
//If we run into a rare column we can't handle, just ignore the table and warn the user
if (column == null) {
LOGGER.warn("Unable to handle column: {}", fieldDescriptor.getName());
return null;
}
table.addColumn(column);
}
return table;
} catch (ClickHouseException | JsonProcessingException e) {
LOGGER.error(String.format("Exception when running describeTable %s", describeQuery), e);
return null;
}
}
public Table describeTable(String database, String tableName) {
return describeTable(database, tableName, null);
}
public List<Table> extractTablesMapping(String database, Map<String, Table> cache) {
List<Table> tableList = new ArrayList<>();
try (ClickHouseClient client = ClickHouseClient.builder()
.options(getDefaultClientOptions())
.nodeSelector(ClickHouseNodeSelector.of(ClickHouseProtocol.HTTP))
.build()) {
for (String tableName : showTables(database) ) {
// (Full) Table names are escaped in the cache
String escapedTableName = Utils.escapeTableName(database, tableName);
// Read from cache if we already described this table before
// This means we won't pick up edited table configs until the connector is restarted
if (cache.containsKey(escapedTableName)) {
tableList.add(cache.get(escapedTableName));
continue;
}
Table table = describeTable(this.database, tableName, client);
if (table != null )
tableList.add(table);
}
return tableList;
} catch (Exception e) {
LOGGER.error("Failed to extract tables mapping", e);
return null;
}
public static class ClickHouseClientBuilder {
private ClickHouseSinkConfig config = null;
private String hostname = null;
private int port = -1;
private String username = "default";
private String database = "default";
private String password = "";
private boolean sslEnabled = false;
private String jdbcConnectionProperties = "";
private int timeout = ClickHouseSinkConfig.timeoutSecondsDefault * ClickHouseSinkConfig.MILLI_IN_A_SEC;
private int retry = ClickHouseSinkConfig.retryCountDefault;
private ClickHouseProxyType proxyType = null;
private String proxyHost = null;
private int proxyPort = -1;
public ClickHouseClientBuilder(String hostname, int port, ClickHouseProxyType proxyType, String proxyHost, int proxyPort) {
this.hostname = hostname;
this.port = port;
this.proxyType = proxyType;
this.proxyHost = proxyHost;
this.proxyPort = proxyPort;
}
public ClickHouseClientBuilder setUsername(String username) {
this.username = username;
return this;
}
public ClickHouseClientBuilder setPassword(String password) {
this.password = password;
return this;
}
public ClickHouseClientBuilder setDatabase(String database) {
this.database = database;
return this;
}
public ClickHouseClientBuilder sslEnable(boolean sslEnabled) {
this.sslEnabled = sslEnabled;
return this;
}
public ClickHouseClientBuilder setJdbcConnectionProperties(String jdbcConnectionProperties) {
this.jdbcConnectionProperties = jdbcConnectionProperties;
return this;
}
public ClickHouseClientBuilder setTimeout(int timeout) {
this.timeout = timeout;
return this;
}
public ClickHouseClientBuilder setRetry(int retry) {
this.retry = retry;
return this;
}
public ClickHouseHelperClient build(){
return new ClickHouseHelperClient(this);
}
}
}
@mzitnik