clickhouse-java
clickhouse-java copied to clipboard
NullPointerException when inserting Cap'N'Proto encoded data with ClickHouseClient
Describe the bug
When attempting to insert Cap'N'Proto binary encoded data using the edited example, a NullPointerException
is encountered.
The specific error message is:
java.lang.NullPointerException`: Cannot invoke "com.clickhouse.data.ClickHouseDataProcessor.getInputStream()" because "this.processor" is null
Steps to reproduce
- Execute a write request with Cap'N'Proto data or any other binary codec other than RowBinary.
- Attempt to get the response.
Expected behaviour
Using the same code snippet with RowBinary encoded data works as expected, returning the server response.
Code example
public long insert(List<Record> records, ClickHouseNode server) throws ClickHouseException {
if (records.isEmpty()) {
return;
}
try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP)) {
ClickHouseRequest.Mutation request = client
.read(server)
.write()
.table(TABLE_NAME)
.format(ClickHouseFormat.CapnProto)
.set("format_schema", "schema.capnp:Record")
.decompressClientRequest(ClickHouseCompression.LZ4);
ClickHouseConfig config = request.getConfig();
CompletableFuture<ClickHouseResponse> future;
// back-pressuring is not supported, you can adjust the first two arguments
try (ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance()
.createPipedOutputStream(config, (Runnable) null)) {
// in async mode, which is default, execution happens in a worker thread
future = request.data(stream.getInputStream()).execute();
records.stream()
.filter(Objects::nonNull)
.forEach(record -> {
try {
stream.write(record.getPayload());
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}
// response should be always closed
try (ClickHouseResponse response = future.get()) {
ClickHouseResponseSummary summary = response.getSummary();
return summary.getWrittenRows();
}
} catch (IOException | InterruptedException | ExecutionException e) {
throw ClickHouseException.of(e, server);
}
}
Configuration
Environment
- Client version: 0.4.6
- Language version: java 21 2023-09-19 LTS
- OS: Linux
It looks like the problem is at the ClickHouseDataStreamFactory
, where any other binary format from RowBinary will return null processor.
public ClickHouseDataProcessor getProcessor(ClickHouseDataConfig config, ClickHouseInputStream input,
ClickHouseOutputStream output, Map<String, Serializable> settings, List<ClickHouseColumn> columns)
throws IOException {
ClickHouseFormat format = ClickHouseChecker.nonNull(config, ClickHouseDataConfig.TYPE_NAME).getFormat();
ClickHouseDataProcessor processor = null;
if (ClickHouseFormat.RowBinary == format || ClickHouseFormat.RowBinaryWithNamesAndTypes == format) {
processor = new ClickHouseRowBinaryProcessor(config, input, output, columns, settings);
} else if (format.isText()) {
processor = new ClickHouseTabSeparatedProcessor(config, input, output, columns, settings);
}
return processor;
}
I would appriciate any assistance addressing this problem or offer any potential solutions or workarounds, thank you.
@ladislavmacoun Thanks for opening the issue. Can you provide a sample of data so we can reproduce.
@mzitnik Sure, here is simple example of inserting Cap'N'Proto to Clickhouse
Let's create a simple capnproto schema schema.capnp
and example data example.json
for it.
@0xe7a7d8bbc2f70135;
struct Message {
id @0 :UInt64;
data @1 :Text;
}
{
"id": 1475,
"data": "Hello, clickhouse-java!"
}
We can now encode this data to capnp using the capnp tool1
capnp convert json:binary schema.capnp Message < example.json > encoded.bin
Make sure it's properly encoded
capnp convert binary:json schema.capnp Message < encoded.bin
{"id": "1475", "data": "Hello, clickhouse-java!"}
Now, prepare Clickhouse DB and table (/var/lib/clickhouse/format_schemas/
is the default for the format_schema_path2)
docker run --rm -it \
--name clickhouse-container \
-p 8123:8123 \
-p 9000:9000 \
-v $(pwd)/schema.capnp:/var/lib/clickhouse/format_schemas/schema.capnp \
yandex/clickhouse-server
Let's create Table for the data
curl -X POST 'http://localhost:8123/' \
--data-binary "CREATE TABLE Message (id UInt64, data String) ENGINE = MergeTree() ORDER BY id;"
And insert the encoded capnp data
#!/bin/bash
url="http://localhost:8123/"
q="INSERT INTO example FORMAT CapnProto SETTINGS format_schema = \'schema.capnp:Message\'"
curl -X POST "$url?query=$q" --data-binary @encoded.bin
Now, we should now be able to query the data
curl 'http://localhost:8123/' --data-binary "SELECT * FROM Message format Vertical;"
Row 1:
──────
id: 1475
data: Hello, clickhouse-java!
Here is an example Java class to which loads this data
import com.clickhouse.client.*;
import com.clickhouse.data.ClickHouseDataStreamFactory;
import com.clickhouse.data.ClickHouseFormat;
import com.clickhouse.data.ClickHousePipedOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class ClickhouseCapnpInsertExample {
private static final String TABLE_NAME = "Message";
private static final ClickHouseNode server = ClickHouseNode.of("http://localhost:8123");
public static long insert(List<Record> records) throws ClickHouseException {
if (records.isEmpty()) {
return 0;
}
try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP)) {
ClickHouseRequest.Mutation request = client
.read(server)
.write()
.table(TABLE_NAME)
.format(ClickHouseFormat.CapnProto)
.set("format_schema", "schema.capnp:Message");
ClickHouseConfig config = request.getConfig();
CompletableFuture<ClickHouseResponse> future;
// back-pressuring is not supported, you can adjust the first two arguments
try (ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance()
.createPipedOutputStream(config, (Runnable) null)) {
// in async mode, which is default, execution happens in a worker thread
future = request.data(stream.getInputStream()).execute();
records.stream()
.filter(Objects::nonNull)
.forEach(record -> {
try {
stream.write(record.payload());
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}
// response should be always closed
try (ClickHouseResponse response = future.get()) {
ClickHouseResponseSummary summary = response.getSummary();
return summary.getWrittenRows();
}
} catch (IOException | InterruptedException | ExecutionException e) {
throw ClickHouseException.of(e, server);
}
}
public static void main(String[] args) {
if (args.length < 1) {
System.out.println("Missing encoded binary data path");
return;
}
String filePath = args[0];
try {
byte[] data = Files.readAllBytes(Paths.get(filePath));
long insertedRows = 0;
try {
insertedRows = insert(new ArrayList<>(Collections.singleton(new Record(data))));
} catch (ClickHouseException e) {
throw new RuntimeException("Error while inserting CapNProto data to clickhouse:", e);
}
System.out.println("Successfully inserted " + insertedRows + " rows.");
} catch (IOException e) {
System.out.println("An error occurred while reading the file: " + e.getMessage());
}
}
public record Record(byte[] payload) {
}
}
with these maven deps
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>test-ch-java-capnp</artifactId>
<version>1.0-SNAPSHOT</version>
<name>Archetype - test-ch-java-capnp</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-data</artifactId>
<version>0.4.6</version>
</dependency>
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-http-client</artifactId>
<version>0.4.6</version>
</dependency>
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-client</artifactId>
<version>0.4.6</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>16</source>
<target>16</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
When run with the encoded.bin data as an argument, you should get this exception
Exception in thread "main" java.lang.NullPointerException: Cannot invoke "com.clickhouse.data.ClickHouseDataProcessor.getInputStream()" because "this.processor" is null
at com.clickhouse.client.ClickHouseStreamResponse.close(ClickHouseStreamResponse.java:94)
at ClickhouseCapnpInsertExample.insert(ClickhouseCapnpInsertExample.java:58)
at ClickhouseCapnpInsertExample.main(ClickhouseCapnpInsertExample.java:75)
Hi @mzitnik, it's been a few months since the last update, and I was wondering if there are any new developments. Alternatively, could you suggest any other approaches to address this issue?