spark-bigquery-connector icon indicating copy to clipboard operation
spark-bigquery-connector copied to clipboard

Got java.lang.OutOfMemoryError: unable to create new native thread when use this connector to write data in BQ using spark streaming

Open yirhuang opened this issue 2 years ago • 7 comments

2022-07-31T02:24:09.545954 Caused by: org.apache.spark.SparkException: Task failed while writing rows.
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:288)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:198)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
  at org.apache.spark.scheduler.Task.run(Task.scala:109)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)	... 3 more
Caused by: java.lang.OutOfMemoryError: unable to create new native thread
  at java.lang.Thread.start0(Native Method)
  at java.lang.Thread.start(Thread.java:717)
  at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:957)
  at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1367)
  at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:134)
  at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.BatchHelper.queue(BatchHelper.java:161)
  at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getItemInfos(GoogleCloudStorageImpl.java:1687)
  at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.checkNoFilesConflictingWithDirs(GoogleCloudStorageFileSystem.java:1200)
  at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.create(GoogleCloudStorageFileSystem.java:251)
  at com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.createChannel(GoogleHadoopOutputStream.java:78)
  at com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.<init>(GoogleHadoopOutputStream.java:70)
  at com.google.cloud.hadoop.fs.gcs.GoogleHadoopSyncableOutputStream.<init>(GoogleHadoopSyncableOutputStream.java:180)
  at com.google.cloud.hadoop.fs.gcs.GoogleHadoopSyncableOutputStream.<init>(GoogleHadoopSyncableOutputStream.java:144)
  at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.create(GoogleHadoopFileSystemBase.java:632)
  at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1067)
  at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1048)
  at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:937)
  at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:241)
  at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:342)
  at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:302)
  at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:37)
  at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:151)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.newOutputWriter(FileFormatWriter.scala:370)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:381)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:272)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:270)
  at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1415)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:275)	... 8 more 

yirhuang avatar Aug 01 '22 08:08 yirhuang

Can you please share the connector version, spark version, java version and Dataproc image version (if relevant)?

davidrabinowitz avatar Aug 01 '22 18:08 davidrabinowitz

@davidrabinowitz Thanks for your quick update. Connection Version: spark-bigquery-with-dependencies_2.11:0.25.2 spark version: 2.4.5 java version: 1.8 Dataproc image version: not sure

Actually, I'm running the spark streaming job ingesting Kafka data to BigQuery for the duration of 1 minute. The job ran smoothly at first but after two days, it failed with the above exception 'java.lang.OutOfMemoryError: unable to create new native thread'.

And I checked the memory & CPU used in my cluster from google console page. They are all very normal, far away from the upper limit of the instances. Guess it may be caused by too many threads created by the spark-BQ-connector. Can you please help check if there is the possibility of any thread leak in this connector?

Thanks a lot!

yirhuang avatar Aug 02 '22 06:08 yirhuang

29351659618639_ pic @davidrabinowitz
I have checked the thread dump in executors. Found number of threads starting with GAX is growing with time. I have checked the connector is using related GAX lib, can you please check if any thread leak exists since the threads are always in waiting state and never got killed

yirhuang avatar Aug 04 '22 13:08 yirhuang

@davidrabinowitz Any update?

yirhuang avatar Aug 10 '22 06:08 yirhuang

Same for us. The number of active Gax-# threads can grow to 30k+ and then OOM. bq connector 0.25.2, spark 3.2.0, scala 2.12, AdoptOpenJDK-11.0.11+9

"Gax-1" #85 daemon prio=5 os_prio=0 cpu=42.17ms elapsed=20223.58s tid=0x00007f6e217fe800 nid=0x139 waiting on condition  [0x00007f6d937e4000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
	- parking to wait for  <0x000000069f48e330> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
	at java.util.concurrent.locks.LockSupport.park([email protected]/LockSupport.java:194)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await([email protected]/AbstractQueuedSynchronizer.java:2081)
	at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take([email protected]/ScheduledThreadPoolExecutor.java:1177)
	at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take([email protected]/ScheduledThreadPoolExecutor.java:899)
	at java.util.concurrent.ThreadPoolExecutor.getTask([email protected]/ThreadPoolExecutor.java:1054)
	at java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1114)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:628)
	at java.lang.Thread.run([email protected]/Thread.java:829)

"Gax-29013" #55368 daemon prio=5 os_prio=0 cpu=0.25ms elapsed=134.73s tid=0x00007f6e2c57c800 nid=0xd0cd waiting on condition  [0x00007f5149c46000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
	- parking to wait for  <0x00000006e0290e38> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
	at java.util.concurrent.locks.LockSupport.park([email protected]/LockSupport.java:194)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await([email protected]/AbstractQueuedSynchronizer.java:2081)
	at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take([email protected]/ScheduledThreadPoolExecutor.java:1177)
	at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take([email protected]/ScheduledThreadPoolExecutor.java:899)
	at java.util.concurrent.ThreadPoolExecutor.getTask([email protected]/ThreadPoolExecutor.java:1054)
	at java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1114)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:628)
	at java.lang.Thread.run([email protected]/Thread.java:829)

asiunov avatar Sep 16 '22 05:09 asiunov

In addition to 0.25.2, I've tested 0.25.1, 0.25.0, 0.26.0 - all having the same pattern - they create Gax threads and never GC them - for the example above, I've seen all the threads from Gax-1 to Gax-29013 in one threaddump.

0.24.2 works fine. It does not create Gax threads, it creates block-manager-ask-thread-pool-# and block-manager-storage-async-thread-pool-# instead, but those are GCed over time, down to 0 when nothing is running.

asiunov avatar Sep 16 '22 08:09 asiunov

Thanks fro the info - it means that the change has occurred in gax-java between version 2.13.0 and 2.17.0. We'll try to narrow it down

davidrabinowitz avatar Sep 16 '22 20:09 davidrabinowitz

Just a quick note: we're seeing this problem still with 0.27.1.

asnare avatar Nov 29 '22 13:11 asnare

Facing the same issue with 2.12-0.27.0, appreciate if anyone can share a workaround

java.lang.OutOfMemoryError: unable to create new native thread at java.lang.Thread.start0(Native Method) at java.lang.Thread.start(Thread.java:719) at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:957) at java.util.concurrent.ThreadPoolExecutor.processWorkerExit(ThreadPoolExecutor.java:1025) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750)

saipraveenpn avatar Dec 09 '22 13:12 saipraveenpn

@saipraveenpn Is it on read or on write? What happens if you change to the DIRECT write method?

davidrabinowitz avatar Dec 09 '22 16:12 davidrabinowitz

@davidrabinowitz facing this in Direct write mode. However, it did go through on a retry.

BQ Connector: spark-bigquery-with-dependencies_2.12-0.27.0 Scala Spark: 2.4.8 openjdk: 11.0.16 Dataproc Version: Custom image of 1.5-debian10

Reference Scala Spark code, please let me know if I can help with any further logs.

dataFrame.write
    .format("bigquery")
    .mode(SaveMode.Overwrite)
    .option("writeMethod", "direct")
    .option("parentproject", parentProject)
    .option("project", project)
    .option("dataset", dataset)
    .save(bqTable)

saipraveenpn avatar Dec 09 '22 16:12 saipraveenpn