/v2 and /v2/candidates endpoints not respecting broker partition pruning - range partition
This is to report unexpected behavior where the broker selects too many segments to query. The identified scenario is when secondary partition information exists (eg, for range partitioning) but is not being used in conjunction with a query filter which corresponds to the secondary partitions. The result of not completely pruning segments at the broker is a detrimental impact on system operations such as for performance and cost.
Affected Version
28
Description
The /druid/v2 and /druid/v2/candidates broker endpoints are returning segments which should be filtered out based on secondary partition metadata. Returning unneeded segments for planning and executing queries will cause unnecessary I/O throughout the system, causing avoidable detriment to cost and performance.
Steps to reproduce:
- create segments with range partitioning, mark as used, load onto historicals. Testing will require multiple segments per combination of datasource and time chunk.
- Identify the partition column value(s) corresponding to a single segment (within a datasource/time chunk)
- submit /v2 and /v2/candidates requests to the broker for the datasource and time chunk identified. Add a query filter using a partition value previously identified for the first partition column.
Expected/observed behavior summary
- The expected behavior is that the response will return only the segment identified and no others for the loaded datasource/time chunk.
- The observed behavior is that the response will include additional segments for the loaded datasource/time chunk.
Testing notes
- Testing used a segment where the partition/filter column was not a setwise start or end (ie, not indicated with a negative or positive infinity value for the column). Further, to reduce ambiguity, a partition filter value was chosen that was not the first or last for the column within the segment.
- The test setup had multiple partition columns in the queried segments but only filtered on the first partition column
- Testing both included and omitted the query context value "secondaryPartitionPruning"
- Some testing included "bySegment" in the query context to assess broker logic for selecting segments
- Testing only queried a single time chunk (aligning with segment granularity). This shouldn't be strictly necessary, but reproductions of the test may need to at least ensure that all segments in the time chunk have the same partitioning.
Representation of the test setup
segments for "src1"
- there are multiple segments for the tested time chunk
segment_id
src1_2025-02-16T01:00:00.000Z_2025-02-16T02:00:00.000Z_2024-04-01T23:59:59.999Z
src1_2025-02-16T01:00:00.000Z_2025-02-16T02:00:00.000Z_2024-04-01T23:59:59.999Z_1
src1_2025-02-16T01:00:00.000Z_2025-02-16T02:00:00.000Z_2024-04-01T23:59:59.999Z_2
server segments for datasource "src1"
- 2x replication - similar to a typical production setup, but not strictly required for testing
segment_id,server
src1_2025-02-16T01:00:00.000Z_2025-02-16T02:00:00.000Z_2024-04-01T23:59:59.999Z,host1:8283
src1_2025-02-16T01:00:00.000Z_2025-02-16T02:00:00.000Z_2024-04-01T23:59:59.999Z,host2:8283
src1_2025-02-16T01:00:00.000Z_2025-02-16T02:00:00.000Z_2024-04-01T23:59:59.999Z_1,host1:8283
src1_2025-02-16T01:00:00.000Z_2025-02-16T02:00:00.000Z_2024-04-01T23:59:59.999Z_1,host3:8283
src1_2025-02-16T01:00:00.000Z_2025-02-16T02:00:00.000Z_2024-04-01T23:59:59.999Z_2,host2:8283
src1_2025-02-16T01:00:00.000Z_2025-02-16T02:00:00.000Z_2024-04-01T23:59:59.999Z_2,host3:8283
segment metadata - range partition column values
- for testing we can assume that partitionDimensions are the first columns in dimensions, but that should not be strictly necessary
src1_2025-02-16T01:00:00.000Z_2025-02-16T02:00:00.000Z_2024-04-01T23:59:59.999Z: partitionDimensions: ["col_1","col_2"], start: [-inf, -inf], end: ["value1ghi","value2tuv"]
src1_2025-02-16T01:00:00.000Z_2025-02-16T02:00:00.000Z_2024-04-01T23:59:59.999Z_1: partitionDimensions: ["col_1","col_2"], start: ["value1ghi","value2jkl"], end: ["value1lmn", "value2mnop"]
src1_2025-02-16T01:00:00.000Z_2025-02-16T02:00:00.000Z_2024-04-01T23:59:59.999Z_2: partitionDimensions: ["col_1","col_2"], start: ["value1lmn", "value2qrs"], end: [+inf, +inf]
test query
- select the identified datasource/time chunk and filter on the first partition column using a value matching only one of the multiple segments
{
"queryType": "scan",
"dataSource": "src1",
"intervals": "2025-02-16T01:00:00.000Z/2025-02-16T02:00:00.000Z",
"columns": ["__time", "col_1", "col_15"],
"filter": {
"type": "selector",
"dimension": "col_1",
"value": "value1jkl"
},
"context": {"secondaryPartitionPruning": true}
}
Expected result example - /v2/candidates
[
{
"interval": "2025-02-16T01:00:00.000Z/2025-02-16T02:00:00.000Z",
"version": "2024-04-01T23:59:59.999Z",
"partitionNumber": 1,
"size": 999999,
"locations": [
{
"name": "host1:8283",
"host": null,
"hostAndTlsPort": "host1:8283",
"maxSize": 999999999999,
"type": "historical",
"tier": "_default_tier",
"priority": 0
},
{
"name": "host3:8283",
"host": null,
"hostAndTlsPort": "host3:8283",
"maxSize": 999999999999,
"type": "historical",
"tier": "tier2",
"priority": 0
}
]
}
]
If partition pruning is reflected at the broker, it would be unexpected to receive back any of "the other two segments" (regardless of server/replication)
Unexpected result example 1: /druid/v2 endpoint - extra (but not all) segments are returned
- expected only
src1_2025-02-16T01:00:00.000Z_2025-02-16T02:00:00.000Z_2024-04-01T23:59:59.999Z_1 - (FYI) unexpected segments are loaded on different servers than the expected segments
- the test adds "bySegment" to the query context
[
{
"timestamp": "2024-04-01T09:00:00.000Z",
"result": {
"results": [
{
"segmentId": "src1_2025-02-16T01:00:00.000Z_2025-02-16T02:00:00.000Z_2024-04-01T23:59:59.999Z_1",
"columns": ["__time", "col_1", "col_15"],
"events": [
{"__time": 1739667600000, "col_1": "value1jkl", "col_15": "hello"},
{"__time": 1739668980000, "col_1": "value1jkl", "col_15": "world"},
{"__time": 1739668980000, "col_1": "value1jkl", "col_15": "hello"},
{"__time": 1739670360000, "col_1": "value1jkl", "col_15": "world"}
],
"rowSignature": [
{"name": "__time", "type": "LONG"},
{"name": "col_1", "type": "STRING"},
{"name": "col_15", "type": "STRING"}
]
}
],
"segment": "src1_2025-02-16T01:00:00.000Z_2025-02-16T02:00:00.000Z_2024-04-01T23:59:59.999Z_1",
"interval": "2025-02-16T01:00:00.000Z/2025-02-16T02:00:00.000Z"
}
},
{
"timestamp": "2024-04-01T09:00:00.000Z",
"result": {
"results": [],
"segment": "src1_2025-02-16T01:00:00.000Z_2025-02-16T02:00:00.000Z_2024-04-01T23:59:59.999Z",
"interval": "2025-02-16T01:00:00.000Z/2025-02-16T02:00:00.000Z"
}
}
]
Unexpected result example 2: /druid/v2/candidates endpoint - all segments are returned
- expected only
src1_2025-02-16T01:00:00.000Z/2025-02-16T02:00:00.000Z_2024-04-01T23:59:59.999Z_1
[
{
"interval": "2025-02-16T01:00:00.000Z/2025-02-16T02:00:00.000Z",
"version": "2024-04-01T23:59:59.999Z",
"partitionNumber": 0,
"size": 999999,
"locations": [
{
"name": "host1:8283",
"host": null,
"hostAndTlsPort": "host1:8283",
"maxSize": 999999999999,
"type": "historical",
"tier": "_default_tier",
"priority": 0
},
{
"name": "host2:8283",
"host": null,
"hostAndTlsPort": "host2:8283",
"maxSize": 999999999999,
"type": "historical",
"tier": "tier2",
"priority": 0
}
]
},
{
"interval": "2025-02-16T01:00:00.000Z/2025-02-16T02:00:00.000Z",
"version": "2024-04-01T23:59:59.999Z",
"partitionNumber": 1,
"size": 999999,
"locations": [
{
"name": "host1:8283",
"host": null,
"hostAndTlsPort": "host1:8283",
"maxSize": 999999999999,
"type": "historical",
"tier": "_default_tier",
"priority": 0
},
{
"name": "host3:8283",
"host": null,
"hostAndTlsPort": "host3:8283",
"maxSize": 999999999999,
"type": "historical",
"tier": "tier2",
"priority": 0
}
]
},
{
"interval": "2025-02-16T01:00:00.000Z/2025-02-16T02:00:00.000Z",
"version": "2024-04-01T23:59:59.999Z",
"partitionNumber": 2,
"size": 999999,
"locations": [
{
"name": "host2:8283",
"host": null,
"hostAndTlsPort": "host2:8283",
"maxSize": 999999999999,
"type": "historical",
"tier": "_default_tier",
"priority": 0
},
{
"name": "host3:8283",
"host": null,
"hostAndTlsPort": "host3:8283",
"maxSize": 999999999999,
"type": "historical",
"tier": "tier2",
"priority": 0
}
]
}
]
Summary
The broker should only select and/or query the smallest number of segments matching on datasource/time chunk and partition dimensions when it has enough has enough partition information about the used/loaded segments to prune out segments that don't match on partition dimensions (in addition to those segments not matching on datasource or time chunk)
Related documentation
- https://druid.apache.org/docs/latest/ingestion/partitioning/
- https://imply.io/blog/real-time-analytics-database-uses-partitioning-and-pruning-to-achieve-its-legendary-performance/
- https://imply.io/developer/articles/multi-dimensional-range-partioning-in-druid/
Hi @ColeAtCharter. The /druid/v2/candidates API does not currently take into account segment pruning, although /druid/v2/ and /druid/v2/sql/ (the actual query APIs) do. I tried to repro the issue on /druid/v2/ and was not able to. A single segment was queried, as expected.
Could you provide some more details that could be used to repro this issue?
Here's what I tried:
I loaded data with the following SQL and rowsPerSegment: 1500 (to ensure we get multiple segments):
REPLACE INTO "wikipedia" OVERWRITE ALL
WITH "ext" AS (
SELECT *
FROM TABLE(
EXTERN(
'{"type":"http","uris":["https://druid.apache.org/data/wikipedia.json.gz"]}',
'{"type":"json"}'
)
) EXTEND ("isRobot" VARCHAR, "channel" VARCHAR, "timestamp" VARCHAR, "flags" VARCHAR, "isUnpatrolled" VARCHAR, "page" VARCHAR, "diffUrl" VARCHAR, "added" BIGINT, "comment" VARCHAR, "commentLength" BIGINT, "isNew" VARCHAR, "isMinor" VARCHAR, "delta" BIGINT, "isAnonymous" VARCHAR, "user" VARCHAR, "deltaBucket" BIGINT, "deleted" BIGINT, "namespace" VARCHAR, "cityName" VARCHAR, "countryName" VARCHAR, "regionIsoCode" VARCHAR, "metroCode" BIGINT, "countryIsoCode" VARCHAR, "regionName" VARCHAR)
)
SELECT
TIME_PARSE("timestamp") AS "__time",
"isRobot",
"channel",
"flags",
"isUnpatrolled",
"page",
"diffUrl",
"added",
"comment",
"commentLength",
"isNew",
"isMinor",
"delta",
"isAnonymous",
"user",
"deltaBucket",
"deleted",
"namespace",
"cityName",
"countryName",
"regionIsoCode",
"metroCode",
"countryIsoCode",
"regionName"
FROM "ext"
PARTITIONED BY DAY
CLUSTERED BY "page", "user"
Then ran this query:
{
"queryType": "timeseries",
"dataSource": {
"type": "table",
"name": "wikipedia"
},
"intervals": {
"type": "intervals",
"intervals": [
"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z"
]
},
"filter": {
"type": "selector",
"dimension": "page",
"value": "Bailando 2015"
},
"granularity": {
"type": "all"
},
"aggregations": [
{
"type": "count",
"name": "a0"
}
],
"context": {
"bySegment": true
}
}
A single segment came back in the reply:
[
{
"timestamp": "2016-06-27T00:00:00.000Z",
"result": {
"results": [
{
"timestamp": "2016-06-27T00:00:34.959Z",
"result": {
"a0": 9
}
}
],
"segment": "wikipedia_2016-06-27T00:00:00.000Z_2016-06-28T00:00:00.000Z_2024-06-04T17:02:47.206Z_1",
"interval": "2016-06-27T00:00:00.000Z/2016-06-28T00:00:00.000Z"
}
}
]
I also confirmed using the metric query/segment/time that only a single segment was queried.
@gianm -- thanks for looking at this. I do think "Unexpected result example 1: /druid/v2 endpoint" should never happen if broker partition pruning is working. Druid returned a segmentId that should have been pruned at the broker. That should not happen. The one hypothetical "exception" is if the broker is combining the data query results with its own metadata about segments (before pruning) and including a dummy data result. However, this is a bit of a stretch.
Our team can rerun the test and see if there are more metrics to elucidate the extent to which the data queries are fanned out even with partition pruning enabled
@ColeAtCharter I was working on something recently that reminded me of this report. I realized it's possible to see the "unexpected result example 1" if the filter is filtering on something that is on the border of two segments (i.e. the value is the end of one segment and also the start of the next). This particular behavior is intentional, because it's possible for a value on the border of two segments to appear in both segments.
In the wikipedia example I posted, when I filtered on page Bailando 2015 I only got a single segment in the reply. That particular page was not the border of two segments. When I filtered on another page that was on the border of two segments, I got two segments in the reply. One had a count of 1 and the other had a count of 0.
It is certainly possible this is what you're seeing. If so, I would expect it to be fairly uncommon, unless your partitioning key is low cardinality relative to the number of segments.
Thanks, @gianm! Understood. The test example specifically contemplates this scenario by applying an equality filter that is on the first partition dimension where the value is strictly in between the target segment's start and end value for that dimension:
query filter:
{
"type": "selector",
"dimension": "col_1",
"value": "value1jkl"
}
target segment metadata:
src1_2025-02-16T01:00:00.000Z_2025-02-16T02:00:00.000Z_2024-04-01T23:59:59.999Z_1: partitionDimensions: ["col_1","col_2"], start: ["value1ghi","value2jkl"], end: ["value1lmn", "value2mnop"]
So the segment's col_1 start (value1ghi ) should be strictly less than the query's col_1 equality filter value (value1jkl) and further, strictly less than the segment's col_1 end value (value1lmn).
We can look for additional examples of this behavior to get more information about reproducing the issue.
This issue has been marked as stale due to 280 days of inactivity. It will be closed in 4 weeks if no further activity occurs. If this issue is still relevant, please simply write any comment. Even if closed, you can still revive the issue at any time or discuss it on the [email protected] list. Thank you for your contributions.
Oops, I missed the latest message on this issue. Certainly, if the filter value is not ambiguously in two possible segments, then only one (per time chunk) should be queried. Do you have some steps to reproduce the behavior you saw (specific ingestion commands, etc)? I'm wondering if the behavior still occurs on the latest release.