featuretools icon indicating copy to clipboard operation
featuretools copied to clipboard

Fix: Future instantiation in parallel_calculate_chunks for correct cl…

Open gitgud5000 opened this issue 9 months ago • 1 comments

Pull Request Description

This PR resolves an issue in Featuretools where using distributed>=2024.8.2 causes a crash due to an incorrect instantiation of Future without passing a client instance.

🔧 Bug Description

As reported by @BohdanBilonoh, the latest Featuretools release fails when used with Dask versions >=2024.8.2. The issue occurs due to the following line in calculate_feature_matrix.py:

https://github.com/kedro-org/kedro-plugins/blob/552b973a256c0f4a9f96e36feb70f4fc15fb371b/kedro-datasets/kedro_datasets/tensorflow/tensorflow_model_dataset.py#L168-L172

https://github.com/alteryx/featuretools/blob/938a0f6ccb98eaf21eca89830a25be2358a75db7/featuretools/computational_backends/calculate_feature_matrix.py#L746-L748

The above code leads to an AttributeError because Dask's API now requires Future to be explicitly associated with a client instance.

Fix Implementation

Updating the Future instantiation to include the client resolves the issue:

num_scattered_workers = len(
    client.who_has([Future(es_token, client=client)]).get(es_token, []),
)

Testing & Verification

  • Successfully tested fix with featuretools==1.31.0 and distributed==2025.2.0.
  • Verified Featuretools runs successfully with the modified code.
  • No breaking changes introduced.

