druid
druid copied to clipboard
Druid nested data columns
Motivation
Apache Druid has quite a lot of tricks up its sleeve for providing extremely fast queries on very large datasets. However, one of the major limitations in the current system is that this only works on completely flattened data since that is all that Druid segments are currently able to natively store (and table to table join support is limited). To achieve this flattened table requires either external transformation or utilizing the built-in 'flattening' that Druid ingestion supports, in order to pluck specific nested values and translate them into top level columns within a segment.
This however has a downside in that the exact set of extractions to be performed must be completely known up front, prior to ingestion, which is especially hard if not impossible to deal with in the case of loosely structured data whose schema might vary row to row. Additionally, often-times this structure is in fact interesting, illustrating relations between values, which is lost completely when transformed into flattened Druid tables without careful naming.
In order to overcome this, this proposal focuses on building out the capabilities to store nested and structured data directly as it is, and query nested fields within this structure without sacrificing the performance available to queries operating on traditional Druid flattened columns.
Proposed changes
To achieve this, we will introduce a new type of column for storing structured data in Druid segments. The initial implementation centers on leaning heavily into what we already know Druid does very well, taking an approach I like to refer to as "a bunch of columns in a trench coat".
This column is built on top of Druids 'complex' type system, which allows complete control over how columns are encoded and decoded, and virtual columns to allow building specialized value selectors for the nested columns through VirtualColumn
implementations. At ingestion time, all 'paths' in the structured data which contain a 'literal' field (Druid STRING
, LONG
, or DOUBLE
) will be split out into internal 'nested field literal' columns, and stored in a manner similar to how we store normal literal columns, complete with dictionary encoding and bitmap value indexes.
To prove feasibility, I've actually been prototyping this functionality for a bit over 6 months now, making core improvements along the way as needed to improve the complex type system and indexes functionality, and testing with a variety of different workloads. This effort is a spiritual successor to the 'map-string-string' column of #10628, except instead of 1 layer deep with only strings, this proposal allows for any level of nesting and supporting the complete set of Druid literal types. The short list of important core changes that have made this feature possible:
- #10277
- #11713
- #11853
- #12241
- #12251
- #12388
Additionally, the investigation in #12277 is inspired by the changes proposed here (which should become apparent shortly).
Column format
Internally, the nested column is structured into a main column file in the smoosh, and several associated "internal" files for every nested literal field in the structure. All literal fields are dictionary encoded, but unlike our dictionary encoded STRING
columns, will share a value dictionary that is 'global' to all of the nested columns. The global value dictionaries are split by type and stacked (strings are ids 0
through m
, longs m + 1
through n
, doubles n + 1
to the end). Locally, the nested columns will have a dictionary which maps local dictionary ids to these global dictionary ids (int -> int), so value lookup is a 2 step operation of local to global, then global to value.
The complex column is composed of:
- compressed, 'raw' representation of the structured data
- bitmap to indicate which rows are null values
- a list of all 'literal' nested columns contained in the structure
- type information for all 'literal' nested columns contained in the structure
- global value dictionaries for all 'literal' values that are shared between all nested columns
The nested field literal contain:
- local to global integer dictionary
- local dictionary encoded compressed integer value column
- bitmap value indexes
- for numeric columns, compressed numeric value columns

