distributed icon indicating copy to clipboard operation
distributed copied to clipboard

[RFC] Decompose server class

Open fjetter opened this issue 1 year ago • 2 comments

I was thinking a little more about how https://github.com/dask/distributed/pull/8430 could be used and ended up wanting to use this for the client as well.

(This is also loosely motivated by https://github.com/dask-contrib/dask-expr/pull/765 where I suggested to move the key generation / tokenization from client to scheduler where the ordered sendrcv of https://github.com/dask/distributed/pull/8430 would be nice)

The client unfortunately re-implements the RPC stream handler of the server base class making it hard to reuse the ordered sendrcv without a lot of code duplication.

This PR suggests a refactoring where I propose to move from inheritance to composition regarding the Server networking and RPC logic. The current Server base class is mixing a lot of application logic with networking logic which in the past frequently caused issues during teardown so separation of those concerns is long overdue.

The Server as proposed in this PR should likely rather be called RPCServer or RPCSomethingElse since it doesn't actually implement a traditional server<->client model but I'm trying to not get hung up on naming at this point.

With this decomposition I was then able to use the slimmed down Server in the client as well which opens up the reuse of smth like #8430 This refactoring is not really done but if we go down this road, the universally used ConnectionPool and stream_comms would basically blend into this single object which would support three ways to communicate

  1. One way communication, send only, ordered / multiplexed (what we currently do with BatchedSend)
  2. Two way send_recv, not ordered, dedicated channel (what is currently the PooledRPCCall)
  3. Two way send_decv, ordered, multiplexed (this is new)

State of this PR: Mostly working but still a couple of failing tests. From what I saw nothing insurmountable.

There are two commits

  • https://github.com/dask/distributed/pull/8468/commits/efb904580c1d29628bff605fb8ff651da9979b20 which decomposes the Server class and refactors a couple of modules
  • https://github.com/dask/distributed/pull/8468/commits/395fcc234c09da7b7c76b9e5a18f8dc60a0af12f which applies this decomposition to the Client

I think I'm mostly interested in feedback about the first one since that may have merit without the introduction of new features

fjetter avatar Jan 19 '24 14:01 fjetter

Some general comments without diving too deep into the actual changes: We should definitely aim to use orderd send-recv wherever possible, so also for connections between the client and the cluster instances.

+1 on refactoring more of our codebase away from inheritance to composition where possible/useful. IMO, this also makes testing easier. +1 on reusing an "RPC server" to handle all our RPC logic, it doesn't make much sense for our Client to reimplement that stuff.

hendrikmakait avatar Jan 19 '24 14:01 hendrikmakait

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

    29 files  ±    0      29 suites  ±0   14h 35m 48s :stopwatch: + 3h 18m 41s  4 083 tests ±    0   3 639 :white_check_mark:  -   325    116 :zzz: +  4    326 :x: +  319  2 :fire: +2  50 647 runs   - 4 583  44 569 :white_check_mark:  - 8 218  2 288 :zzz:  - 145  3 781 :x: +3 771  9 :fire: +9 

For more details on these failures and errors, see this check.

Results for commit f8ccddf8. ± Comparison against base commit 8564dc79.

This pull request removes 10 and adds 10 tests. Note that renamed tests count towards both.
distributed.tests.test_core ‑ test_async_listener_stop
distributed.tests.test_core ‑ test_counters
distributed.tests.test_core ‑ test_server_assign_assign_enum_is_quiet
distributed.tests.test_core ‑ test_server_close_stops_gil_monitoring
distributed.tests.test_core ‑ test_server_status_compare_enum_is_quiet
distributed.tests.test_core ‑ test_server_status_is_always_enum
distributed.tests.test_core ‑ test_server_sys_path_local_directory_cleanup
distributed.tests.test_core ‑ test_thread_id
distributed.tests.test_core ‑ test_tick_logging
distributed.tests.test_core ‑ test_ticks
distributed.cli.tests.test_dask_worker.test_listen_address_ipv6[tcp:..[ ‑ 1]:---nanny]
distributed.cli.tests.test_dask_worker.test_listen_address_ipv6[tcp:..[ ‑ 1]:---no-nanny]
distributed.tests.test_node ‑ test_server_assign_assign_enum_is_quiet
distributed.tests.test_node ‑ test_server_close_stops_gil_monitoring
distributed.tests.test_node ‑ test_server_status_compare_enum_is_quiet
distributed.tests.test_node ‑ test_server_status_is_always_enum
distributed.tests.test_node ‑ test_server_sys_path_local_directory_cleanup
distributed.tests.test_node ‑ test_thread_id
distributed.tests.test_node ‑ test_tick_logging
distributed.tests.test_node ‑ test_ticks
This pull request skips 2 tests.
distributed.tests.test_metrics ‑ test_monotonic
distributed.tests.test_scheduler ‑ test_get_cluster_state_worker_error

:recycle: This comment has been updated with latest results.

github-actions[bot] avatar Jan 19 '24 15:01 github-actions[bot]