druid
druid copied to clipboard
Refactoring the data source before unnest
Motivation
There are a number of use cases that require “flattening” records when processing. The need for this is exemplified in the existed of MVDs, which basically have a native “flattening” built-in. Other SQL systems can also do similar behaviors, they expect an ARRAY type of data and then you put that into a FLATTEN or UNNEST or other operator and it “explodes” out one new row for every value in the array.
We need to implement a similar operator for Druid. This dove-tails a bit with some of the recent work done for handling arrays in Druid. Essentially, the operator would take in an array and then “flatten” the array into N (N=number of elements in the array) rows where each row has one of the values from the array.
Related Work
The inspiration for using unnest as a data source comes from (Work with arrays | BigQuery | Google Cloud ) where unnest has been used as a data source. Clickhouse also has a flatten functionality (Array Functions | ClickHouse Docs ) but that does not transform a dataset by adding more rows. Since for our use case, unnest can be used with dateExpand functionality (coming after unnest) we model unnest as a data source similar to Bigquery.
Methodology
Refactoring
The underlying principle here is an operation on a data source that works on a segment and creates additional rows. Joins have a similar principle where the number of rows can be more than the input table after the join operation. The current framework supports that in the following way:
Having a join data source, alongside a factory and then a specialized wrapper (JoinableFactoryWrapper.java ) around it which creates a function to transform one segment into another by the notion of a segment function.
Have a separate implementation of segment reference through HashJoinSegment.java which uses a custom storage adapter to access a cursor to the segment for processing.
The goal here is to move out the creation of the segment map function from outside the wrapper to individual datasources. In cases where the segment map function is not an identity function (in case of join and also for unnest) the segment functions can be created accordingly for each data source. This makes the abstraction generic and readily extendable to other datasources we might create in future (e.g flatten)
The changes here:
- Creation of a segment map function moved into each data source
- Code refactored from
JoinableFactoryWrapper
toJoinDataSource
- InputNumberDataSource has a dependency on
broadcastJoinHelper
which was moved into the data source - PlannerContext was updated with an additional
JoinableFactoryWrapper
object during this process - Several test cases were updated with the change to the
JoinDataSource
- Some Guice injector things were updated as a part of the process
New changes (will be in a followup)
After the new abstraction is in place, the following needs to be done for unnest
- Have a separate UnnestDataSource class that deals with unnest
- This should have it’s own segment map function
- A separate UnnestSegment with a custom storage adapter to gain access to cursor in each segment
- Design of the native query for Unnest considering we also need to pass the column(s) to unnest
- Finally, the SQL query that maps to the native query
Each of these development phases should be backed by unit tests/ integration tests as applicable
This PR has:
- [x] been self-reviewed.
- [ ] using the concurrency checklist (Remove this item if the PR doesn't have any relation to concurrency.)
- [ ] added documentation for new or modified features or behaviors.
- [ ] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
- [ ] added or updated version, license, or notice information in licenses.yaml
- [x] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
- [x] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
- [ ] added integration tests.
- [x] been tested in a test Druid cluster.
Travis is failing from a branch coverage from the changes to BaseLeafFrameProcessor
. While I am working on that I am setting this PR up for review now.
Guice issues in Integration tests. Taking a look
@imply-cheddar , @clintropolis I have addressed the comments and also resolved the conflicts. The thing remaining to address it use of a set in GuiceInjectableValues
so the nullable check can be avoided
I have introduced an atomic reference to a set to not do the check on every run in GuiceInjectableValues
. @clintropolis @imply-cheddar @abhishekagarwal87 this is ready for review. Can you please take a look
The conflicts have been resolved. Some additional refactoring introduced due to change in underlying test framework in #12965. CalciteTests do not have the createDefaultJoinableFactory
anymore and all references for JoinDataSource now use CalciteTests.createJoinableFactoryWrapper()