distributed icon indicating copy to clipboard operation
distributed copied to clipboard

Flaky test_profile_server

Open jrbourbeau opened this issue 5 years ago • 4 comments

We ran into a failure of test_profile_server in CI over in https://github.com/dask/distributed/pull/4248 that was unrelated to the changes in the PR. Full traceback below.

Full traceback:
2020-11-17T21:43:17.4637490Z ================================== FAILURES ===================================
2020-11-17T21:43:17.4638364Z _____________________________ test_profile_server _____________________________
2020-11-17T21:43:17.4638784Z 
2020-11-17T21:43:17.4639380Z self = <closed TCP>, deserializers = None
2020-11-17T21:43:17.4639838Z 
2020-11-17T21:43:17.4640417Z     async def read(self, deserializers=None):
2020-11-17T21:43:17.4641238Z         stream = self.stream
2020-11-17T21:43:17.4641733Z         if stream is None:
2020-11-17T21:43:17.4642434Z             raise CommClosedError
2020-11-17T21:43:17.4642878Z     
2020-11-17T21:43:17.4643274Z         try:
2020-11-17T21:43:17.4643935Z >           n_frames = await stream.read_bytes(8)
2020-11-17T21:43:17.4644789Z E           tornado.iostream.StreamClosedError: Stream is closed
2020-11-17T21:43:17.4645373Z 
2020-11-17T21:43:17.4645937Z distributed\comm\tcp.py:187: StreamClosedError
2020-11-17T21:43:17.4646472Z 
2020-11-17T21:43:17.4647051Z The above exception was the direct cause of the following exception:
2020-11-17T21:43:17.4647545Z 
2020-11-17T21:43:17.4647915Z     def test_func():
2020-11-17T21:43:17.4648375Z         result = None
2020-11-17T21:43:17.4648771Z         workers = []
2020-11-17T21:43:17.4649377Z         with clean(timeout=active_rpc_timeout, **clean_kwargs) as loop:
2020-11-17T21:43:17.4649894Z     
2020-11-17T21:43:17.4650314Z             async def coro():
2020-11-17T21:43:17.4650810Z                 with dask.config.set(config):
2020-11-17T21:43:17.4651328Z                     s = False
2020-11-17T21:43:17.4651735Z                     for i in range(5):
2020-11-17T21:43:17.4652177Z                         try:
2020-11-17T21:43:17.4652622Z                             s, ws = await start_cluster(
2020-11-17T21:43:17.4653132Z                                 nthreads,
2020-11-17T21:43:17.4653602Z                                 scheduler,
2020-11-17T21:43:17.4654094Z                                 loop,
2020-11-17T21:43:17.4654702Z                                 security=security,
2020-11-17T21:43:17.4655226Z                                 Worker=Worker,
2020-11-17T21:43:17.4655774Z                                 scheduler_kwargs=scheduler_kwargs,
2020-11-17T21:43:17.4656457Z                                 worker_kwargs=worker_kwargs,
2020-11-17T21:43:17.4656912Z                             )
2020-11-17T21:43:17.4657414Z                         except Exception as e:
2020-11-17T21:43:17.4657915Z                             logger.error(
2020-11-17T21:43:17.4658509Z                                 "Failed to start gen_cluster, retrying",
2020-11-17T21:43:17.4659044Z                                 exc_info=True,
2020-11-17T21:43:17.4659483Z                             )
2020-11-17T21:43:17.4659929Z                             await asyncio.sleep(1)
2020-11-17T21:43:17.4660428Z                         else:
2020-11-17T21:43:17.4661000Z                             workers[:] = ws
2020-11-17T21:43:17.4661478Z                             args = [s] + workers
2020-11-17T21:43:17.4662156Z                             break
2020-11-17T21:43:17.4662607Z                     if s is False:
2020-11-17T21:43:17.4663135Z                         raise Exception("Could not start cluster")
2020-11-17T21:43:17.4663686Z                     if client:
2020-11-17T21:43:17.4664111Z                         c = await Client(
2020-11-17T21:43:17.4664590Z                             s.address,
2020-11-17T21:43:17.4665020Z                             loop=loop,
2020-11-17T21:43:17.4665520Z                             security=security,
2020-11-17T21:43:17.4666019Z                             asynchronous=True,
2020-11-17T21:43:17.4668294Z                             **client_kwargs,
2020-11-17T21:43:17.4668742Z                         )
2020-11-17T21:43:17.4669111Z                         args = [c] + args
2020-11-17T21:43:17.4669489Z                     try:
2020-11-17T21:43:17.4669949Z                         future = func(*args)
2020-11-17T21:43:17.4670379Z                         if timeout:
2020-11-17T21:43:17.4671515Z                             future = asyncio.wait_for(future, timeout)
2020-11-17T21:43:17.4672043Z                         result = await future
2020-11-17T21:43:17.4672481Z                         if s.validate:
2020-11-17T21:43:17.4672881Z                             s.validate_state()
2020-11-17T21:43:17.4673258Z                     finally:
2020-11-17T21:43:17.4673758Z                         if client and c.status not in ("closing", "closed"):
2020-11-17T21:43:17.4674333Z                             await c._close(fast=s.status == Status.closed)
2020-11-17T21:43:17.4674904Z                         await end_cluster(s, workers)
2020-11-17T21:43:17.4675454Z                         await asyncio.wait_for(cleanup_global_workers(), 1)
2020-11-17T21:43:17.4675938Z     
2020-11-17T21:43:17.4676223Z                     try:
2020-11-17T21:43:17.4676728Z                         c = await default_client()
2020-11-17T21:43:17.4677663Z                     except ValueError:
2020-11-17T21:43:17.4678277Z                         pass
2020-11-17T21:43:17.4678640Z                     else:
2020-11-17T21:43:17.4679031Z                         await c._close(fast=True)
2020-11-17T21:43:17.4679417Z     
2020-11-17T21:43:17.4679926Z                     def get_unclosed():
2020-11-17T21:43:17.4680480Z                         return [c for c in Comm._instances if not c.closed()] + [
2020-11-17T21:43:17.4680936Z                             c
2020-11-17T21:43:17.4681434Z                             for c in _global_clients.values()
2020-11-17T21:43:17.4681919Z                             if c.status != "closed"
2020-11-17T21:43:17.4682281Z                         ]
2020-11-17T21:43:17.4682606Z     
2020-11-17T21:43:17.4682897Z                     try:
2020-11-17T21:43:17.4683275Z                         start = time()
2020-11-17T21:43:17.4683663Z                         while time() < start + 5:
2020-11-17T21:43:17.4684348Z                             gc.collect()
2020-11-17T21:43:17.4684746Z                             if not get_unclosed():
2020-11-17T21:43:17.4685122Z                                 break
2020-11-17T21:43:17.4685567Z                             await asyncio.sleep(0.05)
2020-11-17T21:43:17.4686268Z                         else:
2020-11-17T21:43:17.4686713Z                             if allow_unclosed:
2020-11-17T21:43:17.4687164Z                                 print(f"Unclosed Comms: {get_unclosed()}")
2020-11-17T21:43:17.4687613Z                             else:
2020-11-17T21:43:17.4688079Z                                 raise RuntimeError("Unclosed Comms", get_unclosed())
2020-11-17T21:43:17.4688541Z                     finally:
2020-11-17T21:43:17.4688969Z                         Comm._instances.clear()
2020-11-17T21:43:17.4689594Z                         _global_clients.clear()
2020-11-17T21:43:17.4690174Z     
2020-11-17T21:43:17.4690498Z                     return result
2020-11-17T21:43:17.4690850Z     
2020-11-17T21:43:17.4691181Z             result = loop.run_sync(
2020-11-17T21:43:17.4691683Z >               coro, timeout=timeout * 2 if timeout else timeout
2020-11-17T21:43:17.4692143Z             )
2020-11-17T21:43:17.4692364Z 
2020-11-17T21:43:17.4692728Z distributed\utils_test.py:954: 
2020-11-17T21:43:17.4693336Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
2020-11-17T21:43:17.4693978Z C:\Miniconda3\envs\dask-distributed\lib\site-packages\tornado\ioloop.py:576: in run_sync
2020-11-17T21:43:17.4694800Z     return future_cell[0].result()
2020-11-17T21:43:17.4695275Z distributed\utils_test.py:912: in coro
2020-11-17T21:43:17.4695695Z     result = await future
2020-11-17T21:43:17.4696329Z C:\Miniconda3\envs\dask-distributed\lib\asyncio\tasks.py:442: in wait_for
2020-11-17T21:43:17.4696884Z     return fut.result()
2020-11-17T21:43:17.4697434Z distributed\tests\test_client.py:5932: in test_profile_server
2020-11-17T21:43:17.4698028Z     p = await c.profile(scheduler=True)  #  Scheduler
2020-11-17T21:43:17.4698547Z distributed\client.py:3431: in _profile
2020-11-17T21:43:17.4699970Z     scheduler=scheduler,
2020-11-17T21:43:17.4700524Z distributed\core.py:883: in send_recv_from_rpc
2020-11-17T21:43:17.4701119Z     result = await send_recv(comm=comm, op=key, **kwargs)
2020-11-17T21:43:17.4701644Z distributed\core.py:666: in send_recv
2020-11-17T21:43:17.4702297Z     response = await comm.read(deserializers=deserializers)
2020-11-17T21:43:17.4702896Z distributed\comm\tcp.py:202: in read
2020-11-17T21:43:17.4703457Z     convert_stream_closed_error(self, e)
2020-11-17T21:43:17.4704045Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
2020-11-17T21:43:17.4704292Z 
2020-11-17T21:43:17.4704765Z obj = <closed TCP>, exc = StreamClosedError('Stream is closed')
2020-11-17T21:43:17.4705173Z 
2020-11-17T21:43:17.4705554Z     def convert_stream_closed_error(obj, exc):
2020-11-17T21:43:17.4705981Z         """
2020-11-17T21:43:17.4706539Z         Re-raise StreamClosedError as CommClosedError.
2020-11-17T21:43:17.4707071Z         """
2020-11-17T21:43:17.4707430Z         if exc.real_error is not None:
2020-11-17T21:43:17.4707959Z             # The stream was closed because of an underlying OS error
2020-11-17T21:43:17.4708492Z             exc = exc.real_error
2020-11-17T21:43:17.4708960Z             if ssl and isinstance(exc, ssl.SSLError):
2020-11-17T21:43:17.4709500Z                 if "UNKNOWN_CA" in exc.reason:
2020-11-17T21:43:17.4710016Z                     raise FatalCommClosedError(
2020-11-17T21:43:17.4710769Z                         "in %s: %s: %s" % (obj, exc.__class__.__name__, exc)
2020-11-17T21:43:17.4711158Z                     )
2020-11-17T21:43:17.4711584Z             raise CommClosedError(
2020-11-17T21:43:17.4712069Z                 "in %s: %s: %s" % (obj, exc.__class__.__name__, exc)
2020-11-17T21:43:17.4712466Z             ) from exc
2020-11-17T21:43:17.4712920Z         else:
2020-11-17T21:43:17.4713387Z >           raise CommClosedError("in %s: %s" % (obj, exc)) from exc
2020-11-17T21:43:17.4714462Z E           distributed.comm.core.CommClosedError: in <closed TCP>: Stream is closed
2020-11-17T21:43:17.4715293Z 
2020-11-17T21:43:17.4715735Z distributed\comm\tcp.py:126: CommClosedError
2020-11-17T21:43:17.4717062Z ---------------------------- Captured stderr call -----------------------------
2020-11-17T21:43:17.4718559Z distributed.protocol.core - CRITICAL - Failed to Serialize
2020-11-17T21:43:17.4719516Z Traceback (most recent call last):
2020-11-17T21:43:17.4720490Z   File "D:\a\distributed\distributed\distributed\protocol\core.py", line 39, in dumps
2020-11-17T21:43:17.4721691Z     small_header, small_payload = dumps_msgpack(msg, **compress_opts)
2020-11-17T21:43:17.4722852Z   File "D:\a\distributed\distributed\distributed\protocol\core.py", line 184, in dumps_msgpack
2020-11-17T21:43:17.4723714Z     payload = msgpack.dumps(msg, default=msgpack_encode_default, use_bin_type=True)
2020-11-17T21:43:17.4724660Z   File "C:\Miniconda3\envs\dask-distributed\lib\site-packages\msgpack\__init__.py", line 35, in packb
2020-11-17T21:43:17.4725367Z     return Packer(**kwargs).pack(o)
2020-11-17T21:43:17.4726001Z   File "msgpack/_packer.pyx", line 286, in msgpack._cmsgpack.Packer.pack
2020-11-17T21:43:17.4726912Z   File "msgpack/_packer.pyx", line 292, in msgpack._cmsgpack.Packer.pack
2020-11-17T21:43:17.4727715Z   File "msgpack/_packer.pyx", line 289, in msgpack._cmsgpack.Packer.pack
2020-11-17T21:43:17.4728536Z   File "msgpack/_packer.pyx", line 225, in msgpack._cmsgpack.Packer._pack
2020-11-17T21:43:17.4729355Z   File "msgpack/_packer.pyx", line 225, in msgpack._cmsgpack.Packer._pack
2020-11-17T21:43:17.4730268Z   File "msgpack/_packer.pyx", line 225, in msgpack._cmsgpack.Packer._pack
2020-11-17T21:43:17.4730905Z   [Previous line repeated 508 more times]
2020-11-17T21:43:17.4731581Z   File "msgpack/_packer.pyx", line 223, in msgpack._cmsgpack.Packer._pack
2020-11-17T21:43:17.4732352Z   File "msgpack/_packer.pyx", line 157, in msgpack._cmsgpack.Packer._pack
2020-11-17T21:43:17.4733133Z ValueError: recursion limit exceeded.
2020-11-17T21:43:17.4733831Z distributed.comm.utils - ERROR - recursion limit exceeded.
2020-11-17T21:43:17.4734461Z Traceback (most recent call last):
2020-11-17T21:43:17.4735153Z   File "D:\a\distributed\distributed\distributed\comm\utils.py", line 35, in _to_frames
2020-11-17T21:43:17.4735939Z     msg, serializers=serializers, on_error=on_error, context=context
2020-11-17T21:43:17.4736827Z   File "D:\a\distributed\distributed\distributed\protocol\core.py", line 39, in dumps
2020-11-17T21:43:17.4737585Z     small_header, small_payload = dumps_msgpack(msg, **compress_opts)
2020-11-17T21:43:17.4738408Z   File "D:\a\distributed\distributed\distributed\protocol\core.py", line 184, in dumps_msgpack
2020-11-17T21:43:17.4739267Z     payload = msgpack.dumps(msg, default=msgpack_encode_default, use_bin_type=True)
2020-11-17T21:43:17.4740209Z   File "C:\Miniconda3\envs\dask-distributed\lib\site-packages\msgpack\__init__.py", line 35, in packb
2020-11-17T21:43:17.4740921Z     return Packer(**kwargs).pack(o)
2020-11-17T21:43:17.4741593Z   File "msgpack/_packer.pyx", line 286, in msgpack._cmsgpack.Packer.pack
2020-11-17T21:43:17.4742396Z   File "msgpack/_packer.pyx", line 292, in msgpack._cmsgpack.Packer.pack
2020-11-17T21:43:17.4743240Z   File "msgpack/_packer.pyx", line 289, in msgpack._cmsgpack.Packer.pack
2020-11-17T21:43:17.4744020Z   File "msgpack/_packer.pyx", line 225, in msgpack._cmsgpack.Packer._pack
2020-11-17T21:43:17.4744782Z   File "msgpack/_packer.pyx", line 225, in msgpack._cmsgpack.Packer._pack
2020-11-17T21:43:17.4745579Z   File "msgpack/_packer.pyx", line 225, in msgpack._cmsgpack.Packer._pack
2020-11-17T21:43:17.4746213Z   [Previous line repeated 508 more times]
2020-11-17T21:43:17.4746944Z   File "msgpack/_packer.pyx", line 223, in msgpack._cmsgpack.Packer._pack
2020-11-17T21:43:17.4747792Z   File "msgpack/_packer.pyx", line 157, in msgpack._cmsgpack.Packer._pack
2020-11-17T21:43:17.4748484Z ValueError: recursion limit exceeded.
2020-11-17T21:43:17.4750030Z tornado.application - ERROR - Exception in callback functools.partial(<function wrap.<locals>.null_wrapper at 0x0000022338FC03A8>, <Task finished coro=<BaseTCPListener._handle_stream() done, defined at D:\a\distributed\distributed\distributed\comm\tcp.py:447> exception=ValueError('recursion limit exceeded.')>)
2020-11-17T21:43:17.4751545Z Traceback (most recent call last):
2020-11-17T21:43:17.4752311Z   File "C:\Miniconda3\envs\dask-distributed\lib\site-packages\tornado\ioloop.py", line 758, in _run_callback
2020-11-17T21:43:17.4753081Z     ret = callback()
2020-11-17T21:43:17.4753813Z   File "C:\Miniconda3\envs\dask-distributed\lib\site-packages\tornado\stack_context.py", line 300, in null_wrapper
2020-11-17T21:43:17.4754593Z     return fn(*args, **kwargs)
2020-11-17T21:43:17.4755326Z   File "C:\Miniconda3\envs\dask-distributed\lib\site-packages\tornado\tcpserver.py", line 297, in <lambda>
2020-11-17T21:43:17.4756084Z     lambda f: f.result())
2020-11-17T21:43:17.4756763Z   File "D:\a\distributed\distributed\distributed\comm\tcp.py", line 463, in _handle_stream
2020-11-17T21:43:17.4757456Z     await self.comm_handler(comm)
2020-11-17T21:43:17.4758090Z   File "D:\a\distributed\distributed\distributed\core.py", line 546, in handle_comm
2020-11-17T21:43:17.4758823Z     await comm.write(result, serializers=serializers)
2020-11-17T21:43:17.4759588Z   File "D:\a\distributed\distributed\distributed\comm\tcp.py", line 231, in write
2020-11-17T21:43:17.4760216Z     **self.handshake_options,
2020-11-17T21:43:17.4760909Z   File "D:\a\distributed\distributed\distributed\comm\utils.py", line 54, in to_frames
2020-11-17T21:43:17.4761503Z     return _to_frames()
2020-11-17T21:43:17.4762145Z   File "D:\a\distributed\distributed\distributed\comm\utils.py", line 35, in _to_frames
2020-11-17T21:43:17.4762928Z     msg, serializers=serializers, on_error=on_error, context=context
2020-11-17T21:43:17.4763763Z   File "D:\a\distributed\distributed\distributed\protocol\core.py", line 39, in dumps
2020-11-17T21:43:17.4764586Z     small_header, small_payload = dumps_msgpack(msg, **compress_opts)
2020-11-17T21:43:17.4765384Z   File "D:\a\distributed\distributed\distributed\protocol\core.py", line 184, in dumps_msgpack
2020-11-17T21:43:17.4768748Z     payload = msgpack.dumps(msg, default=msgpack_encode_default, use_bin_type=True)
2020-11-17T21:43:17.4770044Z   File "C:\Miniconda3\envs\dask-distributed\lib\site-packages\msgpack\__init__.py", line 35, in packb
2020-11-17T21:43:17.4771195Z     return Packer(**kwargs).pack(o)
2020-11-17T21:43:17.4771834Z   File "msgpack/_packer.pyx", line 286, in msgpack._cmsgpack.Packer.pack
2020-11-17T21:43:17.4772686Z   File "msgpack/_packer.pyx", line 292, in msgpack._cmsgpack.Packer.pack
2020-11-17T21:43:17.4773487Z   File "msgpack/_packer.pyx", line 289, in msgpack._cmsgpack.Packer.pack
2020-11-17T21:43:17.4774327Z   File "msgpack/_packer.pyx", line 225, in msgpack._cmsgpack.Packer._pack
2020-11-17T21:43:17.4775095Z   File "msgpack/_packer.pyx", line 225, in msgpack._cmsgpack.Packer._pack
2020-11-17T21:43:17.4775900Z   File "msgpack/_packer.pyx", line 225, in msgpack._cmsgpack.Packer._pack
2020-11-17T21:43:17.4777050Z   [Previous line repeated 508 more times]
2020-11-17T21:43:17.4777982Z   File "msgpack/_packer.pyx", line 223, in msgpack._cmsgpack.Packer._pack
2020-11-17T21:43:17.4778782Z   File "msgpack/_packer.pyx", line 157, in msgpack._cmsgpack.Packer._pack
2020-11-17T21:43:17.4907924Z ValueError: recursion limit exceeded.

