cuprate icon indicating copy to clipboard operation
cuprate copied to clipboard

Concurrent map

Open Boog900 opened this issue 11 months ago • 16 comments

This adds a concurrent hash map impl.

This concurrent map is optimized for:

  • multiple threads working on retrieving values concurrently.
  • the keys needed being known before the workers start

It is not a full concurrent map. It is pretty much a concurrent map builder.

Some background:

In the consensus code, we batch outputs need into one DB request so the DB can split the request up as it sees fit. This is probably the biggest reason why our RPC scanner is so quick, it will take this huge list of outputs needed and split them up between nodes getting the maximum from each node (5,000).

The way this currently works is when the shimmed db-rpc gets an output request it will deconstruct the hashmap into mutliplte hashmaps sending them to different nodes, await the response and combine the hashmap responses.

This isn't very efficient but it was good enough for the RPC scanner where the majority of the wait is on the actual RPC request.

This may be a problem for our DB though, which is what this Map impl tries to solve, no longer do we have to iterate, deconstruct and combine hashmaps.

Nothing here is final, but would be good to have thoughts @hinto-janai, @SyntheticBird45

I did think of using 2 DashMaps one for request one for return but I feel this custom solution would be quicker as we don't have to care about 99% of the things DashMap can do.

Boog900 avatar Mar 05 '24 22:03 Boog900

I am not looking for reviews of this code (yet), but if you see some invalid logic then please say.

Boog900 avatar Mar 05 '24 23:03 Boog900

What part of Cuprate exactly calls this? Is it something like:

  1. Outside Cuprate sends large output request to DB
  2. DB uses ConcurrentMapBuilder to fetch outputs
  3. DB responds with BuiltMap

hinto-janai avatar Mar 06 '24 01:03 hinto-janai

Yeah that's a bit I am unsure about, I think the best way is for the caller to pass the IndexSet to the service, which will then create a builder. Then with that builder, create N request to put in the channel which would get picked up by different workers and when they all return the service will call try_create on the builder it has, returning the BuiltMap if successful.

If a request goes to the same worker that already dealt with it, the worker would use get_work which would return None/Err and the worker would just move on to another request.

Boog900 avatar Mar 06 '24 01:03 Boog900

Could we use rayon + par_iter().collect() here instead our own impl?

  1. Caller sends IndexSet to DB service
  2. DB thread picks up request, starts par_iter().collect() in rayon pool
  3. rayon returns result to DB thread
  4. DB sends response back to caller

The 1 DB thread acts as the single joiner, the rayon pool does the actual work.

              |-------------------|
              |  Outside Cuprate  |
              |-------------------|
                        ^
                        |
                   DB Service
                        |
       /---------------/
      v
|-----------------------------------------------|
| DbThread1 | DbThread2 | DbThread3 | DbThread4 |
|-----------------------------------------------|
    ^   |
    |   v
|-----------------------------------------------|
|           global rayon thread pool            |
|-----------------------------------------------|

From my perspective, the DB thread-pool in service isn't necessarily supposed to be doing work, they exist essentially to route things around. They could all use the global rayon thread-pool as the work "backend" and take advantage of par_iter and such.

hinto-janai avatar Mar 06 '24 02:03 hinto-janai

I don't think this expands the scope of the service too much, it is still routing the request to the threads to do work.

I don't think we can use rayon, as we would be doing DB operations (getting outputs) in the par_iter.

We could use a normal thread pool instead of rayon, and have each thread use the MapBuilder but then we are spawning more threads when we already have DB threads to do work.

Boog900 avatar Mar 06 '24 13:03 Boog900

I would be good to have a way to send a request to one or all DB threads though, instead of needing to send the same request multiple times and hope it gets picked up by different threads.

Boog900 avatar Mar 06 '24 13:03 Boog900

I don't think this expands the scope of the service too much

Some thread has to split up and distribute the work. If not the caller by sending multiple requests, then wouldn't it be the 1 DB thread who received the initial request to all DB threads?

Setting up channel+select wiring would work but I think implementing 1 DB thread receives request + rayon would be simpler.

I don't think we can use rayon, as we would be doing DB operations (getting outputs) in the par_iter

rayon::scope() + pass in &ConcreteEnv? Or a custom rayon::ThreadPool that has Arc<ConcreteEnv> that drops alongside the DB threadpool.

For the question: "who is responsible for DB parallelization?"

  • The caller (splits up the work, sends multiple requests, receives multiples responses)
  • The DB threadpool (1 DB thread receives request, splits work across the other threads)
  • rayon (1 DB thread receives request, splits work using rayon)

I don't think it should ever be the caller, as all requests (even 1) should saturate the DB as much as possible.

hinto-janai avatar Mar 06 '24 18:03 hinto-janai

I had a bit more of a think.

The reason I didn't want to use rayon is because one of the reasons for doing what we are doing with the DB is to remove all DB operations from the async runtime, so it feels weird to then put it in a rayon runtime, which should also not have blocking operations.

