Cluster and Client creation error when using EMR
What happened:
Running dask-yarn on EMR causes a repeating error in tornado on client creation.
What you expected to happen:
No error, just the client being created and being able to use it.
Minimal Complete Verifiable Example:
I am using dask-yarn on EMR. I followed the directions outlined here and used the unaltered bootstrap-dask script from this repo. I used the emr-5.29.0 release to avoid the other bootstrap issue #122. Connect to the master node and open jupyter notebook. Start with a new notebook.
from dask_yarn import YarnCluster
from dask.distributed import Client
# Create a cluster
cluster = YarnCluster()
# Connect to the cluster
client = Client(cluster)
client
I then get this error every few seconds reported back to the notebook:
distributed.scheduler - INFO - Receive client connection: Client-d994493a-d748-11ea-afbf-025317f23bc7
distributed.core - INFO - Starting established connection
Exception in callback with_timeout.<locals>.error_callback(<Future cancelled>) at /home/hadoop/miniconda/lib/python3.8/site-packages/tornado/gen.py:968
handle: <Handle with_timeout.<locals>.error_callback(<Future cancelled>) at /home/hadoop/miniconda/lib/python3.8/site-packages/tornado/gen.py:968>
Traceback (most recent call last):
File "/home/hadoop/miniconda/lib/python3.8/asyncio/events.py", line 81, in _run
self._context.run(self._callback, *self._args)
File "/home/hadoop/miniconda/lib/python3.8/site-packages/tornado/gen.py", line 970, in error_callback
future.result()
asyncio.exceptions.CancelledError
Anything else we need to know?:
Environment:
- Dask version: 2.22.0
- Python version: 3.8.3
- Operating System: emr-5.29.0 (Amazon linux)
- Install method (conda, pip, source): conda from bootstrap-dask script.
I had to downgrade to python3.7 and then it started working again
here is the bit from bootstap.sh
curl https://repo.anaconda.com/miniconda/Miniconda3-py37_4.8.3-Linux-x86_64.sh -o /tmp/miniconda.sh
Notice I pinned to 3.7.x
But it'd be good to get to the bottom of it.
I seem to be having the same issue running on python 3.8
Python 3.8.3 (default, May 19 2020, 18:47:26)
[GCC 7.3.0] :: Anaconda, Inc. on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from dask_yarn import YarnCluster
>>> cluster = YarnCluster(environment="/home/hadoop/environment.tar.gz")
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO - Scheduler at: tcp://10.0.10.191:46623
20/10/21 02:46:40 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
20/10/21 02:46:41 INFO client.RMProxy: Connecting to ResourceManager at ip-10-0-10-191.ec2.internal/10.0.10.191:8032
20/10/21 02:46:41 INFO client.AHSProxy: Connecting to Application History server at ip-10-0-10-191.ec2.internal/10.0.10.191:10200
20/10/21 02:46:42 INFO skein.Driver: Driver started, listening on 40913
20/10/21 02:46:42 INFO conf.Configuration: resource-types.xml not found
20/10/21 02:46:42 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
20/10/21 02:46:42 INFO resource.ResourceUtils: Adding resource type - name = memory-mb, units = Mi, type = COUNTABLE
20/10/21 02:46:42 INFO resource.ResourceUtils: Adding resource type - name = vcores, units = , type = COUNTABLE
20/10/21 02:46:42 INFO skein.Driver: Uploading application resources to hdfs://ip-10-0-10-191.ec2.internal:8020/user/hadoop/.skein/application_1603177429164_0002
20/10/21 02:46:43 INFO skein.Driver: Submitting application...
20/10/21 02:46:43 INFO impl.YarnClientImpl: Submitted application application_1603177429164_0002
>>>
>>>
>>> cluster.scale(1)
>>>
>>> distributed.scheduler - INFO - Register worker <Worker 'tcp://10.0.10.161:38381', name: dask.worker_0, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.0.10.161:38381
distributed.core - INFO - Starting established connection
Exception in callback with_timeout.<locals>.error_callback(<Future cancelled>) at /home/hadoop/miniconda/lib/python3.8/site-packages/tornado/gen.py:968
handle: <Handle with_timeout.<locals>.error_callback(<Future cancelled>) at /home/hadoop/miniconda/lib/python3.8/site-packages/tornado/gen.py:968>
Traceback (most recent call last):
File "/home/hadoop/miniconda/lib/python3.8/asyncio/events.py", line 81, in _run
self._context.run(self._callback, *self._args)
File "/home/hadoop/miniconda/lib/python3.8/site-packages/tornado/gen.py", line 970, in error_callback
future.result()
asyncio.exceptions.CancelledError
Downgraded to python 3.7.9 and all seems well
Steps taken
- Updated yarn cluster bootstrap script for EMI swapped out Upstart for systemd as EMR ec2 instance removed Upstart cant use that to launch the jupyter notebooks hence that part of the script has to change
- Changed python version to 3.7.9 you can do this by changing miniconda download link
Python 3.7.9 (default, Aug 31 2020, 12:42:55)
[GCC 7.3.0] :: Anaconda, Inc. on linux
Type "help", "copyright", "credits" or "license" for more information.
>>>
>>>
>>> from dask_yarn import YarnCluster
>>>
>>> cluster = YarnCluster(environment="/home/hadoop/environment.tar.gz")
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO - Scheduler at: tcp://10.0.10.191:35369
distributed.scheduler - INFO - dashboard at: :41787
20/10/21 05:09:26 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
20/10/21 05:09:26 INFO client.RMProxy: Connecting to ResourceManager at ip-10-0-10-191.ec2.internal/10.0.10.191:8032
20/10/21 05:09:26 INFO client.AHSProxy: Connecting to Application History server at ip-10-0-10-191.ec2.internal/10.0.10.191:10200
20/10/21 05:09:27 INFO skein.Driver: Driver started, listening on 41837
20/10/21 05:09:27 INFO conf.Configuration: resource-types.xml not found
20/10/21 05:09:27 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
20/10/21 05:09:27 INFO resource.ResourceUtils: Adding resource type - name = memory-mb, units = Mi, type = COUNTABLE
20/10/21 05:09:27 INFO resource.ResourceUtils: Adding resource type - name = vcores, units = , type = COUNTABLE
20/10/21 05:09:27 INFO skein.Driver: Uploading application resources to hdfs://ip-10-0-10-191.ec2.internal:8020/user/hadoop/.skein/application_1603177429164_0003
20/10/21 05:09:28 INFO skein.Driver: Submitting application...
20/10/21 05:09:28 INFO impl.YarnClientImpl: Submitted application application_1603177429164_0003
>>>
>>>
>>> cluster.scale(1)
>>> distributed.scheduler - INFO - Register worker <Worker 'tcp://10.0.10.161:40657', name: dask.worker_0, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.0.10.161:40657
distributed.core - INFO - Starting established connection
>>>
>>>
>>>
>>> cluster.scale(3)
>>> distributed.scheduler - INFO - Register worker <Worker 'tcp://10.0.10.161:44253', name: dask.worker_1, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.0.10.161:44253
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register worker <Worker 'tcp://10.0.10.161:37565', name: dask.worker_2, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.0.10.161:37565
distributed.core - INFO - Starting established connection
>>>
>>>
>>> from dask.distributed import Client
>>>
>>> import dask.array as da
>>> client = Client(cluster)
distributed.scheduler - INFO - Receive client connection: Client-a0c24102-135d-11eb-a1a6-0a89473033af
distributed.core - INFO - Starting established connection
>>>
>>>
>>> array = da.ones((10000, 10000, 10000))
>>> print(array.mean().compute())
distributed.core - INFO - Event loop was unresponsive in Scheduler for 8.97s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
1.0
>>>
>>> len(client.scheduler_info()['workers'])
3
>>>
Can you try again with latest dask/distributed ? should be 2.30.0. I don't think we want to force users to python 3.7