distributed icon indicating copy to clipboard operation
distributed copied to clipboard

Events are sent to non-subscribing clients and duplicated

Open lwatt opened this issue 2 years ago • 0 comments

What happened:

Events sent by log_event are sent to all registered clients, whether or not they have subscribed to the topic. When more than one client is subscribed to the topic, it receives every message len(event_subscribers[topic]) times.

What you expected to happen:

A single event message is received by each subscribed client, and no others. This appears to be the intended behaviour looking at Scheduler._report_event, but Scheduler.report ignores the client parameter if not task key is passed, and sends to all registered clients.

Minimal Complete Verifiable Example:

import distributed
from time import sleep

client1 = distributed.Client()
address = client1.scheduler.address

# Connect two more clients
client2 = distributed.Client(address, set_as_default=False)
client3 = distributed.Client(address, set_as_default=False)

# Basic event handler with a unique key per client for debugging
def get_event_handler(handler_id):
    def handler(event):
        print(f'{handler_id}: {event}')
    return handler

client1.subscribe_topic('test-topic', get_event_handler(1))
client2.subscribe_topic('test-topic', get_event_handler(2))
sleep(2)  # make sure scheduler has processed all messages, makes this more reliably reproducible 

client1.log_event('test-topic', {})

This prints output:

2: (1662544794.9486787, {})
2: (1662544794.9486787, {})
1: (1662544794.9486787, {})
1: (1662544794.9486787, {})

and the third client (not subscribed) raises error:

ValueError: No event handler known for topic test-topic

Anything else we need to know?:

I've created a PR with a fix here: https://github.com/dask/distributed/pull/7014

lwatt avatar Sep 07 '22 10:09 lwatt