presto
presto copied to clipboard
Redesign Exchange protocol to reduce query latency
An investigation of a slow exchange-heavy query prompted re-thinking of Exchange protocol used in Presto and Prestissimo.
The query under investigation has 10+ count(distinct) which are planned as 10+ stages with MarkDistinct and shuffle. This query shuffles same data 10+ times and some stages are skewed so that a handful of workers process 80% of the data. A simplified version of the query with fewer count(distinct) was taking 8 min, where 2 min went into fetching data over the network and 6 minutes went into waiting.
The consumer task has a memory limit (32GB) and therefore cannot pull from all the producers at once. The consumer also doesn't know which producers have data and which don't. When pulling from producers that do not have data, the consumer waits for 2 seconds before moving on to the next producer. These delays add up. We'd like to redesign Exchange protocol to eliminate these delays. We tried a prototype of the modified protocol described below on a query that used to time out after 1h. That query finished in < 4m.
Modified Protocol
Presto uses many-to-many streaming shuffle. M upstream workers partition data N ways. N downstream workers pull data from M upstream workers each.
Upstream workers use PartitionedOutput operators to partition the data and store it in OutputBuffer. There is a single OutputBuffer for each task with N partitions, one per downstream worker.
Downstream workers use ExchangeOperators and a single ExchangeClient to pull data from output buffers. ExchangeClient creates multiple ExchangeSources, one per upstream worker, to pull the data.
ExchangeSource uses HTTP protocol to pull data: https://prestodb.io/docs/current/develop/worker-protocol.html#data-plane
The existing protocol allows the consumer to ask for data and specify a limit on response size (maxBytes) and maximum time to wait for data to become available before sending an empty response (maxWait).
The limit on response size is advisory. The producer may not respect it. This would happen if the limit is very small (few KB) or if there is a single row that’s bigger than 1MB.
The consumer needs to maintain a cap on memory usage and therefore needs to be careful not to fetch from too many sources at once.
It is hard to come up with an optimal scheduling algorithm without knowing which sources have data and how much. Safe choices sacrifice latency.
Therefore we propose to modify the protocol to allow consumers to ask producers about how much data they have available first, then use this information to schedule data fetch.
There are cases when Prestissimo workers need to continue using the existing protocol to exchange data with Coordinator. This happens when the Coordinator pulls query results from the C++ worker. This also happens when C++ workers pull data from the leaf fragment that runs on Coordinator (TableScan for system tables). Therefore, we are going to extend the existing protocol in a backwards compatible way.
We introduce the HTTP header X-Presto-Get-Data-Size and use it with a GET {taskId}/results/{bufferId}/{token} request.
Prestissimo workers, in the presence of X-Presto-Get-Data-Size header, will return the amount of data available: a list of page sizes. The consumer will be able to request as many pages from the start of the list as would fit in memory by specifying the total bytes in these pages.
If data is not available yet, the producer would wait for up to maxWait seconds before responding.
We also introduce an HTTP header X-Presto-Buffer-Remaining-Bytes that is set by the producer in response to a GET {taskId}/results/{bufferId}/{token} to indicate how many bytes are still available.
A Prestissimo consumer would first query all (at the same time) producers for how much data they have.
X-Presto-Get-Data-Size: true
GET {taskId}/results/{bufferId}/{token}
The producers would respond with available data specified in the X-Presto-Buffer-Remaining-Bytes header.
X-Presto-Buffer-Remaining-Bytes: 100,200,150,400
The consumer then comes up with a plan to fetch available data and control the response sizes using the existing X-Presto-Max-Size header.
X-Presto-Max-Size: 1MB
GET {taskId}/results/{bufferId}/{token}
The consumer would prioritize fetching data from the sources that have most data. This will unblock these producers sooner and allow them to produce more data while the consumer fetches data from the remaining sources.
When fetching data, the consumer would receive the data and the size of the remaining data. This will allow the consumer to continue fetching the data without extra metadata requests.
Rinse and repeat until all sources report no-more-data.
The value of X-Presto-Buffer-Remaining-Bytes is a list of page sizes in order in which they will be produced. This is a comma-separated list of positive integers, where each value is the number of bytes in a page. For example,
X-Presto-Buffer-Remaining-Bytes: 12323432,123452342,23344354,34532343241341
Prestissimo worker includes X-Presto-Buffer-Remaining-Bytes header in all responses to GET {taskId}/results/{bufferId}/{token} regardless of whether X-Presto-Get-Data-Size header is present or not. When no data is available the header value is set to 0.
X-Presto-Buffer-Remaining-Bytes: 0
Backwards compatibility with the Coordinator
There are two use cases that involve data transfer between the Coordinator that doesn’t understand the new protocol and a Prestissimo worker.
Use case 1: Coordinator fetches query results from the worker.
Prestissimo workers would include the new X-Presto-Buffer-Remaining-Bytes header in response to existing GET {taskId}/results/{bufferId}/{token} request (without X-Presto-Get-Data-Size). Coordinator would ignore this header and continue fetching data without the benefit of knowing how much data is available.
Use case 2: Prestissimo worker fetches table scan results for system table from the Coordinator.
Prestissimo worker includes the new X-Presto-Get-Data-Size header in the existing GET {taskId}/results/{bufferId}/{token} request. Coordinator will ignore this header and return the data. Prestissimo workers should accept that response (even though they only asked for data size). This should work because in this case there is only one source and it doesn’t produce a lot of data. Prestissimo workers would also set the X-Presto-Max-Size header to avoid accidentally getting too much data (just in case).
Implementation
ExchangeClient in Velox would implement two independent loops: wait-for-data, fetch-data.
Wait-for-data loop is waiting for sources to report having data. This loop processes all sources in parallel.
Fetch-data loop is fetching data from sources that reported having some data. This loop processes a subset of sources at a time to stay within the memory budget.
Any given source is being assigned to either one loop or the other, but not both. Initially all sources go into the wait-for-data loop. As soon as the source responds as having data it is moved to the data-fetching loop. The source stays in the data-fetching loop until response indicates there is no more data available, but the source is not at-end. At this point the source is moved into the wait-for-data loop. Sources that reported being at-end are removed from the loops and do not participate in further processing.
CC: @Yuhta @spershin @arhimondr @tdcmeehan @aditi-pandit @majetideepak @pranjalssh
CC: @agrawaldevesh
@mbasmanova thank you for the clear write up. I had one initial question.
A Prestissimo consumer would first query all (at the same time) producers for how much data they have.
Do we need special casing for this first request in order to receive most of the performance improvement? Would it not be sufficient to simply receive the header on subsequent shuffles and adapt according to the values we receive?
@tdcmeehan Tim, thank you for considering this proposal.
Do we need special casing for this first request in order to receive most of the performance improvement? Would it not be sufficient to simply receive the header on subsequent shuffles and adapt according to the values we receive?
A simpler improvement on the existing protocol is to simply add X-Presto-Buffer-Remaining-Bytes header that producer uses to specify how many more bytes it has buffered and ready to go.
However, this is not enough. Consider what happens when request for data comes back with X-Presto-Buffer-Remaining-Bytes indicating there is no more data yet. The consumer needs to continue polling the producer, but it no longer knows how much data it may get in response and when. So, we are back to original problem where we don't know whom to ask for data and end up spending lots of wall time asking the "wrong" producers before finally getting to the "right" one.
@mbasmanova so is the idea like this:
- Query all task result endpoints at the start of execution for how much data is available for shuffle. The hope is that even though this is at the start of execution, there is some meaningful number of bytes available for the consumer to pick amongst them.
- Once we have the list of available bytes from all task result endpoints, we sort them in ascending order so that we pick the endpoints that have the most bytes available.
- We prioritize the ones that have the most data available, in a way that doesn't cause starvation to the ones that initially reported low bytes available.
If this is accurate, does this mean, what we're trying to avoid is being biased by one particular task result reporting before others have finished reporting?
I also don't understand your example. If we receive X-Presto-Buffer-Remaining-Bytes
, then this goes into the bottom of the priority queue of producers to poll. I imagine we go to another producer at this point. Either this next producer reports it has a lot of data, or not a lot. If not a lot, we continue until we reach a producer that does produce a lot of data. If it does produce a lot, then we are lucky and prioritize it on top until we either consider another producer for fairness (to prevent starvation) or consume quickly what data it has buffered and move on.
In other words, I believe there may be some fairness mechanism in this algorithm, otherwise we could be misled by the initially reported bytes which might not reflect current reality. And if there is, wouldn't this fairness mechanism be enough to ensure that even if we chose producers at random, given enough time it would prioritize the producers that end up producing the most data?
@tdcmeehan
The main goal is to ensure that as long as there is data available, the consumer is working hard at fetching that data and avoid having consumer being idle when there is data to fetch.
With the existing design, we see cases when consumers are not fetching data although it is available.
Consider, a single consumer with 100 producers. If consumer initiates a fetch from all 100 producers in parallel, it is possible that all 100 respond back with 1MB of data each. This would require consumer to allocate 100MB of memory and go over budget. To solve this problem, consumers were modified to fetch from a limited number of producers at once. Furthermore, to avoid too much back-and-worth when waiting for data to be produced, each fetch blocks for 2 seconds if data is not available.
Let's say a consumer has a budget to 10MB, so it fetches data from 10 producers in parallel. Let's also say that producer 95 has data, but no other producer does. The consumer will spent 2s waiting for producers 1-10 to report they have nothing, another 2s for producers 11-20 to report they have nothing, etc. until it gets to producer 95 after spending 18 seconds waiting.
Then, let's say producer 95 returned all the data it had and will take a bit of time to produce more data. The consumer will repeat the cycle above and again sit idle waiting for another 18 seconds before getting next batch of data.
@mbasmanova thank you for the example, that clears things up a lot.
Just to clear up my understanding: in this example, would it be fair to say that if we just added the header, and not the initial special case call to retrieve the initial buffers, that it would take us 18s to discover that producer 95 has the majority of the data? And if so, that means going forward, producer 95 would be prioritized above the rest? Meaning, we would have just an 18s penality, instead of a penalty that is amortized over the life of the query?
@tdcmeehan Yes that is true, but only if the throughput distribution of producers does not change over time. With the new protocol, it can adapt to any change and react quickly so no large latency can occur no matter what.
Oh, I see. Is this because we periodically take a "snapshot" of all producers' pending output buffers? I had presumed that the initial request with X-Presto-Get-Data-Size
was done once at the beginning. Is it actually done periodically?
@tdcmeehan It's done periodically whenever the source is in wait mode:
Any given source is being assigned to either one loop or the other, but not both. Initially all sources go into the wait-for-data loop. As soon as the source responds as having data it is moved to the data-fetching loop. The source stays in the data-fetching loop until response indicates there is no more data available, but the source is not at-end. At this point the source is moved into the wait-for-data loop. Sources that reported being at-end are removed from the loops and do not participate in further processing.
@tdcmeehan
Is this because we periodically take a "snapshot" of all producers' pending output buffers?
Not quite.
We run two independent loops: wait-for-data, fetch-data.
Wait-for-data loop is waiting for sources to report having data. This loop processes all sources in parallel.
Fetch-data loop is fetching data from sources that reported having some data. This loop processes a subset of sources at a time to stay within the memory budget.
Any given source is being assigned to either one loop or the other, but not both. Initially all sources go into the wait-for-data loop. As soon as the source responds as having data it is moved to the data-fetching loop. The source stays in the data-fetching loop until response indicates there is no more data available, but the source is not at-end. At this point the source is moved into the wait-for-data loop. Sources that reported being at-end are removed from the loops and do not participate in further processing.
Makes sense.
One idea on the API with respect to backwards compatibility. It seems like X-Presto-Get-Data-Size: true
is like a metadata-only request. Typically, such requests are fulfilled by HTTP HEAD, whose purpose is to send back headers without data. What if instead of sending across this new header on the GET
to {taskId}/results/{bufferId}/{token}
, we just have consumers in wait-for-data mode to send HEAD
requests to the underlying source. That way, we always send back X-Presto-Buffer-Remaining-Bytes
consistently between HEAD
and GET
, and the only difference is the new HEAD
request. It's backwards compatible because the only difference between the coordinator and workers is the coordinator doesn't care to make the HEAD
requests, and we don't need to define a new header type for this purpose.
@tdcmeehan
What if instead of sending across this new header on the GET to {taskId}/results/{bufferId}/{token} , we just have consumers in wait-for-data mode to send HEAD requests to the underlying source.
To do that, we need Coordinator to add support for this new HEAD request. This is needed to support queries that use SYSTEM tables and have their leaf scans run on Coordinator. In this case, Prestissimo workers need to fetch data from Coordinator.
@mbasmanova this is true, but from the description, it seems that we would just need to make sure that this call doesn't fail (perhaps, just returns 200 if the buffer exists at all), which would be a trivial change. It doesn't seem like it would require support for the X-Presto-Buffer-Remaining-Bytes
header.
@tdcmeehan
we would just need to make sure that this call doesn't fail (perhaps, just returns 200 if the buffer exists at all), which would be a trivial change. It doesn't seem like it would require support for the X-Presto-Buffer-Remaining-Bytes header.
In the current design, this call must return remaining bytes, otherwise the consumer will never switch from waiting to fetching-data for that producer.
Currently, we say that get-data-size request is allowed to return data, but if it doesn't and if it doesn't specify remaining bytes, we'll continue issuing that same request until we get "no-more-data" or "remaining-bytes".
@mbasmanova got it, I missed that part of the design. Would something along these lines work in TaskResource
? I suppose we'd need a similar change in the GET
method as well.
@HEAD
@Path("{taskId}/results/{bufferId}")
public Response head(
@PathParam("taskId") TaskId taskId,
@PathParam("bufferId") OutputBufferId bufferId,
@HeaderParam(PRESTO_MAX_SIZE) DataSize maxSize)
{
requireNonNull(taskId, "taskId is null");
requireNonNull(bufferId, "bufferId is null");
OutputBufferInfo outputBuffers = taskManager.getTaskInfo(taskId).getOutputBuffers();
Optional<BufferInfo> bufferInfo = outputBuffers.getBuffers().stream().filter(buffer -> buffer.getBufferId().equals(bufferId)).findFirst();
Response.ResponseBuilder responseBuilder = Response.ok();
return bufferInfo
.map(info -> responseBuilder.header("X-Presto-Buffer-Remaining-Bytes", info.getPageBufferInfo().getBufferedBytes())).orElse(responseBuilder)
.build();
}
@tdcmeehan I'm not familiar with this code, but it looks like this would work. Do you know if this works for all kinds of buffers, i.e. broadcast/partitioned/arbitrary?
There are BufferInfo
implementations for all buffer types, and it's being reported in TaskInfo, so I believe this should work consistently.
@tdcmeehan Tim, any chance you could help add support for HTTP HEAD metadata requests and and returning remaining bytes from existing HTTP GET requests to coordinator?
@Yuhta Jimmy, what do you think about this proposal? It looks cleaner, but unfortunately requires changes to the coordinator, so we won't be able to modify Prestissimo independently.
I think this is good. On the Prestissimo side we will handle both HEAD and X-Presto-Get-Data-Size
temporarily, once the coordinator is fully upgraded we can remove X-Presto-Get-Data-Size
@mbasmanova @Yuhta I'll work on the coordinator part.
@tdcmeehan Thank you, Tim.
@mbasmanova in the example, the following is returned:
X-Presto-Buffer-Remaining-Bytes: 100,200,150,400
Does this signify a series of data pages and their size? Would it be the same if they were summed together (e.g., 850)?
@tdcmeehan Split the bytes at page boundary allows the consumer to fetch a prefix of them according to the memory budget
Thanks @Yuhta. In practice, how would that work? For example, suppose you have enough memory for the first two pages (100 and 200 bytes), would you specify the X-Presto-Max-Size
header to be 300
?
@tdcmeehan Yes we will request a sum of prefix of remaining bytes
@Yuhta thanks. For the purposes of the coordinator changes, it's a lot more convenient to keep them summed since this is how it is reported in TaskInfo
, and it seems like it should be fine to keep it that way. Let me know if my understanding is not correct.
@tdcmeehan Yes when we are fetching from coordinator, it is the only producer and all the data is usually small enough to be kept in memory (otherwise it would be OOM in coordinator first), so it is probably ok.
@tdcmeehan Tim, Jimmy already replied. I just wanted to confirm that it is important to specify the list of page sizes to allow consumer to know exactly how much data they will get back. If producer simply says 10MB, then it is not clear whether it can return those in 1MB chunks or 2MB chunks or only in one 10MB chunk.
Have we made the changes to use the new protocol that relies on HEAD to get output buffer metadata in C++?
@tdcmeehan Not yet, what is the coordinator version that has the HEAD support in? We need to check all our internal coordinator has been upgraded to newer than this verison