spark icon indicating copy to clipboard operation
spark copied to clipboard

[SPARK-48495][SQL][DOCS] Describe shredding scheme for Variant

Open cashmand opened this issue 1 year ago • 9 comments

What changes were proposed in this pull request?

For the Variant data type, we plan to add support for columnar storage formats (e.g. Parquet) to write the data shredded across multiple physical columns, and read only the data required for a given query. This PR merges a document describing the approach we plan to take. We can continue to update it as the implementation progresses.

Why are the changes needed?

When implemented, can allow much better performance when reading from columnar storage. More detail is given in the document.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

It is internal documentation, no testing should be needed.

Was this patch authored or co-authored using generative AI tooling?

No.

cashmand avatar May 31 '24 22:05 cashmand

Merged to master.

HyukjinKwon avatar Jun 03 '24 01:06 HyukjinKwon

Alright, I had an offline discussion with the PR author. While it looks fine to me, the author would like to have some more feedback and discussion. Respecting that, I will revert and reopen this.

HyukjinKwon avatar Jun 03 '24 04:06 HyukjinKwon

I read through the proposal and some thoughts:

It would be really useful to add to this PR a list of ways nested structs (struct-of-structs) and array-of-structs can be represented because this is not clarified. From my understanding, a nested object path value can be represented as either a fully "typed_value", or a variant within a variant (containing a required value/metadata, and optional paths), or by directly nesting into the paths (a.b) without introducing an intermediate definition level.

Looks like the current proposal for columnarization works well if the data in a file mostly has one global structure. However, for heterogenous data sources or data sources with fields that alternate between more than one type of value occasionally, it seems there are limitations such as potentially needing to store the value in the top level value field bag if there is a single type conflict for some deeply nested path.


I would like to propose an alternate way to encode the data that allows for more flexibility in representing nested structures and also allows for more efficient encoding of the data.

  • I propose that simplify the design we require every path part to be immediately be followed by a definition level ($typed_value_*/$untyped_value_variant) that indicates the type of the value at that path, allowing for a fully recursive definition of the variant type as union of the types observed at each path.
  • I also propose that we allow storing the key paths in an untyped value variant separately as a native parquet list to enable field membership checks without having to scan the metadata. In my proposal, the metadata fields are also made optional, which if not present, means that the metadata is encoded in the value.

Simplest variant example representations, according to my proposal:

optional group message { // message: variant (untyped)
    optional group $untyped_value_variant {
        optional binary value;
    }
}
optional group message { // message: string (typed)
    optional binary $typed_value_string;
}

Nested struct example (w/ subcolumnarized paths, nested type conflicts) representation, according to my proposal:

optional group a {
    optional group $typed_value_object {
        optional group b {
            optional group $typed_value_object {
                optional group c {
                    optional group $typed_value_object {
                        optional group d { // d: string | untyped (value+metadata)
                                optional binary $typed_value_string;
                                optional group $untyped_value_variant {
                                     optional binary value;
                                     optional binary metadata; // make metadata optional, if not present, it is included in the value
                                     optional group metadata_key_paths (LIST) { // also allow to optionally store the list of flattened paths in the value as parquet array to enable  dictionary encoding / bloom filters for fast lookup without having to scan the metadata.
                                        repeated group list {
                                            optional binary element;
                                        }
                                    }
                                }
                        }

                        optional group e { // e: untyped (value) | object (subcolumnarized paths e.x, e.y)
                            optional group $untyped_value_variant {
                                optional binary value;
                            }
                            optional group $typed_value_object {
                                optional group x {
                                    optional binary $typed_value_string;
                                }
                                optional group y {
                                    optional int64 $typed_value_int64;
                                }
                            }
                        }

                        optional group f { // f: int64
                            optional int64 $typed_value_int64;
                        }
                    }
                }
            }        
        }
    }
}

NOTE: To reduce the nesting in cases where a field is only present as a single type, a short form could be introduced that allows concatenating the definition level into the path name, making the simplest example representation even compacter:

optional string message.$typed_value_string;

shaeqahmed avatar Jun 04 '24 23:06 shaeqahmed

I suggest moving the implementation of this Variant format to a separate repo outside of the Spark project. I see the usage of "Open Variant" instead of "Spark Variant" in recent announcements. Other projects and table formats like Delta Lake and Apache Iceberg have adopted or are considering adopting this format, respectively.