Querying
Querying will be done primarily through specialized VirtualColumn
, and which will create optimized selectors to read the nested fields. These will look a lot like the standard Druid column selectors for other types, though with some subtle differences.
These VirtualColumn
implementations will also be wired up to SQL functions to allow nested data to be queried with ease. The initial set of functions will be a standard-ish set of JSON
based functions:
SQL functions
function | notes |
---|---|
JSON_VALUE(expr, path) |
Extract a Druid literal (STRING , LONG , DOUBLE ) value from a COMPLEX<json> column or input expr using JSONPath syntax of path |
JSON_QUERY(expr, path) |
Extract a COMPLEX<json> value from a COMPLEX<json> column or input expr using JSONPath syntax of path |
JSON_OBJECT(KEY expr1 VALUE expr2[, KEY expr3 VALUE expr4 ...]) |
Construct a COMPLEX<json> storing the results of VALUE expressions at KEY expressions |
PARSE_JSON(expr) |
Deserialize a JSON STRING into a COMPLEX<json> to be used with expressions which operate on COMPLEX<json> inputs. |
TO_JSON(expr) |
Convert any input type to COMPLEX<json> to be used with expressions which operate on COMPLEX<json> inputs, like a CAST operation (rather than deserializing STRING values like PARSE_JSON ) |
TO_JSON_STRING(expr) |
Convert a COMPLEX<json> input into a JSON STRING value |
JSON_KEYS(expr, path) |
get array of field names in expr at the specified JSONPath path , or null if the data does not exist or have any fields |
JSON_PATHS(expr) |
get array of all JSONPath paths available in expr |
JSONPath syntax
Initially we will support only a small simplified subset of the JSONPath syntax operators, primarily limited to extracting individual values from nested data structures.
operator | description |
---|---|
$ |
'Root' element, all JSONPath expressions will start with this operator |
.<name> |
'Child' element in 'dot' notation |
['<name>'] |
'Child' element in 'bracket' notation |
[<number>] |
'Array' index |
though in the future we will likely expand on this.
Ingestion
During ingestion, a new nested column indexer will process nested data from input rows, traversing the structure and building a global dictionary of all literal values encountered. At persist time, this dictionary is sorted, and then the 'raw' data is serialized with SMILE encoding into a compressed column. As we serialize the rows, we traverse the nested structure again, this time with sorted dictionary in hand and write out columns for the nested literal field columns into temporary files, building local value dictionaries in the process. Once the 'raw' column is complete, we iterate over the nested literal columns, sort their local dictionaries, and write out their finished column, with compressed dictionary encoded value columns, for numeric types compressed numeric columns, and the local dictionaries and bitmap value indexes.
The nested data column indexer will be specified via a new DimensionSchema
type, initially using json
as the type as the initial implementation will only support JSON format, which will process the rows that are pointed at it (even literals).
{
"type": "json",
"name": "someNestedColumnName"
}
That's basically it. For convenience when working with text input formats, like TSV, if all processed rows are string literals the indexer will try to deserialize as JSON, if the data looks like JSON.
Additionally, we will add a handful of native Druid expressions (which will also handle composition uses at query time), which will be able to perform many of the operations which are currently done via flattenSpec
, but instead through transformSpec
.
"transformSpec": {
"transforms": [
{ "type": "expression", "name": "transformedJson", "expression": "json_value(someNestedColumnName, '$.x.y')" }
]
}
Native expressions
function | notes |
---|---|
json_value(expr, path) |
Extract a Druid literal (STRING , LONG , DOUBLE ) value from a COMPLEX<json> column or input expr using JSONPath syntax of path |
json_query(expr, path) |
Extract a COMPLEX<json> value from a COMPLEX<json> column or input expr using JSONPath syntax of path |
json_object(expr1, expr2[, expr3, expr4 ...]) |
Construct a COMPLEX<json> with alternating 'key' and 'value' arguments |
parse_json(expr) |
Deserialize a JSON STRING into a COMPLEX<json> to be used with expressions which operate on COMPLEX<json> inputs. |
to_json(expr) |
Convert any input type to COMPLEX<json> to be used with expressions which operate on COMPLEX<json> inputs, like a CAST operation (rather than deserializing STRING values like PARSE_JSON ) |
to_json_string(expr) |
Convert a COMPLEX<json> input into a JSON STRING value |
json_keys(expr, path) |
get array of field names in expr at the specified JSONPath path , or null if the data does not exist or have any fields |
json_paths(expr) |
get array of all JSONPath paths available in expr |
Rationale
I believe the utility of being able to store nested structure is obvious - besides flattenSpec
and up front ETL being inflexible and complicated. As to why this implementation was chosen for the initial effort, it comes down to starting with what we know and mapping Druids current capability onto a nested structure. There is a lot of room for experimentation after this initial implementation is added, especially in the realm of storage format, as there are a wide variety of approaches to storing this type of data. The proposed implementation will have the same strengths and weaknesses as standard Druid queries, but with the initial implementation in place, we will have a point of comparison to conduct further investigation.
Operational impact
The expense of nested column ingestion is correlated with the complexity of the schema of the nested input data. The majority of the expense happens when serializing the segment (persist/merge), so these operations will take longer than normal for complex schemas, and could require additional heap and disk. Each nested literal field is roughly an additional column, and we're building them all at the end of the process on the fly while persisting the 'raw' data. Additionally, while I've gone through a few iterations so far, the current ingestion algorithm is still rather expensive and could use additional tuning, especially in regard to the number of temporary files involved.
Segments with nested data columns will likely be larger than normal, because the 'raw' data is retained. This data is compressed, but still takes up a decent amount of disk space. The good news is that since the literal values have their own nested columns, strictly using JSON_VALUE
should prevent having to actually read these large 'raw' columns and prevent them from thrashing on the page cache. Future work will allow customization of what exactly is stored in nested columns, to give operators the chance to try to reduce these segment sizes.
Additionally, since this introduces a new column type, these columns will be unavailable when rolling back to older versions.
Test plan
The surface area of this feature is quite large, since it is effectively allowing the full functionality of segments within a single column and several ways of interacting with this data. JSON_VALUE
in particular can be utilized as any other Druid column type across all query types (grouping, filtering, aggregation, etc). Quite a lot of testing has been done so far, including a bit of stress testing, and I've internally gone through a handful of iterations on the code, but work will need to continue on hardening the feature. Because the column format is versioned, we should be able to iterate freely without impacting existing data. Unit test coverage in my prototype is currently pretty decent, so the main focus of testing now will be in 'production'-ish use cases to observe how well things are performing and looking for incremental improvements.
Future work
ingestion performance improvements
This area needs some work to try to improve overall performance and optimize resource usage. For example, the usage of temporary files could be adjusted a bit more dynamically by project sizes and only splitting column components into separate internal files whenever necessary.
automatic typing for schema-less ingestion
The nested columns could be improved to make Druid schema-less ingestion support automatic type discovery. All discovered columns could be created with a nested data indexer, and at serialization time we could improve the persistence code to recognize single typed columns with only 'root' literal values and allow rewriting the type and writing out a standard Druid literal column. This primary work here would be allow this to work seamlessly with realtime queries, allowing the realtime selector to make instead a value selector on the root literal value instead of the 'raw' data selector.
literal arrays
While the current proposal can process and store array values, it does not include the ability to interact with them as native Druid ARRAY
types and utilize the associated functions. Arrays of literal values could be stored with specialized nested columns (instead of a nested column for each array element),
JSONPath wildcards
Interaction with arrays could also be improved by introducing support for wildcards in our JSONPath syntax, to allow selecting an array of values instead of being limited to selecting specific array elements. This would make arrays significantly more useful.
better general array handling
Druid support for ARRAY
types is growing, but still could use some improvement. In particular, an UNNEST
function to allow turning arrays of values into columns of values would unlock a lot of functionality when interacting with nested arrays.
better complex dimension handling, grouping, filtering, aggregation
Druid support for direct usage of COMPLEX
types is still rather limited, and I want to work on improving this to make using nested data columns a more pleasant experience. This includes allowing direct grouping (the 'raw' values, like any variably sized type, could use a dictionary building strategy in the grouping engines). The filtering system could allow complex types to better participate in indexes and value matching. The current workaround is to use TO_JSON_STRING
to stringify these values into a type that Druid can work with, but I think we can eliminate this need in the future.
formal Druid type instead of complex
It might be useful to consider switching from using generic COMPLEX
types and promote the nested data type into a top level Druid type and call it something like OBJECT
or STRUCT
or ... something. This would allow various parts of the engine to take a more active stance on how nested types are handled, and allow tighter integration with various pieces. I'm not certain if this is strictly necessary at this point, just something I've been thinking about.
support for ingesting from other nested formats (Parquet, Avro, ORC)
The nested column implementation is not specific to JSON, so supporting other data formats would give us near full feature parity with the flattenSpec
, allowing it to be deprecated.
customized control over ingestion (which fields to extract, which fields to index, retain raw data, etc)
Fine tuned control over how the nested data indexer produces columns would allow for retaining a larger blob of data but only extracting a specific set of columns to be 'optimized' to support use with JSON_VALUE
and filtering with indexes, allowing the other columns to fall back to the 'raw' data. We could also allow omitting the 'raw' data, and instead opt to reconstruct it on the fly from the nested columns. Additionally, indexes might not be that useful on all nested columns, so control over which fields are indexed for fast filtering would be useful. All of these options would give operators a way to control the output size of nested columns.
bring technical enhancements to normal numeric columns
Nested numeric columns have both a numeric value column and a dictionary encoded column and bitmap indexes. This allows for both fast aggregation and fast filtering in exchange for additional storage space. These improvements can be folded into Druid LONG
, DOUBLE
, and FLOAT
columns to allow operators to optionally specify creating indexes for numeric values.
alternative storage formats
There is a lot of room for exploration on alternative storage formats to suit various nested data use cases. For example, in cases where the structure is interesting and it is likely that a collection of nested fields will be taking part in the same query often, it might make sense to explore formats that allow compressing the values of these columns together into a single column (a fixed width row oriented format), allowing lower overhead to read multiple values in the same query (whether or not this is actually better would need proving). That said, I don't really have anything specific in mind in this area, just throwing it out there as an area of interest.
Hi @clintropolis , Introducing JSON type is a very big movement in an analytic database, and appreciate your const effort on this.
I get some personal opinions on the SQL function level that I want to share with you.
- When using
json_value
function to extract a value, this function does not know what the actual type of the value in advance and maybe there might be some problems. For example,
SELECT json_value(a_json_column, "a") / 5
In this example, a type of LONG
or DOUBLE
is expected for the returning of json_value function. But If one row accidentally stores the a
in type of STRING
(It's very common because some JSON serialization tools serialize LONG and DOUBLE as quoted format which will be recognised as string by some other deserialization tools), above expression fails to calculate.
Or for
SELECT json_value(a_json_column, "b") + json_value(a_json_column, "c")
does it perform a math calculation or string concatenation? It's not clear, only the writer of the SQL knows.
I think a better way is to make the json_value
function more specific, such as json_value_long
, json_value_double
and json_value_string
.
- I don't know if
json_value
andjson_query
are the final SQL function names in your implementation, but I think they're a little confusion for people when they first touch these functions.json_extract
is a more widely used function name in other database like MySQL, SQLite, ClickHouse etc, I think we can use it so people are easy to know what these functions do.
So, I would like to propose the function names as: json_extract_long
, json_extract_double
, json_extract_string
, json_extract_object
.
- The final one is about the performance of application of JSONPath in above functions. There is a PR(#11467) compares the JSONPath-based implementation and a hand-writing implementation which turns out the latter one has much better performance. I'm not saying this hand-writing implementation should be used but just want to give you a hint.
Hi @FrankChen021 thanks for having a look, sorry I haven't finished filling out the PR description for #12753 yet, it might have been able to answer some of your questions.
When using json_value function to extract a value, this function does not know what the actual type of the value in advance and maybe there might be some problems.
For this, I have actually picked up a syntax that was implemented in the Calcite parser for these functions that allows specifying the type inline, looking something like
JSON_VALUE(x, '$.some.nested' RETURNING BIGINT)
The way I have implemented stuff during SQL conversion will also "eat" cast operations, so wrapping JSON_VALUE
in a CAST
will plan to the same native virtual column as the 'returning' syntax would, decorated with the expected type. I don't know how standard this part of the syntax is, but it came free from using these functions in Calcite.
Underlying JSON_VALUE
is a thing called the NestedFieldVirtualColumn
which can make optimized type selectors as asked for by the expected type, as well as supply the column indexes and everything else which provides 'native' druid performance.
I don't know if json_value and json_query are the final SQL function names in your implementation, but I think they're a little confusion for people when they first touch these functions. json_extract is a more widely used function name in other database like MySQL, SQLite, ClickHouse etc, I think we can use it so people are easy to know what these functions do.
I did some surveying as well and JSON_VALUE
and JSON_QUERY
are also relatively widely used, in fact it looks like even Clickhouse supports them (also big query, ms sql, oracle, etc). That said, it is relatively easy to add additional functions at the SQL layer, including optimized functions that plan into the NestedFieldVirtualColumn
, you can see 2 of them still hanging out in my PR, GET_PATH
and JSON_GET_PATH
were my initial prototype functions and use 'jq' style paths instead of 'JSONPath', see NestedDataOperatorConversions
The final one is about the performance of application of JSONPath in above functions. There is a PR(https://github.com/apache/druid/pull/11467) compares the JSONPath-based implementation and a hand-writing implementation which turns out the latter one has much better performance. I'm not saying this hand-writing implementation should be used but just want to give you a hint.
Heh, not to worry, I actually also wrote my own JSONPath parser (also jq parser) that only handles the subset of functionality that we actually support for these columns see NestedPathFinder
. I actually missed #11467, though I guess there is a bit of difference between what's going on in that PR, which looks like it has functions to handle stringified json inputs, vs what I'm proposing here, which is a new column type that decomposes the JSON at ingest time into indexed nested columns, but I'll have a look to see if there is any overlap. Thanks for pointing this out.
For #11467, if all your proposed functions are provided, there's no need to merge that PR. The use of your json_value
and parse_json
together will cover the proposed functions in that PR.
@FrankChen021 @clintropolis Good news. Although my pr(#11467) was not accepted, it is encouraging to give hints on performance issues. Great job.