References & Related Issues

  • Issue: #2733

  • Related Dask Function: [distributed.client.py#L322](https://github.com/dask/distributed/blob/7ab72493b6df194bf6cb86ec1c4ac7c27cf1b3a1/distributed/client.py#L322)

Error

View log output
╭─────────────────────────────── Traceback (most recent call last) ────────────────────────────────╮
│ in <module>:4                                                                                    │
│                                                                                                  │
│   1 FEATURE_ONLY = True                                                                         │
│   2 FEATURE_ONLY = False                                                                        │
│    3 # columns_to_ignore = ["ID_UEN"]                                                            │
│ ❱  4 out = ft.dfs(                                                                               │
│   5 │   entityset=es,                                                                           │
│   6 │   cutoff_time=cutoff_times,                                                               │
│   7 │   cutoff_time_in_index=True,                                                              │
│                                                                                                  │
│ /anaconda/envs/fraude_mp_env/lib/python3.12/site-packages/featuretools/utils/entry_point.py:39   │
│ in function_wrapper                                                                              │
│                                                                                                  │
│   36 │   │   │   │   # send error                                                                │
│   37 │   │   │   │   for ep in entry_points:                                                     │
│   38 │   │   │   │   │   ep.on_error(error=e, runtime=runtime)                                   │
│ ❱ 39 │   │   │   │   raise e                                                                     │
│   40 │   │   │                                                                                   │
│   41 │   │   │   # send return value                                                             │
│   42 │   │   │   for ep in entry_points:                                                         │
│                                                                                                  │
│ /anaconda/envs/fraude_mp_env/lib/python3.12/site-packages/featuretools/utils/entry_point.py:32   │
│ in function_wrapper                                                                              │
│                                                                                                  │
│   29 │   │   │   try:                                                                            │
│   30 │   │   │   │   # call function                                                             │
│   31 │   │   │   │   start = time.time()                                                         │
│ ❱ 32 │   │   │   │   return_value = func(*args, **kwargs)                                        │
│   33 │   │   │   │   runtime = time.time() - start                                               │
│   34 │   │   │   except Exception as e:                                                          │
│   35 │   │   │   │   runtime = time.time() - start                                               │
│                                                                                                  │
│ /anaconda/envs/fraude_mp_env/lib/python3.12/site-packages/featuretools/synthesis/dfs.py:283 in   │
│ dfs                                                                                              │
│                                                                                                  │
│   280 │   │   features != []                                                                     │
│   281 │   ), "No features can be generated from the specified primitives. Please make sure the   │
│   282 │                                                                                          │
│ ❱ 283 │   feature_matrix = calculate_feature_matrix(                                             │
│   284 │   │   features,                                                                          │
│   285 │   │   entityset=entityset,                                                               │
│   286 │   │   cutoff_time=cutoff_time,                                                           │
│                                                                                                  │
│ /anaconda/envs/fraude_mp_env/lib/python3.12/site-packages/featuretools/computational_backends/ca │
│ lculate_feature_matrix.py:298 in calculate_feature_matrix                                        │
│                                                                                                  │
│   295 │                                                                                          │
│   296 │   with make_tqdm_iterator(**tqdm_options) as progress_bar:                               │
│   297 │   │   if n_jobs != 1 or dask_kwargs is not None:                                         │
│ ❱ 298 │   │   │   feature_matrix = parallel_calculate_chunks(                                    │
│   299 │   │   │   │   cutoff_time=cutoff_time_to_pass,                                           │
│   300 │   │   │   │   chunk_size=chunk_size,                                                     │
│   301 │   │   │   │   feature_set=feature_set,                                                   │
│                                                                                                  │
│ /anaconda/envs/fraude_mp_env/lib/python3.12/site-packages/featuretools/computational_backends/ca │
│ lculate_feature_matrix.py:747 in parallel_calculate_chunks                                       │
│                                                                                                  │
│   744 │   │   _saved_features = client.scatter(pickled_feats)                                    │
│   745 │   │   client.replicate([_es, _saved_features])                                           │
│   746 │   │   num_scattered_workers = len(                                                       │
│ ❱ 747 │   │   │   client.who_has([Future(es_token)]).get(es_token, []),                          │
│   748 │   │   )                                                                                  │
│   749 │   │   num_workers = len(client.scheduler_info()["workers"].values())                     │
│   750                                                                                            │
│                                                                                                  │
│ /anaconda/envs/fraude_mp_env/lib/python3.12/site-packages/distributed/client.py:4203 in who_has  │
│                                                                                                  │
│   4200 │   │   Client.nthreads                                                                   │
│   4201 │   │   """                                                                               │
│   4202 │   │   if futures is not None:                                                           │
│ ❱ 4203 │   │   │   futures = self.futures_of(futures)                                            │
│   4204 │   │   │   keys = list({f.key for f in futures})                                         │
│   4205 │   │   else:                                                                             │
│   4206 │   │   │   keys = None                                                                   │
│                                                                                                  │
│ /anaconda/envs/fraude_mp_env/lib/python3.12/site-packages/distributed/client.py:4912 in          │
│ futures_of                                                                                       │
│                                                                                                  │
│   4909 │   │   futures : tuple                                                                   │
│   4910 │   │   │   The futures                                                                   │
│   4911 │   │   """                                                                               │
│ ❱ 4912 │   │   return futures_of(futures, client=self)                                           │
│   4913 │                                                                                         │
│   4914 │   @classmethod                                                                          │
│   4915 │   def _expand_key(cls, k):                                                              │
│                                                                                                  │
│ /anaconda/envs/fraude_mp_env/lib/python3.12/site-packages/distributed/client.py:6132 in          │
│ futures_of                                                                                       │
│                                                                                                  │
│   6129 │   if client is not None:                                                                │
│   6130 │   │   cancelled_errors = defaultdict(list)                                              │
│   6131 │   │   for f in futures:                                                                 │
│ ❱ 6132 │   │   │   if not f.cancelled():                                                         │
│   6133 │   │   │   │   continue                                                                  │
│   6134 │   │   │   exception = f._state.exception                                                │
│   6135 │   │   │   assert isinstance(exception, FutureCancelledError)                            │
│                                                                                                  │
│ /anaconda/envs/fraude_mp_env/lib/python3.12/site-packages/distributed/client.py:520 in cancelled │
│                                                                                                  │
│   517 │   │   bool                                                                              │
│   518 │   │   │   True if the future was 'cancelled', otherwise False                           │
│   519 │   │   """                                                                               │
│ ❱  520 │   │   return self._state.status == "cancelled"                                          │
│   521 │                                                                                         │
│   522 │   async def _traceback(self):                                                           │
│   523 │   │   await self._state.wait()                                                          │
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
AttributeError: 'NoneType' object has no attribute 'status'

After creating the pull request: in order to pass the release_notes_updated check you will need to update the "Future Release" section of docs/source/release_notes.rst to include this pull request.

gitgud5000 avatar Feb 28 '25 08:02 gitgud5000

CLA assistant check
All committers have signed the CLA.

CLAassistant avatar Feb 28 '25 08:02 CLAassistant