openfl icon indicating copy to clipboard operation
openfl copied to clipboard

Batched fetching of tensors to reduce RPC calls [1/2]

Open MasterSkepticista opened this issue 8 months ago • 3 comments

Motivation

Collaborators today fetch each model tensor from the aggregator via a dedicated RPC call. Aggregator has limited resources to serve requests, and it is not uncommon to have hundreds (if not thousands) of model tensors waiting to be served to each collaborator. A federation itself may have hundreds (if not thousands) of participants making these requests.

Example

Consider torch/histology experiment.

  • Model size is ~64MB with about 20 tensors.
  • In a federation with 8 collaborators, a total of 160 RPC calls are made each round to the aggregator just to fetch latest model tensors.
  • In contrast, only (get_tasks + 3 x send_local_task_results) x 8 collaborators 32 RPC calls are made for other purposes in each round.
  • >80% of all requests are consumed in serving tensors. This also leads to thread contention. Aggregator cannot generally afford more than 8-32 threads assuming modest CPU/memory availability.
  • Had the tensors been batched per request, each collaborator could be served with a single RPC thread and no aggregator-side contention.

Problem gets worse when models have hundreds of tensors and experiments span more collaborators.

Goal of this PR

This is Part 1 (of 2) PRs that adds support for batched fetching of tensors over gRPC. This PR makes the following major changes:

  1. Remove send_model_deltas and use_delta_updates support.
  2. Simplify nparray_to_named_tensor (henceforth called serialize_tensor) and named_tensor_to_nparray (henceforth called deserialize_tensor) without any brittle conditional checks.
  3. Batch gRPC requests for tensors on the collaborator and aggregator side.

In Part 2 of this PR, delta update support will be added back. The reason for removal is to straighten the logic for actions that concern a communication pipeline (de/compression, de/serialization) and the aggregator/collaborator component.

Implementation

A new RPC call replaces the get_aggregated_tensor with get_aggregated_tensors (plural 's'). Collaborators request a batch of tensors in a new TensorSpec format.

message TensorSpec {
  string tensor_name = 1;
  int32 round_number = 2;
  bool report = 3;
  repeated string tags = 4;
  bool require_lossless = 5;
}

You may observe the resemblance with TensorKey, except that origin field is missing. The RPC request and response formats are shown below:

message GetAggregatedTensorsRequest {
  MessageHeader header = 1;
  repeated TensorSpec tensor_specs = 2;
}

message GetAggregatedTensorsResponse {
  MessageHeader header = 1;
  repeated NamedTensor tensors = 2;
}

On collaborator side, tensors are fetched via self.fetch_tensors_from_aggregator(tensor_keys) where tensor_keys are of type List[TensorKey].

Cost/Benefit Analysis

One downside of this PR is that potential bandwidth savings achieved due to delta updates is lost. The upside of this PR is a significantly simplified mental model of how tensors are processed on both ends, higher correctness confidence and higher maintainability.

Indeed, the second part will close the drawback by bringing in delta updates, keeping the mental model simple.

Reviewer note(s): This PR has a lot of cleanup. It is best understood by looking at the proposed changes directly and not diffs.

MasterSkepticista avatar Apr 30 '25 11:04 MasterSkepticista

FYI, a couple of the tests are failing - secure aggregation in particular https://github.com/securefederatedai/openfl/actions/runs/15063524782/job/42343165534?pr=1575

kminhta avatar May 18 '25 14:05 kminhta

Also, going to tag @ishaileshpant to keep him in the loop as there are modifications to RPC calls that may affect https://github.com/securefederatedai/openfl/pull/1500

kminhta avatar May 18 '25 14:05 kminhta

Thanks for taking this up @MasterSkepticista - this is a great fundamental adjustment to how we transport the model across the network. I don't see any major concerns, I'm going to go ahead and approve this after my review and our offline discussion with the understanding that:

  1. delta updates get reintroduced in the follow up PR
  2. streaming gets reintroduced in a more generic manner (as you called out in Batched fetching of tensors to reduce RPC calls [1/2] #1575 (comment)) in some subsequent PR as well - or otherwise some manner of handling large models from both the aggregator and collaborator

I know you called out point 1 in the PR, but do you have a mental model for how point 2 can be achieved or is the general implementation still an open? I ask mainly because it would be good to close the gap for first class LLM support (thinking in terms of pretraining), but it is also true that LLM fine-tuning has a large library of parameter efficient methods that may still allow for expanding OpenFL-LLM capabilities in the interim

Yes, delta updates will be reintroduced shortly. Streaming changes are reverted in this PR. I don't have concrete idea on how to generalize (2). But since we discussed its orthogonality with the current set of changes, we will attack it separately

MasterSkepticista avatar May 19 '25 13:05 MasterSkepticista