clickhouse-java icon indicating copy to clipboard operation
clickhouse-java copied to clipboard

how can i insert data use grpc-client with lz4 Compression and with format RowBinary?

Open DemonJun opened this issue 3 years ago • 2 comments

my code like this:

 ClickHouseClient clickHouseClient = CkUtils.buildClient(node);
        ClickHouseRequest<?> request = clickHouseClient.connect(node);
        Pair<List<ClickHouseColumn>, Map<String, ClickHouseColumn>> tableColumns =
                CkUtils.getTableColumns(request, "default", "test");
        String columns =
                tableColumns.getKey().stream()
                        .map(ClickHouseColumn::getColumnName)
                        .collect(Collectors.joining(","));

        Mutation mutation =
                request.copy()
                        .write()
                        .query(
                                StrUtil.format("insert into test({})", columns),
                                IdUtil.fastSimpleUUID())
                        .decompressClientRequest(true, ClickHouseCompression.LZ4)
                        .format(RowBinary);

        CompletableFuture<ClickHouseResponse> future;
   
        ClickHouseDataStreamFactory instance = ClickHouseDataStreamFactory.getInstance();
        ClickHousePipedOutputStream stream =
                instance.createPipedOutputStream(mutation.getConfig(), null);
        ClickHouseInputStream input =
                ClickHouseInputStream.of(
                        stream.getInputStream(),
                        mutation.getConfig().getBufferSize(),
                        ClickHouseCompression.LZ4);
        ClickHouseDataProcessor processor =
                instance.getProcessor(
                        mutation.getConfig(),
                        input,
                        stream,
                        mutation.getSettings(),
                        tableColumns.getKey());

        // in async mode, which is default, execution happens in a worker thread
        future = mutation.data(input).execute();

        // writing happens in main thread
        for (int i = 0; i < 10; i++) {
            long now = System.currentTimeMillis();
            JSONObject data = jsonObject.clone();
            data.put("a", now);
            data.put("b", now);
            data.put("c", idGenerator.uniqueId(now));
            data.put("d", now);
            data.put("e", now);

            Map<String, String> others =
                    data.entrySet().stream()
                            .filter(e -> !tableColumns.getValue().containsKey(e.getKey()))
                            .collect(
                                    toMap(
                                            e -> escape(e.getKey()),
                                            e -> escape(e.getValue().toString())));
            data.put("other", others);
            for (ClickHouseColumn column : tableColumns.getKey()) {
                processor.write(caseToCkValue(data, column), column);
            }
        }
        stream.close();
        future.get();

but i get an error:

2022-08-17 10:47:57,880 ERROR [grpc-default-executor-2] com.clickhouse.client.logging.Slf4jLogger.error(62) - Server error: Code=618, DB::Exception: LZ4 decompression failed. LZ4F version: 100. Error: ERROR_frameType_unknown: While executing BinaryRowInputFormat
2022-08-17 10:47:57,883 ERROR [grpc-default-executor-2] com.clickhouse.client.logging.Slf4jLogger.error(62) - 0. DB::Exception::Exception(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, int, bool) @ 0xb4903fa in /usr/bin/clickhouse
1. DB::Exception::Exception<int, char const*>(int, fmt::v8::basic_format_string<char, fmt::v8::type_identity<int>::type, fmt::v8::type_identity<char const*>::type>, int&&, char const*&&) @ 0x129b13cc in /usr/bin/clickhouse
2. DB::Lz4InflatingReadBuffer::nextImpl() @ 0x129b1767 in /usr/bin/clickhouse
3. DB::RowInputFormatWithNamesAndTypes::readPrefix() @ 0x16d76aee in /usr/bin/clickhouse
4. DB::IRowInputFormat::generate() @ 0x16d68f92 in /usr/bin/clickhouse
5. DB::ISource::tryGenerate() @ 0x16cfc835 in /usr/bin/clickhouse
6. DB::ISource::work() @ 0x16cfc4da in /usr/bin/clickhouse
7. DB::ExecutionThreadContext::executeTask() @ 0x16d18ee8 in /usr/bin/clickhouse
8. DB::PipelineExecutor::executeStepImpl(unsigned long, std::__1::atomic<bool>*) @ 0x16d0cbfe in /usr/bin/clickhouse
9. DB::PipelineExecutor::executeStep(std::__1::atomic<bool>*) @ 0x16d0c400 in /usr/bin/clickhouse
10. DB::PullingPipelineExecutor::pull(DB::Chunk&) @ 0x16d1dace in /usr/bin/clickhouse
11. DB::PullingPipelineExecutor::pull(DB::Block&) @ 0x16d1de2c in /usr/bin/clickhouse
12. ? @ 0x16a19a44 in /usr/bin/clickhouse
13. ? @ 0x16a15b67 in /usr/bin/clickhouse
14. ThreadPoolImpl<std::__1::thread>::worker(std::__1::__list_iterator<std::__1::thread, void*>) @ 0xb53fc47 in /usr/bin/clickhouse
15. ? @ 0xb54367d in /usr/bin/clickhouse
16. ? @ 0x7fbfcaff0609 in ?
17. __clone @ 0x7fbfcaf15133 in ?

DemonJun avatar Aug 17 '22 02:08 DemonJun

Thanks for the report @DemonJun and apologize for the late reply.

The gRPC implementation does not support compressing client request as of now, but I'll fix that in next release. However, please be aware of the performance issue I mentioned at here, which may impact inserting as well. If there's no specific reason that you must use gRPC, I'd strongly suggest you starting with http and then switch to tcp in the future for better error handling and performance.

Lastly I want to mention that above code can be simplified using ClickHouseWriter, for example:

request.write().table("mytable").data(out -> {
  // write custom data into the stream
}).executeAndWait();

zhicwu avatar Aug 18 '22 23:08 zhicwu

it looks great! wait for it

DemonJun avatar Aug 23 '22 08:08 DemonJun

It's supported since v0.4.0, but please do not use lz4 to decompress server response because it's extremely slow. Please use zstd instead.

I think grpc is still very experimental as sometimes server complains about memory but http client works just fine.

zhicwu avatar Jan 19 '23 14:01 zhicwu