support for federated clusters
Fixes #14535.
Description
-
Druid provides a friendly and unified query gateway for users across multiple data centers and clusters through cluster federation queries.
-
As the number of nodes and metadata increases, a single Druid cluster can become excessively large, leading to mutual interference among tasks and deteriorating scheduling performance. Implementing federated queries can help avoid such issues. It allows for breaking down large clusters into relatively independent ones as needed, making scheduling more agile and lightweight.
Key changed/added classes in this PR
-
QueryContexts -
BrokerServerView -
CachingClusteredClient -
TimelineServerView
This PR has:
- [x] been self-reviewed.
- [x] using the concurrency checklist (Remove this item if the PR doesn't have any relation to concurrency.)
- [x] added documentation for new or modified features or behaviors.
- [ ] a release note entry in the PR description.
- [x] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
- [ ] added or updated version, license, or notice information in licenses.yaml
- [x] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
- [x] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
- [ ] added integration tests.
- [x] been tested in a test Druid cluster.
@599166320 How big is the deployment you are working witch is causing things like
leading to mutual interference among tasks and deteriorating scheduling performance.
I really donot understand what this means. Are you talking about ingestion tasks or query scheduling.
The change in the current form looks hackish . I think its a better design pattern that one cluster owns one data source if you really want to break things up since then you can configure load/rules compaction etc only on one cluster for a data source. Ingestion also gets simpler.
@cryptoe Thank you for your response. For the second point, I also hesitated whether to include it. However, I decided to include it to see what everyone thinks.
Usually, we deploy a Druid cluster in one data center, with approximately a hundred servers in each data center. The entire cluster is quite stable. However, we've noticed that the master node operates in a master-slave configuration, storing a significant amount of metadata and handling heavy scheduling tasks. In theory, there might be bottlenecks, so we wanted to bring it up for discussion.
Of course, what we are more concerned about is the cost of dedicated network traffic.
@cryptoe Is there a better way to implement the practical application of federated queries? @abhishekagarwal87 mentioned that it might potentially break certain protocols. In my opinion, the native query for Historical and Broker currently shares the code using QueryResource as the entry point, and the parameter structures are almost identical.
However, if strict constraints are necessary, Query.queryContext might require some improvements. What are your thoughts on this?
Things like lookups, post aggregators stuff with the current approach needs to be thought through.
I think the correct way to do it would be to use something like https://github.com/lyft/presto-gateway and pass a query context to select the correct cluster you want or make some mapping to data source -> cluster on this gateway nodes.
Another thing I was thinking is how order by's would look. The broker expects things to be sorted by the grouping key and then sorts stuff on the order by key IIRC. In this case the cluster 2 broker will return the rows already sorted on the order by key, which will break the merging logic of the grouping keys on the broker for cluster 1.
@cryptoe According to your description, are you concerned about a sorting like the one below?
SELECT
COUNT(*) c,
regionName
FROM wikipedia
GROUP BY regionName
ORDER BY regionName, c DESC
LIMIT 10
In fact, when a query like this is forwarded from one broker to another broker in a cluster, the LIMIT 10 part is removed. It will be transformed into a native query similar to the one below:
{
"queryType": "groupBy",
"dataSource":
{
"type": "table",
"name": "wikipedia"
},
"intervals":
{
"type": "intervals",
"intervals":
[
"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z"
]
},
"granularity":
{
"type": "all"
},
"dimensions":
[
{
"type": "default",
"dimension": "regionName",
"outputName": "d0",
"outputType": "STRING"
}
],
"aggregations":
[
{
"type": "count",
"name": "a0"
}
],
"limitSpec":
{
"type": "NoopLimitSpec"
},
"context":
{
"applyLimitPushDown": false,
"defaultTimeout": 300000,
"federatedClusterBrokers": "",
"finalize": false,
"fudgeTimestamp": "-4611686018427387904",
"groupByOutermost": false,
"groupByStrategy": "v2",
"maxQueuedBytes": 5242880,
"maxScatterGatherBytes": 9223372036854775807,
"queryFailTime": 1699369214285,
"queryId": "605a751b-f0ee-43fe-a754-b702035622df",
"resultAsArray": true,
"sqlQueryId": "e8bed46a-f2de-4fc3-89c6-febfe048debc",
"timeout": 29981
}
}
So, you don't need to worry about this sorting issue.
This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 4 weeks if no further activity occurs. If you think that's incorrect or this pull request should instead be reviewed, please simply write any comment. Even if closed, you can still revive the PR at any time or discuss it on the [email protected] list. Thank you for your contributions.
This pull request/issue has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.