dask-gateway icon indicating copy to clipboard operation
dask-gateway copied to clipboard

large-ish Dask arrays randomly breaking on k8s deployment

Open bolliger32 opened this issue 4 years ago • 6 comments
trafficstars

What happened: We have the daskhub helm chart deployed on a GCP cluster and are having a very strange issue communicating arrays from the singleuser pod to workers (i.e. calling persist on a Dask Array created from a bumpy array, or calling compute on a Dask Array). The error causes the scheduler to terminate immediately and it looks to be something to do with SSL certificates. Strangely, the occurrence of the error seems to be related to both the size and content of the arrays. For example:

import dask_gateway
import dask.array as da
import dask.distributed as dd
import numpy as np

cluster = dask_gateway.GatewayCluster()
cluster.scale(4)
client = dd.Client(cluster)

chunks = (1000,1000)
test_da = da.from_array(np.random.rand(10000,10000), chunks=chunks).persist() # --> works fine
test_da = da.from_array(np.ones((20000,20000), dtype=np.float64), chunks=chunks).persist() # --> also works fine

test_da = da.from_array(np.random.rand(20000,20000), chunks=chunks).persist() # --> scheduler immediately dies

So the 20k X 20k array of all ones works fine, but the random array does not. Also, the smaller random array works fine, while the larger random array does not.

Weirder still, the exact size that it can handle before raising the error is not entirely consistent from one try to the next. It's always within the ballpark but sometimes I can get slightly larger arrays to pass through than other times.

The error looks like this (all in warning text)

Exception in callback None()
handle: <Handle cancelled>
Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/iostream.py", line 1391, in _do_ssl_handshake
    self.socket.do_handshake()
  File "/srv/conda/envs/notebook/lib/python3.8/ssl.py", line 1309, in do_handshake
    self._sslobj.do_handshake()
ssl.SSLCertVerificationError: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: self signed certificate (_ssl.c:1124)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.8/asyncio/events.py", line 81, in _run
    self._context.run(self._callback, *self._args)
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/platform/asyncio.py", line 189, in _handle_events
    handler_func(fileobj, events)
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/iostream.py", line 696, in _handle_events
    self._handle_read()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/iostream.py", line 1478, in _handle_read
    self._do_ssl_handshake()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/iostream.py", line 1409, in _do_ssl_handshake
    return self.close(exc_info=err)
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/iostream.py", line 611, in close
    self._signal_closed()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/iostream.py", line 641, in _signal_closed
    self._ssl_connect_future.exception()
asyncio.exceptions.CancelledError
distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
asyncio.exceptions.CancelledError
Exception in callback None()
handle: <Handle cancelled>
Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/iostream.py", line 1391, in _do_ssl_handshake
    self.socket.do_handshake()
  File "/srv/conda/envs/notebook/lib/python3.8/ssl.py", line 1309, in do_handshake
    self._sslobj.do_handshake()
ssl.SSLCertVerificationError: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: self signed certificate (_ssl.c:1124)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.8/asyncio/events.py", line 81, in _run
    self._context.run(self._callback, *self._args)
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/platform/asyncio.py", line 189, in _handle_events
    handler_func(fileobj, events)
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/iostream.py", line 696, in _handle_events
    self._handle_read()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/iostream.py", line 1478, in _handle_read
    self._do_ssl_handshake()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/iostream.py", line 1409, in _do_ssl_handshake
    return self.close(exc_info=err)
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/iostream.py", line 611, in close
    self._signal_closed()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/iostream.py", line 641, in _signal_closed
    self._ssl_connect_future.exception()
asyncio.exceptions.CancelledError
Exception in callback None()
handle: <Handle cancelled>
Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/iostream.py", line 1391, in _do_ssl_handshake
    self.socket.do_handshake()
  File "/srv/conda/envs/notebook/lib/python3.8/ssl.py", line 1309, in do_handshake
    self._sslobj.do_handshake()
ssl.SSLEOFError: EOF occurred in violation of protocol (_ssl.c:1124)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.8/asyncio/events.py", line 81, in _run
    self._context.run(self._callback, *self._args)
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/platform/asyncio.py", line 189, in _handle_events
    handler_func(fileobj, events)
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/iostream.py", line 696, in _handle_events
    self._handle_read()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/iostream.py", line 1478, in _handle_read
    self._do_ssl_handshake()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/iostream.py", line 1400, in _do_ssl_handshake
    return self.close(exc_info=err)
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/iostream.py", line 611, in close
    self._signal_closed()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/iostream.py", line 641, in _signal_closed
    self._ssl_connect_future.exception()
asyncio.exceptions.CancelledError

I have tried this on several images including our own custom images, but as an example we've used the pangeo/pangeo-notebook:2020.12.08 image (for notebook, scheduler, and worker) and gotten the same error. With some images we get more or less of this warning text and may also get the following error text printed after the warning text:

---------------------------------------------------------------------------
StreamClosedError                         Traceback (most recent call last)
/opt/conda/lib/python3.8/site-packages/distributed/comm/tcp.py in read(self, deserializers)
    186         try:
