featuretools icon indicating copy to clipboard operation
featuretools copied to clipboard

OSError: Timed out trying to connect to tcp://127.0.0.1:32832 after 30 s

Open dehiker opened this issue 2 years ago • 8 comments

Got an OSError after featuretools started for hours.

Part of traceback (cannot retrieve the whole log)

File "/home/zzz/.conda/envs/test/lib/python3.9/site-packages/distributed/utils.py", line 338, in sync return sync( File "/home/zzz/.conda/envs/test/lib/python3.9/site-packages/distributed/utils.py", line 405, in sync raise exc.with_traceback(tb) File "/home/zzz/.conda/envs/test/lib/python3.9/site-packages/distributed/utils.py", line 378, in f result = yield future File "/home/zzz/.conda/envs/test/lib/python3.9/site-packages/tornado/gen.py", line 762, in run value = future.result() File "/home/zzz/.conda/envs/test/lib/python3.9/site-packages/distributed/client.py", line 2270, in _scatter await self.scheduler.scatter( File "/home/zzz/.conda/envs/test/lib/python3.9/site-packages/distributed/core.py", line 1153, in send_recv_from_rpc return await send_recv(comm=comm, op=key, **kwargs) File "/home/zzz/.conda/envs/test/lib/python3.9/site-packages/distributed/core.py", line 943, in send_recv raise exc.with_traceback(tb) File "/home/zzz/.conda/envs/test/lib/python3.9/site-packages/distributed/core.py", line 769, in _handle_comm result = await result File "/home/zzz/.conda/envs/test/lib/python3.9/site-packages/distributed/scheduler.py", line 5043, in scatter keys, who_has, nbytes = await scatter_to_workers( File "/home/zzz/.conda/envs/test/lib/python3.9/site-packages/distributed/utils_comm.py", line 142, in scatter_to_workers out = await All( File "/home/zzz/.conda/envs/test/lib/python3.9/site-packages/distributed/utils.py", line 236, in All result = await tasks.next() File "/home/zzz/.conda/envs/test/lib/python3.9/site-packages/distributed/core.py", line 1150, in send_recv_from_rpc comm = await self.pool.connect(self.addr) File "/home/zzz/.conda/envs/test/lib/python3.9/site-packages/distributed/core.py", line 1371, in connect return await connect_attempt File "/home/zzz/.conda/envs/test/lib/python3.9/site-packages/distributed/core.py", line 1307, in _connect comm = await connect( File "/home/zzz/.conda/envs/test/lib/python3.9/site-packages/distributed/comm/core.py", line 317, in connect raise OSError( OSError: Timed out trying to connect to tcp://127.0.0.1:32832 after 30 s

Code Sample, a copy-pastable example to reproduce your bug.

# Your code here
feature_matrix, _ = ft.dfs(
    target_dataframe_name="users",
    cutoff_time=label_times,
    entityset=es,
    n_jobs=6,
    verbose=True,
    # some basic primitives, set in the code
    agg_primitives=["mean", "trend"],
    trans_primitives=[],
    # set in the code
    primitive_options={"trend": {...}},
)

Output of featuretools.show_info()

[paste the output of featuretools.show_info() here below this line] 2022-08-31 11:01:26,399 featuretools - WARNING Featuretools failed to load plugin tsfresh from library featuretools_tsfresh_primitives.init. For a full stack trace, set logging to debug. Featuretools version: 1.13.0 Featuretools installation directory: /home/zzz/.conda/envs/test/lib/python3.9/site-packages/featuretools

SYSTEM INFO

python: 3.9.7.final.0 python-bits: 64 OS: Linux OS-release: 3.10.0-862.el7.x86_64 machine: x86_64 processor: x86_64 byteorder: little LC_ALL: None LANG: en_US.UTF-8 LOCALE: en_US.UTF-8

INSTALLED VERSIONS

numpy: 1.21.6 pandas: 1.4.3 tqdm: 4.62.3 cloudpickle: 2.0.0 dask: 2022.7.1 distributed: 2022.7.1 psutil: 5.9.1 pip: 21.2.4 setuptools: 58.0.4

dehiker avatar Aug 31 '22 03:08 dehiker

Hi @dehiker, thanks for the report. A few questions

What version of the fsspec library do you have installed? There have been some issue with fsspec==2022.8.0.

This section of the documentation goes over using a dask cluster directly instead of through n_jobs. The cluster dashboard can be useful for diagnosing memory problems among other things.

rwedge avatar Aug 31 '22 22:08 rwedge

Thank you for your prompt reply, @rwedge !

I work with fsspec==2021.8.1. Any issue with this version?

The link you mentioned greatly inspired me. As working with quite a large file, I read in raw data in chunks and call DFS on each chunk. Having tried several times, I found the task failed roughly at the same iteration (though not exactly the same). So maybe calling DFS within loop is a bad idea? Any ideas why this happened?

Also, as the link suggests, I tried to add dask_kwargs={"diagnostics_port": 5000}, to DFS calling. But it seems I cannot open the dashborad directly. Is there anything else I have to do to get it work?

dehiker avatar Sep 03 '22 15:09 dehiker

Hi @dehiker

That version of fsspec should be fine.

Do you have a rough outline you could share of the loop you are trying?

The dashboard also needs the bokeh package installed to run the dashboard

rwedge avatar Sep 06 '22 14:09 rwedge

Thank you, @rwedge

Basically, I'll DFS hundreds of times in the loop as demostrated below. Note the code worked as expected at first, but always crashed after some iterations.

reader = pd.read_csv(filename, chunksize=20000)
for df_ in reader:
    # Create es from df_

    # DFS here
    feature_matrix, _ = ft.dfs(
        target_dataframe_name="users",
        cutoff_time=label_times,
        entityset=es,
        ...
    )

As to dashboard, I do have bokeh==2.4.3 installed. Maybe something wrong with my network setting if there's nothing else to be set.

dehiker avatar Sep 07 '22 09:09 dehiker

Hi @dehiker

To test the dashboard, you could try creating a distributed cluster directly and see if that works

from distributed import LocalCluster
cluster = LocalCluster()
cluster

reference doc: https://distributed.dask.org/en/stable/api.html#distributed.LocalCluster

You could also try using a cluster created this way to handle the computation. First get the cluster's address

cluster.scheduler.address

The add that information to dask_kwargs in dfs

ft.dfs(
    ...
    dask_kwargs = {"cluster": "scheduler address string"}

I like to run the featuretools code and the cluster in separate python interpeters / notebooks to make tracking output / errors easier.

If you try this with your looping calls approach you'll want to make another adjustment. Featuretools will publish the entityset data to the cluster so it doesn't have to transmit that data again if dfs is called again. However since in the loop approach you are making multiple entitysets, you'll probably want to remove the old entitysets once you finish calculating a feature matrix. This can be done by adding a code snippet to unpublish the data

from dask.base import tokenize
from distributed import Client
dataset_name = "EntitySet-{}".format(tokenize(es))
client = Client("scheduler address string")
client.unpublish_dataset(dataset_name)

Another thing you could try is saving the chunks feature matrix to disk each loop and combining them later. If there's a failure during the loop, you could start from the chunks that hadn't been saved yet.

rwedge avatar Sep 07 '22 20:09 rwedge

Your prompt and detailed reply is highly appreciated, and sorry for my late reply @rwedge

  • With your intuitive demo, I get dashboard work now. Hopefully it will do me a favor when I use featuretools.
  • I just got to know the method unpublish_dataset, and it seems that would be the problem. I'll get back here after testing.
  • Yup, this's what I've done now.

Thank you again for your time, bro!

dehiker avatar Sep 21 '22 03:09 dehiker

Hi @rwedge , it seems there're still something wrong with what I did.

from dask.distributed import Client
from dask.distributed import LocalCluster

cluster = LocalCluster(n_workers=6)
ft.dfs(
    ...
    dask_kwargs = {"cluster": cluster.scheduler.address}
)

dataset_name = "EntitySet-{}".format(tokenize(es))
client = Client(cluster.scheduler.address)
client.unpublish_dataset(dataset_name)

With the updated code shown above, I got a strange error:

Traceback (most recent call last): File "/home/zzz/.conda/envs/dh2111/lib/python3.9/site-packages/distributed/nanny.py", line 777, in _wait_until_connected msg = self.init_result_q.get_nowait() File "/home/zzz/.conda/envs/dh2111/lib/python3.9/multiprocessing/queues.py", line 135, in get_nowait return self.get(False) File "/home/zzz/.conda/envs/dh2111/lib/python3.9/multiprocessing/queues.py", line 116, in get raise Empty _queue.Empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last): File "/home/zzz/.conda/envs/dh2111/lib/python3.9/site-packages/distributed/utils.py", line 778, in wrapper return await func(*args, **kwargs) File "/home/zzz/.conda/envs/dh2111/lib/python3.9/site-packages/distributed/nanny.py", line 522, in _on_exit await self.instantiate() File "/home/zzz/.conda/envs/dh2111/lib/python3.9/site-packages/distributed/nanny.py", line 423, in instantiate result = await self.process.start() File "/home/zzz/.conda/envs/dh2111/lib/python3.9/site-packages/distributed/nanny.py", line 669, in start msg = await self._wait_until_connected(uid) File "/home/zzz/.conda/envs/dh2111/lib/python3.9/site-packages/distributed/nanny.py", line 779, in _wait_until_connected await asyncio.sleep(self._init_msg_interval) File "/home/zzz/.conda/envs/dh2111/lib/python3.9/asyncio/tasks.py", line 655, in sleep return await future asyncio.exceptions.CancelledError

Do you have any ideas on this?

Also, I put dfs in a function, which is actually used in a loop. Maybe no need to unpublish_dataset for this case?

BTW, could you reopen the issue, @gsheni ? I found no where to make it. Thanks

dehiker avatar Sep 21 '22 07:09 dehiker

@dehiker From your most recent code block, it's a little unclear to me what you are doing inside a loop and what you are doing outside the loop. I haven't tested this on a large dataset, but I think you would want your code flow to be something like this:

from dask.distributed import Client
from dask.distributed import LocalCluster
from dask.base import tokenize

cluster = LocalCluster(n_workers=6)
client = Client(cluster.scheduler.address)

reader = pd.read_csv(filename, chunksize=20000)
for df_ in reader:
    # Create es from df_
    es = ft.EntitySet(...)

    # DFS here
    feature_matrix, _ = ft.dfs(
        ...
        dask_kwargs = {"cluster": cluster.scheduler.address}
    )

    # Remove EntitySet from cluster
    dataset_name = "EntitySet-{}".format(tokenize(es))
    client.unpublish_dataset(dataset_name)

Is this how you have things structured?

thehomebrewnerd avatar Sep 22 '22 13:09 thehomebrewnerd

Your prompt reply is highly appreciated, @thehomebrewnerd.

Actually I put dfs and initiated cluster & client in a function, which was used in a loop. The code structure is shown below:

def ft_raw(df):
    cluster = LocalCluster(n_workers=6)

    # Create es from df_
    es = ft.EntitySet(...)

    # DFS here
    feature_matrix, _ = ft.dfs(
        ...
        dask_kwargs = {"cluster": cluster.scheduler.address}
    )

    # Remove EntitySet from cluster
    dataset_name = "EntitySet-{}".format(tokenize(es))
    client = Client(cluster.scheduler.address)
    client.unpublish_dataset(dataset_name)

reader = pd.read_csv(filename, chunksize=20000)
for df_ in reader:
    feas = ft_raw(df_)

But I think it woundn't be the cause. I also tested your code, and failed in my case. After some testings, I find I could ONLY get my code worked when setting dask_kwargs=None, but not for dask_kwargs={"dashboard_address": "0:5000"}, or even dask_kwargs={}.

It seems quite wierd.

dehiker avatar Sep 27 '22 03:09 dehiker

@dehiker I'm pretty sure you want to create the cluster outside of your for loop. With the code you have above, if you have 100 chunks in your data I think you will end up creating 100 different Dask clusters since you create a new cluster each time through your loop. For this, I think you want to reuse the same cluster each time.

I can try to reproduce your failure with a larger dataset when I have a chance. A couple more questions that might help me try to reproduce:

  • Roughly how big is the dataset you are reading in with the pd.read_csv command in terms of rows, columns and total MB?
  • What are you doing with the feature matrix that you create each time through the list? Are you serializing that to disk each time through the loop or storing it somewhere else?

thehomebrewnerd avatar Sep 27 '22 14:09 thehomebrewnerd

Thanks @thehomebrewnerd! Yeah, I know. I also tried to pass cluster & client to ft_raw, as shown below, but I got BrokenPipeError (one time), and then TimeoutError (always). I doubt if I use dask correctly, or maybe something is wrong with my environment?

def ft_raw(df, cluster, client):
    # Create es from df_
    es = ft.EntitySet(...)

    # DFS here
    feature_matrix, _ = ft.dfs(
        ...
        dask_kwargs = {"cluster": cluster.scheduler.address}
    )

    # Remove EntitySet from cluster
    dataset_name = "EntitySet-{}".format(tokenize(es))
    client.unpublish_dataset(dataset_name)

cluster = LocalCluster(n_workers=6)
client = Client(cluster.scheduler.address)

reader = pd.read_csv(filename, chunksize=20000)
for df_ in reader:
    feas = ft_raw(df_, cluster, client)

BrokenPipeError

Traceback (most recent call last): File "/home/zzz/.conda/envs/dh2111/lib/python3.9/multiprocessing/queues.py", line 251, in _feed send_bytes(obj) File "/home/zzz/.conda/envs/dh2111/lib/python3.9/multiprocessing/connection.py", line 205, in send_bytes self._send_bytes(m[offset:offset + size]) File "/home/zzz/.conda/envs/dh2111/lib/python3.9/multiprocessing/connection.py", line 416, in _send_bytes self._send(header + buf) File "/home/zzz/.conda/envs/dh2111/lib/python3.9/multiprocessing/connection.py", line 373, in _send n = write(self._handle, buf) BrokenPipeError: [Errno 32] Broken pipe

TimeoutError

File "/home/zzz/python/temp_tools/ft_amex_t.py", line 275, in ft_statements feature_matrix, _ = ft.dfs( File "/home/zzz/.conda/envs/dh2111/lib/python3.9/site-packages/featuretools/utils/entry_point.py", line 39, in function_wrapper raise e File "/home/zzz/.conda/envs/dh2111/lib/python3.9/site-packages/featuretools/utils/entry_point.py", line 32, in function_wrapper return_value = func(*args, **kwargs) File "/home/zzz/.conda/envs/dh2111/lib/python3.9/site-packages/featuretools/synthesis/dfs.py", line 283, in dfs feature_matrix = calculate_feature_matrix( File "/home/zzz/.conda/envs/dh2111/lib/python3.9/site-packages/featuretools/computational_backends/calculate_feature_matrix.py", line 316, in calculate_feature_matrix feature_matrix = parallel_calculate_chunks( File "/home/zzz/.conda/envs/dh2111/lib/python3.9/site-packages/featuretools/computational_backends/calculate_feature_matrix.py", line 786, in parallel_calculate_chunks _es = client.scatter([entityset])[0] File "/home/zzz/.conda/envs/dh2111/lib/python3.9/site-packages/distributed/client.py", line 2385,in scatter return self.sync( File "/home/zzz/.conda/envs/dh2111/lib/python3.9/site-packages/distributed/utils.py", line 338, in sync return sync( File "/home/zzz/.conda/envs/dh2111/lib/python3.9/site-packages/distributed/utils.py", line 405, in sync raise exc.with_traceback(tb) File "/home/zzz/.conda/envs/dh2111/lib/python3.9/site-packages/distributed/utils.py", line 378, in f result = yield future File "/home/zzz/.conda/envs/dh2111/lib/python3.9/site-packages/tornado/gen.py", line 762, in run value = future.result() File "/home/zzz/.conda/envs/dh2111/lib/python3.9/site-packages/distributed/client.py", line 2270,in _scatter await self.scheduler.scatter( File "/home/zzz/.conda/envs/dh2111/lib/python3.9/site-packages/distributed/core.py", line 1153, in send_recv_from_rpc return await send_recv(comm=comm, op=key, **kwargs) File "/home/zzz/.conda/envs/dh2111/lib/python3.9/site-packages/distributed/core.py", line 943, insend_recv raise exc.with_traceback(tb) File "/home/zzz/.conda/envs/dh2111/lib/python3.9/site-packages/distributed/core.py", line 769, in_handle_comm result = await result File "/home/zzz/.conda/envs/dh2111/lib/python3.9/site-packages/distributed/scheduler.py", line 5036, in scatter raise TimeoutError("No valid workers found") asyncio.exceptions.TimeoutError: No valid workers found

As to your questions:

  • It's 14G dataset with 75 columns, and I set chunksize=20000.
  • I appended the feature matrix to a local file.

Please let me know if I missed anything. Thank you so much!

dehiker avatar Sep 29 '22 10:09 dehiker

@dehiker I tried to reproduce your error without luck. I used a 13.7 GB input file. Mine didn't have as many columns as yours, so likely had many more row, and because of that I used a larger chunksize. I ran this using the code below, and it processed without error. As far as I can tell Featuretools is behaving as expected.

The only additional suggestions I can offer at this point is to make sure you are not overloading your Dask workers (make sure they each have enough memory to hold the input data and the feature matrix that gets created). You could also try experimenting with a smaller (or even larger) chunksize, or change the number of features you are generating. Without being able to reproduce your error, I'm not sure what might be causing the problem.

This code generated 510 feature matrix files, each 109MB in size and took just under an hour to run:

import featuretools as ft
import pandas as pd

from dask.distributed import Client
from dask.distributed import LocalCluster
from dask.base import tokenize

cluster = LocalCluster(n_workers=6)
client = Client(cluster.scheduler.address)

def run_ft(df, cluster, client):
    es = ft.EntitySet()
    es.add_dataframe(
        dataframe=df,
        dataframe_name="df",
        index="idx",
        make_index=True,  # data did not contain index, so had to create it
    )
    
    feature_matrix, _ = ft.dfs(
        entityset=es,
        target_dataframe_name="df",
        dask_kwargs = {"cluster": cluster.scheduler.address},
        max_depth=1,
        trans_primitives=["multiply_numeric", "year", "month", "day", "absolute"]
    )
    
    dataset_name = "EntitySet-{}".format(tokenize(es))
    client.unpublish_dataset(dataset_name)
    
    return feature_matrix

reader = pd.read_csv("./inputs/large_df.csv", chunksize=200000). # large_df is 13.7GB
for i, df in enumerate(reader):
    print(f"Starting Iteration {i}")
    feature_matrix = run_ft(df, cluster, client)
    feature_matrix.to_csv(f"./outputs/feature_matrix_{i}.csv", index=False)

Typical Dask memory usage graph: image

thehomebrewnerd avatar Sep 30 '22 14:09 thehomebrewnerd

Thank you so much @thehomebrewnerd

Yeah, maybe it was due to out of memory. After reducing n_workers from 6 to 2, and chunksize from 20000 to 10000, I got your demo code run finally. I'm gonna close this issue as it's probably not a bug in FT.

Thank you all for your time, bro!

dehiker avatar Oct 08 '22 10:10 dehiker