How can we insert array of objects in clickhouse using java client
I have the following table in clickhouse
42fccdad8610 :) describe user;
DESCRIBE TABLE user
Query id: 5839f61d-c905-47f8-ac7d-92e7f76b2584
┌─name─┬─type───┬─default_type─┬─default_expression─┬─comment─┬─codec_expression─┬─ttl_expression─┐
│ name │ String │ │ │ │ │ │
└──────┴────────┴──────────────┴────────────────────┴─────────┴──────────────────┴────────────────┘
1 row in set. Elapsed: 0.003 sec.
I am trying to insert some data into that table using java, but it's not working. Below is the code to insert data. Following this documentation https://clickhouse.com/docs/en/integrations/java#insert
package com.watchman.etl.dbsync;
import com.clickhouse.client.ClickHouseException;
import com.clickhouse.client.ClickHouseResponse;
import com.clickhouse.data.ClickHouseFormat;
import java.io.ByteArrayInputStream;
public class Test {
public static void main(String[] args) throws ClickHouseException {
String data = "{\"name\":\"Shani Kumar\"}";
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data.getBytes());
ClickHouseResponse clickHouseResponse =
Clk.getClickHouseClient()
.read("http://localhost:8123/event")
.format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
.write()
.query("insert into user select name from input('name String')")
.data(byteArrayInputStream)
.executeAndWait();
System.out.printf("Written %d rows\n", clickHouseResponse.getSummary().getWrittenRows());
}
}
Here is the error details
LF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Exception in thread "main" com.clickhouse.client.ClickHouseException: Code: 33. DB::Exception: Cannot read all data. Bytes read: 20. Bytes expected: 123.: (at row 1)
: While executing BinaryRowInputFormat. (CANNOT_READ_ALL_DATA) (version 24.1.5.6 (official build))
, server ClickHouseNode [uri=http://localhost:8123/event]@-54754220
at com.clickhouse.client.ClickHouseException.of(ClickHouseException.java:168)
at com.clickhouse.client.AbstractClient.lambda$execute$0(AbstractClient.java:275)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.io.IOException: Code: 33. DB::Exception: Cannot read all data. Bytes read: 20. Bytes expected: 123.: (at row 1)
: While executing BinaryRowInputFormat. (CANNOT_READ_ALL_DATA) (version 24.1.5.6 (official build))
at com.clickhouse.client.http.ApacheHttpConnectionImpl.checkResponse(ApacheHttpConnectionImpl.java:209)
at com.clickhouse.client.http.ApacheHttpConnectionImpl.post(ApacheHttpConnectionImpl.java:243)
at com.clickhouse.client.http.ClickHouseHttpClient.send(ClickHouseHttpClient.java:118)
at com.clickhouse.client.AbstractClient.sendAsync(AbstractClient.java:161)
at com.clickhouse.client.AbstractClient.lambda$execute$0(AbstractClient.java:273)
... 4 more
Process finished with exit code 1
Here is an working example for the following table:
CREATE OR REPLACE TABLE events (
id UUID,
external_id String CODEC(ZSTD(9)),
type String CODEC(ZSTD(9)),
sub_type Nullable(String) CODEC(ZSTD(9)),
description String CODEC(ZSTD(9)),
data_set_id Int32 CODEC(ZSTD(9)),
source String CODEC(ZSTD(9)),
date_created DateTime64(3) CODEC(DoubleDelta, ZSTD(9)),
last_updated DateTime64(3) CODEC(DoubleDelta, ZSTD(9)),
start_time DateTime64(3) CODEC(DoubleDelta, ZSTD(9)),
end_time Nullable(DateTime64(3)) CODEC(DoubleDelta, ZSTD(9)),
INDEX event_type_idx (type) TYPE minmax GRANULARITY 4,
INDEX event_sub_type_idx (sub_type) TYPE minmax GRANULARITY 4,
INDEX event_start_time_idx (start_time) TYPE minmax GRANULARITY 4,
INDEX event_end_time_idx (end_time) TYPE minmax GRANULARITY 4
) ENGINE = MergeTree()
ORDER BY (id, data_set_id)
PRIMARY KEY id
PARTITION BY (YEAR(date_created), data_set_id);
public ClickHouseService(
@Value("${clickhouse.database.name:intellistream}") String dbName,
@Value("${clickhouse.database.host:192.168.1.1}") String dbHost,
@Value("${clickhouse.database.username:default}") String dbUser,
@Value("${clickhouse.database.password:mypassword}") String dbPw
){
this.DATABASE_HOST = dbHost;
this.DATABASE_NAME = dbName;
this.DATABASE_USER = dbUser;
this.DATABASE_PASSWORD = dbPw;
this.server = ClickHouseNode.builder()
.host(DATABASE_HOST)
.port(ClickHouseProtocol.HTTP, 8123)
.database(DATABASE_NAME)
.credentials(ClickHouseCredentials.fromUserAndPassword(DATABASE_USER, DATABASE_PASSWORD))
.build();
}
public ClickHouseClient getClient(){
return ClickHouseClient.builder()
.nodeSelector(ClickHouseNodeSelector.of(null, ClickHouseProtocol.HTTP))
.option(ClickHouseClientOption.COMPRESS_ALGORITHM, ClickHouseCompression.ZSTD)
.option(ClickHouseClientOption.COMPRESS_LEVEL, 1)
.option(ClickHouseClientOption.SOCKET_TIMEOUT, 8000)
.option(ClickHouseClientOption.MAX_THREADS_PER_CLIENT, 4)
.build();
}
public void createEvents(EventCudMessage message) throws ClickHouseException {
try (ClickHouseClient client = getClient()) {
ClickHouseRequest.Mutation request = client.write(server).table("events")
.format(ClickHouseFormat.RowBinary);
ClickHouseConfig config = request.getConfig();
CompletableFuture<ClickHouseResponse> future = null;
// back-pressuring is not supported, you can adjust the first two arguments
try (ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance()
.createPipedOutputStream(config, (Runnable) null)) {
// in async mode, which is default, execution happens in a worker thread
future = request.data(stream.getInputStream()).execute();
for(EventModel eventModel : message.getEvents()) {
// writing happens in main thread
var createdTime = ZonedDateTime.now();
var lastUpdatedTime = ZonedDateTime.now();
var eventStartTime = DateTimeHandler.fromEpochUTCTimeAsZonedDateTime(eventModel.getStartTime());
ZonedDateTime eventEndTime = null;
LocalDateTime eventEndDateTime = null;
if(eventModel.getEndTime() != null){
eventEndTime = DateTimeHandler.fromEpochUTCTimeAsZonedDateTime(eventModel.getEndTime());
eventEndDateTime = DateTimeHandler.toUTC(eventEndTime);
}
LocalDateTime createdDateTime = DateTimeHandler.toUTC(createdTime);
LocalDateTime lastUpdatedDateTime = DateTimeHandler.toUTC(lastUpdatedTime);
LocalDateTime eventStartDateTime = DateTimeHandler.toUTC(eventStartTime);
BinaryStreamUtils.writeUuid(stream, UUID.fromString(eventModel.getId()));
BinaryStreamUtils.writeString(stream, eventModel.getExternalId());
BinaryStreamUtils.writeString(stream, eventModel.getType());
if(eventModel.getSubType() != null){
BinaryStreamUtils.writeNonNull(stream);
BinaryStreamUtils.writeString(stream, eventModel.getSubType());
} else {
BinaryStreamUtils.writeNull(stream);
}
BinaryStreamUtils.writeString(stream, eventModel.getDescription());
BinaryStreamUtils.writeInt32(stream, eventModel.getDataSetId());
BinaryStreamUtils.writeString(stream, eventModel.getSource());
BinaryStreamUtils.writeDateTime(stream, createdDateTime, TIMESTAMP_SCALE, TimeZone.getTimeZone(ZoneId.of("UTC")));
BinaryStreamUtils.writeDateTime(stream, lastUpdatedDateTime, TIMESTAMP_SCALE, TimeZone.getTimeZone(ZoneId.of("UTC")));
BinaryStreamUtils.writeDateTime(stream, eventStartDateTime, TIMESTAMP_SCALE, TimeZone.getTimeZone(ZoneId.of("UTC")));
if(eventEndDateTime != null){
BinaryStreamUtils.writeNonNull(stream);
BinaryStreamUtils.writeDateTime(stream, eventEndDateTime, TIMESTAMP_SCALE, TimeZone.getTimeZone(ZoneId.of("UTC")));
} else {
BinaryStreamUtils.writeNull(stream);
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
if(future != null){
// response should be always closed
try (ClickHouseResponse response = future.get()) {
ClickHouseResponseSummary summary = response.getSummary();
log.debug("Datapoints summary: {} rows inserted.", summary.getWrittenRows());
} catch (ExecutionException e){
log.error(e.getMessage(), e);
throw ClickHouseException.of(e, server);
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
throw new RuntimeException(e);
}
}
}
}
This is blazingly fast and I could insert 100k records in 3 seconds.
Isn’t there any simple API that allows developers to insert data into any database table, just like how JDBC work?
Here is a simple example of JDBC, easy to understand, write and debug. I understand JDBC can’t be used directly with click house, but at least we can write a simple API which is easy to write, understand
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
public class Main {
public static void main(String[] args) {
// Database connection parameters
String url = "jdbc:mysql://localhost:3306/mydatabase";
String username = "username";
String password = "password";
// SQL query to insert data into a table
String sql = "INSERT INTO mytable (column1, column2, column3) VALUES (?, ?, ?)";
// Data to be inserted
String value1 = "Value1";
int value2 = 123;
String value3 = "Value3";
try (
// Establish a connection to the database
Connection conn = DriverManager.getConnection(url, username, password);
// Create a PreparedStatement to execute the SQL query
PreparedStatement pstmt = conn.prepareStatement(sql)
) {
// Set values for PreparedStatement parameters
pstmt.setString(1, value1);
pstmt.setInt(2, value2);
pstmt.setString(3, value3);
// Execute the query to insert data
int rowsAffected = pstmt.executeUpdate();
// Check if any rows were affected
if (rowsAffected > 0) {
System.out.println("Data inserted successfully.");
} else {
System.out.println("No rows affected. Data may not have been inserted.");
}
} catch (SQLException e) {
// Handle any SQL errors
e.printStackTrace();
}
}
}
Looks like BinaryStreamUtils does not have any support for array data type
Good day, @ats1999 !
BinaryStreamUtils is working only with primitives, unfortunately.
Simple arrays however may be written with it by writing vatInt and then each primitive value.
BinaryStreamUtils.writeVarInt(out, array.length)
for (String item : array) {
BinaryStreamUtils.writeString(out, item);
}
Btw, JDBC client may be directly used with ClickHouse. Or have you met some issues?