druid icon indicating copy to clipboard operation
druid copied to clipboard

Refactoring the data source before unnest

Open somu-imply opened this issue 2 years ago • 2 comments

Motivation

There are a number of use cases that require “flattening” records when processing. The need for this is exemplified in the existed of MVDs, which basically have a native “flattening” built-in. Other SQL systems can also do similar behaviors, they expect an ARRAY type of data and then you put that into a FLATTEN or UNNEST or other operator and it “explodes” out one new row for every value in the array.

We need to implement a similar operator for Druid. This dove-tails a bit with some of the recent work done for handling arrays in Druid. Essentially, the operator would take in an array and then “flatten” the array into N (N=number of elements in the array) rows where each row has one of the values from the array.

Related Work

The inspiration for using unnest as a data source comes from (Work with arrays | BigQuery | Google Cloud ) where unnest has been used as a data source. Clickhouse also has a flatten functionality (Array Functions | ClickHouse Docs ) but that does not transform a dataset by adding more rows. Since for our use case, unnest can be used with dateExpand functionality (coming after unnest) we model unnest as a data source similar to Bigquery.

Methodology

Refactoring

The underlying principle here is an operation on a data source that works on a segment and creates additional rows. Joins have a similar principle where the number of rows can be more than the input table after the join operation. The current framework supports that in the following way:

Having a join data source, alongside a factory and then a specialized wrapper (JoinableFactoryWrapper.java ) around it which creates a function to transform one segment into another by the notion of a segment function.

Have a separate implementation of segment reference through HashJoinSegment.java which uses a custom storage adapter to access a cursor to the segment for processing.

The goal here is to move out the creation of the segment map function from outside the wrapper to individual datasources. In cases where the segment map function is not an identity function (in case of join and also for unnest) the segment functions can be created accordingly for each data source. This makes the abstraction generic and readily extendable to other datasources we might create in future (e.g flatten)

The changes here:

  1. Creation of a segment map function moved into each data source
  2. Code refactored from JoinableFactoryWrapper to JoinDataSource
  3. InputNumberDataSource has a dependency on broadcastJoinHelper which was moved into the data source
  4. PlannerContext was updated with an additional JoinableFactoryWrapper object during this process
  5. Several test cases were updated with the change to the JoinDataSource
  6. Some Guice injector things were updated as a part of the process

New changes (will be in a followup)

After the new abstraction is in place, the following needs to be done for unnest

  • Have a separate UnnestDataSource class that deals with unnest
  • This should have it’s own segment map function
  • A separate UnnestSegment with a custom storage adapter to gain access to cursor in each segment
  • Design of the native query for Unnest considering we also need to pass the column(s) to unnest
  • Finally, the SQL query that maps to the native query

Each of these development phases should be backed by unit tests/ integration tests as applicable

This PR has:

  • [x] been self-reviewed.
    • [ ] using the concurrency checklist (Remove this item if the PR doesn't have any relation to concurrency.)
  • [ ] added documentation for new or modified features or behaviors.
  • [ ] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • [ ] added or updated version, license, or notice information in licenses.yaml
  • [x] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • [x] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • [ ] added integration tests.
  • [x] been tested in a test Druid cluster.

somu-imply avatar Sep 14 '22 02:09 somu-imply

Travis is failing from a branch coverage from the changes to BaseLeafFrameProcessor. While I am working on that I am setting this PR up for review now.

somu-imply avatar Sep 27 '22 16:09 somu-imply

Guice issues in Integration tests. Taking a look

somu-imply avatar Sep 27 '22 19:09 somu-imply

@imply-cheddar , @clintropolis I have addressed the comments and also resolved the conflicts. The thing remaining to address it use of a set in GuiceInjectableValues so the nullable check can be avoided

somu-imply avatar Oct 18 '22 17:10 somu-imply

I have introduced an atomic reference to a set to not do the check on every run in GuiceInjectableValues. @clintropolis @imply-cheddar @abhishekagarwal87 this is ready for review. Can you please take a look

somu-imply avatar Oct 21 '22 20:10 somu-imply

The conflicts have been resolved. Some additional refactoring introduced due to change in underlying test framework in #12965. CalciteTests do not have the createDefaultJoinableFactory anymore and all references for JoinDataSource now use CalciteTests.createJoinableFactoryWrapper()

somu-imply avatar Oct 22 '22 19:10 somu-imply