clickhouse-java icon indicating copy to clipboard operation
clickhouse-java copied to clipboard

NullPointerException when inserting Cap'N'Proto encoded data with ClickHouseClient

Open ladislavmacoun opened this issue 1 year ago • 3 comments

Describe the bug

When attempting to insert Cap'N'Proto binary encoded data using the edited example, a NullPointerException is encountered.

The specific error message is:

java.lang.NullPointerException`: Cannot invoke "com.clickhouse.data.ClickHouseDataProcessor.getInputStream()" because "this.processor" is null

Steps to reproduce

  1. Execute a write request with Cap'N'Proto data or any other binary codec other than RowBinary.
  2. Attempt to get the response.

Expected behaviour

Using the same code snippet with RowBinary encoded data works as expected, returning the server response.

Code example


  public long insert(List<Record> records, ClickHouseNode server) throws ClickHouseException {
        if (records.isEmpty()) {
            return;
        }

        try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP)) {
            ClickHouseRequest.Mutation request = client
                    .read(server)
                    .write()
                    .table(TABLE_NAME)
                    .format(ClickHouseFormat.CapnProto)
                    .set("format_schema", "schema.capnp:Record")
                    .decompressClientRequest(ClickHouseCompression.LZ4);

            ClickHouseConfig config = request.getConfig();
            CompletableFuture<ClickHouseResponse> future;
            // back-pressuring is not supported, you can adjust the first two arguments
            try (ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance()
                    .createPipedOutputStream(config, (Runnable) null)) {
                // in async mode, which is default, execution happens in a worker thread
                future = request.data(stream.getInputStream()).execute();

                records.stream()
                        .filter(Objects::nonNull)
                        .forEach(record -> {
                            try {
                                stream.write(record.getPayload());
                            } catch (IOException e) {
                                throw new RuntimeException(e);
                            }
                        });
            }

            // response should be always closed
            try (ClickHouseResponse response = future.get()) {
                ClickHouseResponseSummary summary = response.getSummary();
                return summary.getWrittenRows();
            }
        } catch (IOException | InterruptedException | ExecutionException e) {
            throw ClickHouseException.of(e, server);
        }
    }

Configuration

Environment

  • Client version: 0.4.6
  • Language version: java 21 2023-09-19 LTS
  • OS: Linux

It looks like the problem is at the ClickHouseDataStreamFactory, where any other binary format from RowBinary will return null processor.

    public ClickHouseDataProcessor getProcessor(ClickHouseDataConfig config, ClickHouseInputStream input,
            ClickHouseOutputStream output, Map<String, Serializable> settings, List<ClickHouseColumn> columns)
            throws IOException {
        ClickHouseFormat format = ClickHouseChecker.nonNull(config, ClickHouseDataConfig.TYPE_NAME).getFormat();
        ClickHouseDataProcessor processor = null;
        if (ClickHouseFormat.RowBinary == format || ClickHouseFormat.RowBinaryWithNamesAndTypes == format) {
            processor = new ClickHouseRowBinaryProcessor(config, input, output, columns, settings);
        } else if (format.isText()) {
            processor = new ClickHouseTabSeparatedProcessor(config, input, output, columns, settings);
        }
        return processor;
    }

I would appriciate any assistance addressing this problem or offer any potential solutions or workarounds, thank you.

ladislavmacoun avatar Oct 20 '23 16:10 ladislavmacoun

@ladislavmacoun Thanks for opening the issue. Can you provide a sample of data so we can reproduce.

mzitnik avatar Oct 22 '23 10:10 mzitnik

@mzitnik Sure, here is simple example of inserting Cap'N'Proto to Clickhouse

Let's create a simple capnproto schema schema.capnp and example data example.json for it.

@0xe7a7d8bbc2f70135;

struct Message {
  id @0 :UInt64;
  data @1 :Text;
}
{
    "id": 1475,
    "data": "Hello, clickhouse-java!"
}

We can now encode this data to capnp using the capnp tool1

capnp convert json:binary schema.capnp Message < example.json > encoded.bin

Make sure it's properly encoded

capnp convert binary:json schema.capnp Message < encoded.bin
{"id": "1475", "data": "Hello, clickhouse-java!"}

Now, prepare Clickhouse DB and table (/var/lib/clickhouse/format_schemas/ is the default for the format_schema_path2)

docker run --rm -it \
  --name clickhouse-container \
  -p 8123:8123 \
  -p 9000:9000 \
  -v $(pwd)/schema.capnp:/var/lib/clickhouse/format_schemas/schema.capnp \
  yandex/clickhouse-server

Let's create Table for the data

curl -X POST 'http://localhost:8123/' \
   --data-binary "CREATE TABLE Message (id UInt64, data String) ENGINE = MergeTree() ORDER BY id;"

And insert the encoded capnp data

#!/bin/bash
url="http://localhost:8123/"
q="INSERT INTO example FORMAT CapnProto SETTINGS format_schema = \'schema.capnp:Message\'"

curl -X POST "$url?query=$q" --data-binary @encoded.bin

Now, we should now be able to query the data

curl 'http://localhost:8123/' --data-binary "SELECT * FROM Message format Vertical;"

Row 1:
──────
id:   1475
data: Hello, clickhouse-java!

Here is an example Java class to which loads this data

import com.clickhouse.client.*;
import com.clickhouse.data.ClickHouseDataStreamFactory;
import com.clickhouse.data.ClickHouseFormat;
import com.clickhouse.data.ClickHousePipedOutputStream;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class ClickhouseCapnpInsertExample {
    private static final String TABLE_NAME = "Message";
    private static final ClickHouseNode server = ClickHouseNode.of("http://localhost:8123");

    public static long insert(List<Record> records) throws ClickHouseException {
        if (records.isEmpty()) {
            return 0;
        }

        try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP)) {
            ClickHouseRequest.Mutation request = client
                    .read(server)
                    .write()
                    .table(TABLE_NAME)
                    .format(ClickHouseFormat.CapnProto)
                    .set("format_schema", "schema.capnp:Message");

            ClickHouseConfig config = request.getConfig();
            CompletableFuture<ClickHouseResponse> future;
            // back-pressuring is not supported, you can adjust the first two arguments
            try (ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance()
                    .createPipedOutputStream(config, (Runnable) null)) {
                // in async mode, which is default, execution happens in a worker thread
                future = request.data(stream.getInputStream()).execute();

                records.stream()
                        .filter(Objects::nonNull)
                        .forEach(record -> {
                            try {
                                stream.write(record.payload());
                            } catch (IOException e) {
                                throw new RuntimeException(e);
                            }
                        });
            }

            // response should be always closed
            try (ClickHouseResponse response = future.get()) {
                ClickHouseResponseSummary summary = response.getSummary();
                return summary.getWrittenRows();
            }
        } catch (IOException | InterruptedException | ExecutionException e) {
            throw ClickHouseException.of(e, server);
        }
    }

    public static void main(String[] args) {
        if (args.length < 1) {
            System.out.println("Missing encoded binary data path");
            return;
        }

        String filePath = args[0];
        try {
            byte[] data = Files.readAllBytes(Paths.get(filePath));
            long insertedRows = 0;
            try {
                insertedRows = insert(new ArrayList<>(Collections.singleton(new Record(data))));
            } catch (ClickHouseException e) {
                throw new RuntimeException("Error while inserting CapNProto data to clickhouse:", e);
            }
            System.out.println("Successfully inserted " + insertedRows + " rows.");


        } catch (IOException e) {
            System.out.println("An error occurred while reading the file: " + e.getMessage());
        }
    }

    public record Record(byte[] payload) {
    }
}

with these maven deps

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>org.example</groupId>
  <artifactId>test-ch-java-capnp</artifactId>
  <version>1.0-SNAPSHOT</version>
  <name>Archetype - test-ch-java-capnp</name>
  <url>http://maven.apache.org</url>
  <dependencies>
    <dependency>
      <groupId>org.lz4</groupId>
      <artifactId>lz4-java</artifactId>
      <version>1.8.0</version>
    </dependency>
    <dependency>
      <groupId>com.clickhouse</groupId>
      <artifactId>clickhouse-data</artifactId>
      <version>0.4.6</version>
    </dependency>
    <dependency>
      <groupId>com.clickhouse</groupId>
      <artifactId>clickhouse-http-client</artifactId>
      <version>0.4.6</version>
    </dependency>
    <dependency>
      <groupId>com.clickhouse</groupId>
      <artifactId>clickhouse-client</artifactId>
      <version>0.4.6</version>
    </dependency>
  </dependencies>
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <configuration>
          <source>16</source>
          <target>16</target>
        </configuration>
      </plugin>
    </plugins>
  </build>
</project>

When run with the encoded.bin data as an argument, you should get this exception

Exception in thread "main" java.lang.NullPointerException: Cannot invoke "com.clickhouse.data.ClickHouseDataProcessor.getInputStream()" because "this.processor" is null
	at com.clickhouse.client.ClickHouseStreamResponse.close(ClickHouseStreamResponse.java:94)
	at ClickhouseCapnpInsertExample.insert(ClickhouseCapnpInsertExample.java:58)
	at ClickhouseCapnpInsertExample.main(ClickhouseCapnpInsertExample.java:75)

ladislavmacoun avatar Oct 23 '23 10:10 ladislavmacoun

Hi @mzitnik, it's been a few months since the last update, and I was wondering if there are any new developments. Alternatively, could you suggest any other approaches to address this issue?

ladislavmacoun avatar Jan 05 '24 09:01 ladislavmacoun