featuretools
featuretools copied to clipboard
OSError: Timed out trying to connect to tcp://127.0.0.1:32832 after 30 s
Got an OSError after featuretools started for hours.
Part of traceback (cannot retrieve the whole log)
Code Sample, a copy-pastable example to reproduce your bug.
# Your code here
feature_matrix, _ = ft.dfs(
target_dataframe_name="users",
cutoff_time=label_times,
entityset=es,
n_jobs=6,
verbose=True,
# some basic primitives, set in the code
agg_primitives=["mean", "trend"],
trans_primitives=[],
# set in the code
primitive_options={"trend": {...}},
)
Output of featuretools.show_info()
[paste the output of featuretools.show_info()
here below this line]
2022-08-31 11:01:26,399 featuretools - WARNING Featuretools failed to load plugin tsfresh from library featuretools_tsfresh_primitives.init. For a full stack trace, set logging to debug.
Featuretools version: 1.13.0
Featuretools installation directory: /home/zzz/.conda/envs/test/lib/python3.9/site-packages/featuretools
SYSTEM INFO
python: 3.9.7.final.0 python-bits: 64 OS: Linux OS-release: 3.10.0-862.el7.x86_64 machine: x86_64 processor: x86_64 byteorder: little LC_ALL: None LANG: en_US.UTF-8 LOCALE: en_US.UTF-8
INSTALLED VERSIONS
numpy: 1.21.6 pandas: 1.4.3 tqdm: 4.62.3 cloudpickle: 2.0.0 dask: 2022.7.1 distributed: 2022.7.1 psutil: 5.9.1 pip: 21.2.4 setuptools: 58.0.4
Hi @dehiker, thanks for the report. A few questions
What version of the fsspec
library do you have installed? There have been some issue with fsspec==2022.8.0
.
This section of the documentation goes over using a dask cluster directly instead of through n_jobs. The cluster dashboard can be useful for diagnosing memory problems among other things.
Thank you for your prompt reply, @rwedge !
I work with fsspec==2021.8.1
. Any issue with this version?
The link you mentioned greatly inspired me. As working with quite a large file, I read in raw data in chunks and call DFS on each chunk. Having tried several times, I found the task failed roughly at the same iteration (though not exactly the same). So maybe calling DFS within loop is a bad idea? Any ideas why this happened?
Also, as the link suggests, I tried to add dask_kwargs={"diagnostics_port": 5000},
to DFS calling. But it seems I cannot open the dashborad directly. Is there anything else I have to do to get it work?
Hi @dehiker
That version of fsspec
should be fine.
Do you have a rough outline you could share of the loop you are trying?
The dashboard also needs the bokeh
package installed to run the dashboard
Thank you, @rwedge
Basically, I'll DFS hundreds of times in the loop as demostrated below. Note the code worked as expected at first, but always crashed after some iterations.
reader = pd.read_csv(filename, chunksize=20000)
for df_ in reader:
# Create es from df_
# DFS here
feature_matrix, _ = ft.dfs(
target_dataframe_name="users",
cutoff_time=label_times,
entityset=es,
...
)
As to dashboard, I do have bokeh==2.4.3
installed. Maybe something wrong with my network setting if there's nothing else to be set.
Hi @dehiker
To test the dashboard, you could try creating a distributed cluster directly and see if that works
from distributed import LocalCluster
cluster = LocalCluster()
cluster
reference doc: https://distributed.dask.org/en/stable/api.html#distributed.LocalCluster
You could also try using a cluster created this way to handle the computation. First get the cluster's address
cluster.scheduler.address
The add that information to dask_kwargs in dfs
ft.dfs(
...
dask_kwargs = {"cluster": "scheduler address string"}
I like to run the featuretools code and the cluster in separate python interpeters / notebooks to make tracking output / errors easier.
If you try this with your looping calls approach you'll want to make another adjustment. Featuretools will publish the entityset data to the cluster so it doesn't have to transmit that data again if dfs
is called again. However since in the loop approach you are making multiple entitysets, you'll probably want to remove the old entitysets once you finish calculating a feature matrix. This can be done by adding a code snippet to unpublish the data
from dask.base import tokenize
from distributed import Client
dataset_name = "EntitySet-{}".format(tokenize(es))
client = Client("scheduler address string")
client.unpublish_dataset(dataset_name)
Another thing you could try is saving the chunks feature matrix to disk each loop and combining them later. If there's a failure during the loop, you could start from the chunks that hadn't been saved yet.
Your prompt and detailed reply is highly appreciated, and sorry for my late reply @rwedge
- With your intuitive demo, I get dashboard work now. Hopefully it will do me a favor when I use featuretools.
- I just got to know the method unpublish_dataset, and it seems that would be the problem. I'll get back here after testing.
- Yup, this's what I've done now.
Thank you again for your time, bro!
Hi @rwedge , it seems there're still something wrong with what I did.
from dask.distributed import Client
from dask.distributed import LocalCluster
cluster = LocalCluster(n_workers=6)
ft.dfs(
...
dask_kwargs = {"cluster": cluster.scheduler.address}
)
dataset_name = "EntitySet-{}".format(tokenize(es))
client = Client(cluster.scheduler.address)
client.unpublish_dataset(dataset_name)
With the updated code shown above, I got a strange error:
During handling of the above exception, another exception occurred:
Traceback (most recent call last): File "/home/zzz/.conda/envs/dh2111/lib/python3.9/site-packages/distributed/utils.py", line 778, in wrapper return await func(*args, **kwargs) File "/home/zzz/.conda/envs/dh2111/lib/python3.9/site-packages/distributed/nanny.py", line 522, in _on_exit await self.instantiate() File "/home/zzz/.conda/envs/dh2111/lib/python3.9/site-packages/distributed/nanny.py", line 423, in instantiate result = await self.process.start() File "/home/zzz/.conda/envs/dh2111/lib/python3.9/site-packages/distributed/nanny.py", line 669, in start msg = await self._wait_until_connected(uid) File "/home/zzz/.conda/envs/dh2111/lib/python3.9/site-packages/distributed/nanny.py", line 779, in _wait_until_connected await asyncio.sleep(self._init_msg_interval) File "/home/zzz/.conda/envs/dh2111/lib/python3.9/asyncio/tasks.py", line 655, in sleep return await future asyncio.exceptions.CancelledError
Do you have any ideas on this?
Also, I put dfs in a function, which is actually used in a loop. Maybe no need to unpublish_dataset for this case?
BTW, could you reopen the issue, @gsheni ? I found no where to make it. Thanks
@dehiker From your most recent code block, it's a little unclear to me what you are doing inside a loop and what you are doing outside the loop. I haven't tested this on a large dataset, but I think you would want your code flow to be something like this:
from dask.distributed import Client
from dask.distributed import LocalCluster
from dask.base import tokenize
cluster = LocalCluster(n_workers=6)
client = Client(cluster.scheduler.address)
reader = pd.read_csv(filename, chunksize=20000)
for df_ in reader:
# Create es from df_
es = ft.EntitySet(...)
# DFS here
feature_matrix, _ = ft.dfs(
...
dask_kwargs = {"cluster": cluster.scheduler.address}
)
# Remove EntitySet from cluster
dataset_name = "EntitySet-{}".format(tokenize(es))
client.unpublish_dataset(dataset_name)
Is this how you have things structured?
Your prompt reply is highly appreciated, @thehomebrewnerd.
Actually I put dfs and initiated cluster & client in a function, which was used in a loop. The code structure is shown below:
def ft_raw(df):
cluster = LocalCluster(n_workers=6)
# Create es from df_
es = ft.EntitySet(...)
# DFS here
feature_matrix, _ = ft.dfs(
...
dask_kwargs = {"cluster": cluster.scheduler.address}
)
# Remove EntitySet from cluster
dataset_name = "EntitySet-{}".format(tokenize(es))
client = Client(cluster.scheduler.address)
client.unpublish_dataset(dataset_name)
reader = pd.read_csv(filename, chunksize=20000)
for df_ in reader:
feas = ft_raw(df_)
But I think it woundn't be the cause. I also tested your code, and failed in my case.
After some testings, I find I could ONLY get my code worked when setting dask_kwargs=None
, but not for dask_kwargs={"dashboard_address": "0:5000"}
, or even dask_kwargs={}
.
It seems quite wierd.
@dehiker I'm pretty sure you want to create the cluster outside of your for loop. With the code you have above, if you have 100 chunks in your data I think you will end up creating 100 different Dask clusters since you create a new cluster each time through your loop. For this, I think you want to reuse the same cluster each time.
I can try to reproduce your failure with a larger dataset when I have a chance. A couple more questions that might help me try to reproduce:
- Roughly how big is the dataset you are reading in with the
pd.read_csv
command in terms of rows, columns and total MB? - What are you doing with the feature matrix that you create each time through the list? Are you serializing that to disk each time through the loop or storing it somewhere else?
Thanks @thehomebrewnerd! Yeah, I know. I also tried to pass cluster & client to ft_raw, as shown below, but I got BrokenPipeError (one time), and then TimeoutError (always). I doubt if I use dask correctly, or maybe something is wrong with my environment?
def ft_raw(df, cluster, client):
# Create es from df_
es = ft.EntitySet(...)
# DFS here
feature_matrix, _ = ft.dfs(
...
dask_kwargs = {"cluster": cluster.scheduler.address}
)
# Remove EntitySet from cluster
dataset_name = "EntitySet-{}".format(tokenize(es))
client.unpublish_dataset(dataset_name)
cluster = LocalCluster(n_workers=6)
client = Client(cluster.scheduler.address)
reader = pd.read_csv(filename, chunksize=20000)
for df_ in reader:
feas = ft_raw(df_, cluster, client)
BrokenPipeError
TimeoutError
As to your questions:
- It's 14G dataset with 75 columns, and I set chunksize=20000.
- I appended the feature matrix to a local file.
Please let me know if I missed anything. Thank you so much!
@dehiker I tried to reproduce your error without luck. I used a 13.7 GB input file. Mine didn't have as many columns as yours, so likely had many more row, and because of that I used a larger chunksize. I ran this using the code below, and it processed without error. As far as I can tell Featuretools is behaving as expected.
The only additional suggestions I can offer at this point is to make sure you are not overloading your Dask workers (make sure they each have enough memory to hold the input data and the feature matrix that gets created). You could also try experimenting with a smaller (or even larger) chunksize, or change the number of features you are generating. Without being able to reproduce your error, I'm not sure what might be causing the problem.
This code generated 510
feature matrix files, each 109MB
in size and took just under an hour to run:
import featuretools as ft
import pandas as pd
from dask.distributed import Client
from dask.distributed import LocalCluster
from dask.base import tokenize
cluster = LocalCluster(n_workers=6)
client = Client(cluster.scheduler.address)
def run_ft(df, cluster, client):
es = ft.EntitySet()
es.add_dataframe(
dataframe=df,
dataframe_name="df",
index="idx",
make_index=True, # data did not contain index, so had to create it
)
feature_matrix, _ = ft.dfs(
entityset=es,
target_dataframe_name="df",
dask_kwargs = {"cluster": cluster.scheduler.address},
max_depth=1,
trans_primitives=["multiply_numeric", "year", "month", "day", "absolute"]
)
dataset_name = "EntitySet-{}".format(tokenize(es))
client.unpublish_dataset(dataset_name)
return feature_matrix
reader = pd.read_csv("./inputs/large_df.csv", chunksize=200000). # large_df is 13.7GB
for i, df in enumerate(reader):
print(f"Starting Iteration {i}")
feature_matrix = run_ft(df, cluster, client)
feature_matrix.to_csv(f"./outputs/feature_matrix_{i}.csv", index=False)
Typical Dask memory usage graph:
Thank you so much @thehomebrewnerd
Yeah, maybe it was due to out of memory. After reducing n_workers from 6 to 2, and chunksize from 20000 to 10000, I got your demo code run finally. I'm gonna close this issue as it's probably not a bug in FT.
Thank you all for your time, bro!