jrbourbeau avatar Nov 18 '20 03:11 jrbourbeau

Profiles can sometimes become very deeply stacked. I *thought *that I had limited nesting to a certain extent, but apparently this isn't the case. I suspect that this is also what stops the Profile plot from appearing sometimes.

On Tue, Nov 17, 2020 at 7:26 PM James Bourbeau [email protected] wrote:

We ran into a failure of test_profile_server in CI over in #4248 https://github.com/dask/distributed/pull/4248 that was unrelated to the changes in the PR. Full traceback below. Full traceback:

2020-11-17T21:43:17.4637490Z ================================== FAILURES =================================== 2020-11-17T21:43:17.4638364Z _____________________________ test_profile_server _____________________________ 2020-11-17T21:43:17.4638784Z 2020-11-17T21:43:17.4639380Z self = <closed TCP>, deserializers = None 2020-11-17T21:43:17.4639838Z 2020-11-17T21:43:17.4640417Z async def read(self, deserializers=None): 2020-11-17T21:43:17.4641238Z stream = self.stream 2020-11-17T21:43:17.4641733Z if stream is None: 2020-11-17T21:43:17.4642434Z raise CommClosedError 2020-11-17T21:43:17.4642878Z 2020-11-17T21:43:17.4643274Z try: 2020-11-17T21:43:17.4643935Z > n_frames = await stream.read_bytes(8) 2020-11-17T21:43:17.4644789Z E tornado.iostream.StreamClosedError: Stream is closed 2020-11-17T21:43:17.4645373Z 2020-11-17T21:43:17.4645937Z distributed\comm\tcp.py:187: StreamClosedError 2020-11-17T21:43:17.4646472Z 2020-11-17T21:43:17.4647051Z The above exception was the direct cause of the following exception: 2020-11-17T21:43:17.4647545Z 2020-11-17T21:43:17.4647915Z def test_func(): 2020-11-17T21:43:17.4648375Z result = None 2020-11-17T21:43:17.4648771Z workers = [] 2020-11-17T21:43:17.4649377Z with clean(timeout=active_rpc_timeout, **clean_kwargs) as loop: 2020-11-17T21:43:17.4649894Z 2020-11-17T21:43:17.4650314Z async def coro(): 2020-11-17T21:43:17.4650810Z with dask.config.set(config): 2020-11-17T21:43:17.4651328Z s = False 2020-11-17T21:43:17.4651735Z for i in range(5): 2020-11-17T21:43:17.4652177Z try: 2020-11-17T21:43:17.4652622Z s, ws = await start_cluster( 2020-11-17T21:43:17.4653132Z nthreads, 2020-11-17T21:43:17.4653602Z scheduler, 2020-11-17T21:43:17.4654094Z loop, 2020-11-17T21:43:17.4654702Z security=security, 2020-11-17T21:43:17.4655226Z Worker=Worker, 2020-11-17T21:43:17.4655774Z scheduler_kwargs=scheduler_kwargs, 2020-11-17T21:43:17.4656457Z worker_kwargs=worker_kwargs, 2020-11-17T21:43:17.4656912Z ) 2020-11-17T21:43:17.4657414Z except Exception as e: 2020-11-17T21:43:17.4657915Z logger.error( 2020-11-17T21:43:17.4658509Z "Failed to start gen_cluster, retrying", 2020-11-17T21:43:17.4659044Z exc_info=True, 2020-11-17T21:43:17.4659483Z ) 2020-11-17T21:43:17.4659929Z await asyncio.sleep(1) 2020-11-17T21:43:17.4660428Z else: 2020-11-17T21:43:17.4661000Z workers[:] = ws 2020-11-17T21:43:17.4661478Z args = [s] + workers 2020-11-17T21:43:17.4662156Z break 2020-11-17T21:43:17.4662607Z if s is False: 2020-11-17T21:43:17.4663135Z raise Exception("Could not start cluster") 2020-11-17T21:43:17.4663686Z if client: 2020-11-17T21:43:17.4664111Z c = await Client( 2020-11-17T21:43:17.4664590Z s.address, 2020-11-17T21:43:17.4665020Z loop=loop, 2020-11-17T21:43:17.4665520Z security=security, 2020-11-17T21:43:17.4666019Z asynchronous=True, 2020-11-17T21:43:17.4668294Z **client_kwargs, 2020-11-17T21:43:17.4668742Z ) 2020-11-17T21:43:17.4669111Z args = [c] + args 2020-11-17T21:43:17.4669489Z try: 2020-11-17T21:43:17.4669949Z future = func(*args) 2020-11-17T21:43:17.4670379Z if timeout: 2020-11-17T21:43:17.4671515Z future = asyncio.wait_for(future, timeout) 2020-11-17T21:43:17.4672043Z result = await future 2020-11-17T21:43:17.4672481Z if s.validate: 2020-11-17T21:43:17.4672881Z s.validate_state() 2020-11-17T21:43:17.4673258Z finally: 2020-11-17T21:43:17.4673758Z if client and c.status not in ("closing", "closed"): 2020-11-17T21:43:17.4674333Z await c._close(fast=s.status == Status.closed) 2020-11-17T21:43:17.4674904Z await end_cluster(s, workers) 2020-11-17T21:43:17.4675454Z await asyncio.wait_for(cleanup_global_workers(), 1) 2020-11-17T21:43:17.4675938Z 2020-11-17T21:43:17.4676223Z try: 2020-11-17T21:43:17.4676728Z c = await default_client() 2020-11-17T21:43:17.4677663Z except ValueError: 2020-11-17T21:43:17.4678277Z pass 2020-11-17T21:43:17.4678640Z else: 2020-11-17T21:43:17.4679031Z await c._close(fast=True) 2020-11-17T21:43:17.4679417Z 2020-11-17T21:43:17.4679926Z def get_unclosed(): 2020-11-17T21:43:17.4680480Z return [c for c in Comm._instances if not c.closed()] + [ 2020-11-17T21:43:17.4680936Z c 2020-11-17T21:43:17.4681434Z for c in _global_clients.values() 2020-11-17T21:43:17.4681919Z if c.status != "closed" 2020-11-17T21:43:17.4682281Z ] 2020-11-17T21:43:17.4682606Z 2020-11-17T21:43:17.4682897Z try: 2020-11-17T21:43:17.4683275Z start = time() 2020-11-17T21:43:17.4683663Z while time() < start + 5: 2020-11-17T21:43:17.4684348Z gc.collect() 2020-11-17T21:43:17.4684746Z if not get_unclosed(): 2020-11-17T21:43:17.4685122Z break 2020-11-17T21:43:17.4685567Z await asyncio.sleep(0.05) 2020-11-17T21:43:17.4686268Z else: 2020-11-17T21:43:17.4686713Z if allow_unclosed: 2020-11-17T21:43:17.4687164Z print(f"Unclosed Comms: {get_unclosed()}") 2020-11-17T21:43:17.4687613Z else: 2020-11-17T21:43:17.4688079Z raise RuntimeError("Unclosed Comms", get_unclosed()) 2020-11-17T21:43:17.4688541Z finally: 2020-11-17T21:43:17.4688969Z Comm._instances.clear() 2020-11-17T21:43:17.4689594Z _global_clients.clear() 2020-11-17T21:43:17.4690174Z 2020-11-17T21:43:17.4690498Z return result 2020-11-17T21:43:17.4690850Z 2020-11-17T21:43:17.4691181Z result = loop.run_sync( 2020-11-17T21:43:17.4691683Z > coro, timeout=timeout * 2 if timeout else timeout 2020-11-17T21:43:17.4692143Z ) 2020-11-17T21:43:17.4692364Z 2020-11-17T21:43:17.4692728Z distributed\utils_test.py:954: 2020-11-17T21:43:17.4693336Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 2020-11-17T21:43:17.4693978Z C:\Miniconda3\envs\dask-distributed\lib\site-packages\tornado\ioloop.py:576: in run_sync 2020-11-17T21:43:17.4694800Z return future_cell[0].result() 2020-11-17T21:43:17.4695275Z distributed\utils_test.py:912: in coro 2020-11-17T21:43:17.4695695Z result = await future 2020-11-17T21:43:17.4696329Z C:\Miniconda3\envs\dask-distributed\lib\asyncio\tasks.py:442: in wait_for 2020-11-17T21:43:17.4696884Z return fut.result() 2020-11-17T21:43:17.4697434Z distributed\tests\test_client.py:5932: in test_profile_server 2020-11-17T21:43:17.4698028Z p = await c.profile(scheduler=True) # Scheduler 2020-11-17T21:43:17.4698547Z distributed\client.py:3431: in profile 2020-11-17T21:43:17.4699970Z scheduler=scheduler, 2020-11-17T21:43:17.4700524Z distributed\core.py:883: in send_recv_from_rpc 2020-11-17T21:43:17.4701119Z result = await send_recv(comm=comm, op=key, **kwargs) 2020-11-17T21:43:17.4701644Z distributed\core.py:666: in send_recv 2020-11-17T21:43:17.4702297Z response = await comm.read(deserializers=deserializers) 2020-11-17T21:43:17.4702896Z distributed\comm\tcp.py:202: in read 2020-11-17T21:43:17.4703457Z convert_stream_closed_error(self, e) 2020-11-17T21:43:17.4704045Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 2020-11-17T21:43:17.4704292Z 2020-11-17T21:43:17.4704765Z obj = <closed TCP>, exc = StreamClosedError('Stream is closed') 2020-11-17T21:43:17.4705173Z 2020-11-17T21:43:17.4705554Z def convert_stream_closed_error(obj, exc): 2020-11-17T21:43:17.4705981Z """ 2020-11-17T21:43:17.4706539Z Re-raise StreamClosedError as CommClosedError. 2020-11-17T21:43:17.4707071Z """ 2020-11-17T21:43:17.4707430Z if exc.real_error is not None: 2020-11-17T21:43:17.4707959Z # The stream was closed because of an underlying OS error 2020-11-17T21:43:17.4708492Z exc = exc.real_error 2020-11-17T21:43:17.4708960Z if ssl and isinstance(exc, ssl.SSLError): 2020-11-17T21:43:17.4709500Z if "UNKNOWN_CA" in exc.reason: 2020-11-17T21:43:17.4710016Z raise FatalCommClosedError( 2020-11-17T21:43:17.4710769Z "in %s: %s: %s" % (obj, exc.class.name, exc) 2020-11-17T21:43:17.4711158Z ) 2020-11-17T21:43:17.4711584Z raise CommClosedError( 2020-11-17T21:43:17.4712069Z "in %s: %s: %s" % (obj, exc.class.name, exc) 2020-11-17T21:43:17.4712466Z ) from exc 2020-11-17T21:43:17.4712920Z else: 2020-11-17T21:43:17.4713387Z > raise CommClosedError("in %s: %s" % (obj, exc)) from exc 2020-11-17T21:43:17.4714462Z E distributed.comm.core.CommClosedError: in <closed TCP>: Stream is closed 2020-11-17T21:43:17.4715293Z 2020-11-17T21:43:17.4715735Z distributed\comm\tcp.py:126: CommClosedError 2020-11-17T21:43:17.4717062Z ---------------------------- Captured stderr call ----------------------------- 2020-11-17T21:43:17.4718559Z distributed.protocol.core - CRITICAL - Failed to Serialize 2020-11-17T21:43:17.4719516Z Traceback (most recent call last): 2020-11-17T21:43:17.4720490Z File "D:\a\distributed\distributed\distributed\protocol\core.py", line 39, in dumps 2020-11-17T21:43:17.4721691Z small_header, small_payload = dumps_msgpack(msg, **compress_opts) 2020-11-17T21:43:17.4722852Z File "D:\a\distributed\distributed\distributed\protocol\core.py", line 184, in dumps_msgpack 2020-11-17T21:43:17.4723714Z payload = msgpack.dumps(msg, default=msgpack_encode_default, use_bin_type=True) 2020-11-17T21:43:17.4724660Z File "C:\Miniconda3\envs\dask-distributed\lib\site-packages\msgpack_init.py", line 35, in packb 2020-11-17T21:43:17.4725367Z return Packer(**kwargs).pack(o) 2020-11-17T21:43:17.4726001Z File "msgpack/_packer.pyx", line 286, in msgpack._cmsgpack.Packer.pack 2020-11-17T21:43:17.4726912Z File "msgpack/_packer.pyx", line 292, in msgpack._cmsgpack.Packer.pack 2020-11-17T21:43:17.4727715Z File "msgpack/_packer.pyx", line 289, in msgpack._cmsgpack.Packer.pack 2020-11-17T21:43:17.4728536Z File "msgpack/_packer.pyx", line 225, in msgpack._cmsgpack.Packer._pack 2020-11-17T21:43:17.4729355Z File "msgpack/_packer.pyx", line 225, in msgpack._cmsgpack.Packer._pack 2020-11-17T21:43:17.4730268Z File "msgpack/_packer.pyx", line 225, in msgpack._cmsgpack.Packer._pack 2020-11-17T21:43:17.4730905Z [Previous line repeated 508 more times] 2020-11-17T21:43:17.4731581Z File "msgpack/_packer.pyx", line 223, in msgpack._cmsgpack.Packer._pack 2020-11-17T21:43:17.4732352Z File "msgpack/_packer.pyx", line 157, in msgpack._cmsgpack.Packer._pack 2020-11-17T21:43:17.4733133Z ValueError: recursion limit exceeded. 2020-11-17T21:43:17.4733831Z distributed.comm.utils - ERROR - recursion limit exceeded. 2020-11-17T21:43:17.4734461Z Traceback (most recent call last): 2020-11-17T21:43:17.4735153Z File "D:\a\distributed\distributed\distributed\comm\utils.py", line 35, in to_frames 2020-11-17T21:43:17.4735939Z msg, serializers=serializers, on_error=on_error, context=context 2020-11-17T21:43:17.4736827Z File "D:\a\distributed\distributed\distributed\protocol\core.py", line 39, in dumps 2020-11-17T21:43:17.4737585Z small_header, small_payload = dumps_msgpack(msg, **compress_opts) 2020-11-17T21:43:17.4738408Z File "D:\a\distributed\distributed\distributed\protocol\core.py", line 184, in dumps_msgpack 2020-11-17T21:43:17.4739267Z payload = msgpack.dumps(msg, default=msgpack_encode_default, use_bin_type=True) 2020-11-17T21:43:17.4740209Z File "C:\Miniconda3\envs\dask-distributed\lib\site-packages\msgpack_init.py", line 35, in packb 2020-11-17T21:43:17.4740921Z return Packer(**kwargs).pack(o) 2020-11-17T21:43:17.4741593Z File "msgpack/_packer.pyx", line 286, in msgpack._cmsgpack.Packer.pack 2020-11-17T21:43:17.4742396Z File "msgpack/_packer.pyx", line 292, in msgpack._cmsgpack.Packer.pack 2020-11-17T21:43:17.4743240Z File "msgpack/_packer.pyx", line 289, in msgpack._cmsgpack.Packer.pack 2020-11-17T21:43:17.4744020Z File "msgpack/_packer.pyx", line 225, in msgpack._cmsgpack.Packer._pack 2020-11-17T21:43:17.4744782Z File "msgpack/_packer.pyx", line 225, in msgpack._cmsgpack.Packer._pack 2020-11-17T21:43:17.4745579Z File "msgpack/_packer.pyx", line 225, in msgpack._cmsgpack.Packer._pack 2020-11-17T21:43:17.4746213Z [Previous line repeated 508 more times] 2020-11-17T21:43:17.4746944Z File "msgpack/_packer.pyx", line 223, in msgpack._cmsgpack.Packer._pack 2020-11-17T21:43:17.4747792Z File "msgpack/_packer.pyx", line 157, in msgpack._cmsgpack.Packer._pack 2020-11-17T21:43:17.4748484Z ValueError: recursion limit exceeded. 2020-11-17T21:43:17.4750030Z tornado.application - ERROR - Exception in callback functools.partial(<function wrap..null_wrapper at 0x0000022338FC03A8>, <Task finished coro=<BaseTCPListener._handle_stream() done, defined at D:\a\distributed\distributed\distributed\comm\tcp.py:447> exception=ValueError('recursion limit exceeded.')>) 2020-11-17T21:43:17.4751545Z Traceback (most recent call last): 2020-11-17T21:43:17.4752311Z File "C:\Miniconda3\envs\dask-distributed\lib\site-packages\tornado\ioloop.py", line 758, in _run_callback 2020-11-17T21:43:17.4753081Z ret = callback() 2020-11-17T21:43:17.4753813Z File "C:\Miniconda3\envs\dask-distributed\lib\site-packages\tornado\stack_context.py", line 300, in null_wrapper 2020-11-17T21:43:17.4754593Z return fn(*args, **kwargs) 2020-11-17T21:43:17.4755326Z File "C:\Miniconda3\envs\dask-distributed\lib\site-packages\tornado\tcpserver.py", line 297, in 2020-11-17T21:43:17.4756084Z lambda f: f.result()) 2020-11-17T21:43:17.4756763Z File "D:\a\distributed\distributed\distributed\comm\tcp.py", line 463, in _handle_stream 2020-11-17T21:43:17.4757456Z await self.comm_handler(comm) 2020-11-17T21:43:17.4758090Z File "D:\a\distributed\distributed\distributed\core.py", line 546, in handle_comm 2020-11-17T21:43:17.4758823Z await comm.write(result, serializers=serializers) 2020-11-17T21:43:17.4759588Z File "D:\a\distributed\distributed\distributed\comm\tcp.py", line 231, in write 2020-11-17T21:43:17.4760216Z **self.handshake_options, 2020-11-17T21:43:17.4760909Z File "D:\a\distributed\distributed\distributed\comm\utils.py", line 54, in to_frames 2020-11-17T21:43:17.4761503Z return _to_frames() 2020-11-17T21:43:17.4762145Z File "D:\a\distributed\distributed\distributed\comm\utils.py", line 35, in to_frames 2020-11-17T21:43:17.4762928Z msg, serializers=serializers, on_error=on_error, context=context 2020-11-17T21:43:17.4763763Z File "D:\a\distributed\distributed\distributed\protocol\core.py", line 39, in dumps 2020-11-17T21:43:17.4764586Z small_header, small_payload = dumps_msgpack(msg, **compress_opts) 2020-11-17T21:43:17.4765384Z File "D:\a\distributed\distributed\distributed\protocol\core.py", line 184, in dumps_msgpack 2020-11-17T21:43:17.4768748Z payload = msgpack.dumps(msg, default=msgpack_encode_default, use_bin_type=True) 2020-11-17T21:43:17.4770044Z File "C:\Miniconda3\envs\dask-distributed\lib\site-packages\msgpack_init.py", line 35, in packb 2020-11-17T21:43:17.4771195Z return Packer(**kwargs).pack(o) 2020-11-17T21:43:17.4771834Z File "msgpack/_packer.pyx", line 286, in msgpack._cmsgpack.Packer.pack 2020-11-17T21:43:17.4772686Z File "msgpack/_packer.pyx", line 292, in msgpack._cmsgpack.Packer.pack 2020-11-17T21:43:17.4773487Z File "msgpack/_packer.pyx", line 289, in msgpack._cmsgpack.Packer.pack 2020-11-17T21:43:17.4774327Z File "msgpack/_packer.pyx", line 225, in msgpack._cmsgpack.Packer._pack 2020-11-17T21:43:17.4775095Z File "msgpack/_packer.pyx", line 225, in msgpack._cmsgpack.Packer._pack 2020-11-17T21:43:17.4775900Z File "msgpack/_packer.pyx", line 225, in msgpack._cmsgpack.Packer._pack 2020-11-17T21:43:17.4777050Z [Previous line repeated 508 more times] 2020-11-17T21:43:17.4777982Z File "msgpack/_packer.pyx", line 223, in msgpack._cmsgpack.Packer._pack 2020-11-17T21:43:17.4778782Z File "msgpack/_packer.pyx", line 157, in msgpack._cmsgpack.Packer._pack 2020-11-17T21:43:17.4907924Z ValueError: recursion limit exceeded.

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/4251, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTG22T3DQZRDAPUITNTSQM5HVANCNFSM4TZND65A .

mrocklin avatar Nov 18 '20 15:11 mrocklin

This is still failing frequently: image

https://github.com/dask/distributed/pull/6696 seems relevant? cc @graingert

gjoseph92 avatar Jul 20 '22 20:07 gjoseph92

This is still failing frequently: image

#6696 seems relevant? cc @graingert

6696 won't fix this issue, it will only make it worse as my PR is using a deque to build a bigger dict than the stack can handle

graingert avatar Jul 20 '22 21:07 graingert

I think the only fix is to change the format of the profiler.merge return value so it's a flat object with internal references

graingert avatar Jul 20 '22 21:07 graingert