Can you clarify if this is not a Spark specific internal Variant, because it so it would be useful in terms of specification and implementation, and interoperability for this to not be embedded inside of the Apache Spark project.

Samrose-Ahmed avatar Jun 06 '24 04:06 Samrose-Ahmed

Hi @shaeqahmed, thanks for your detailed response. Your suggestions add a lot of flexibility to the shredding scheme! At the same time, we are wary of adding complexity that could be a burden on implementations to support. Our expectation is that the primary candidate for shredding is data with a fairly uniform sub-structure. In particular, this assumption simplifies the shredding decision and behavior. With a more flexible shredding scheme it also becomes more difficult to decide what to shred since there are significantly more viable options with nuanced tradeoffs. Simplicity in implementation and user-observed behavior is very important to us.

Since the benefits of shredding are both data and workload dependent, could you help us understand concrete query examples for your suggested features? Do you have particular use cases where you expect to write a non-uniform shredding scheme, and get a significant performance benefit?

A few specific points:

In my proposal, the metadata fields are also made optional, which if not present, means that the metadata is encoded in the value.

Do you have specific typed/untyped combinations in mind that are common based on your experience? Adding options to the spec increases the implementation complexity (readers need to support both versions to function correctly), and we’d like to explore the impact of these choices more concretely. Our motivation for combining the metadata and value is to reduce the size of the Parquet schema. Large schemas can be quite a performance burden due to how Parquet stores its footer, especially for selective queries.

metadata_key_paths

Do you have a concrete query in mind for this feature? My understanding is that it is redundant, and readers could safely ignore it. We’ve purposely designed shredding without redundancy to avoid unexpected increases in storage. In the RFC for Delta here, it mentions struct fields which start with _ (underscore) can be safely ignored. I think we could add that to the spec here, and perhaps reserve _metadata_key_paths as a keyword for a future addition to the spec. As long as readers ignore fields with underscore, it shouldn’t cause any backwards compatibility issues.

union of the types observed at each path

I’d be interested in understanding the expected use cases. Distinguishing different types for scalar fields does not seem to add much value compared to storing mismatched types in an untyped_value column, and adds complexity to the spec and implementation. Could you highlight an example or query pattern where having different typed-values would provide significant benefits over a single typed-value? Our assumption is that one of the types will be most common and shredding should focus on that one.

cashmand avatar Jun 07 '24 19:06 cashmand

Hi @Samrose-Ahmed, our intent is for this to be an open format that other engines can adopt. We're aiming to put common code in a Java library under common/variant, so that other engines can reuse the core operations. At this point, I think it makes more sense to leave it in the Spark repo while we iterate on the implementation. We can choose to pull it out into a separate repo later.

cashmand avatar Jun 07 '24 19:06 cashmand

Thanks for the response and feedback @cashmand!

Can you please include an example on the document which clarifies how the current proposal deals with nested structs and the general way deeply nested data is expected to be converted from JSON -> the shredded variant form? It is not clear how a nested key path with an object value should be encoded as there are 2-3 implicit ways this can be done IIUC, each with their own tradeoffs: either by adding it as a nested variant (assuming this is supported), or adding the struct directly as a nested key path within the existing path/paths.*, or maybe adding it as a struct typed_value (assuming this is not supported, but it is not actually stated whether or not it is allowed to have a typed value as a struct, and if typed values can be nested within typed values?). Elaborating on this would be useful for readers to understand how this proposal deals with the different cases of structs of structs with structured and unstructured parts.


