druid icon indicating copy to clipboard operation
druid copied to clipboard

Disabling rollup on post agg operators for MSQ sql based ingestion.

Open cryptoe opened this issue 2 years ago • 4 comments

While running query :

REPLACE INTO country_shard_test_1 OVERWRITE ALL
SELECT Country,Capital,    THETA_SKETCH_INTERSECT(
      DS_THETA(Country),
      DS_THETA(Capital)
    ) as sketch FROM TABLE(
  EXTERN(
    '{"type":"http","uris":["https://static.imply.io/lookup/country.tsv"]}',
    '{"type":"tsv","findColumnsFromHeader":true}',
    '[{"name":"Country","type":"string"},{"name":"Capital","type":"string"},{"name":"ISO3","type":"string"},{"name":"ISO2","type":"string"}]'))
  group by 1,2
  PARTITIONED BY ALL

We get an unknown error :

UnknownError: java.util.NoSuchElementException (Stack trace)
Failed task ID: query-0bc9b9c3-de75-416c-bfdb-6ab802871826 (on host: localhost:8100)
Debug: get query detail archive
Full stack trace
java.util.NoSuchElementException
    at java.base/java.util.Collections$EmptyIterator.next(Collections.java:4210)
    at com.google.common.collect.Iterators.getOnlyElement(Iterators.java:297)
    at com.google.common.collect.Iterables.getOnlyElement(Iterables.java:285)
    at org.apache.druid.msq.exec.ControllerImpl.makeDimensionsAndAggregatorsForIngestion(ControllerImpl.java:1646)
    at org.apache.druid.msq.exec.ControllerImpl.generateDataSchema(ControllerImpl.java:1431)
    at org.apache.druid.msq.exec.ControllerImpl.makeQueryDefinition(ControllerImpl.java:1398)
    at org.apache.druid.msq.exec.ControllerImpl.initializeQueryDefAndState(ControllerImpl.java:523)
    at org.apache.druid.msq.exec.ControllerImpl.runTask(ControllerImpl.java:346)
    at org.apache.druid.msq.exec.ControllerImpl.run(ControllerImpl.java:296)
    at org.apache.druid.msq.indexing.MSQControllerTask.run(MSQControllerTask.java:192)
    at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:477)
    at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:449)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)

As in SQL based ingestion with roll up mode enabled, we do not know how to do post aggregator based ingestion, I have disabled the code path which does that now. Instead is nudges user to to disable the rollup mode by following instructions here : https://druid.apache.org/docs/24.0.0/multi-stage-query/concepts.html#rollup

Fixed the bug ...


Key changed/added classes in this PR
  • ControllerImpl

This PR has:

  • [x] been self-reviewed.
  • [x] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • [x] been tested in a test Druid cluster.

cryptoe avatar Oct 04 '22 07:10 cryptoe

This PR is related to : https://github.com/apache/druid/issues/13180

cryptoe avatar Oct 04 '22 07:10 cryptoe

Iterables.getOnlyElement should be banned and replaced with something that forces developers to put a nice error message 😄 . Both for the case, when there is no element and when there is more than one element in the collection.

abhishekagarwal87 avatar Oct 04 '22 07:10 abhishekagarwal87

@cryptoe thank you for looking into this issue. As a user, I'm somewhat confused - it appears as if ingesting records using multi-stage SQL does not support rollups on post-aggregators such as a sketch intersection. What are the consequences of this? For example, if my SQL statement is as follows:

INSERT INTO user_activity
SELECT user_cohort_country.__time, user_cohort_country.cohort, user_cohort_country.country, 
    THETA_SKETCH_INTERSECT(
      DS_THETA(user_cohort_country.segments), 
      DS_THETA(user_cohort_country.countries)
    ) as sketch
FROM user_cohort_country
GROUP BY 1, 2, 3
PARTITIONED BY DAY

Is the rollup automatic, or does it replicate the rollup configuration of the source table? I'm trying to understand where the rollup is implied in the statement above.

davecromberge avatar Oct 04 '22 08:10 davecromberge

@cryptoe thank you for looking into this issue. As a user, I'm somewhat confused - it appears as if ingesting records using multi-stage SQL does not support rollups on post-aggregators such as a sketch intersection. What are the consequences of this? For example, if my SQL statement is as follows:

INSERT INTO user_activity
SELECT user_cohort_country.__time, user_cohort_country.cohort, user_cohort_country.country, 
    THETA_SKETCH_INTERSECT(
      DS_THETA(user_cohort_country.segments), 
      DS_THETA(user_cohort_country.countries)
    ) as sketch
FROM user_cohort_country
GROUP BY 1, 2, 3
PARTITIONED BY DAY

If tomorrow you decide to partition by month, using reindex, then druid needs to understand how to merge the THETA_SKETCH_INTERSECT across various days. In rollup mode, we automatically figure it out, meaning that you can submit an autocompaction task and druid engine will be smart enough to figure out how to merge across various days. Having said that, you can also submit a manual compaction task and basically tell the druid engine how to merge the column sketch.

There are always limits to this. for a query like insert into abc select a,b, count(a)/count(b) from cde group by 1,2 partitioned by day We cant really merge the ratio across days. Instead what's recommended is insert into abc select a,b, count(a),count(b) from cde group by 1,2 partitioned by day and at the query time just do a divide.

Is the rollup automatic, or does it replicate the rollup configuration of the source table? I'm trying to understand where the rollup is implied in the statement above.

https://druid.apache.org/docs/latest/multi-stage-query/concepts.html#rollup the webconsole automatically figures out if the statement is a rollup statement or not.

cryptoe avatar Oct 04 '22 13:10 cryptoe

This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 4 weeks if no further activity occurs. If you think that's incorrect or this pull request should instead be reviewed, please simply write any comment. Even if closed, you can still revive the PR at any time or discuss it on the [email protected] list. Thank you for your contributions.

github-actions[bot] avatar Jan 10 '24 00:01 github-actions[bot]

This pull request/issue has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

github-actions[bot] avatar Feb 08 '24 00:02 github-actions[bot]