clickhouse-java icon indicating copy to clipboard operation
clickhouse-java copied to clipboard

Show 'Cannot write to ostream' when reading lots of data

Open lucasguo opened this issue 2 years ago • 2 comments

Describe the bug

I have a huge table about 100,000,000,000 rows and partitioned by date. I try to read the data day by day and send them to flink. Reading data of first day is ok, but when reading data of second day, error occur. This program executed good with third party clickhouse jdbc driver (clickhouse4j).

Code example

import com.clickhouse.jdbc.ClickHouseDriver;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;

Class.forName(ClickHouseDriver.class.getCanonicalName());
this.connection = DriverManager.getConnection(this.clickhouseUrl, this.clickhouseUsername, this.clickhousePassword);
PreparedStatement statement = this.connection.prepareStatement(sql);
statement.setString(1, date);
ResultSet rs = statement.executeQuery();
while (rs.next()) {
    // read data
}

Error log

client side:

Caused by: java.sql.BatchUpdateException: Read timed out, server ClickHouseNode [uri=http://0.0.0.0:8123/default]@-1450162100
	at com.clickhouse.jdbc.SqlExceptionUtils.batchUpdateError(SqlExceptionUtils.java:107)
	at com.clickhouse.jdbc.internal.SqlBasedPreparedStatement.executeAny(SqlBasedPreparedStatement.java:219)
	at com.clickhouse.jdbc.internal.SqlBasedPreparedStatement.executeQuery(SqlBasedPreparedStatement.java:246)
	at org.ff.source.history.HistoryClickhouseSource.run(HistoryClickhouseSource.java:85)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:116)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:73)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323)

server side:

2023.09.28 10:03:36.602875 [ 67760 ] {0356468d-88b0-4246-9557-d2aba8ccae20} <Error> executeQuery: Code: 24. DB::Exception: Cannot write to ostream at offset 1048576: While executing ParallelFormattingOutputFormat. (CANNOT_WRITE_TO_OSTREAM) (version 23.7.5.30 (official build)) (from 192.169.110.82:44730) (in query: SELECT code, exchangeCode_, date, time, transNo, transCode, orderCode_, transBS, price, volume, shortNo, buyNo, amount, inserted_time, inserted_mills FROM transaction WHERE date = '20200311' ORDER BY toInt32(time)), Stack trace (when copying this message, always include the lines below):

0. DB::Exception::Exception(DB::Exception::MessageMasked&&, int, bool) @ 0x000000000e91fbf7 in /usr/bin/clickhouse
1. ? @ 0x000000000a3a05dc in /usr/bin/clickhouse
2. DB::WriteBufferFromOStream::nextImpl() @ 0x000000000ea9ca2a in /usr/bin/clickhouse
3. DB::WriteBufferFromHTTPServerResponse::nextImpl() @ 0x000000001529ec87 in /usr/bin/clickhouse
4. ? @ 0x0000000008fb7c9d in /usr/bin/clickhouse
5. DB::CompressedWriteBuffer::nextImpl() @ 0x0000000013417a8d in /usr/bin/clickhouse
6. ? @ 0x0000000008fb7c9d in /usr/bin/clickhouse
7. DB::ParallelFormattingOutputFormat::collectorThreadFunction(std::shared_ptr<DB::ThreadGroup> const&) @ 0x000000001540e167 in /usr/bin/clickhouse
8. void std::__function::__policy_invoker<void ()>::__call_impl<std::__function::__default_alloc_func<ThreadFromGlobalPoolImpl<true>::ThreadFromGlobalPoolImpl<DB::ParallelFormattingOutputFormat::ParallelFormattingOutputFormat(DB::ParallelFormattingOutputFormat::Params)::'lambda'()>(DB::ParallelFormattingOutputFormat::ParallelFormattingOutputFormat(DB::ParallelFormattingOutputFormat::Params)::'lambda'()&&)::'lambda'(), void ()>>(std::__function::__policy_storage const*) @ 0x00000000152ac41a in /usr/bin/clickhouse
9. ThreadPoolImpl<std::thread>::worker(std::__list_iterator<std::thread, void*>) @ 0x000000000ea0708f in /usr/bin/clickhouse
10. ? @ 0x000000000ea0d081 in /usr/bin/clickhouse
11. start_thread @ 0x0000000000007ea5 in /usr/lib64/libpthread-2.17.so
12. __clone @ 0x00000000000fe8dd in /usr/lib64/libc-2.17.so
13. 
2023.09.28 10:03:36.858923 [ 67760 ] {0356468d-88b0-4246-9557-d2aba8ccae20} <Error> DynamicQueryHandler: Code: 24. DB::Exception: Cannot write to ostream at offset 1048576: While executing ParallelFormattingOutputFormat. (CANNOT_WRITE_TO_OSTREAM), Stack trace (when copying this message, always include the lines below):