The real world use case I have in mind is semi structured log analytics, particularly on data that comes from upstream sources that contain heterogenous loosely typed data. A good example of this is AWS Cloudtrail Logs (https://www.databricks.com/blog/2022/06/03/building-etl-pipelines-for-the-cybersecurity-lakehouse-with-delta-live-tables.html), which has variant fields like requestParameters and responseElements whose schema shape is largely relational but directly determined by the AWS service (of which there are a few hundred, the cardinality of the eventProvider field) that a given log row belongs to. Fields like requestParameters and responseElements also contain arbitrary user input that is completely unstructured and such key paths' data should ideally end up stored in an untyped blob field, while all other key paths should be subcolumnarized for performance in analytical queries. The current proposal makes it difficult to encode this data in a subcolumnarized way as there is no single global schema that can be inferred by reading the first N rows from a large batch (e.g a file).

I agree that having more than one typed field for a given key path per smaller batch of rows (e.g. ~10,000) is not necessary, but the reason for adding this flexibility to the variant representation is that the current proposal does not allow for taking a series of row batches representing different locally discovered schemas and unioning them together to form a file containing a large batch of rows (256MB-10GB) efficiently and without type conflicts. The idea is that the writer should group the rows in smaller batches and sort in a way that is designed to place similarly shaped data closer together in the file.

My proposal is inspired by some of the state of the art research done for the Umbra database in the JSON Tiles paper (https://db.in.tum.de/~durner/papers/json-tiles-sigmod21.pdf), which describes a columnar format and approach designed to exploit implicit schemas in semi structured data and popular real world implementations of Variant such as that in Apache Doris Lakehouse (https://github.com/apache/doris/issues/26225).

In our case for open tables, Parquet v1/v2 has some limitations that must be kept in mind like extra overhead associated with wide tables / too many definition levels (> tens of thousands of columns) and the inability to have a separate subschema per row group which can result in sparse null columns. However, it is still possible to take advantage of subcolumnarization on heterogenous data if the data is laid out correctly so as to maximize RLE on null arrays and using a compact representation that doesn't require an extra definition level (e.g. x.typed_value in the current proposal) for value paths that have no conflicts in a file.

shaeqahmed avatar Jun 08 '24 19:06 shaeqahmed

Hi @shaeqahmed, sorry for the delay, and for not replying earlier about how nested structs are handled. I’ll try to update the doc with an example, but in the meantime, the plan is to support two of the cases you described:

  • Adding the struct directly as a nested key path within the existing paths structure is meant to be the primary approach. The example at the end of the doc shows an array-of-struct with this form, but a struct-of-struct would look the same. At any nesting level, if a given key doesn’t exist in the parquet schema, it would be stored in the top-level value binary. A request for any non-leaf field would require checking the top-level value, and merging the result with the shredded values (as described in the pseudo-code in the PR).
  • Adding a nested key path as a nested Variant is supported. This is indicated by just including untyped_value, with no corresponding typed_value. But in this case, it wouldn’t be possible to recursively shred the nested value.

Please let me know if the above is clear, or if I’m misunderstanding the question.

Thanks for describing your use case and the papers you’re referenced. The CloudTrail use case makes a lot of sense, and is definitely one that we should consider carefully. For the current approach, I think it would make sense to shred a field like requestParameters as a Variant binary. This would provide a lot of the benefit, since queries on requestParameters would not need to fetch the top-level binary or any other columns.

I can see that the more flexible schema you’ve proposed could provide better performance for some query patterns, though. At the same time, we’d like to aim to minimize the complexity in the spec, the Parquet footer, and implementation.

I’d like to spend a bit more time looking at the papers you’ve linked to, and considering the trade-offs between the proposals. Can you give us a better idea of what type of queries you expect to see on the read path, and how your scheme would benefit? E.g. would you expect to typically see a mix of queries that need all of requestParameters, and others that only need a field or two? What type of query is likely to benefit significantly from shredding different types (e.g. integer and string) vs. just shredding the most common type, and fetching the rest from the binary? We would like to better understand how the shredding scheme will improve read performance for your workload. Thanks!

cashmand avatar Jun 14 '24 19:06 cashmand

Thanks @cashmand and yes that is clear. The type of queries one would expect to see on AWS CloudTrail requestParameters and responseElements would both be (needle in haystack) queries that would filter on just a few nested field paths corresponding to the AWS service, returning all columns (benefits from late materialization) AND analytical dashboards that aggregate/summarize on a few fields such as queries for service level dashboards (e.g. AWS S3, etc.) that perform aggregations on some of the sub fields pertaining to each service within requestParameters and responseElements.

An example of the first type is the following AWS repository containing a variety of search oriented security detection rules: https://github.com/sbasu7241/AWS-Threat-Simulation-and-Detection.

Both of these types of queries (highly selective search & analytical) would benefit massively from shredding/subcolumnarization over being stored as a binary variant due to the compression, statistics, and reduction in data scanned. Note that these fields can contain massive user input strings blobs coming from user input (e.g. a stringified SQL query string as a request parameter https://docs.aws.amazon.com/athena/latest/APIReference/API_StartQueryExecution.html#athena-StartQueryExecution-request-QueryString) alongside compact low cardinality or numerical fields which are useful in a query or a dashboard (e.g. viewing distinct requestParameters.policyDocument.statements[*].action or a search like requestParameters.ipPermissions.items[0].ipRanges.items[0].cidrIp == "127.0.0.1"), which is why shredding is important for performance on this type of semi structured field.

shaeqahmed avatar Jun 17 '24 02:06 shaeqahmed

Hi @shaeqahmed, I updated the scheme based on the discussion above, while still trying to keep the scheme relatively simple. At a high level, I added the option to define one or more of object, array, typed_value or untyped_value at each path segment (including at the top level, rather than having the one-off value/metadata). This provides the flexibility to union multiple schemas, and avoids the problem of having to fetch the top-level value to determine if an intermediate path was only partially shredded.

We decided to allow only one typed_value at each level, rather than providing one per type. The storage overhead of storing alternative scalar values in untyped_value should be fairly low after encoding/compression, and it should still be possible to define custom stats/metadata schemes later if that turns out to be useful for filtering applications.

Please take a look, and let me know if you have more feedback.

cashmand avatar Jul 03 '24 22:07 cashmand

Hi @shaeqahmed, I think the ability to read with older engines is not a goal, and I don't think it's worth adding extra complexity to the scheme to allow it. I can update the document to make that clear.

I also don't think there's much benefit to collapsing of the levels in the schema. It adds extra complexity to parse and handle that case, and ensure that the meaning of a name can't be ambiguous. I don't think it really saves much, if anything - the number of column chunks won't change, and in the current scheme, marking the intermediate groups as required instead of optional should result in the same column chunk size as if they had been collapsed.

cashmand avatar Jul 10 '24 13:07 cashmand

@cashmand Ah that makes sense, since marking those intermediate columns as required means the writer does not have to write an extra definition level. Thanks for updating the doc, this looks good to me!

shaeqahmed avatar Jul 10 '24 20:07 shaeqahmed

Merged to master.

HyukjinKwon avatar Jul 13 '24 03:07 HyukjinKwon

@cashmand Shredding will be a great improvement for variant, looking forward to its implementation! How is its progress so far~ And after reading this document, I have some questions and hope to get help:

  1. Will shredded variant be a new type? Because I see that it is currently a nested and changing Struct type, it is a bit difficult to imagine how to describe it.
  2. For the write side, how is the shredding schema generated adaptively? From the description in the document, it looks dynamic, is it at the table level / file level / or even rowGroup level? And, I see that many layers of nesting are currently designed, does this have an impact on the write overhead.
  3. For the read side, if it is a file-level schema, how should spark integrate it when reading. For example, if we want to obtain a certain path, but if the schemas of different files are different, how should we determine the physical plan.

Zouxxyy avatar Sep 18 '24 08:09 Zouxxyy

@Zouxxyy, thanks, these are great questions, which we don't have clear answers for yet, but I'll give you my high-level thoughts.

  1. Will shredded variant be a new type? Because I see that it is currently a nested and changing Struct type, it is a bit difficult to imagine how to describe it.

The intent is for it to not be an entirely new type. For the purposes of describing the desired read/write schema to a data source, I think we might want to do something like extend the current VariantType to specify a shredding schema, but I don't think most of Spark should need to consider it to be a distinct type.

  1. For the write side, how is the shredding schema generated adaptively? From the description in the document, it looks dynamic, is it at the table level / file level / or even rowGroup level? And, I see that many layers of nesting are currently designed, does this have an impact on the write overhead.

The intent is to allow it to vary at the file level. (I don't think row group level is an option for Parquet, since Parquet metadata has a single schema per-file.) The exact mechanism is still up in the air. We could start as simply as having a single user-specified schema per write job, but ultimately I think we'd like to either see Spark determine a shredding schema dynamically, or provide the flexibility in the data source API to allow connectors to determine a shredding schema.

  1. For the read side, if it is a file-level schema, how should spark integrate it when reading. For example, if we want to obtain a certain path, but if the schemas of different files are different, how should we determine the physical plan.

Also a tough question. I think we'll either need the Parquet reader to handle the per-file manipulation, or provide an API to allow data sources to inject per-file expressions to produce the data needed by the query. (This could be useful in other scenarios like type widening, which might have data source specific requirements.) We're still looking into what the best approach is here.

cashmand avatar Sep 18 '24 13:09 cashmand

@cashmand Got it, thanks for the answer. The reading and writing of shredding scheme is implicit in the variant type, and the user is unaware of it, which is user-friendly. looking forward to the final implementation !

Zouxxyy avatar Sep 20 '24 02:09 Zouxxyy