--> 187             n_frames = await stream.read_bytes(8)
    188             n_frames = struct.unpack("Q", n_frames)[0]
StreamClosedError: Stream is closed
The above exception was the direct cause of the following exception:
CommClosedError                           Traceback (most recent call last)
<ipython-input-3-ba8c10876a0e> in <module>
      5         concat_dim='simulation')
      6 
----> 7 rcps_ds = rcps_ds.compute() #<-- 2.24 Gb
/opt/conda/lib/python3.8/site-packages/xarray/core/dataset.py in compute(self, **kwargs)
    904         """
    905         new = self.copy(deep=False)
--> 906         return new.load(**kwargs)
    907 
    908     def _persist_inplace(self, **kwargs) -> "Dataset":
/opt/conda/lib/python3.8/site-packages/xarray/core/dataset.py in load(self, **kwargs)
    739 
    740             # evaluate all the dask arrays simultaneously
--> 741             evaluated_data = da.compute(*lazy_data.values(), **kwargs)
    742 
    743             for k, data in zip(lazy_data, evaluated_data):
/opt/conda/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
    450         postcomputes.append(x.__dask_postcompute__())
    451 
--> 452     results = schedule(dsk, keys, **kwargs)
    453     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    454 
/opt/conda/lib/python3.8/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2723                     should_rejoin = False
   2724             try:
-> 2725                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2726             finally:
   2727                 for f in futures.values():
/opt/conda/lib/python3.8/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1984             else:
   1985                 local_worker = None
-> 1986             return self.sync(
   1987                 self._gather,
   1988                 futures,
/opt/conda/lib/python3.8/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    830             return future
    831         else:
--> 832             return sync(
    833                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    834             )
/opt/conda/lib/python3.8/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    338     if error[0]:
    339         typ, exc, tb = error[0]
--> 340         raise exc.with_traceback(tb)
    341     else:
    342         return result[0]
/opt/conda/lib/python3.8/site-packages/distributed/utils.py in f()
    322             if callback_timeout is not None:
    323                 future = asyncio.wait_for(future, callback_timeout)
--> 324             result[0] = yield future
    325         except Exception as exc:
    326             error[0] = sys.exc_info()
/opt/conda/lib/python3.8/site-packages/tornado/gen.py in run(self)
    760 
    761                     try:
--> 762                         value = future.result()
    763                     except Exception:
    764                         exc_info = sys.exc_info()
/opt/conda/lib/python3.8/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1878                 else:
   1879                     self._gather_future = future
-> 1880                 response = await future
   1881 
   1882             if response["status"] == "error":
/opt/conda/lib/python3.8/site-packages/distributed/client.py in _gather_remote(self, direct, local_worker)
   1929 
   1930             else:  # ask scheduler to gather data for us
-> 1931                 response = await retry_operation(self.scheduler.gather, keys=keys)
   1932 
   1933         return response
/opt/conda/lib/python3.8/site-packages/distributed/utils_comm.py in retry_operation(coro, operation, *args, **kwargs)
    383         dask.config.get("distributed.comm.retry.delay.max"), default="s"
    384     )
--> 385     return await retry(
    386         partial(coro, *args, **kwargs),
    387         count=retry_count,
/opt/conda/lib/python3.8/site-packages/distributed/utils_comm.py in retry(coro, count, delay_min, delay_max, jitter_fraction, retry_on_exceptions, operation)
    368                 delay *= 1 + random.random() * jitter_fraction
    369             await asyncio.sleep(delay)
--> 370     return await coro()
    371 
    372 
/opt/conda/lib/python3.8/site-packages/distributed/core.py in send_recv_from_rpc(**kwargs)
    881             name, comm.name = comm.name, "ConnectionPool." + key
    882             try:
--> 883                 result = await send_recv(comm=comm, op=key, **kwargs)
    884             finally:
    885                 self.pool.reuse(self.addr, comm)
/opt/conda/lib/python3.8/site-packages/distributed/core.py in send_recv(comm, reply, serializers, deserializers, **kwargs)
    664         await comm.write(msg, serializers=serializers, on_error="raise")
    665         if reply:
--> 666             response = await comm.read(deserializers=deserializers)
    667         else:
    668             response = None
/opt/conda/lib/python3.8/site-packages/distributed/comm/tcp.py in read(self, deserializers)
    200             self.stream = None
    201             if not shutting_down():
--> 202                 convert_stream_closed_error(self, e)
    203         else:
    204             try:
/opt/conda/lib/python3.8/site-packages/distributed/comm/tcp.py in convert_stream_closed_error(obj, exc)
    124         ) from exc
    125     else:
--> 126         raise CommClosedError("in %s: %s" % (obj, exc)) from exc
    127 
    128 
CommClosedError: in <closed TLS>: Stream is closed

What you expected to happen: Persisting to happen without causing the scheduler to terminate

Minimal Complete Verifiable Example: (see above)

Anything else we need to know?:

I was worried this was due to a network policy issue (see #352), but I disabled network policies entirely and the issue still persists. I wouldn't be surprised if this is something buried in the network config of our GCP cluster but I'm just hoping someone else has experienced this or has a hint as to what might be going on.

Environment:

  • Dask version: 2.3.0
  • dask-gateway version: 0.9.0
  • Python version: 3.8.6
  • Operating System: Linux (on GKE cluster)
  • Install method (conda, pip, source): daskhub helm chart, with a variety of images for notebook/scheduler/worker tested

bolliger32 avatar Dec 12 '20 23:12 bolliger32

Watching the traefik pod, when the scheduler goes down I see errors like:

Error during connection: readfrom tcp 10.4.1.18:57274->10.4.0.23:8786: write tcp 10.4.1.18:57274->10.4.0.23:8786: write: connection reset by peer

Error while connection to backend: dial tcp 10.4.0.23:8786: connect: connection refused
# THIS IS REPEATED A BUNCH OF TIMES

msg="subset not found for [clustername]/dask-b821f778151d4392b3961cdf577b187c" ingress=dask-b821f778151d4392b3961cdf577b187c providerName=kubernetescrd namespace=[clustername]

msg="Cannot create service: subset not found" ingress=dask-b821f778151d4392b3961cdf577b187c namespace=[clustername] servicePort=8786 providerName=kubernetescrd serviceName=dask-b821f778151d4392b3961cdf577b187c

msg="the service \"[clustername]-dask-b821f778151d4392b3961cdf577b187c-60ecaeceae9fdd403ba3@kubernetescrd\" does not exist" entryPointName=web routerName=[clustername]-dask-b821f778151d4392b3961cdf577b187c-60ecaeceae9fdd403ba3@kubernetescrd

These look to me like they are the result of the scheduler going down, rather than the cause (the warnings/errors in my original post also seem to be more result than cause, but I'm not positive).

bolliger32 avatar Dec 13 '20 00:12 bolliger32

How much memory does your scheduler have available? When re-running your example on my deployment, I saw my kernel die when running the last one.

But when I re-run it without the other working examples, I do see the scheduler pod apparently die. I don't see anything strange in the logs.

TomAugspurger avatar Dec 13 '20 02:12 TomAugspurger

Thanks @TomAugspurger that was it! I wasn't even thinking about scheduler size... Do all computed objects pass through the scheduler? I had not adjusted the default scheduler memory which was 2G, so anything slightly bigger than that (which is why I was seeing the size cap) was killing the kernel like you said. And I imagine compression was why the matrix of ones passed through ok while the random one didn't. I think we're going to error on the safe side and make the default scheduler memory/cpu requests the same as those of the singleuser pods, so anything we want to compute should be able to pass through the scheduler. Thank you!! Feel free to close unless you think this is unexpected behavior.

bolliger32 avatar Dec 13 '20 16:12 bolliger32

Perhaps we leave this open to discuss if the default resources for the scheduler should be higher? I don't know what's best here.

It'd be nice to have better logs here, though maybe the distributed scheduler processes isn't able to since it's getting killed. I wonder if kubernetes has a way to monitor pods that are killed for exceeding their memory budget.

Do all computed objects pass through the scheduler?

The task graph does get sent to the scheduler, which assigns pieces of it to the workers. Since the concrete ndarray is in the task graph as an argument to from_array(...), it does get sent to the scheduler.

TomAugspurger avatar Dec 13 '20 18:12 TomAugspurger

Starting from in memory numpy arrays to dask arrays has been cropping up a bit in recent weeks.

You could do something like the following

future = client.scatter(arr)
x = da.from_delayed(future, shape=arr.shape, dtype=arr.dtype)

There are other solutions outlined in this SO post: https://stackoverflow.com/questions/45941528/how-to-efficiently-send-a-large-numpy-array-to-the-cluster-with-dask-array

quasiben avatar Dec 14 '20 14:12 quasiben

Thanks for the thoughts @TomAugspurger and @quasiben . @quasiben - Just to clarify, the casting from NumPy arrays to dask arrays was just one example of something that broke our scheduler due to memory issues. The same issue cropped up when trying to call compute() on an object that was too big to pass through the scheduler but was small enough to be stored in memory on the client. Probably could find a way to send the chunks through sequentially when making this dask-->NumPy transition (like you were suggesting with scatter and from_delayed for the numpy-->dask direction). But it seems like calling compute() is a common workflow that it's worth trying to avoid needing a hack for using.

@TomAugspurger I do think better logs would be helpful but I think the challenge is exactly what you pointed out. The scheduler can't monitor logs when the scheduler itself is getting killed :) I don't know enough off the top of my head if there's any way to query the kubernetes API for information on pods killed due to memory use. Maybe it's possible to monitor that api from the gateway pod, and use that to provide logs when a scheduler is killed? Probably beyond my k8s understanding, but that does seem like a potential avenue.

As for default resources, I think it's probably ok as it is? I guess the other option would be trying to query the memory size of the client and setting that to be the default? This is what we're doing at the moment via the cluster_options config. But I feel like that might not necessarily be the right choice for people, so the current defaults actually seem fine to me. Perhaps just some well-placed documentation, particularly if there's no way to improve the logging for these killed-scheduler events?

bolliger32 avatar Dec 16 '20 23:12 bolliger32