0. DB::Exception::Exception(DB::Exception::MessageMasked&&, int, bool) @ 0x000000000e91fbf7 in /usr/bin/clickhouse
1. ? @ 0x000000000a3a05dc in /usr/bin/clickhouse
2. DB::WriteBufferFromOStream::nextImpl() @ 0x000000000ea9ca2a in /usr/bin/clickhouse
3. DB::WriteBufferFromHTTPServerResponse::nextImpl() @ 0x000000001529ec87 in /usr/bin/clickhouse
4. ? @ 0x0000000008fb7c9d in /usr/bin/clickhouse
5. DB::CompressedWriteBuffer::nextImpl() @ 0x0000000013417a8d in /usr/bin/clickhouse
6. ? @ 0x0000000008fb7c9d in /usr/bin/clickhouse
7. DB::ParallelFormattingOutputFormat::collectorThreadFunction(std::shared_ptr<DB::ThreadGroup> const&) @ 0x000000001540e167 in /usr/bin/clickhouse
8. void std::__function::__policy_invoker<void ()>::__call_impl<std::__function::__default_alloc_func<ThreadFromGlobalPoolImpl<true>::ThreadFromGlobalPoolImpl<DB::ParallelFormattingOutputFormat::ParallelFormattingOutputFormat(DB::ParallelFormattingOutputFormat::Params)::'lambda'()>(DB::ParallelFormattingOutputFormat::ParallelFormattingOutputFormat(DB::ParallelFormattingOutputFormat::Params)::'lambda'()&&)::'lambda'(), void ()>>(std::__function::__policy_storage const*) @ 0x00000000152ac41a in /usr/bin/clickhouse
9. ThreadPoolImpl<std::thread>::worker(std::__list_iterator<std::thread, void*>) @ 0x000000000ea0708f in /usr/bin/clickhouse
10. ? @ 0x000000000ea0d081 in /usr/bin/clickhouse
11. start_thread @ 0x0000000000007ea5 in /usr/lib64/libpthread-2.17.so
12. __clone @ 0x00000000000fe8dd in /usr/lib64/libc-2.17.so
 (version 23.7.5.30 (official build))

Configuration

Environment

  • Client version: 0.4.6
  • Language version: openjdk13.0.2
  • OS: CentOS 7

ClickHouse server

  • ClickHouse Server version: version 23.7.5.30 (official build)
  • ClickHouse Server non-default settings, if any:
  • CREATE TABLE statements for tables involved:
-- `default`.`transaction` definition

CREATE TABLE default.transaction
(

    `code` String,

    `exchangeCode_` String,

    `date` String,

    `time` String,

    `transNo` String,

    `transCode` String,

    `orderCode_` String,

    `transBS` String,

    `price` UInt64,

    `volume` UInt64,

    `shortNo` String,

    `buyNo` String,

    `amount` UInt64,

    `inserted_time` DateTime DEFAULT now(),

    `inserted_mills` Int16 DEFAULT 0
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/01/default/transaction',
 '{replica}')
PARTITION BY date
ORDER BY code
SETTINGS storage_policy = 'store_in_extra',
 index_granularity = 8192;

lucasguo avatar Sep 28 '23 02:09 lucasguo

@lucasguo hello,hava you resolved this bug?

zhulw76 avatar Dec 29 '23 10:12 zhulw76

@lucasguo hello,hava you resolved this bug?

Unfortunately, no. I had to rewrite my flink job restarting job after one day's data done instead of running continuously.

lucasguo avatar Feb 22 '24 01:02 lucasguo