feat: add partition columns in ReadRel
feat: This PR added partition columns in ReadRel.
Authored by @zzcclp.
That doesn't really belong there; a named struct, by definition, is just a struct with names attached, and nothing more. It's also worth noting that the names do not map to columns necessarily, but to any struct fields in the nested type in DFS order, so introducing something operating on columns in the same abstraction might be more confusing than it is helpful.
Unless we're going to introduce a Schema message for this, should this maybe just go in ReadRel? @jacques-n
That doesn't really belong there; a named struct, by definition, is just a struct with names attached, and nothing more. It's also worth noting that the names do not map to columns necessarily, but to any struct fields in the nested type in DFS order, so introducing something operating on columns in the same abstraction might be more confusing than it is helpful.
Unless we're going to introduce a
Schemamessage for this, should this maybe just go inReadRel? @jacques-n
Thanks for your comment. I changed it into the ReadRel. Would you like to take a review again? Thanks.
The other thing I'm wondering about (and why I tagged @jacques-n) is whether this should be in Substrait at all or whether advanced extensions are more suitable. The question would be how this would translate to completely different query engines, if at all, and how these things affect the behavior of the plan. If different engines handle this in fundamentally different ways, or if it doesn't affect the behavior of the plan, it should IMO just be in advanced_extension via Any.
I personally don't have enough of a background in query engines to answer either question or review this, sorry.
The other thing I'm wondering about (and why I tagged @jacques-n) is whether this should be in Substrait at all or whether advanced extensions are more suitable. The question would be how this would translate to completely different query engines, if at all, and how these things affect the behavior of the plan. If different engines handle this in fundamentally different ways, or if it doesn't affect the behavior of the plan, it should IMO just be in
advanced_extensionviaAny.I personally don't have enough of a background in query engines to answer either question or review this, sorry.
Thank you for your nice comment. When scanning partitoned table, we find this information is neccessary for two query engines we are testing. Otherwise, extra logic is needed to distinguish between normal columns and partitioned columns, for example, according to the path and dir name, which is not convenient. If this information is already known by the planner and the planner also puts it into the Substrait plan, the engine can handle the information in its way. As you said, the engine may create the scanning operator in different ways, but this information is there for them to use.
Can you expand on a bit more on how this information is actually used? For example, what part of your engines needs to know what the column type is and why? Is this in the scan operation?
Can you expand on a bit more on how this information is actually used? For example, what part of your engines needs to know what the column type is and why? Is this in the scan operation?
Hi, we integrate Gluten(Spark) with ClickHouse Backend, when querying a partitioned table, it needs to specify some partitioned column to ClickHouse Backend through Substrait Plan, and ClickHouse Backend will use these partitioned columns to read values from the file paths, because the partitioned columns are not included in file source, for example, parquet, MergeTree.
Can you expand on a bit more on how this information is actually used? For example, what part of your engines needs to know what the column type is and why? Is this in the scan operation?
We are also integrateing Velox as a query engine, in this case, this information is used in Hive connector and table scan operator.
@westonpace Would you like to take some further review? Thanks.
@rui-mo I apologize but I still am not sure I understand. Let us consider an example.
I have a bunch of parquet files with schema {"x": int32, "y": int64}.
They are stored in a dataset with hive partitioning on year and month (e.g. /my_dataset/year=2022/month=July).
Can you give an example ReadRel (in JSON format) with your addition to read this dataset? What would the output schema be?
@rui-mo I apologize but I still am not sure I understand. Let us consider an example.
I have a bunch of parquet files with schema
{"x": int32, "y": int64}. They are stored in a dataset with hive partitioning on year and month (e.g./my_dataset/year=2022/month=July).Can you give an example
ReadRel(in JSON format) with your addition to read this dataset? What would the output schema be?
Sorry for the late response. Here is an example from our testing. The query is select cr_returned_date_sk, cr_returned_time_sk from catalog_returns, and the partition columns is cr_returned_date_sk (seeing TPCDSTables). Then the Substrait plan was like below, in which partitionColumns was used to specify whether a column is normal or partition column.
{"read":{
"common":{"direct":{}},"baseSchema":{"names":["cr_returned_time_sk","cr_returned_date_sk"],
"struct":{"types":[{"i32":{"nullability":"NULLABILITY_NULLABLE"}}, {"i32" {"nullability":"NULLABILITY_NULLABLE"}}]},
"partitionColumns":{"columnType":["NORMAL_COL","PARTITION_COL"]}},
"localFiles":{...}
......
}
}
Does this make sense?
I think maybe this sort of column metadata can be somehow centralized/reorganized in future, cause different consumer might care about different information about a specific column. E.g. compression, dictionary encoding, partition (as well as the partition key format maybe), etc. Anyway it seems to be reasonable to Substrait to have the stuffs predefined as long as the information is not customized for a single engine.
I'm struggling with the patch. I do agree that there are scenarios where partition columns are required but I'm a little bit confused about how this information is being captured. For example, in Hive, partition columns are synthetic: they aren't actually stored in files. I can't see how marking is sufficient to clarify the values of the partition fields. In that case, the partition columns are part of a lot of other information for reading hive tables (e.g. table properties) and all are needed to construct the records correctly. I could see having a Hive table type that carries such information.
So can you give a concrete example of when marking which columns are partition columns would be useful without any additional information (e.g. partition column values per file, for example)? The only thing I can come up with is when one would want to give the optimizer better insight into which columns would be more effective to apply predicates to first...
As a separate note, if this in fact valuable, I'd be inclined to expose as something more like repeated int partition_columns but again, I'm not exactly clear.
Maybe you could create an actual velox plan (in their format) and show us all the details. As I said, it feels like this is a format specific concept (except for optimizer costing).
@jacques-n Thanks for your comments!
I can't see how marking is sufficient to clarify the values of the partition fields.
In Velox, below part is within HiveColumnHandle class, and is what we need to set when converting from Substrait ReadRel to Velox TableScanNode.
enum class ColumnType { kPartitionKey, kRegular, kSynthesized };
But that is not enough. We also need to set the partiton column and value to the split like below, so the HiveDataSouce can know the partition value and set this constant to the partition column during scanning.
auto split = std::make_shared<hive::HiveConnectorSplit>(
kHiveConnectorId,
paths[idx],
format,
starts[idx],
lengths[idx],
partitionKeys);
From what I saw, the column types in TableScanNode are used to provide information for latter computing. For example, whether there is a partition key, or which column is expected to be a partition key thus setting constant is needed. So when data scanning happens, the data source can use those information.
Maybe you could create an actual velox plan (in their format) and show us all the details.
Below is the plan details I got for query select cr_returned_date_sk, cr_returned_time_sk from catalog_returns but it is lack of the partition information.
-- Project[expressions: (n1_0:INTEGER, ROW["n0_1"]), (n1_1:INTEGER, ROW["n0_0"])] -> n1_0:INTEGER, n1_1:INTEGER
-- TableScan[table: hive_table] -> n0_0:INTEGER, n0_1:INTEGER
Would it be fair to characterize this as the following?
Information that is put into the logical plan that a planner or optimizer could use to improve the plan
I have been thinking of the ReadRel as very much a physical operator. In other words it merely contains the instructions needed by the consumer to read the files.
It makes sense that a planner would need this information but I had thought it would get the information from the table catalog or some kind of external metadata/statistics source.
I think maybe this sort of column metadata can be somehow centralized/reorganized in future, cause different consumer might care about different information about a specific column.
I still don't have a strong opionion about the other stuff here, but I agree with this one. IMO, column/field names are also part of this.
On the one hand, I like the idea that schemas are represented by NSTRUCT types for consistency. On the other hand, a schema can never be nullable so something's off here immediately, and it makes the name metadata information somehow special compared to all the other metadata an engine might want to attach to a column, even thought Substrait doesn't use the names for anything either.
It would obviously be a breaking change (or field deprecation at least), but I think something like the following would make more sense instead of NamedStruct base_schema:
message ReadRel {
// ...
repeated Field fields = 1;
message Field {
Type type = 1;
repeated string field_names = 2; // DFS field names for this column, or just one if not nested
// other metadata fields that we agree are generic enough to be adopted by Substrait
google.protobuf.Any custom_metadata = 31; // or AdvancedExtension or something
}
// ...
}
If we don't come to an agreement about a column's partitioning type belonging in core, then Velox could use the custom metadata field for it.
A bit off-topic maybe, but I always found the DFS nature of name specification to be awkward, too... Why not just allow names (and other metadata) to be specified in struct fields? Or, more generally, as part of any parameter binding, which is how I'm already doing it in the validator, because it mimics the syntax we use to refer to types (at least for field names).
It makes sense that a planner would need this information but I had thought it would get the information from the table catalog or some kind of external metadata/statistics source.
Got it. I agree there can SQL engine does not need this information. If that is the case, I think custom metadata can also work for us, and can be more generic.
@jvanstraten Based on your suggestion, I added a custom metadata field in ReadRel. To do not bring breaking change, the base_schema was not changed. But I found advanced_extension is already there and maybe be duplicated with this field. How do you think so?
That's not really what I meant. I'll try to clarify the options I see right now for Velox:
- Use the existing
advanced_extensionfield inReadRel. This doesn't require a change to Substrait at all. - Add some sort of metadata field to the
ReadRelmessage specifically for attaching metadata to fields. This is what you're suggesting now. It's indeed kind of duplicating functionality, though; I think I'd favor the first option. - Make some kind of new
Schemamessage that encapsulates fields as type-name-metadata triples (or more), similar to whatNamedStructis doing right now. I think this is a reasonably clean solution, but it's a breaking change. - Do the
repeated Fieldthing I suggested above. Basically the same as theSchemamessage, and also a breaking change. - Accept that
NamedStructwas always a weird construct as a type and is only used for schemas anyway, and modify that to include field metadata. As far as the protobuf messages are concerned, I think this is what you suggested originally. However, named structs are currently considered to represent theNSTRUCTpseudotype rather than just schemas, so without getting rid of that part of the docs this change would be pretty weird. Continuing to call the messageNamedStructrather thanSchemais then also pretty weird, but would be required for backward compatibility. It'd be something I would rename as part of #348, at which point we'd end up with theSchemamessage solution but without a breaking change right now. But I don't know if we're going to actually end up making that refactoring effort.
I don't have a strong preference for any of those options. My personal opinion that we should just support metadata (including names) for all struct fields and get rid of the awkward NSTRUCT pseudotype and depth-first name annotation, but I don't see that one happening.
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.
@jvanstraten Thank you for your informative suggestion. Changed NamedStruct into Fields with type, names and metadata. As you mentioned, this could be a breaking change, but I think it is OK because only the parsing for NamedStruct should be changed. Would you like to take review again? Thanks.
I would personally be good with this change. However, a number of organizational things have changed since you opened this PR. As this would be a breaking change, I believe you would now need a +2 from the members of the SMC (Jacques, Weston, and/or Phillip).
Got that, thank all of you for the review and comments.
I have a new case in https://github.com/oap-project/gluten/issues/853
As it stands, I still do not see enough justification for a backwards incompatible change. A backwards compatible change might be ok but I don't really think it is the right approach. Let me clarify my suggestion in https://github.com/substrait-io/substrait/pull/298#discussion_r1009590053 . I agree custom_metadata would be repeated and would carry metadata for columns. I am only suggesting we keep it out of base_schema.
Here is why I do not understand the need:
I have a new case in https://github.com/oap-project/gluten/issues/853
This ask is for named field references. Producers can support named field references and translate them to numeric field references when creating the plan. This feature is not needed.
For Hive partition information
I have studied the Velox code. As I understand it, this is what is happening:
- The user asks for fields
regular1, regular2, partition1. - The planner discovers that
partition1is a partition key and for the filespartition1=7/chunk0.parquetandpartition1=7/chunk1.parquetthe value ofpartition1 == 7. For the filepartition1=10/chunk0.parquetthe value ofpartition1 == 10. - The planner needs to tell the query consumer to create a constant column named
partition1with the value7or10depending on the file.
First, this is not unique to Velox. I would not consider this "metadata". I think other query engines need this too. Acero has this feature in datasets (though it is more general). I assume DuckDb has support for something like this. This exists in table formats like Iceberg. It is a very common thing. We should create a dedicated message or way to handle this and should not call it "metadata".
If you want to use this feature before it is final or without upstreaming into Substrait then AdvancedExtension or ExtensionTable should be used. We do not need another arbitrary extension point.
Finally, it is entirely possible to do this today:
Read1:
files:
- partition1=7/chunk0.parquet
- partition1=7/chunk1.parquet
Read2:
files:
- partition1=10/chunk0.parquet
Project1:
input: Read1
exprs:
- 7
Project2:
input: Read2
exprs:
- 10
Union:
inputs:
- Project1
- Project2
Suggested Next Steps: My preference (if we don't want to use simple project nodes) is to formally define a "guarantees" or "partition columns" or "constant values" property for ReadRel.
It is a very common thing. We should create a dedicated message or way to handle this and should not call it "metadata".
If you want to use this feature before it is final or without upstreaming into Substrait then AdvancedExtension or ExtensionTable should be used. We do not need another arbitrary extension point.
Suggested Next Steps: My preference (if we don't want to use simple project nodes) is to formally define a "guarantees" or "partition columns" or "constant values" property for ReadRel.
@westonpace Thanks for your instructive comment!
That you suggested is just what we need. We find the problem to use AdvancedExtension or ExtensionTable is the Substrait plan receiver needs to know how the information is translated to Any firstly, which requires some extra implicit information besides Substrait. Therefore, we prefer to formally define a "guarantees" or "partition columns" or "constant values" property for ReadRel just as you suggested.
What we're doing in this PR is trying to find a more acceptable way to carry the partition columns information in a Substrait plan. Would you like to take a review on my current change? Thanks again.
Hi @westonpace, Is there anything else needed for this PR? Thanks.
Hi @westonpace, Is there anything else needed for this PR? Thanks.
This will need a description in logical_relations.md. I have proposed a PR with some example wording.
It's still not entirely clear to me why this needs to be in the read relation instead of in a follow-up project relation. I'm not sure of any engine that would use this other than Velox & Gluten.
I'm struggling with the patch. I do agree that there are scenarios where partition columns are required but I'm a little bit confused about how this information is being captured. For example, in Hive, partition columns are synthetic: they aren't actually stored in files. I can't see how marking is sufficient to clarify the values of the partition fields. In that case, the partition columns are part of a lot of other information for reading hive tables (e.g. table properties) and all are needed to construct the records correctly. I could see having a Hive table type that carries such information.
@jacques-n I believe this question is answered now that we specify the actual value.
I think what is going on here might be:
- The planner reads the hive metadata to determine the partitioning column values. However, these values are always strings (since they come from the path name).
- The planner passes these strings on to Velox in this "partition columns" spot.
- Velox, during execution (actually I think this is during the creation of the execution plan) converts these strings to the appropriate type.
I would say that this design still seems confusing to me. As I mentioned above, my main question that remains is:
- Why can't the planner do the string->value casting and then insert a project?
- Maybe because the read relation's filters or projection depend on this value. However, I think those filters could be simplified to not depend on this value.
However, my question might be trying to enforce a design on the particular consumer. If this were a physical relation I would just say "There are two implementations and they think it is useful and that is good enough for me".
Overall, I think I am +0.5 on this.
This will need a description in logical_relations.md. I have proposed a PR with some example wording.
@westonpace Really appreciate your help on this PR! And you are totally right on what is happening on the partition column & value in Velox.
Why can't the planner do the string->value casting and then insert a project?
Maybe because the read relation's filters or projection depend on this value. However, I think those filters could be simplified to not depend on this value.
As you said, a project rel can be inserted after read rel, but we need to convert both the read rel and project rel into a TableScan node in Velox, and that can be some kind of ambiguous. Another option for us it to use the AdvancedExtension, but any type of information can be wrapped into this field. That means some implicit information is still needed to extract it correctly, because sometimes the Substrait consumer does not know how the Substrait generator wraps this Any information. Since Substrait serves as a protocol for computing, we assume it would be better to have a clear definition in it, rather than to rely on some implicit conversions.
I see your point and I am still in favor of this. If @jacques-n or @cpcloud agree we can go ahead.
Another option for us it to use the AdvancedExtension, but any type of information can be wrapped into this field. That means some implicit information is still needed to extract it correctly, because sometimes the Substrait consumer does not know how the Substrait generator wraps this Any information. Since Substrait serves as a protocol for computing, we assume it would be better to have a clear definition in it, rather than to rely on some implicit conversions.
Correct. If an extension is used then a different message will still be needed. The difference is this:
The basic set of most common logical relations should ideally be supported by a large number of consumers and producers. If we add this field to ReadRel then it means that other consumers should handle this as well. This adds extra work for everyone, not just velox and gluten, even though other engines do not need this information. For example, now Acero, Datafusion, and DuckDb will need to support this field (probably by using a ReadRel and a ProjectRel).
So if a feature is only needed by a small number of producers/consumers then it is better to have it as a hint or a more physical relation.
If we add this field to ReadRel then it means that other consumers should handle this as well. This adds extra work for everyone, not just velox and gluten, even though other engines do not need this information.
@westonpace Thanks for your reply! Having a little different opinion here. If the other consumers do not need this field, it's OK for them not to handle it. That means, Substrait could provide extra fields. For example, in ReadRel, there are below two fields. Although Gluten + Velox do not use them for now, there is no code change for us on both Substrait plan generation and consumption process with them in Substrait proto.
Expression best_effort_filter = 11;
Expression.MaskExpression projection = 4;
How do you think? Thanks again!