pinot icon indicating copy to clipboard operation
pinot copied to clipboard

Make broker short-circuit for point queries w/ a limit clause

Open dang-stripe opened this issue 3 years ago • 6 comments

We've observed select and select distinct queries with no aggregation and a small limit clause can take a long time to process. We think that the broker is waiting for all results to return before returning to the client, when it can short-circuit and return once enough results to satisfy the limit are available.

cc @Jackie-Jiang

dang-stripe avatar Jul 29 '22 19:07 dang-stripe

To add on this, we may also build a mechanism for broker to early terminate the query execution on the servers when it already collects enough results.

Jackie-Jiang avatar Jul 29 '22 20:07 Jackie-Jiang

So currently, the broker reduce does not start until all responses (DataTable) have been received from all servers. This will require some sort of pipelined reduce and break in between as soon as termination condition is met. IIUC, this may lead to holes in results especially for select with ORDER BY. In theory, there can be holes in today's execution as well but this may just increase them ?

siddharthteotia avatar Aug 05 '22 03:08 siddharthteotia

@siddharthteotia For select with ORDER BY, we cannot really early-terminate. Early terminate can only be applied to select and select distinct without ORDER BY

Jackie-Jiang avatar Aug 05 '22 04:08 Jackie-Jiang

In that case, doing this should be fine. It wasn't clear if the proposal here was to do this for non ORDER BY alone.

siddharthteotia avatar Aug 05 '22 04:08 siddharthteotia

Hey, I am planning to pick this task. I have a couple of follow-up questions from my investigations:

Need a bit of context to understand the difference b/n GrpcBrokerRequestHandler, MultiStageBrokerRequestHandler and SingleConnectionBrokerRequestHandler which are invoked from here. Are we using all of them?

At a high level, I am planning to:

  1. Pass a limit down to processBrokerRequest method after extracting it from the compiled pinot query. Note: limit would only be passed for early terminate cases (ie, select and select distinct without ORDER BY)
  2. Return early from offline server request after limit no of responses are received from offline servers or go to real time servers to gather limit responses
  3. No changes needed in reduceOnStreamResponse as it would only have limit no of responses.

Please let me know how the implementation proposal sounds and if I am missing something. Thanks!

tanmesh avatar Aug 22 '22 19:08 tanmesh

@tanmesh Thanks for volunteering. We can focus on SingleConnectionBrokerRequestHandler for now. Currently broker gathers all the responses from all the servers before sending them to the reducer (See SingleConnectionBrokerRequestHandler.processBrokerRequest() line 113 asyncQueryResponse.getFinalResponses()). Instead, we want to reduce the results whenever a server responds, and early terminate the reduce (we may leverage the new introduced cancel query feature #9171). No extra limit needs to be passed as it should already exist in the pinot query. The challenge here is to make the reducer directly work on the AsyncQueryResponse

Jackie-Jiang avatar Aug 23 '22 22:08 Jackie-Jiang

I have added the progress so far in the above PR. May I get some eyes on this, to ensure that I am on the right track?

tanmesh avatar Dec 28 '22 21:12 tanmesh