sparkmonitor icon indicating copy to clipboard operation
sparkmonitor copied to clipboard

Sparkmonitor failure for kernel restart - java.net.SocketException: Broken pipe (Write failed)

Open creepysta opened this issue 3 years ago • 2 comments

Hi,

Noticed the following issue when restarting the kernel from a classic Jupyter Notebook using JEG to launch remote spark kernels in kubernetes. The sparkmonitor doesn't show up, and in the driver logs we see its a java.net.SocketException: Broken pipe (Write failed) that's being thrown and the following line -

[IPKernelApp] WARNING | No such comm: b5b03d3c1393459f9b736fb5f5dd5461

PFA the stack trace at the end.

Observations so far -

For a successful case -

  1. Comm opened
  2. Client connected
INFO:SparkMonitorKernel:Comm opened
[I 2021-12-28 06:36:19,883.883 SparkMonitorKernel] Comm opened
...
INFO:SparkMonitorKernel:Client Connected ('127.0.0.1', 35792)
[I 2021-12-28 06:36:19,914.914 SparkMonitorKernel] Client Connected ('127.0.0.1', 35792)

For failure case -

  1. Client Connected
  2. Comm opened
INFO:SparkMonitorKernel:Client Connected ('127.0.0.1', 33320)
[I 2021-12-28 05:44:11,603.603 SparkMonitorKernel] Client Connected ('127.0.0.1', 33320)
...
INFO:SparkMonitorKernel:Comm opened
[I 2021-12-28 05:44:11,760.760 SparkMonitorKernel] Comm opened

For a temporary fix, to replicate the successful case, a delay of 20secs has been placed in the CustomListener.scala before establishing the socket connection. This is to ensure the Comm opened is done before Client Connected

Thanks @akhileshram for pointing out the fix

def startConnection(): Unit = {
  try {
      Thread.sleep(20000) // added
      socket = new Socket("localhost", port.toInt)
      out = new OutputStreamWriter(socket.getOutputStream())

      ....
  }
}

Any hint, or help with this issue will help out a lot.

Error

2021-12-28 05:44:48,802 INFO  [spark-listener-group-shared] listener.JupyterSparkMonitorListener (CustomListener.scala:onJobStart(267)) - Job Start: 0
2021-12-28 05:44:48,804 ERROR [spark-listener-group-shared] listener.JupyterSparkMonitorListener (CustomListener.scala:send(86)) - Exception sending socket message:
java.net.SocketException: Broken pipe (Write failed)
	at java.base/java.net.SocketOutputStream.socketWrite0(Native Method)
	at java.base/java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:110)
	at java.base/java.net.SocketOutputStream.write(SocketOutputStream.java:150)
	at java.base/sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:233)
	at java.base/sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:312)
	at java.base/sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:316)
	at java.base/sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:153)
	at java.base/java.io.OutputStreamWriter.flush(OutputStreamWriter.java:251)
	at sparkmonitor.listener.JupyterSparkMonitorListener.send(CustomListener.scala:83)
	at sparkmonitor.listener.JupyterSparkMonitorListener.onJobStart(CustomListener.scala:269)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:37)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
	at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
	at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
	at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105)
	at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96)
	at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1381)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96)

creepysta avatar Feb 24 '22 07:02 creepysta

@creepysta : Even with the additional sleep(), are you still seeing the exception or its solved completely?

rahul26goyal avatar Mar 07 '22 04:03 rahul26goyal

@creepysta : Even with the additional sleep(), are you still seeing the exception or its solved completely?

The sleep() seems to have solved the cases where the sparkmonitor intermittently won't show up even -

  • with a new notebook start, and
  • with manually restarting the kernel from a running notebook.

But it doesn't solve the scenario when the JEG restarts the kernel.

notebook - Classic Jupyter notebook kernel - (here referring) spark driver pod (kernel) JEG - Jupyter Enterprise Gateway

creepysta avatar Mar 07 '22 05:03 creepysta