distributed
distributed copied to clipboard
Events are sent to non-subscribing clients and duplicated
What happened:
Events sent by log_event
are sent to all registered clients, whether or not they have subscribed to the topic. When more than one client is subscribed to the topic, it receives every message len(event_subscribers[topic])
times.
What you expected to happen:
A single event message is received by each subscribed client, and no others. This appears to be the intended behaviour looking at Scheduler._report_event
, but Scheduler.report
ignores the client
parameter if not task key is passed, and sends to all registered clients.
Minimal Complete Verifiable Example:
import distributed
from time import sleep
client1 = distributed.Client()
address = client1.scheduler.address
# Connect two more clients
client2 = distributed.Client(address, set_as_default=False)
client3 = distributed.Client(address, set_as_default=False)
# Basic event handler with a unique key per client for debugging
def get_event_handler(handler_id):
def handler(event):
print(f'{handler_id}: {event}')
return handler
client1.subscribe_topic('test-topic', get_event_handler(1))
client2.subscribe_topic('test-topic', get_event_handler(2))
sleep(2) # make sure scheduler has processed all messages, makes this more reliably reproducible
client1.log_event('test-topic', {})
This prints output:
2: (1662544794.9486787, {})
2: (1662544794.9486787, {})
1: (1662544794.9486787, {})
1: (1662544794.9486787, {})
and the third client (not subscribed) raises error:
ValueError: No event handler known for topic test-topic
Anything else we need to know?:
I've created a PR with a fix here: https://github.com/dask/distributed/pull/7014