presto
presto copied to clipboard
[native] WIP: SystemConnector to query system.runtime.tasks table
Description
Motivation and Context
Impact
Test Plan
Contributor checklist
- [ ] Please make sure your submission complies with our development, formatting, commit message, and attribution guidelines.
- [ ] PR description addresses the issue accurately and concisely. If the change is non-trivial, a GitHub Issue is referenced.
- [ ] Documented new properties (with its default value), SQL syntax, functions, or other functionality.
- [ ] If release notes are required, they follow the release notes guidelines.
- [ ] Adequate tests were added if applicable.
- [ ] CI passed.
Release Notes
Please follow release notes guidelines and fill in the release notes below.
== RELEASE NOTES ==
General Changes
* ...
* ...
Hive Changes
* ...
* ...
If release note is NOT required, use:
== NO RELEASE NOTE ==
CC: @spershin
@aditi-pandit Is there a design document about this connector? I assume SystemConnector requires access to Metastore and we don't have that on the worker. Hence, wondering how this will work?
@mbasmanova : This SystemConnector code was to query the tasks table. That seemed the only part of the SystemConnector that was needed at the worker for Prestissimo.
Other tables like nodes, queries were populated from in-memory structures in the co-ordinator itself. Any code accessing Metastore (like TablePropertiesSystemTable say) seemed to be required only at the co-ordinator part of the connector.
I just spent a day on this prototype to wire the pieces. I haven't put together a design doc.
@aditi-pandit Aditi, thank you for clarifying. It is interesting that tasks table is populated on the workers. I wonder why. All the information is available on the coordinator. CC: @tdcmeehan
I think the reason for this is because historically you could always deploy Presto in a mode where many or all of the workers also functioned as coordinators. In this mode, any single coordinator would only know of the tasks whose queries are local to that coordinator.
I think the reason for this is because historically you could always deploy Presto in a mode where many or all of the workers also functioned as coordinators. In this mode, any single coordinator would only know of the tasks whose queries are local to that coordinator.
@tdcmeehan Tim, thank you for clarifying. I didn't know about this deployment scheme. I'm not sure I understand how this works though. When there are multiple coordinators, wouldn't query results depend on which coordinator is being asked to process the query? Are you saying that in this setup a query can be routed to any coordinator and the results are expected to be the same? I guess in this case it is necessary to ask all the workers to report their tasks since as you pointed out a single coordinator knows about a subset of tasks only.
Generally speaking, Java workers are not compatible with native workers. They use different hash functions and different intermediate results for aggregations. Hence, we had to make a change to run system connector only on coordinator and introduce an exchange before partial agg. These changes may get in the way of making this PR work.
See #21725 and #21285
I think the reason for this is because historically you could always deploy Presto in a mode where many or all of the workers also functioned as coordinators. In this mode, any single coordinator would only know of the tasks whose queries are local to that coordinator.
@tdcmeehan Tim, thank you for clarifying. I didn't know about this deployment scheme. I'm not sure I understand how this works though. When there are multiple coordinators, wouldn't query results depend on which coordinator is being asked to process the query? Are you saying that in this setup a query can be routed to any coordinator and the results are expected to be the same? I guess in this case it is necessary to ask all the workers to report their tasks since as you pointed out a single coordinator knows about a subset of tasks only.
In this scheme, queries are sticky to a single coordinator (after you POST a query, each nextUri
always returns the host of the local coordinator). It's presumed there's something in front of the cluster to distribute the query creation, like a load balancer.
@tdcmeehan Tim, thank you for clarifying. One more follow-up question. In a multi-coordinator deployment, do all workers report themselves to all coordinators or a given worker is fixed-assigned to just one coordinator? In other words, do we have N coordinators managing a shared pool of workers or we have just N "mini" clusters that are independent of each other?
@tdcmeehan Tim, thank you for clarifying. One more follow-up question. In a multi-coordinator deployment, do all workers report themselves to all coordinators or a given worker is fixed-assigned to just one coordinator? In other words, do we have N coordinators managing a shared pool of workers or we have just N "mini" clusters that are independent of each other?
Workers report themselves to a single discovery service, which is either replicated to other coordinators in an eventually consistent manner, or the discovery service is a single process which is separate from the coordinators. Originally, when this system connector was written, there was no concept of shared resources (e.g. resource groups, global memory management, etc.) and it relied purely on individual backpressure from workers, although there are now tools to help make that work.
@tdcmeehan Tim, I wonder if it still makes sense to support this deployment model. What do you think? Does it makes sense to consider it when thinking about native workers?
Tactically and short term, I think it would be great to support this if there was an easy and not hacky way to get it to work with #21725 and #21285. But given that most people would be deploying Presto for their large to medium size data lakes, I don't think an Impala-style deployment model makes sense for Presto's future, and personally I feel comfortable saying we can deprecate it in the future.
That being said, system tables in the coordinator present a challenge for what I feel is one of the end goals of moving to native, which is simplifying our Java code. I'd like to think about a way to move this to C++ so it doesn't need to be present in the Java SPI (thinking way ahead in the future, if the only reason we retain page source providers is for system tables, I think it would be worthwhile to think about how to move system tables to C++). So I'd like to revisit the presumption at some point that system tables must be coordinator-provided tables, since even now that's not necessarily true.
@mbasmanova, @tdcmeehan : Thanks for the discussion. It has been informative.
If we want to stay with this approach of getting tasks table on worker we could modify https://github.com/prestodb/presto/pull/21725 and https://github.com/prestodb/presto/pull/21285 to not perform those rewrites for system.runtime.tasks table specifically as it based on the worker.
https://github.com/prestodb/presto/pull/21725 could work un-modified as well. It would just mean that we don't allow partial agg over the tasks table which might not be a big deal unless a massive numbers of queries are scheduled in the cluster.
wdyt ?
The other fixable issue we are hitting internally in a large setup when querying system tables is that the Native worker does not handle chunked HTTP responses yet. @tdcmeehan do you know what causes a chunked HTTP response from the coordinator? I tried reproducing with a large system table (many entries) but I could not.
https://github.com/prestodb/presto/blob/master/presto-native-execution/presto_cpp/main/PrestoExchangeSource.cpp#L246
@majetideepak Chunked response used to be produced by task/async endpoint which was removed in https://github.com/prestodb/presto/pull/21772 . You should not be seeing issues if you update past that PR.
@mbasmanova thank you for the pointer!
@mbasmanova, @majetideepak : Thanks for your previous input. This code is looking good for a full review now. Looking forward to your comments.
@mbasmanova, @tdcmeehan : Thanks for the discussion. It has been informative.
If we want to stay with this approach of getting tasks table on worker we could modify #21725 and #21285 to not perform those rewrites for system.runtime.tasks table specifically as it based on the worker.
#21725 could work un-modified as well. It would just mean that we don't allow partial agg over the tasks table which might not be a big deal unless a massive numbers of queries are scheduled in the cluster.
wdyt ?
tasks table gets data from all-nodes, so both the co-ordinator and workers. Since the co-ordinator generates data, both previous planner rules are also applicable.
@majetideepak : Have addressed your review comments. Would appreciate another pass. Thanks !