spark-bigquery-connector
spark-bigquery-connector copied to clipboard
Got java.lang.OutOfMemoryError: unable to create new native thread when use this connector to write data in BQ using spark streaming
2022-07-31T02:24:09.545954 Caused by: org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:288)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:198)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) ... 3 more
Caused by: java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:717)
at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:957)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1367)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:134)
at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.BatchHelper.queue(BatchHelper.java:161)
at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getItemInfos(GoogleCloudStorageImpl.java:1687)
at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.checkNoFilesConflictingWithDirs(GoogleCloudStorageFileSystem.java:1200)
at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.create(GoogleCloudStorageFileSystem.java:251)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.createChannel(GoogleHadoopOutputStream.java:78)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.<init>(GoogleHadoopOutputStream.java:70)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopSyncableOutputStream.<init>(GoogleHadoopSyncableOutputStream.java:180)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopSyncableOutputStream.<init>(GoogleHadoopSyncableOutputStream.java:144)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.create(GoogleHadoopFileSystemBase.java:632)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1067)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1048)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:937)
at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:241)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:342)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:302)
at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:37)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:151)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.newOutputWriter(FileFormatWriter.scala:370)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:381)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:272)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:270)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1415)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:275) ... 8 more
Can you please share the connector version, spark version, java version and Dataproc image version (if relevant)?
@davidrabinowitz Thanks for your quick update. Connection Version: spark-bigquery-with-dependencies_2.11:0.25.2 spark version: 2.4.5 java version: 1.8 Dataproc image version: not sure
Actually, I'm running the spark streaming job ingesting Kafka data to BigQuery for the duration of 1 minute. The job ran smoothly at first but after two days, it failed with the above exception 'java.lang.OutOfMemoryError: unable to create new native thread'.
And I checked the memory & CPU used in my cluster from google console page. They are all very normal, far away from the upper limit of the instances. Guess it may be caused by too many threads created by the spark-BQ-connector. Can you please help check if there is the possibility of any thread leak in this connector?
Thanks a lot!
@davidrabinowitz
I have checked the thread dump in executors. Found number of threads starting with GAX is growing with time.
I have checked the connector is using related GAX lib, can you please check if any thread leak exists since the threads are always in waiting state and never got killed
@davidrabinowitz Any update?
Same for us. The number of active Gax-#
threads can grow to 30k+ and then OOM.
bq connector 0.25.2, spark 3.2.0, scala 2.12, AdoptOpenJDK-11.0.11+9
"Gax-1" #85 daemon prio=5 os_prio=0 cpu=42.17ms elapsed=20223.58s tid=0x00007f6e217fe800 nid=0x139 waiting on condition [0x00007f6d937e4000]
java.lang.Thread.State: WAITING (parking)
at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
- parking to wait for <0x000000069f48e330> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park([email protected]/LockSupport.java:194)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await([email protected]/AbstractQueuedSynchronizer.java:2081)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take([email protected]/ScheduledThreadPoolExecutor.java:1177)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take([email protected]/ScheduledThreadPoolExecutor.java:899)
at java.util.concurrent.ThreadPoolExecutor.getTask([email protected]/ThreadPoolExecutor.java:1054)
at java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1114)
at java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:628)
at java.lang.Thread.run([email protected]/Thread.java:829)
"Gax-29013" #55368 daemon prio=5 os_prio=0 cpu=0.25ms elapsed=134.73s tid=0x00007f6e2c57c800 nid=0xd0cd waiting on condition [0x00007f5149c46000]
java.lang.Thread.State: WAITING (parking)
at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
- parking to wait for <0x00000006e0290e38> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park([email protected]/LockSupport.java:194)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await([email protected]/AbstractQueuedSynchronizer.java:2081)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take([email protected]/ScheduledThreadPoolExecutor.java:1177)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take([email protected]/ScheduledThreadPoolExecutor.java:899)
at java.util.concurrent.ThreadPoolExecutor.getTask([email protected]/ThreadPoolExecutor.java:1054)
at java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1114)
at java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:628)
at java.lang.Thread.run([email protected]/Thread.java:829)
In addition to 0.25.2, I've tested 0.25.1, 0.25.0, 0.26.0 - all having the same pattern - they create Gax threads and never GC them - for the example above, I've seen all the threads from Gax-1
to Gax-29013
in one threaddump.
0.24.2 works fine. It does not create Gax threads, it creates block-manager-ask-thread-pool-#
and block-manager-storage-async-thread-pool-#
instead, but those are GCed over time, down to 0 when nothing is running.
Thanks fro the info - it means that the change has occurred in gax-java between version 2.13.0 and 2.17.0. We'll try to narrow it down
Just a quick note: we're seeing this problem still with 0.27.1.
Facing the same issue with 2.12-0.27.0, appreciate if anyone can share a workaround
java.lang.OutOfMemoryError: unable to create new native thread at java.lang.Thread.start0(Native Method) at java.lang.Thread.start(Thread.java:719) at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:957) at java.util.concurrent.ThreadPoolExecutor.processWorkerExit(ThreadPoolExecutor.java:1025) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750)
@saipraveenpn Is it on read or on write? What happens if you change to the DIRECT write method?
@davidrabinowitz facing this in Direct write mode. However, it did go through on a retry.
BQ Connector: spark-bigquery-with-dependencies_2.12-0.27.0 Scala Spark: 2.4.8 openjdk: 11.0.16 Dataproc Version: Custom image of 1.5-debian10
Reference Scala Spark code, please let me know if I can help with any further logs.
dataFrame.write
.format("bigquery")
.mode(SaveMode.Overwrite)
.option("writeMethod", "direct")
.option("parentproject", parentProject)
.option("project", project)
.option("dataset", dataset)
.save(bqTable)