But if we were to use a second rayon pool just for the DB we would not starve other parts of Cuprate or the rest of the DB, as if one part of the DB is blocking if we were to move on to another request that is likely to block as well so we don't lose anything.

We can also take using rayon a step further by removing our reader threads completely, just using the custom rayon thread pool to execute requests.

Instead of the service sending the request down a channel with a oneshot return it would call spawn on the runtime with the request and oneshot.

This would allow using rayon's par iters for all DB requests, without spawning excess threads

Boog900 avatar Mar 06 '24 23:03 Boog900

if one part of the DB is blocking if we were to move on to another request that is likely to block as well

What about a situation where there's 1 large request, then shortly after 500 small requests? Shouldn't those 500 requests be responded to (dictated by rayon's scheduler I guess) instead of being stalled on the 1?

We can also take using rayon a step further by removing our reader threads completely, just using the custom rayon thread pool to execute requests

A single "manager" thread (1 DB reader thread) splitting work with a shared rayon threadpool is what I would do on first glance, but if rayon threads can use their own threadpool then sure (what is rayon's behavior with nested par_*()? bad perf? deadlocks?).

Instead of the service sending the request down a channel with a oneshot return it would call spawn on the runtime with the request and oneshot

Can you write code or a graph of what this would look like? How does the work get split after spawn()? How does it get joined and sent back?

hinto-janai avatar Mar 07 '24 02:03 hinto-janai

What about a situation where there's 1 large request, then shortly after 500 small requests? Shouldn't those 500 requests be responded to (dictated by rayon's scheduler I guess) instead of being stalled on the 1?

This is true of our current reader threads as well a big request may take a thread out, we still have other rayon threads that can be doing work.

A single "manager" thread (1 DB reader thread) splitting work with a shared rayon threadpool is what I would do on first glance, but if rayon threads can use their own threadpool then sure (what is rayon's behavior with nested par_*()? bad perf? deadlocks?).

No it's perfectly fine to do nested parallelism within rayon, the rayon thread which made the call will also work on the par_iter so no deadlocks

Can you write code or a graph of what this would look like? How does the work get split after spawn()? How does it get joined and sent back?

Here is some rough code:

// the DB service's call function
fn call(&self, req: DBReq) -> Future {
    let (tx, rx) = oneshot::new();
    // this will put a job in the pool without blocking the current thread.
    self.pool.spawn(handle_req(tx, req)):
    
    rx

}

fn handle_req(tx: oneshot::Sender, req: DBReq) {
     match req {
         DBReq::Outputs(needed) => tx.send(get_outs(needed))
     }
}

fn get_outs(needed: OutputIDs) -> Outputs {
     // the current rayon thread will make progress on this, this will not deadlock.
    needed.par_iter(|id| get_output(id)).collect()
}

We would need to add some sort of back pressure so the database does not get rundown with tasks like with the current channels but that will be easy to do with a semaphore so we can ignore for now.

Boog900 avatar Mar 07 '24 14:03 Boog900

No it's perfectly fine to do nested parallelism within rayon, the rayon thread which made the call will also work on the par_iter so no deadlocks

Swapping our current DB reader thread-pool with rayon seems good then. I don't think our locks will deadlock either since only read guards are taken.

Here is some rough code

Yeah this looks good too. Is the question now if get_output() should use this PR's concurrent map or do a bunch of Vec flattens or similar? I think this PR would be faster although I'm not a fan of introducing unsafe and maintaining it. Maybe some perf tests could settle the difference?

hinto-janai avatar Mar 07 '24 20:03 hinto-janai

I was thinking about the similarities between tokio blocking threadpool and rayon one. Someone on reddit described the difference between the two. Despite tokio being supposedly more suitable for our purpose, I think rayon api is much easier and straightforward to use and test. So yeah I agree with the idea of using rayon threadpool

SyntheticBird45 avatar Mar 07 '24 20:03 SyntheticBird45

So, unless I'm stupid, the proposed concurrent hashmap is optimized for the multiple db threads to concurrently write results into it ?

SyntheticBird45 avatar Mar 07 '24 20:03 SyntheticBird45

Yeah this looks good too. Is the question now if get_output() should use this PR's concurrent map or do a bunch of Vec flattens or similar? I think this PR would be faster although I'm not a fan of introducing unsafe and maintaining it. Maybe some perf tests could settle the difference?

Yep we need benchmarks from the DB, I don't think there will be too much difference though, rayon is very efficient.

So, unless I'm stupid, the proposed concurrent hashmap is optimized for the multiple db threads to concurrently write results into it ?

pretty much

Boog900 avatar Mar 07 '24 21:03 Boog900

Yep we need benchmarks from the DB, I don't think there will be too much difference though, rayon is very efficient.

If this PR isn't urgent, we could write a benchmark code with criterion and try out with an rng for data and a random thread sleep to simulate blocking file op.

SyntheticBird45 avatar Mar 07 '24 21:03 SyntheticBird45

This PR isn't urgent, yeah we could

Boog900 avatar Mar 07 '24 21:03 Boog900