distributed icon indicating copy to clipboard operation
distributed copied to clipboard

PubSub functionality kills stream-based connections due to race conditions

Open hendrikmakait opened this issue 1 year ago • 7 comments

The PubSub functionality is prone to race conditions and may bring down stream-based connections when it raises errors.

Reproducer

from distributed import Event, Pub, Sub
from distributed.utils_test import freeze_batched_send, gen_cluster

@gen_cluster(client=True, nthreads=[("", 1)])
async def test_race(c, s, a):
    sub = Sub("a")
    in_event = Event()
    in_event_2 = Event()
    block_event = Event()
    block_event_2 = Event()
    def f(x, in_event, block_event, in_event_2, block_event_2):
        pub = Pub("a")
        in_event.set()
        block_event.wait()
        pub.put({"status": "OK"})
        in_event_2.set()
        block_event_2.wait()
        del pub
        return x
    
    fut = c.submit(f, 1, in_event, block_event, in_event_2, block_event_2)
    await in_event.wait()
    with freeze_batched_send(s.client_comms[c.id]):
        await block_event.set()
        await in_event_2.wait()
        del sub
        await block_event_2.set()
    await fut
    await c.submit(lambda x: x, 2)

This reproducer kills the client connection due to call in a non-existent stream handler (remove-pubpus-subscribers) and also kills the client connection with

Traceback (most recent call last):
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/core.py", line 970, in _handle_comm
    result = await result
             ^^^^^^^^^^^^
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/scheduler.py", line 5716, in add_client
    await self.handle_stream(comm=comm, extra={"client": client})
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/core.py", line 1053, in handle_stream
    handler(**merge(extra, msg))
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/pubsub.py", line 86, in remove_subscriber
    self.client_subscribers[name].remove(client)
KeyError: 'Client-89725a46-27eb-11ef-9134-be79fecea867'

if the appropriate handler is used.

I suspect that there are more possible race conditions hidden in this code.

hendrikmakait avatar Jun 11 '24 12:06 hendrikmakait

Usually, we just leave things that already exist but haven't been touched in ages in distributed unless they cause issues. We have now had a Coiled customer report issues that trace back to pub/sub being used in https://github.com/saturncloud/dask-pytorch-ddp. In that light, I think that we should deprecate pub/sub to avoid future issues for users who expect this to work. I am not aware of anyone but https://github.com/saturncloud/dask-pytorch-ddp using pub/sub, so deprecating seems fine.

An alternative approach would be to rewrite it based on the existing event-logging functionality. The major difference in functionality would be that the scheduler acts as the central broker for event logs whereas pub/sub has been designed to use point-to-point communication between workers. (Between workers and clients the scheduler still acts as the broker.) Once again, I am not aware of any project but https://github.com/saturncloud/dask-pytorch-ddp using pub/sub (let alone in a performance-critical way that requires point-to-point communication), so I don't think that this difference is relevant for now.

hendrikmakait avatar Jun 11 '24 15:06 hendrikmakait

I'm -1 on rewriting the existing pub/sub. I think for the dask-pytorch-ddp thing the Client.subscribe_topic feature could be a better drop in replacement.

I'm OK with dropping support for Pub/Sub

fjetter avatar Jun 13 '24 11:06 fjetter

Probably Actors are better for this kind of thing?

On Thu, Jun 13, 2024 at 6:06 AM Florian Jetter @.***> wrote:

I'm -1 on rewriting the existing pub/sub. I think for the dask-pytorch-ddp thing the Client.subscribe_topic https://distributed.dask.org/en/stable/api.html#distributed.Client.subscribe_topic feature could be a better drop in replacement.

I'm OK with dropping support for Pub/Sub

— Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/8685#issuecomment-2165333897, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTGHLD4U3NFJ54753TTZHF4KVAVCNFSM6AAAAABJEHCNN6VHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDCNRVGMZTGOBZG4 . You are receiving this because you are subscribed to this thread.Message ID: @.***>

mrocklin avatar Jun 13 '24 11:06 mrocklin

And they have slightly better support :)

On Thu, Jun 13, 2024 at 6:07 AM Matthew Rocklin @.***> wrote:

Probably Actors are better for this kind of thing?

On Thu, Jun 13, 2024 at 6:06 AM Florian Jetter @.***> wrote:

I'm -1 on rewriting the existing pub/sub. I think for the dask-pytorch-ddp thing the Client.subscribe_topic https://distributed.dask.org/en/stable/api.html#distributed.Client.subscribe_topic feature could be a better drop in replacement.

I'm OK with dropping support for Pub/Sub

— Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/8685#issuecomment-2165333897, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTGHLD4U3NFJ54753TTZHF4KVAVCNFSM6AAAAABJEHCNN6VHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDCNRVGMZTGOBZG4 . You are receiving this because you are subscribed to this thread.Message ID: @.***>

mrocklin avatar Jun 13 '24 11:06 mrocklin

Usually, we just leave things that already exist but haven't been touched in ages in distributed unless they cause issues. We have now had a Coiled customer report issues that trace back to pub/sub being used in https://github.com/saturncloud/dask-pytorch-ddp. In that light, I think that we should deprecate pub/sub to avoid future issues for users who expect this to work. I am not aware of anyone but https://github.com/saturncloud/dask-pytorch-ddp using pub/sub, so deprecating seems fine.

An alternative approach would be to rewrite it based on the existing event-logging functionality. The major difference in functionality would be that the scheduler acts as the central broker for event logs whereas pub/sub has been designed to use point-to-point communication between workers. (Between workers and clients the scheduler still acts as the broker.) Once again, I am not aware of any project but https://github.com/saturncloud/dask-pytorch-ddp using pub/sub (let alone in a performance-critical way that requires point-to-point communication), so I don't think that this difference is relevant for now.

As you mentioned, log_event is not up to the task when dealing with a large volume of data, since it all has to go through the scheduler. Any chance this can be reconsidered?

Material-Scientist avatar Oct 13 '24 17:10 Material-Scientist

@Material-Scientist: I'm curious, what would you like to use pub/sub for?

Any chance this can be reconsidered?

We're always open for contributions to the community if you feel up for the task of making this work. For me personally, it's not a priority at the moment due to lack of wider adoption.

hendrikmakait avatar Oct 14 '24 10:10 hendrikmakait

what would you like to use pub/sub for?

Dask pub/sub has worked reliably for sending data (~10k msgs/s) from my producer processes to consumer processes w/o adding backpressure on the producer side (even just 0.5s of event-loop blocking would drop the ws-connection). It's convenient bc it doesn't add additional dependencies (since I already use dask for processing on the consumer side anyway), and it's robust (has been running for years).

Material-Scientist avatar Oct 15 '24 16:10 Material-Scientist