clickhouse-java
clickhouse-java copied to clipboard
Pyspark java.io.IOException: Reached end of input stream
Hi! When using pyspark to read and save query with limit 1 million rows everything working fine. However when I try to set the limit up to 10 million rows, for example, got this error on the same query:
Caused by: java.io.IOException: Reached end of input stream after reading 17 of 40 bytes
my code:
spark_config = (
SparkConf()
.setMaster('local')
.setAppName('test')
.set('spark.sql.execution.arrow.pyspark.enabled', 'true')
.set('spark.sql.sources.partitionOverwriteMode', 'dynamic')
.set('spark.sql.shuffle.partitions', 4)
.set('spark.default.parallelism', 4)
.set('spark.jars', 'clickhouse-jdbc-0.3.2-patch10-all.jar')
.set('spark.driver.memory', '8g')
.set('spark.executor.memory', '8g')
.set('spark.executor.cores', 2)
.set('spark.dynamicAllocation.maxExecutors', 4)
.set('spark.dynamicAllocation.minExecutors', 2)
.set('spark.kryoserializer.buffer.max', '512')
)
spark = (
SparkSession
.builder
.config(conf=spark_config)
.enableHiveSupport()
.getOrCreate()
)
sp_test = (
spark
.read
.format('jdbc')
.option('url', 'jdbc:clickhouse://ch_addr:8123/db?ssl=true&custom_http_params=max_rows_to_read%3D100000000')
.option('query', query)
.option('user', connection_params['login'])
.option('password', connection_params['password'])
.option('driver', 'com.clickhouse.jdbc.ClickHouseDriver')
.option('numPartitions', 1)
.option('fetchsize', 100_000)
.load()
)
sp_test.write.mode('overwrite').parquet("hdfs:///folder/test_data.parquet")
SQL query on one table without any joins and group or order statements.
clickhouse-jdbc-0.3.2-patch10-all.jar pyspark 3.0.0 clickhouse 22.3.6.5
Probably the #893 is somehow connected.
Below attached a part of detailed log:
22/07/01 09:12:12 DEBUG ParquetFileWriter: 146205191: footer length = 5575
22/07/01 09:12:12 DEBUG BytesUtils: write le int: 5575 => 199 21 0 0
22/07/01 09:12:12 DEBUG DataStreamer: Queued packet seqno: 2269 offsetInBlock: 11935232 lastPacketInBlock: false lastByteOffsetInBlock: 11987471, blk_2118512093_1045620111
22/07/01 09:12:12 DEBUG DataStreamer: Queued packet seqno: 2270 offsetInBlock: 11987471 lastPacketInBlock: true lastByteOffsetInBlock: 11987471, blk_2118512093_1045620111
22/07/01 09:12:12 DEBUG DataStreamer: stage=DATA_STREAMING, blk_2118512093_1045620111
22/07/01 09:12:12 DEBUG DataStreamer: blk_2118512093_1045620111 waiting for ack for: 2270
22/07/01 09:12:12 DEBUG DataStreamer: blk_2118512093_1045620111 sending packet seqno: 2269 offsetInBlock: 11935232 lastPacketInBlock: false lastByteOffsetInBlock: 11987471
22/07/01 09:12:12 DEBUG DataStreamer: stage=DATA_STREAMING, blk_2118512093_1045620111
22/07/01 09:12:12 DEBUG DataStreamer: DFSClient seqno: 2269 reply: SUCCESS reply: SUCCESS reply: SUCCESS downstreamAckTimeNanos: 464050 flag: 0 flag: 0 flag: 0
22/07/01 09:12:12 DEBUG DataStreamer: blk_2118512093_1045620111 sending packet seqno: 2270 offsetInBlock: 11987471 lastPacketInBlock: true lastByteOffsetInBlock: 11987471
22/07/01 09:12:12 DEBUG DataStreamer: DFSClient seqno: 2270 reply: SUCCESS reply: SUCCESS reply: SUCCESS downstreamAckTimeNanos: 5467820 flag: 0 flag: 0 flag: 0
22/07/01 09:12:12 DEBUG DataStreamer: Closing old block BP-176122593-[ADDRESS]-1631109548934:blk_2118512093_1045620111
22/07/01 09:12:12 DEBUG Client: IPC Client (62343880) connection to [ADDRESS]:8020 from user sending #41 org.apache.hadoop.hdfs.protocol.ClientProtocol.complete
22/07/01 09:12:12 DEBUG Client: IPC Client (62343880) connection to [ADDRESS]:8020 from user got value #41
22/07/01 09:12:12 DEBUG ProtobufRpcEngine: Call: complete took 3ms
22/07/01 09:12:12 DEBUG Client: IPC Client (62343880) connection to [ADDRESS]:8020 from user sending #42 org.apache.hadoop.hdfs.protocol.ClientProtocol.delete
22/07/01 09:12:12 DEBUG Client: IPC Client (62343880) connection to [ADDRESS]:8020 from user got value #42
22/07/01 09:12:12 DEBUG ProtobufRpcEngine: Call: delete took 3ms
22/07/01 09:12:12 ERROR FileFormatWriter: Job job_20220701091101_0001 aborted.
22/07/01 09:12:12 INFO JDBCRDD: closed connection
22/07/01 09:12:12 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:291)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:205)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.sql.SQLException: java.io.IOException: Reached end of input stream after reading 17 of 40 bytes
at com.clickhouse.jdbc.SqlExceptionUtils.handle(SqlExceptionUtils.java:77)
at com.clickhouse.jdbc.ClickHouseResultSet.next(ClickHouseResultSet.java:716)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:357)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:343)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:272)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:281)
... 9 more
Caused by: java.io.IOException: Reached end of input stream after reading 17 of 40 bytes
at com.clickhouse.client.stream.AbstractByteArrayInputStream.readBytes(AbstractByteArrayInputStream.java:241)
at com.clickhouse.client.data.ClickHouseRowBinaryProcessor$MappedFunctions.lambda$buildMappingsForDataTypes$64(ClickHouseRowBinaryProcessor.java:336)
at com.clickhouse.client.data.ClickHouseRowBinaryProcessor$MappedFunctions.deserialize(ClickHouseRowBinaryProcessor.java:466)
at com.clickhouse.client.data.ClickHouseRowBinaryProcessor.readAndFill(ClickHouseRowBinaryProcessor.java:509)
at com.clickhouse.client.ClickHouseDataProcessor.nextRecord(ClickHouseDataProcessor.java:143)
at com.clickhouse.client.ClickHouseDataProcessor.access$100(ClickHouseDataProcessor.java:21)
at com.clickhouse.client.ClickHouseDataProcessor$RecordsIterator.next(ClickHouseDataProcessor.java:36)
at com.clickhouse.client.ClickHouseDataProcessor$RecordsIterator.next(ClickHouseDataProcessor.java:22)
at com.clickhouse.jdbc.ClickHouseResultSet.next(ClickHouseResultSet.java:714)
... 20 more
Also tried couple older version of jdbc driver with the same result. I would be grateful for any ideas.
Hi @1pyxa1, I'm not sure what exactly happened but it looks like server closed the stream somehow. Is there any error on server? You may check either log file or system.query_log
table.
Update: it seems you're connecting to ClickHouse server directly according to above code, but can you also double check if there any proxy sits inbetween?
Hi,
sorry for late reply. indeed there was the error in system.query_log
.
Code: 160. DB::Exception: Query is executing too slow: 52535.46312494424 rows/sec., minimum: 1000000: While executing MergeTreeThread. (TOO_SLOW) (version 22.3.6.5 (official build))
after I pass min_execution_speed
setting got again
Caused by: java.io.IOException: Reached end of input stream after reading 17 of 40 bytes
in pyspark application and the 'real' cause in system.query_log
:
Code: 160. DB::Exception: Estimated query execution time (2069.1041993063686 seconds) is too long. Maximum: 2000. Estimated rows to process: 62228407: While executing MergeTreeThread. (TOO_SLOW) (version 22.3.6.5 (official build))
is there any chance to pass this exceptions to application? it would be much convenient for debug.