risingwave icon indicating copy to clipboard operation
risingwave copied to clipboard

feat(meta): deprecate parallel unit

Open shanicky opened this issue 1 year ago • 0 comments

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

This PR is a massive one, and the specific features still need to be verified. If possible, it may be split into multiple smaller PRs in the future. Due to the need to consider backwards compatibility, more testing is required. Currently, we need to ensure that it can consistently pass CI tests.

This PR made the following modifications:

  1. Removed the dependency on persistent global Parallel Units, replacing them with dynamic temporary WorkerSlots. Previously, Parallel Units had two semantics: Worker and Worker Parallelism. So for each Worker with parallelism of Parallelism, we generated (WorkerId, 0), (WorkerId, 1) and so on to replace it. The vnode mapping in the communication between meta and the frontend also uses worker slots, which should be simplified to WorkerMapping in the future.
  2. Removed the VnodeMapping of Fragment. Since VnodeMapping was implemented through ParallelUnitMapping, we can derive VnodeMapping from ActorBitmaps and ActorStatus, so a persistent VnodeMapping is not needed. Moreover, binding Fragments to Workers is actually quite strange.
  3. Modified the Reschedule interface. The previous syntax was {fragment_id}-[parallel_unit]+[parallel_unit], which has now been changed to {fragment_id}-[worker_id:count]+[worker_id:count]. Reschedule is now defined by modifying the number of allocations on workers. Simulation tests have all been updated to adapt.
  4. Considering compatibility, rw_parallel_units is still retained, changed to the form of (slot_id, worker_id).

Considering compatibility issues, when creating a streaming job, the maximum parallelism is still limited to the sum of worker parallelism. However, it is possible to manually alter the parallelism to a huge value.


dev=> set streaming_parallelism = 20;
SET_VARIABLE
dev=> create table t(v int);
ERROR:  Failed to run the query

Caused by these errors (recent errors listed first):
  1: gRPC request to meta service failed: The service is currently unavailable
  2: Service unavailable: Not enough parallelism to schedule, required: 20, available: 12

dev=> set streaming_parallelism = 0;
SET_VARIABLE
dev=> create table t(v int);
CREATE_TABLE
dev=> alter table t set parallelism = 100;
ALTER_TABLE
dev=> select fragment_id, count(*) from rw_actors group by fragment_id;
 fragment_id | count
-------------+-------
           3 |   100
           4 |   100
(2 rows)

The following is an AI-generated summary.

  • common.proto Changes:

    • Removed the deprecated parallel_units field from WorkerNode.
    • Added a new parallelism field to WorkerNode for indicating parallel capabilities.
  • meta.proto Changes:

    • Replaced the deprecated parallel_unit_migration_plan with a new worker_slot_migration_plan.
    • Deprecated several fields (vnode_mapping, parallel_unit_id) in favor of reserved keywords.
    • Removed GetReschedulePlanRequest and GetReschedulePlanResponse messages from ScaleService.
    • Introduced a new WorkerReschedule message and added worker_reschedules to RescheduleRequest.

Code Refactors

  • src/batch/src/executor/join/local_lookup_join.rs:

    • Updated worker_slot_mapping logic to use the parallelism field.
  • src/batch/src/worker_manager/worker_node.min.rs:

    • Refactored several functionalities to employ the parallelism field instead of the deprecated parallel_units.
  • src/common/src/hash/consistent_hash/mapping.rs:

    • Enhanced WorkerSlotId with a more descriptive Debug implementation.
    • Added to_worker_slot method in ActorMapping for mapping actors to worker slots.
  • src/common/src/vnode_mapping/vnode_placement.rs:

    • Modified place_vnode to use parallelism instead of parallel_units.len().

Checklist

  • [x] I have written necessary rustdoc comments
  • [x] I have added necessary unit tests and integration tests
  • [x] I have added test labels as necessary. See [details]
  • [x] All checks passed in ./risedev check (or alias, ./risedev c)

shanicky avatar Jul 01 '24 14:07 shanicky