Auto-Compaction using Multi-Stage Query Engine
Description
Compaction operations issued by the Coordinator currently run using the native query engine. As majority of the advancements that we are making in batch ingestion are in MSQ, it is imperative that we support compaction on MSQ to make Compaction more robust and possibly faster. For instance, we have seen OOM errors in native compaction that MSQ could have handled by its auto-calculation of tuning parameters.
This PR enables compaction on MSQ to remove the dependency on native engine.
Main changes:
-
DataSourceCompactionConfignow has an additional fieldenginethat can be one among[native, msq]withnativebeing the default. - if engine is MSQ,
CompactSegmentsduty assigns all available compaction task slots to the launchedCompactionTaskto ensure full capacity is available to MSQ. This is to avoid stalling which could happen in case a fraction of the tasks were allotted and they eventually fell short of the number of tasks required by the MSQ engine to run the compaction. -
ClientCompactionTaskQueryhas a new fieldClientCompactionRunnerInfowith just oneEnginesubfield. -
CompactionTasknow hasCompactionRunnerinterface instance with its implementationsNativeCompactinRunnerin core andMSQCompactionRunnerin thedruid-multi-stage-queryextension. . The objectmapper deserializesClientCompactionRunnerInfoinClientCompactionTaskQueryto theCompactionRunnerinstance that is mapped to the specified type [native,msq]. -
CompactTaskuses theCompactionRunnerinstance it receives to create the indexing tasks. -
CompactionTasktoMSQControllerTaskconversion logic checks whether metrics are present in the segment schema. If present, the task is created with a native group-by query; if not, the task is issued with a scan query. ThestoreCompactionStateflag is set in the context. - Each created
MSQControllerTaskis launched in-place and itsTaskStatustracked to determine the final status of theCompactionTask. The id of each of these tasks is the same as that ofCompactionTasksince otherwise, the workers will be unable to determine the controller task's location for communication (as they haven't been launched via the overlord).
Some things to note:
- The context specified in
DataSourceCompactionConfigis passed as is to the MSQControllerTask and hence can contain MSQ context params as well, with the exception ofrowsPerSegment-- which will be overridden by eithertargetRowsPerSegmentormaxRowsPerSegmentif specified in apartitionsSpec. -
maxRowsInMemoryparam is only considered if specified in the context. The value in DataSourceCompactionConfig.tuningConfig is not considered as it is set to a default value (1M) if unspecified by a user, so it is indistinguishable between coming from the user or via the default. - If no
maxNumTasksvalue is specified in thetaskContext,min(availableCompactionTaskSlots, 8)is allotted to MSQ compaction tasks. -
rollup:truewithout anymetricsSpecde-duplicates rows since all columns are then treated as dimensions -- just as in native compaction.
Currently unsupported for MSQ Compaction:
- Update of cluster-level default compaction engine. Current default is
nativewhich can be updated at a per-datasource level. The cluster-level update API will come in a follow-up PR. - Grouping on multi-value columns. The flag
groupByEnableMultiValueUnnestingis disabled. Only array-type columns are supported -
partitionsSpecof typeHashedParititionsSpec. OnlyDimensionRangePartitionsSpecandDynamicPartitionsSpecworks. -
maxTotalRowsinDynamicPartitionsSpec. OnlymaxRowsPerSegmentworks. -
rollupset to false ingranularitySpecwhenmetricsSpecis specified. Onlyrollupset totrueworks with a non-emptymetricsSpec.
Release note
Key changed/added classes in this PR
-
MyFoo -
OurBar -
TheirBaz
This PR has:
- [ ] been self-reviewed.
- [ ] using the concurrency checklist (Remove this item if the PR doesn't have any relation to concurrency.)
- [ ] added documentation for new or modified features or behaviors.
- [ ] a release note entry in the PR description.
- [ ] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
- [ ] added or updated version, license, or notice information in licenses.yaml
- [ ] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
- [ ] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
- [ ] added integration tests.
- [ ] been tested in a test Druid cluster.
@LakshSingla : There needs to be some way to track if a dimension was originally a metrics but ended up being a dimension because of finalizeAggregations=true. So went with a map.
Regarding backward compatibility, while deserializing I've included appropriate handling for missing fields. Some scenarios I can think of:
-
coordinator - old, indexer - new: indexer assumes native compaction engine and the flow is just as the original flow. It stores the new fields in compaction state which the coordinator ignores and continues with the (sufficient) subset.
-
coordinator - new, indexer - old: indexer doesn't honor the engine field and continues with the original native compaction flow. It stores CompactionState without the new fields and Coordinator handles the missing field values correctly using defaults.
Please let me know if you come across any other cases that aren't handled.
Support for finalizeAggregations=false will come in a follow-up PR, as mentioned in my other comment.
queryGranularity set to all
Why is this not supported?
The id of each of these tasks is the same as that of CompactionTask since otherwise, the workers will be unable to determine the controller task's location for communication (as they haven't been launched via the overlord).
Task IDs must be unique right? Won't this contradict that requirement?
@LakshSingla
Why is this not supported?
Didn't add this support because ideally, when query granularity is ALL, all rows for the compacted datasource should have the same timestamp. In native, timestamp of all rows within a segment are changed to the start interval of the segment. Can potentially replicate the same behaviour but ALL granularity for compaction didn't seem like a reasonable usecase for compaction.
Task IDs must be unique right? Won't this contradict that requirement?
In compaction, the tasks run serially and in-place inside the compaction task, so at a given point in time, only one running task will have compaction tasks's ID. The same happens even for native compaction. But this brought up an important point of logs management. I'll look into that aspect.
I am wondering if there are unit tests that test the new msq compaction flow in the unit tests. I could find the spec validation tests only.
I think conversion to MSQ spec is the main job of the MSQCompactionRunner. Unit testing the entire flow doesn't really ensure something is broken in this class. I'm planning to cover the entire flow in ITs instead.
I am wondering if there are unit tests that test the new msq compaction flow in the unit tests. I could find the spec validation tests only.
I think conversion to MSQ spec is the main job of the MSQCompactionRunner. Unit testing the entire flow doesn't really ensure something is broken in this class. I'm planning to cover the entire flow in ITs instead.
@gargvishesh , I concur with @LakshSingla on this one. We need UTs to test the entire flow with MSQ the same way we are doing with native. We may add ITs too but most Druid devs rely on UTs more heavily as the IT flow is a little flaky currently.
- If you have already added tests for the new flow, please share that detail here.
- If you are yet to start on those, you could just try to parameterize the engine the existing compaction flow tests and run the same existing tests with both
nativeandmsqengines. Please let me know if you need any assistance with this.
@kfaraz @LakshSingla
Thanks for your comments -- I do see the usefulness of the UTs for the entire MSQ-based compaction flow. The existing tests in CompactionTaskRunTest however cannot be parameterised since MSQ code resides in an extension. The tests therefore need to be present in the extension itself and require some effort for implementation. I will take them up in a follow-up PR.
Thank you @kfaraz, @LakshSingla and @cryptoe for the time and effort to do all those rounds of reviews, esp. given the size of the PR.