risingwave icon indicating copy to clipboard operation
risingwave copied to clipboard

Support configuration change in a single barrier

Open wenym1 opened this issue 6 months ago • 4 comments

Motivation

Currently, when doing configuration change, such as scaling, replace table (including sink into table), we follow the following steps:

  1. inject a pause barrier, let's say epoch1, and after the barrier is collected and committed, call try_wait_epoch(epoch1) explicitly for each CN
  2. inject a configuration change barrier, let's say epoch2, to apply the configuration. For newly created stateful executors, they call StateTable::init on epoch2, for existing stateful executors, they call update_vnode_bitmap.
  3. inject a resume barrier to resume the stream

The reason for this design is mainly for data synchronization, which means, each newly created actors, or actor that is assigned with new vnode, can see the latest data of the vnodes it owns. The mechanism works, because we call try_wait_epoch(epoch1) in each CN, so that all created actors and any actor created later can at least read all data at epoch1, and since we have paused the stream after epoch1, data is the same in epoch1 and epoch2, and when configuration change happens in epoch2, all actors can read all latest data.

Though the current mechanism works, it has many drawbacks:

  1. We pause the stream explicitly, and during which concurrent barriers become useless
  2. The correctness depends on a too large scope. First, it depends on all executors to handle the pause barrier correctly, to ensure that there is actually no data between epoch1 and epoch2. Second, it depends on correctly calling try_wait_epoch. Sometimes, for example https://github.com/risingwavelabs/risingwave/issues/17602, the correctness implicitly depends on calling try_wait_epoch, but we didn't do it, and the error is either hard to reproduced, or happens in somewhere far away in downstream, which cost many efforts to debug.

Since the main purpose of this mechanism is for data synchronization, we'd better narrow the scope that the correctness depends on. In solely the implementation of StateTable, we can ensure that data are always ready when StateTable::Init and StateTable::update_vnode_bitmap, without any implicit dependency on the correct behavior of higher level streaming executor or the streaming runtime.

Proposal

As discussed offline with @BugenZhao and @st1page , we have the following rough proposal.

When we call StateTable::Init and StateTable::update_vnode_bitmap, the state table itself will wait the committed epoch of the table id to bump up before return, and after the methods return, the state table is ready to serve the read request.

In the previous development of configuration change, @BugenZhao followed a similar implementation to this proposal, but encountered deadlock. The deadlock happened because, we started waiting for committed epoch to bump up before yielding the barrier, and committed epoch bumping up depends on collecting all barriers.

Similar deadlock is very likely to happen, and require careful handling the order of waiting for epoch and yielding barrier. In general, we need to following the following order:

  1. receive barrier
  2. commit state table if not first barrier
  3. yield barrier
  4. wait epoch in either init or update_vnode_bitmap.

Following this order, similar deadlock won't happen. The correctness depends on an assumption that, in the perspective of a single executor, after yielding a checkpoint barrier, the committed epoch must be able to bump up, without dependency on the executor doing any other things, so that it can safely wait for the committed epoch to bump up eventually.

Though we need careful handling the logic in all stateful executors, the correctness can be ensured in CI as long as all executors are covered in CI.

  • For correctness in init, if the logic is not handled correctly, deadlock is bound to happen, and we can discover the mistake in CI
  • For correctness during update_vnode_bitmap, since we have not ensured that all executors are covered in the scale test, we cannot ensure that update_vnode_bitmap of all executors are called in CI. To ensure the correctness, we can remove the current update_vnode_bitmap method of the state table. Instead, we wrap the logic in a new method post_yield_barrier(Option<Mutation>), which should always be called after yielding the barrier regardless of whether there is configuration change. We can add sanity check in this state table method to ensure that this method is called before. In this way, if we miss calling this method in any executor, we will fail the sanity check.

After this proposal, we can do configuration change in a single barrier without the need to pause and resume the stream.

wenym1 avatar Aug 29 '24 06:08 wenym1