How to create batch insert with columns list with clickhouse-http-client?
Hello, the question how to implements inserting process by butch to table by columns list. I have table:
CREATE TABLE test.test
(
a String,
b String DEFAULT 'unknown',
c String DEFAULT 'unknown',
d String DEFAULT 'unknown',
)
ENGINE = MergeTree
ORDER BY a
SETTINGS index_granularity = 8192;
and ingested data in formats like:
[
[
{
"a": "UUID as text",
"c": "some text"
},
{
"a": "UUID as text",
"c": "some text"
},
{
"a": "UUID as text",
"c": "some text"
}
],
[
{
"a": "UUID as text",
"b": "some text"
},
{
"a": "UUID as text",
"b": "some text"
},
{
"a": "UUID as text",
"b": "some text"
}
]
]
(some columns but not from ordering key might be absent in objects) Currently when I for example try to use it as I understand subscribed in documentation:
RowBinaryWithNames
Similar to [RowBinary](https://clickhouse.com/docs/en/interfaces/formats#rowbinary), but with added header:
- [LEB128](https://en.wikipedia.org/wiki/LEB128)-encoded number of columns (N)
- N Strings specifying column names
public void insert(ClickHouseNode server, List<Object> data) throws ClickHouseException {
try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP)) {
ClickHouseRequest.Mutation request = client.read(server).write().table("test").format(ClickHouseFormat.RowBinaryWithNames).set("send_progress_in_http_headers", 1);
ClickHouseConfig config = request.getConfig();
CompletableFuture<ClickHouseResponse> future;
try (ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance().createPipedOutputStream(config, (Runnable) null)) {
future = request.data(stream.getInputStream()).execute();
BinaryStreamUtils.writeInt128(stream, BigInteger.valueOf(3));
BinaryStreamUtils.writeString(stream, "a");
BinaryStreamUtils.writeString(stream, "b");
BinaryStreamUtils.writeString(stream, "c");
for (int i = 0; i < 10_000; i++) {
BinaryStreamUtils.writeString(stream, UUID.randomUUID().toString());
BinaryStreamUtils.writeString(stream, String.valueOf(i));
BinaryStreamUtils.writeString(stream, String.valueOf(i%4));
}
}
try (ClickHouseResponse response = future.get()) {
ClickHouseResponseSummary summary = response.getSummary();
log.debug("Inserted: {}", summary.getWrittenRows());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw ClickHouseException.forCancellation(e, server);
} catch (ExecutionException | IOException e) {
throw ClickHouseException.of(e, server);
}
}
it return errors like:
data-sync | Caused by: java.io.IOException: Code: 626. DB::Exception: Cannot skip unknown field in RowBinaryWithNames format, because it's type is unknown: While executing BinaryRowInputFormat. (CANNOT_SKIP_UNKNOWN_FIELD) (version 23.1.2.9 (official build))
data-sync |
data-sync | at com.clickhouse.client.http.HttpUrlConnectionImpl.checkResponse(HttpUrlConnectionImpl.java:184) ~[clickhouse-http-client-0.4.6.jar:clickhouse-http-client 0.4.6 (revision: dd91e17)]
data-sync | at com.clickhouse.client.http.HttpUrlConnectionImpl.post(HttpUrlConnectionImpl.java:227) ~[clickhouse-http-client-0.4.6.jar:clickhouse-http-client 0.4.6 (revision: dd91e17)]
data-sync | at com.clickhouse.client.http.ClickHouseHttpClient.send(ClickHouseHttpClient.java:124) ~[clickhouse-http-client-0.4.6.jar:clickhouse-http-client 0.4.6 (revision: dd91e17)]
data-sync | at com.clickhouse.client.AbstractClient.sendAsync(AbstractClient.java:161) ~[clickhouse-client-0.4.6.jar:clickhouse-client 0.4.6 (revision: dd91e17)]
data-sync | at com.clickhouse.client.AbstractClient.lambda$execute$0(AbstractClient.java:273) ~[clickhouse-client-0.4.6.jar:clickhouse-client 0.4.6 (revision: dd91e17)]
data-sync | ... 4 common frames omitted
or
data-sync | Caused by: java.io.IOException: Code: 33. DB::Exception: Cannot read all data. Bytes read: 89. Bytes expected: 101.: (at row 2669)
data-sync | : While executing BinaryRowInputFormat. (CANNOT_READ_ALL_DATA) (version 23.1.2.9 (official build))
data-sync |
data-sync | at com.clickhouse.client.http.HttpUrlConnectionImpl.checkResponse(HttpUrlConnectionImpl.java:184) ~[clickhouse-http-client-0.4.6.jar:clickhouse-http-client 0.4.6 (revision: dd91e17)]
data-sync | at com.clickhouse.client.http.HttpUrlConnectionImpl.post(HttpUrlConnectionImpl.java:227) ~[clickhouse-http-client-0.4.6.jar:clickhouse-http-client 0.4.6 (revision: dd91e17)]
data-sync | at com.clickhouse.client.http.ClickHouseHttpClient.send(ClickHouseHttpClient.java:124) ~[clickhouse-http-client-0.4.6.jar:clickhouse-http-client 0.4.6 (revision: dd91e17)]
data-sync | at com.clickhouse.client.AbstractClient.sendAsync(AbstractClient.java:161) ~[clickhouse-client-0.4.6.jar:clickhouse-client 0.4.6 (revision: dd91e17)]
data-sync | at com.clickhouse.client.AbstractClient.lambda$execute$0(AbstractClient.java:273) ~[clickhouse-client-0.4.6.jar:clickhouse-client 0.4.6 (revision: dd91e17)]
data-sync | ... 4 common frames omitted
Is it possible and how to implement it with for example: ClickHouseFormat.RowBinaryWithNamesAndTypes or ClickHouseFormat.RowBinaryWithNames??? Or if not how to use classic SQL insert for batch insert with clickhouse-http-client 0.4.6?.. (really I didn't found in the documentation :'( )
With clickhouse-jdbc It can be implemented as
public void insertJDBC() {
String sql = String.format(
"INSERT INTO %s (a, b, c) SELECT a, b, c FROM input('a String, b String, c String')",
"test"
);
try (PreparedStatement ps = this.connection.prepareStatement(sql)) {
for (int i = 0; i < 10_000; i++) {
ps.setString(1, UUID.randomUUID().toString());
ps.setString(2, "b" + i);
ps.setString(3, "c" + i);
ps.addBatch();
}
int count = 0;
for (int i : ps.executeBatch()) {
if (i > 0) {
count += i;
}
}
log.debug("Inserted: " + count);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
But don't see a way to use types like UInt128 with this solution
Hi @SergImmortal, sorry for the terrible document and cumbersome API.
But don't see a way to use types like UInt128 with JDBC
Please refer to type mappings at https://github.com/ClickHouse/clickhouse-java/tree/main/clickhouse-data#type-mapping
Is it possible and how to implement it with for example: ClickHouseFormat.RowBinaryWithNamesAndTypes or ClickHouseFormat.RowBinaryWithNames???
Once you're clear on the type mapping, you may follow examples at https://github.com/ClickHouse/clickhouse-java/issues/1425#issuecomment-1691394343
Hi, @zhicwu thanks you for the answer
I already try use data serialization like:
ClickHouseRequest.Mutation request = client.read(server).write().table("test").format(ClickHouseFormat.RowBinaryWithNamesAndTypes).set("send_progress_in_http_headers", 1);
ClickHouseConfig config = request.getConfig();
CompletableFuture<ClickHouseResponse> future;
List<ClickHouseColumn> columns = ClickHouseColumn.parse("a String, b Nullable(String), c String, d String");
try (ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance()
.createPipedOutputStream(config, (Runnable) null)) {
future = request.data(stream.getInputStream()).execute();
ClickHouseDataProcessor processor = ClickHouseDataStreamFactory.getInstance().getProcessor(config, null,
stream, null, columns);
ClickHouseValue[] values = new ClickHouseValue[] {
ClickHouseStringValue.ofNull(), // columns.get(0).newValue()
ClickHouseStringValue.ofNull(), // columns.get(1).newValue()
ClickHouseStringValue.ofNull(), // columns.get(2).newValue()
ClickHouseStringValue.ofNull() // columns.get(3).newValue()
};
ClickHouseSerializer[] serializers = processor.getSerializers(config, columns);
// writing happens in main thread
for (int i = 0; i < 10_000; i++) {
serializers[0].serialize(values[0].update(String.valueOf(i % 16)), stream);
serializers[1].serialize(values[1].update(UUID.randomUUID().toString()), stream);
serializers[2].serialize(values[2].update(String.format("I'm %s", i)), stream);
serializers[3].serialize(values[3].update(String.format("%s", i)), stream);
}
}
// response should be always closed
try (ClickHouseResponse response = future.get()) {
ClickHouseResponseSummary summary = response.getSummary();
log.debug("Inserted: {}", summary.getWrittenRows());
}
But for me, it works only in case when the number of columns for insert and columns that exists in the table are equal
About type mapping When I use JDBC driver I can't set some ClickHouse wrapper classes as java.sql.PreparedStatement can't handled it
Anyone figure out a workaround for this issue? @zhicwu
But for me, it works only in case when the number of columns for insert and columns that exists in the table are equal
I need to use an async interface, so not sure if the JDBC interface will work in that respect.
FWIW, if you use ClickHouseFormat.RowBinaryWithNamesAndTypes with BinaryStreamUtils, you can insert a subset of columns values into a table, but it's pretty painful to debug if something goes wrong, especially with more complex types.
You need to set the header before setting the value to use rowbinarywithnamesandtypes
https://clickhouse.com/docs/en/interfaces/formats#rowbinarywithnamesandtypes
Here is a kotlin orm part of the code I tried
val columns: List<ClickHouseColumn> = properties.map { property ->
property.isAccessible = true
val columnName = getColumnName(property)
val type = clickHouseCode2dbType(getColumnType(property), property)
ClickHouseColumn.of(columnName, type)
}
val processor = ClickHouseDataStreamFactory.getInstance().getProcessor(
config, null,
stream, null, columns
)
val serializers = processor.getSerializers(config, columns)
val values: List<ClickHouseValue> = columns.map { column ->
column.newValue(config)
}
// RowBinaryWithNamesAndTypes https://clickhouse.com/docs/en/interfaces/formats#rowbinarywithnamesandtypes
BinaryStreamUtils.writeUnsignedInt8(s, properties.size)
// Look at this.
properties.forEach { property ->
property.isAccessible = true
val columnName = getColumnName(property)
BinaryStreamUtils.writeString(s, columnName)
}
// Look at this.
properties.forEach { property ->
val type = clickHouseCode2dbType(getColumnType(property), property)
BinaryStreamUtils.writeString(s, type)
}
data.forEach { entity ->
properties.forEachIndexed { index, property ->
when (getColumnType(property)) {
ClickHouseTypes.String -> {
serializers[index].serialize(
(values[index] as ClickHouseStringValue).update(property.get(entity)), s
)
}
...
You need to set the header before setting the value to use
rowbinarywithnamesandtypeshttps://clickhouse.com/docs/en/interfaces/formats#rowbinarywithnamesandtypes Here is a kotlin orm part of the code I triedval columns: List<ClickHouseColumn> = properties.map { property -> property.isAccessible = true val columnName = getColumnName(property) val type = clickHouseCode2dbType(getColumnType(property), property) ClickHouseColumn.of(columnName, type) } val processor = ClickHouseDataStreamFactory.getInstance().getProcessor( config, null, stream, null, columns ) val serializers = processor.getSerializers(config, columns) val values: List<ClickHouseValue> = columns.map { column -> column.newValue(config) } // RowBinaryWithNamesAndTypes https://clickhouse.com/docs/en/interfaces/formats#rowbinarywithnamesandtypes BinaryStreamUtils.writeUnsignedInt8(s, properties.size) // Look at this. properties.forEach { property -> property.isAccessible = true val columnName = getColumnName(property) BinaryStreamUtils.writeString(s, columnName) } // Look at this. properties.forEach { property -> val type = clickHouseCode2dbType(getColumnType(property), property) BinaryStreamUtils.writeString(s, type) } data.forEach { entity -> properties.forEachIndexed { index, property -> when (getColumnType(property)) { ClickHouseTypes.String -> { serializers[index].serialize( (values[index] as ClickHouseStringValue).update(property.get(entity)), s ) } ...
I would like to see the implementation code of this method