clickhouse-java icon indicating copy to clipboard operation
clickhouse-java copied to clipboard

Pyspark java.io.IOException: Reached end of input stream

Open 1pyxa1 opened this issue 2 years ago • 2 comments

Hi! When using pyspark to read and save query with limit 1 million rows everything working fine. However when I try to set the limit up to 10 million rows, for example, got this error on the same query: Caused by: java.io.IOException: Reached end of input stream after reading 17 of 40 bytes

my code:

spark_config = (
    SparkConf()
    .setMaster('local')
    .setAppName('test')
    .set('spark.sql.execution.arrow.pyspark.enabled', 'true')
    .set('spark.sql.sources.partitionOverwriteMode', 'dynamic')
    .set('spark.sql.shuffle.partitions', 4)
    .set('spark.default.parallelism', 4)
    .set('spark.jars', 'clickhouse-jdbc-0.3.2-patch10-all.jar')
    .set('spark.driver.memory', '8g')
    .set('spark.executor.memory', '8g')
    .set('spark.executor.cores', 2)
    .set('spark.dynamicAllocation.maxExecutors', 4)
    .set('spark.dynamicAllocation.minExecutors', 2)
    .set('spark.kryoserializer.buffer.max', '512')
)
spark = (
    SparkSession
    .builder
    .config(conf=spark_config)
    .enableHiveSupport()
    .getOrCreate()
)

sp_test = (
    spark
    .read
    .format('jdbc')
    .option('url', 'jdbc:clickhouse://ch_addr:8123/db?ssl=true&custom_http_params=max_rows_to_read%3D100000000')
    .option('query', query)
    .option('user', connection_params['login'])
    .option('password', connection_params['password'])
    .option('driver', 'com.clickhouse.jdbc.ClickHouseDriver')
    .option('numPartitions', 1)
    .option('fetchsize', 100_000)
    .load()
)

sp_test.write.mode('overwrite').parquet("hdfs:///folder/test_data.parquet")

SQL query on one table without any joins and group or order statements.

clickhouse-jdbc-0.3.2-patch10-all.jar pyspark 3.0.0 clickhouse 22.3.6.5

Probably the #893 is somehow connected.

Below attached a part of detailed log:

22/07/01 09:12:12 DEBUG ParquetFileWriter: 146205191: footer length = 5575
22/07/01 09:12:12 DEBUG BytesUtils: write le int: 5575 => 199 21 0 0
22/07/01 09:12:12 DEBUG DataStreamer: Queued packet seqno: 2269 offsetInBlock: 11935232 lastPacketInBlock: false lastByteOffsetInBlock: 11987471, blk_2118512093_1045620111
22/07/01 09:12:12 DEBUG DataStreamer: Queued packet seqno: 2270 offsetInBlock: 11987471 lastPacketInBlock: true lastByteOffsetInBlock: 11987471, blk_2118512093_1045620111
22/07/01 09:12:12 DEBUG DataStreamer: stage=DATA_STREAMING, blk_2118512093_1045620111
22/07/01 09:12:12 DEBUG DataStreamer: blk_2118512093_1045620111 waiting for ack for: 2270
22/07/01 09:12:12 DEBUG DataStreamer: blk_2118512093_1045620111 sending packet seqno: 2269 offsetInBlock: 11935232 lastPacketInBlock: false lastByteOffsetInBlock: 11987471
22/07/01 09:12:12 DEBUG DataStreamer: stage=DATA_STREAMING, blk_2118512093_1045620111
22/07/01 09:12:12 DEBUG DataStreamer: DFSClient seqno: 2269 reply: SUCCESS reply: SUCCESS reply: SUCCESS downstreamAckTimeNanos: 464050 flag: 0 flag: 0 flag: 0
22/07/01 09:12:12 DEBUG DataStreamer: blk_2118512093_1045620111 sending packet seqno: 2270 offsetInBlock: 11987471 lastPacketInBlock: true lastByteOffsetInBlock: 11987471
22/07/01 09:12:12 DEBUG DataStreamer: DFSClient seqno: 2270 reply: SUCCESS reply: SUCCESS reply: SUCCESS downstreamAckTimeNanos: 5467820 flag: 0 flag: 0 flag: 0
22/07/01 09:12:12 DEBUG DataStreamer: Closing old block BP-176122593-[ADDRESS]-1631109548934:blk_2118512093_1045620111
22/07/01 09:12:12 DEBUG Client: IPC Client (62343880) connection to [ADDRESS]:8020 from user sending #41 org.apache.hadoop.hdfs.protocol.ClientProtocol.complete
22/07/01 09:12:12 DEBUG Client: IPC Client (62343880) connection to [ADDRESS]:8020 from user got value #41
22/07/01 09:12:12 DEBUG ProtobufRpcEngine: Call: complete took 3ms
22/07/01 09:12:12 DEBUG Client: IPC Client (62343880) connection to [ADDRESS]:8020 from user sending #42 org.apache.hadoop.hdfs.protocol.ClientProtocol.delete
22/07/01 09:12:12 DEBUG Client: IPC Client (62343880) connection to [ADDRESS]:8020 from user got value #42
22/07/01 09:12:12 DEBUG ProtobufRpcEngine: Call: delete took 3ms
22/07/01 09:12:12 ERROR FileFormatWriter: Job job_20220701091101_0001 aborted.
22/07/01 09:12:12 INFO JDBCRDD: closed connection
22/07/01 09:12:12 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:291)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:205)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.sql.SQLException: java.io.IOException: Reached end of input stream after reading 17 of 40 bytes
	at com.clickhouse.jdbc.SqlExceptionUtils.handle(SqlExceptionUtils.java:77)
	at com.clickhouse.jdbc.ClickHouseResultSet.next(ClickHouseResultSet.java:716)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:357)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:343)
	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:272)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:281)
	... 9 more
