dask-kubernetes icon indicating copy to clipboard operation
dask-kubernetes copied to clipboard

Local port forwarding failing

Open jmif opened this issue 1 year ago • 5 comments

Describe the issue:

I'm consistently getting "kubectl port forward failed" from port_forward_service using the standard KubeCluster class to create an ad hoc cluster. When this happens the port forward is successfully setup and I can connect via the port that is opened, but the is_comm_open check returns false. sock.connect_ex((ip, port)) is always returning code 22 which is essentially "invalid input". If I change the is_comm_open check to:

if await is_comm_open("", local_port, retries=100):  # pass an empty string instead of localhost

it began to temporarily work, but after about 30 minutes that also began to fail.

Minimal Complete Verifiable Example:

I'm seeing this on a very basic setup (environment posted below):

pod_spec = make_pod_spec(image='ghcr.io/dask/dask:latest',
                         memory_limit='4G', memory_request='4G',
                         cpu_limit=1, cpu_request=1)

n_workers = 4
with KubeCluster(
    pod_template=pod_spec,
    name="test",
    n_workers=3
) as cluster:

Environment:

  • Dask version: 2022.9.1
  • Dask-kubernetes version: 2022.7.0
  • Python version: 3.8
  • Operating System: MacOS 12.6
  • Install method (conda, pip, source): conda

jmif avatar Sep 28 '22 19:09 jmif

Thanks for raising this.

Could you share a little more about how your cluster is set up. Where is you Kubernetes cluster (on prem, GKE, etc)? How are you connecting to it (from a server in the same data centre, from a laptop over a home DSL connection, etc)?

jacobtomlinson avatar Sep 29 '22 15:09 jacobtomlinson

Sure thing, I'm connecting from my local MacBook Pro over a fiber connection and Kube cluster is GKE Autopilot. The connection itself is working fine but the check for the connection itself is failing. The is_comm_open retries 100 times and while it is retrying I'm able to connect to the scheduler over the port that has been successfully forwarded, but sock.connect_ex((ip, port)) continues to return code 22 during this time.

jmif avatar Sep 30 '22 19:09 jmif

Code 22 suggests that the ip or port being passed to the sock.connect_ex call are invalid. Which is a little strange and I've not seen this in my testing.

Does it print out anywhere what those values are?

jacobtomlinson avatar Oct 03 '22 13:10 jacobtomlinson

It doesn't print out but I've used a debugger to trace what is happening (I've also confirmed this is happening in a non debugger environment just to be sure). The ip being passed here is localhost, I've tried changing it to 127.0.01 as well. Port changes each time because its selected randomly, but it looks to be a reasonable port selection to me (last value was 59621). I've confirmed that the port is open locally on my machine and the connection is alive as well. If I change the method implementation to:

async def is_comm_open(ip, port, retries=10):
    time.sleep(2)
    return True

The cluster spins up successfully and I'm able to submit jobs.

jmif avatar Oct 03 '22 20:10 jmif

A similar issue pops up when trying to set up a operator.KubeCluster on a locally running Kubernetes via Docker Desktop. Adding a sleep fixes the issue there as well.

semnooij avatar Oct 15 '22 10:10 semnooij

Putting in a two second sleep isn't really a sustainable solution here. The current implementation is trying to open the port and sleeping for two seconds if it fails until a timeout is reached. Really we need to figure out why the port connection isn't working.

Are you able to call the method manually from another Python session to see if it gives any further errors?

jacobtomlinson avatar Oct 17 '22 10:10 jacobtomlinson

I believe I have an acceptable fix to this. The issue seems to be related to the state of the socket after a failed connection. If the first call to sock.connect_ex() fails, it continues to return the same error code on subsequent calls, even when a connection is "possible". If we reinitialize the socket every loop iteration, that fixes the issue with sufficiently small impact. I also dropped the loop delay down as it was contributing ~4s of start-up delay for me. I adjusted retries up to maintain the existing default.

-async def is_comm_open(ip, port, retries=10):
+async def is_comm_open(ip, port, retries=200):
-   sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    while retries > 0:
+       sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        result = sock.connect_ex((ip, port))
+       sock.close()
        if result == 0:
            return True
        else:
-           time.sleep(2)
+           time.sleep(0.1)
            retries -= 1
    return False

I also added the sock.close() call for best practices - it works without. But a nice to have.

I'm happy to open a PR if this solution is acceptable. What do you think @jacobtomlinson?

baswelsh avatar Oct 23 '22 04:10 baswelsh

That seems to resolve it for me as well. I'd suggest the following instead of the explicit closure.

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
    result = sock.connect_ex((ip, port))

semnooij avatar Oct 23 '22 14:10 semnooij

Thanks for looking at this @baswelsh I'd love to see a PR with this. Totally agree with @semnooij that a context manager is a nice way to handle this.

jacobtomlinson avatar Oct 24 '22 14:10 jacobtomlinson