clickhouse-java
clickhouse-java copied to clipboard
Show 'Cannot write to ostream' when reading lots of data
Describe the bug
I have a huge table about 100,000,000,000 rows and partitioned by date. I try to read the data day by day and send them to flink. Reading data of first day is ok, but when reading data of second day, error occur. This program executed good with third party clickhouse jdbc driver (clickhouse4j).
Code example
import com.clickhouse.jdbc.ClickHouseDriver;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
Class.forName(ClickHouseDriver.class.getCanonicalName());
this.connection = DriverManager.getConnection(this.clickhouseUrl, this.clickhouseUsername, this.clickhousePassword);
PreparedStatement statement = this.connection.prepareStatement(sql);
statement.setString(1, date);
ResultSet rs = statement.executeQuery();
while (rs.next()) {
// read data
}
Error log
client side:
Caused by: java.sql.BatchUpdateException: Read timed out, server ClickHouseNode [uri=http://0.0.0.0:8123/default]@-1450162100
at com.clickhouse.jdbc.SqlExceptionUtils.batchUpdateError(SqlExceptionUtils.java:107)
at com.clickhouse.jdbc.internal.SqlBasedPreparedStatement.executeAny(SqlBasedPreparedStatement.java:219)
at com.clickhouse.jdbc.internal.SqlBasedPreparedStatement.executeQuery(SqlBasedPreparedStatement.java:246)
at org.ff.source.history.HistoryClickhouseSource.run(HistoryClickhouseSource.java:85)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:116)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:73)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323)
server side:
2023.09.28 10:03:36.602875 [ 67760 ] {0356468d-88b0-4246-9557-d2aba8ccae20} <Error> executeQuery: Code: 24. DB::Exception: Cannot write to ostream at offset 1048576: While executing ParallelFormattingOutputFormat. (CANNOT_WRITE_TO_OSTREAM) (version 23.7.5.30 (official build)) (from 192.169.110.82:44730) (in query: SELECT code, exchangeCode_, date, time, transNo, transCode, orderCode_, transBS, price, volume, shortNo, buyNo, amount, inserted_time, inserted_mills FROM transaction WHERE date = '20200311' ORDER BY toInt32(time)), Stack trace (when copying this message, always include the lines below):
0. DB::Exception::Exception(DB::Exception::MessageMasked&&, int, bool) @ 0x000000000e91fbf7 in /usr/bin/clickhouse
1. ? @ 0x000000000a3a05dc in /usr/bin/clickhouse
2. DB::WriteBufferFromOStream::nextImpl() @ 0x000000000ea9ca2a in /usr/bin/clickhouse
3. DB::WriteBufferFromHTTPServerResponse::nextImpl() @ 0x000000001529ec87 in /usr/bin/clickhouse
4. ? @ 0x0000000008fb7c9d in /usr/bin/clickhouse
5. DB::CompressedWriteBuffer::nextImpl() @ 0x0000000013417a8d in /usr/bin/clickhouse
6. ? @ 0x0000000008fb7c9d in /usr/bin/clickhouse
7. DB::ParallelFormattingOutputFormat::collectorThreadFunction(std::shared_ptr<DB::ThreadGroup> const&) @ 0x000000001540e167 in /usr/bin/clickhouse
8. void std::__function::__policy_invoker<void ()>::__call_impl<std::__function::__default_alloc_func<ThreadFromGlobalPoolImpl<true>::ThreadFromGlobalPoolImpl<DB::ParallelFormattingOutputFormat::ParallelFormattingOutputFormat(DB::ParallelFormattingOutputFormat::Params)::'lambda'()>(DB::ParallelFormattingOutputFormat::ParallelFormattingOutputFormat(DB::ParallelFormattingOutputFormat::Params)::'lambda'()&&)::'lambda'(), void ()>>(std::__function::__policy_storage const*) @ 0x00000000152ac41a in /usr/bin/clickhouse
9. ThreadPoolImpl<std::thread>::worker(std::__list_iterator<std::thread, void*>) @ 0x000000000ea0708f in /usr/bin/clickhouse
10. ? @ 0x000000000ea0d081 in /usr/bin/clickhouse
11. start_thread @ 0x0000000000007ea5 in /usr/lib64/libpthread-2.17.so
12. __clone @ 0x00000000000fe8dd in /usr/lib64/libc-2.17.so
13.
2023.09.28 10:03:36.858923 [ 67760 ] {0356468d-88b0-4246-9557-d2aba8ccae20} <Error> DynamicQueryHandler: Code: 24. DB::Exception: Cannot write to ostream at offset 1048576: While executing ParallelFormattingOutputFormat. (CANNOT_WRITE_TO_OSTREAM), Stack trace (when copying this message, always include the lines below):
0. DB::Exception::Exception(DB::Exception::MessageMasked&&, int, bool) @ 0x000000000e91fbf7 in /usr/bin/clickhouse
1. ? @ 0x000000000a3a05dc in /usr/bin/clickhouse
2. DB::WriteBufferFromOStream::nextImpl() @ 0x000000000ea9ca2a in /usr/bin/clickhouse
3. DB::WriteBufferFromHTTPServerResponse::nextImpl() @ 0x000000001529ec87 in /usr/bin/clickhouse
4. ? @ 0x0000000008fb7c9d in /usr/bin/clickhouse
5. DB::CompressedWriteBuffer::nextImpl() @ 0x0000000013417a8d in /usr/bin/clickhouse
6. ? @ 0x0000000008fb7c9d in /usr/bin/clickhouse
7. DB::ParallelFormattingOutputFormat::collectorThreadFunction(std::shared_ptr<DB::ThreadGroup> const&) @ 0x000000001540e167 in /usr/bin/clickhouse
8. void std::__function::__policy_invoker<void ()>::__call_impl<std::__function::__default_alloc_func<ThreadFromGlobalPoolImpl<true>::ThreadFromGlobalPoolImpl<DB::ParallelFormattingOutputFormat::ParallelFormattingOutputFormat(DB::ParallelFormattingOutputFormat::Params)::'lambda'()>(DB::ParallelFormattingOutputFormat::ParallelFormattingOutputFormat(DB::ParallelFormattingOutputFormat::Params)::'lambda'()&&)::'lambda'(), void ()>>(std::__function::__policy_storage const*) @ 0x00000000152ac41a in /usr/bin/clickhouse
9. ThreadPoolImpl<std::thread>::worker(std::__list_iterator<std::thread, void*>) @ 0x000000000ea0708f in /usr/bin/clickhouse
10. ? @ 0x000000000ea0d081 in /usr/bin/clickhouse
11. start_thread @ 0x0000000000007ea5 in /usr/lib64/libpthread-2.17.so
12. __clone @ 0x00000000000fe8dd in /usr/lib64/libc-2.17.so
(version 23.7.5.30 (official build))
Configuration
Environment
- Client version: 0.4.6
- Language version: openjdk13.0.2
- OS: CentOS 7
ClickHouse server
- ClickHouse Server version: version 23.7.5.30 (official build)
- ClickHouse Server non-default settings, if any:
CREATE TABLEstatements for tables involved:
-- `default`.`transaction` definition
CREATE TABLE default.transaction
(
`code` String,
`exchangeCode_` String,
`date` String,
`time` String,
`transNo` String,
`transCode` String,
`orderCode_` String,
`transBS` String,
`price` UInt64,
`volume` UInt64,
`shortNo` String,
`buyNo` String,
`amount` UInt64,
`inserted_time` DateTime DEFAULT now(),
`inserted_mills` Int16 DEFAULT 0
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/01/default/transaction',
'{replica}')
PARTITION BY date
ORDER BY code
SETTINGS storage_policy = 'store_in_extra',
index_granularity = 8192;
- Sample data for all these tables, use clickhouse-obfuscator if necessary
@lucasguo hello,hava you resolved this bug?
@lucasguo hello,hava you resolved this bug?
Unfortunately, no. I had to rewrite my flink job restarting job after one day's data done instead of running continuously.