Caused by: java.io.IOException: Reached end of input stream after reading 17 of 40 bytes
	at com.clickhouse.client.stream.AbstractByteArrayInputStream.readBytes(AbstractByteArrayInputStream.java:241)
	at com.clickhouse.client.data.ClickHouseRowBinaryProcessor$MappedFunctions.lambda$buildMappingsForDataTypes$64(ClickHouseRowBinaryProcessor.java:336)
	at com.clickhouse.client.data.ClickHouseRowBinaryProcessor$MappedFunctions.deserialize(ClickHouseRowBinaryProcessor.java:466)
	at com.clickhouse.client.data.ClickHouseRowBinaryProcessor.readAndFill(ClickHouseRowBinaryProcessor.java:509)
	at com.clickhouse.client.ClickHouseDataProcessor.nextRecord(ClickHouseDataProcessor.java:143)
	at com.clickhouse.client.ClickHouseDataProcessor.access$100(ClickHouseDataProcessor.java:21)
	at com.clickhouse.client.ClickHouseDataProcessor$RecordsIterator.next(ClickHouseDataProcessor.java:36)
	at com.clickhouse.client.ClickHouseDataProcessor$RecordsIterator.next(ClickHouseDataProcessor.java:22)
	at com.clickhouse.jdbc.ClickHouseResultSet.next(ClickHouseResultSet.java:714)
	... 20 more

Also tried couple older version of jdbc driver with the same result. I would be grateful for any ideas.

1pyxa1 avatar Jul 01 '22 11:07 1pyxa1

Hi @1pyxa1, I'm not sure what exactly happened but it looks like server closed the stream somehow. Is there any error on server? You may check either log file or system.query_log table.

Update: it seems you're connecting to ClickHouse server directly according to above code, but can you also double check if there any proxy sits inbetween?

zhicwu avatar Jul 01 '22 12:07 zhicwu

Hi, sorry for late reply. indeed there was the error in system.query_log.

Code: 160. DB::Exception: Query is executing too slow: 52535.46312494424 rows/sec., minimum: 1000000: While executing MergeTreeThread. (TOO_SLOW) (version 22.3.6.5 (official build))

after I pass min_execution_speed setting got again

Caused by: java.io.IOException: Reached end of input stream after reading 17 of 40 bytes

in pyspark application and the 'real' cause in system.query_log:

Code: 160. DB::Exception: Estimated query execution time (2069.1041993063686 seconds) is too long. Maximum: 2000. Estimated rows to process: 62228407: While executing MergeTreeThread. (TOO_SLOW) (version 22.3.6.5 (official build))

is there any chance to pass this exceptions to application? it would be much convenient for debug.

1pyxa1 avatar Jul 13 '22 15:07 1pyxa1