distributed
distributed copied to clipboard
[RFC] Decompose server class
I was thinking a little more about how https://github.com/dask/distributed/pull/8430 could be used and ended up wanting to use this for the client as well.
(This is also loosely motivated by https://github.com/dask-contrib/dask-expr/pull/765 where I suggested to move the key generation / tokenization from client to scheduler where the ordered sendrcv of https://github.com/dask/distributed/pull/8430 would be nice)
The client unfortunately re-implements the RPC stream handler of the server base class making it hard to reuse the ordered sendrcv without a lot of code duplication.
This PR suggests a refactoring where I propose to move from inheritance to composition regarding the Server
networking and RPC logic. The current Server
base class is mixing a lot of application logic with networking logic which in the past frequently caused issues during teardown so separation of those concerns is long overdue.
The Server
as proposed in this PR should likely rather be called RPCServer
or RPCSomethingElse
since it doesn't actually implement a traditional server<->client model but I'm trying to not get hung up on naming at this point.
With this decomposition I was then able to use the slimmed down Server
in the client as well which opens up the reuse of smth like #8430
This refactoring is not really done but if we go down this road, the universally used ConnectionPool
and stream_comms
would basically blend into this single object which would support three ways to communicate
- One way communication, send only, ordered / multiplexed (what we currently do with BatchedSend)
- Two way send_recv, not ordered, dedicated channel (what is currently the PooledRPCCall)
- Two way send_decv, ordered, multiplexed (this is new)
State of this PR: Mostly working but still a couple of failing tests. From what I saw nothing insurmountable.
There are two commits
- https://github.com/dask/distributed/pull/8468/commits/efb904580c1d29628bff605fb8ff651da9979b20 which decomposes the Server class and refactors a couple of modules
- https://github.com/dask/distributed/pull/8468/commits/395fcc234c09da7b7c76b9e5a18f8dc60a0af12f which applies this decomposition to the Client
I think I'm mostly interested in feedback about the first one since that may have merit without the introduction of new features
Some general comments without diving too deep into the actual changes:
We should definitely aim to use orderd send-recv
wherever possible, so also for connections between the client and the cluster instances.
+1 on refactoring more of our codebase away from inheritance to composition where possible/useful. IMO, this also makes testing easier.
+1 on reusing an "RPC server" to handle all our RPC logic, it doesn't make much sense for our Client
to reimplement that stuff.
Unit Test Results
See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.
29 files ± 0 29 suites ±0 14h 35m 48s :stopwatch: + 3h 18m 41s 4 083 tests ± 0 3 639 :white_check_mark: - 325 116 :zzz: + 4 326 :x: + 319 2 :fire: +2 50 647 runs - 4 583 44 569 :white_check_mark: - 8 218 2 288 :zzz: - 145 3 781 :x: +3 771 9 :fire: +9
For more details on these failures and errors, see this check.
Results for commit f8ccddf8. ± Comparison against base commit 8564dc79.
This pull request removes 10 and adds 10 tests. Note that renamed tests count towards both.
distributed.tests.test_core ‑ test_async_listener_stop
distributed.tests.test_core ‑ test_counters
distributed.tests.test_core ‑ test_server_assign_assign_enum_is_quiet
distributed.tests.test_core ‑ test_server_close_stops_gil_monitoring
distributed.tests.test_core ‑ test_server_status_compare_enum_is_quiet
distributed.tests.test_core ‑ test_server_status_is_always_enum
distributed.tests.test_core ‑ test_server_sys_path_local_directory_cleanup
distributed.tests.test_core ‑ test_thread_id
distributed.tests.test_core ‑ test_tick_logging
distributed.tests.test_core ‑ test_ticks
distributed.cli.tests.test_dask_worker.test_listen_address_ipv6[tcp:..[ ‑ 1]:---nanny]
distributed.cli.tests.test_dask_worker.test_listen_address_ipv6[tcp:..[ ‑ 1]:---no-nanny]
distributed.tests.test_node ‑ test_server_assign_assign_enum_is_quiet
distributed.tests.test_node ‑ test_server_close_stops_gil_monitoring
distributed.tests.test_node ‑ test_server_status_compare_enum_is_quiet
distributed.tests.test_node ‑ test_server_status_is_always_enum
distributed.tests.test_node ‑ test_server_sys_path_local_directory_cleanup
distributed.tests.test_node ‑ test_thread_id
distributed.tests.test_node ‑ test_tick_logging
distributed.tests.test_node ‑ test_ticks
This pull request skips 2 tests.
distributed.tests.test_metrics ‑ test_monotonic
distributed.tests.test_scheduler ‑ test_get_cluster_state_worker_error
:recycle: This comment has been updated with latest results.