featuretools
featuretools copied to clipboard
Fix: Future instantiation in parallel_calculate_chunks for correct cl…
Pull Request Description
This PR resolves an issue in Featuretools where using distributed>=2024.8.2 causes a crash due to an incorrect instantiation of Future without passing a client instance.
🔧 Bug Description
As reported by @BohdanBilonoh, the latest Featuretools release fails when used with Dask versions >=2024.8.2. The issue occurs due to the following line in calculate_feature_matrix.py:
https://github.com/kedro-org/kedro-plugins/blob/552b973a256c0f4a9f96e36feb70f4fc15fb371b/kedro-datasets/kedro_datasets/tensorflow/tensorflow_model_dataset.py#L168-L172
https://github.com/alteryx/featuretools/blob/938a0f6ccb98eaf21eca89830a25be2358a75db7/featuretools/computational_backends/calculate_feature_matrix.py#L746-L748
The above code leads to an AttributeError because Dask's API now requires Future to be explicitly associated with a client instance.
Fix Implementation
Updating the Future instantiation to include the client resolves the issue:
num_scattered_workers = len(
client.who_has([Future(es_token, client=client)]).get(es_token, []),
)
Testing & Verification
- Successfully tested fix with
featuretools==1.31.0anddistributed==2025.2.0. - Verified Featuretools runs successfully with the modified code.
- No breaking changes introduced.
References & Related Issues
-
Issue: #2733
-
Related Dask Function:
[distributed.client.py#L322](https://github.com/dask/distributed/blob/7ab72493b6df194bf6cb86ec1c4ac7c27cf1b3a1/distributed/client.py#L322)
Error
View log output
╭─────────────────────────────── Traceback (most recent call last) ────────────────────────────────╮
│ in <module>:4 │
│ │
│ 1 FEATURE_ONLY = True │
│ 2 FEATURE_ONLY = False │
│ 3 # columns_to_ignore = ["ID_UEN"] │
│ ❱ 4 out = ft.dfs( │
│ 5 │ entityset=es, │
│ 6 │ cutoff_time=cutoff_times, │
│ 7 │ cutoff_time_in_index=True, │
│ │
│ /anaconda/envs/fraude_mp_env/lib/python3.12/site-packages/featuretools/utils/entry_point.py:39 │
│ in function_wrapper │
│ │
│ 36 │ │ │ │ # send error │
│ 37 │ │ │ │ for ep in entry_points: │
│ 38 │ │ │ │ │ ep.on_error(error=e, runtime=runtime) │
│ ❱ 39 │ │ │ │ raise e │
│ 40 │ │ │ │
│ 41 │ │ │ # send return value │
│ 42 │ │ │ for ep in entry_points: │
│ │
│ /anaconda/envs/fraude_mp_env/lib/python3.12/site-packages/featuretools/utils/entry_point.py:32 │
│ in function_wrapper │
│ │
│ 29 │ │ │ try: │
│ 30 │ │ │ │ # call function │
│ 31 │ │ │ │ start = time.time() │
│ ❱ 32 │ │ │ │ return_value = func(*args, **kwargs) │
│ 33 │ │ │ │ runtime = time.time() - start │
│ 34 │ │ │ except Exception as e: │
│ 35 │ │ │ │ runtime = time.time() - start │
│ │
│ /anaconda/envs/fraude_mp_env/lib/python3.12/site-packages/featuretools/synthesis/dfs.py:283 in │
│ dfs │
│ │
│ 280 │ │ features != [] │
│ 281 │ ), "No features can be generated from the specified primitives. Please make sure the │
│ 282 │ │
│ ❱ 283 │ feature_matrix = calculate_feature_matrix( │
│ 284 │ │ features, │
│ 285 │ │ entityset=entityset, │
│ 286 │ │ cutoff_time=cutoff_time, │
│ │
│ /anaconda/envs/fraude_mp_env/lib/python3.12/site-packages/featuretools/computational_backends/ca │
│ lculate_feature_matrix.py:298 in calculate_feature_matrix │
│ │
│ 295 │ │
│ 296 │ with make_tqdm_iterator(**tqdm_options) as progress_bar: │
│ 297 │ │ if n_jobs != 1 or dask_kwargs is not None: │
│ ❱ 298 │ │ │ feature_matrix = parallel_calculate_chunks( │
│ 299 │ │ │ │ cutoff_time=cutoff_time_to_pass, │
│ 300 │ │ │ │ chunk_size=chunk_size, │
│ 301 │ │ │ │ feature_set=feature_set, │
│ │
│ /anaconda/envs/fraude_mp_env/lib/python3.12/site-packages/featuretools/computational_backends/ca │
│ lculate_feature_matrix.py:747 in parallel_calculate_chunks │
│ │
│ 744 │ │ _saved_features = client.scatter(pickled_feats) │
│ 745 │ │ client.replicate([_es, _saved_features]) │
│ 746 │ │ num_scattered_workers = len( │
│ ❱ 747 │ │ │ client.who_has([Future(es_token)]).get(es_token, []), │
│ 748 │ │ ) │
│ 749 │ │ num_workers = len(client.scheduler_info()["workers"].values()) │
│ 750 │
│ │
│ /anaconda/envs/fraude_mp_env/lib/python3.12/site-packages/distributed/client.py:4203 in who_has │
│ │
│ 4200 │ │ Client.nthreads │
│ 4201 │ │ """ │
│ 4202 │ │ if futures is not None: │
│ ❱ 4203 │ │ │ futures = self.futures_of(futures) │
│ 4204 │ │ │ keys = list({f.key for f in futures}) │
│ 4205 │ │ else: │
│ 4206 │ │ │ keys = None │
│ │
│ /anaconda/envs/fraude_mp_env/lib/python3.12/site-packages/distributed/client.py:4912 in │
│ futures_of │
│ │
│ 4909 │ │ futures : tuple │
│ 4910 │ │ │ The futures │
│ 4911 │ │ """ │
│ ❱ 4912 │ │ return futures_of(futures, client=self) │
│ 4913 │ │
│ 4914 │ @classmethod │
│ 4915 │ def _expand_key(cls, k): │
│ │
│ /anaconda/envs/fraude_mp_env/lib/python3.12/site-packages/distributed/client.py:6132 in │
│ futures_of │
│ │
│ 6129 │ if client is not None: │
│ 6130 │ │ cancelled_errors = defaultdict(list) │
│ 6131 │ │ for f in futures: │
│ ❱ 6132 │ │ │ if not f.cancelled(): │
│ 6133 │ │ │ │ continue │
│ 6134 │ │ │ exception = f._state.exception │
│ 6135 │ │ │ assert isinstance(exception, FutureCancelledError) │
│ │
│ /anaconda/envs/fraude_mp_env/lib/python3.12/site-packages/distributed/client.py:520 in cancelled │
│ │
│ 517 │ │ bool │
│ 518 │ │ │ True if the future was 'cancelled', otherwise False │
│ 519 │ │ """ │
│ ❱ 520 │ │ return self._state.status == "cancelled" │
│ 521 │ │
│ 522 │ async def _traceback(self): │
│ 523 │ │ await self._state.wait() │
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
AttributeError: 'NoneType' object has no attribute 'status'
After creating the pull request: in order to pass the release_notes_updated check you will need to update the "Future Release" section of docs/source/release_notes.rst to include this pull request.