datafusion
datafusion copied to clipboard
use StringViewArray when reading String columns from Parquet
Is your feature request related to a problem or challenge?
Part of https://github.com/apache/datafusion/issues/10918
In order to take advantage of the parquet writer generating StringViewArrays ( https://github.com/apache/arrow-rs/issues/5530 from @ariesdevil (❤️ ) ) we need to make sure datafusion doesn't immediately cast the array back to StringView which would undo the benefits
▲
┌ ─ ─ ─ ─ ─ ─ ┐ │ After filtering,
StringArray │ any unfiltered rows
└ ─ ─ ─ ─ ─ ─ ┘ │ are gathered via
... │ the `take` kernel
│
┌────────────────────────────┐
│ │
│ FilterExec │
│ │
└────────────────────────────┘
▲
┌ ─ ─ ─ ─ ─ ─ ┐ │
StringArray │
└ ─ ─ ─ ─ ─ ─ ┘ │ Reading String data
│ from a Parquet file
... │ results in
│ StringArrays passed
┌ ─ ─ ─ ─ ─ ─ ┐ │
StringArray │
└ ─ ─ ─ ─ ─ ─ ┘ │
│
┌────────────────────────────┐
│ │
│ ParquetExec │
│ │
└────────────────────────────┘
Current situation
Describe the solution you'd like
To support a phased rollout of this feature, I recommend we focus at first on only the first filtering operation
Specifically get to the point where the parquet reader will read data out as StringView like this:
▲
┌ ─ ─ ─ ─ ─ ─ ┐ │
StringArray │
└ ─ ─ ─ ─ ─ ─ ┘ │
... │
│
┌────────────────────────────┐
│ │
│ FilterExec │
│ │
└────────────────────────────┘
┌ ─ ─ ─ ─ ─ ─ ┐ ▲
StringViewArr │
│ ay │ │
─ ─ ─ ─ ─ ─ ─ │
... │
│
┌ ─ ─ ─ ─ ─ ─ ┐ │
StringViewArr │
│ ay │ │
─ ─ ─ ─ ─ ─ ─ │
│
┌────────────────────────────┐
│ │
│ ParquetExec │
│ │
└────────────────────────────┘
Intermediate
Situation 1: pass
StringViewArray
between ParquetExec
Describe alternatives you've considered
I suggest we:
- Make a configuration setting like "force StringViewArray" when reading parquet so we can test this. When this setting is enabled, DataFusion should configure the ParquetExec to produce
StringViewArrayregardless of the type stored in the parquet file - Then work on incrementally rolling out support / testing for various filter expressions (especially string functions like substring and https://github.com/apache/datafusion/issues/10919)
Additional context
No response
I'll take this one, can you assign me @alamb ?
I'll take this one, can you assign me @alamb ?
BTW you can assign yourself (single word comment take): https://datafusion.apache.org/contributor-guide/index.html#finding-and-creating-issues-to-work-on
It seems that the current filter and parquet exec play nicely, and the following code will directly filter on the string view array (instead of converting to StringArrary), which is quite nice.
(the test case requires the latest arrow-rs to run)
#[tokio::test]
async fn parquet_read_filter_string_view() {
let tmp_dir = TempDir::new().unwrap();
let values = vec![Some("small"), None, Some("Larger than 12 bytes array")];
let c1: ArrayRef = Arc::new(StringViewArray::from_iter(values.iter()));
let c2: ArrayRef = Arc::new(StringArray::from_iter(values.iter()));
let batch =
RecordBatch::try_from_iter(vec![("c1", c1.clone()), ("c2", c2.clone())]).unwrap();
let file_name = {
let table_dir = tmp_dir.path().join("parquet_test");
std::fs::create_dir(&table_dir).unwrap();
let file_name = table_dir.join("part-0.parquet");
let mut writer = ArrowWriter::try_new(
fs::File::create(&file_name).unwrap(),
batch.schema(),
None,
)
.unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
file_name
};
let ctx = SessionContext::new();
ctx.register_parquet("t", file_name.to_str().unwrap(), Default::default())
.await
.unwrap();
async fn display_result(sql: &str, ctx: &SessionContext) {
let result = ctx.sql(sql).await.unwrap().collect().await.unwrap();
arrow::util::pretty::print_batches(&result).unwrap();
for b in result {
println!("schema: {:?}", b.schema());
}
}
display_result("SELECT * from t", &ctx).await;
display_result("SELECT * from t where c1 <> 'small'", &ctx).await;
display_result("SELECT * from t where c2 <> 'small'", &ctx).await;
}
I'll take a closer look at the generated logical/physical plans to verify that the string view array is never being converted to string array. And if that is the case, the remaining work of this issue is probably (1) add an option to force reading StringArray as StringView array, and (2) add more tests and potentially test the generated plan uses StringViewArray consistently.
I have manually checked that there isn't any unexpected conversion.
The logical plan is
Projection: t.c1, t.c2 [c1:Utf8View;N, c2:Utf8;N]
Filter: t.c2 != Utf8("small") [c1:Utf8View;N, c2:Utf8;N]
TableScan: t [c1:Utf8View;N, c2:Utf8;N]
The physical plan is
CoalesceBatchesExec: target_batch_size=8192
FilterExec: c2@1 != small
ParquetExec: file_groups={1 group: [[tmp/.tmp4ADQEP/parquet_test/part-0.parquet]]}, projection=[c1, c2], predicate=c2@1 != small, pruning_predicate=CASE WHEN c2_null_count@2 = c2_row_count@3 THEN false ELSE c2_min@0 != small OR small != c2_max@1 END, required_guarantees=[c2 not in (small)]
I looked at the filter exec, it will eventually call into: https://github.com/XiangpengHao/datafusion/blob/string-view/datafusion/physical-expr/src/expressions/binary.rs#L260
Which calls compare_op from arrow.
I'll add more tests to demonstrate that the filter works out of the box.
@jayzhan211 and I are talking about something similar in https://github.com/apache/datafusion/issues/9403#issuecomment-2178347730
What do you think about potentially adding a new OptimizerRule, something like
struct UseStringView {}
/// Optimizer rule that rewrites portions of the Plan to use `StringViewArray` instead of
/// `StringArray` where the operators support the new type
///
/// (some background on StringArray and why it is better for some operators)
///
/// This rule currently supports:
/// 1. Reading strings from ParquetExec (which can save a copy of the string)
/// 2. GroupBy operation
/// ...
impl OptimzierRule for UseViews {
...
}
@jayzhan211 and I are talking about something similar in #9403 (comment)
What do you think about potentially adding a new OptimizerRule, something like
struct UseStringView {} /// Optimizer rule that rewrites portions of the Plan to use `StringViewArray` instead of /// `StringArray` where the operators support the new type /// /// (some background on StringArray and why it is better for some operators) /// /// This rule currently supports: /// 1. Reading strings from ParquetExec (which can save a copy of the string) /// 2. GroupBy operation /// ... impl OptimzierRule for UseViews { ... }
We usually ensure the schema remain unchanged after applying the optimize rule, should this be the special case?
https://github.com/apache/datafusion/blob/4109f581ce9bca956e01f13ff16b30d59720e96b/datafusion/optimizer/src/optimizer.rs#L389-L393
We usually ensure the schema remain unchanged after applying the optimize rule, should this be the special case?
I think we need to keep the schema the same (that is a pretty far reaching invariant)
But maybe we could do something like add a projection to convert it bac
So like if the input plan was
FilterExec [Utf8]
ParquetExec[Utf8]
We could implement an optimizer rule that made a plan like
ProjectionExec(cast(col) as Utf8) [utf8]
Filter[Utf8View]
ParquetExec[Utf8View]
🤔
I like the idea of using an optimizer rule to optimistically/optionally use StringView!
BTW the more I think about this the more I think it should probably be a PhysicalOptimizer rule (not a logical optimizer rule) as the change is related to the particular properties of the ExecutionPlans (rather than the logical type)
https://docs.rs/datafusion/latest/datafusion/physical_optimizer/optimizer/trait.PhysicalOptimizerRule.html
BTW the more I think about this the more I think it should probably be a PhysicalOptimizer rule (not a logical optimizer rule) as the change is related to the particular properties of the ExecutionPlans (rather than the logical type)
https://docs.rs/datafusion/latest/datafusion/physical_optimizer/optimizer/trait.PhysicalOptimizerRule.html
I'll try to prototype a physical optimizer
Without changing schema, we need to convert StringArray to StringViewArray -> compute -> convert back to StringArray. It would be nice if the conversion is negligible
In my case #9403 , we need to
- Convert to StringViewArray and introduce CastExpr to keep the schema unchanged in physical optimziation
- Run GroupStream with StringViewArray
- Apply CastExpr to get the StringArray back.
And, the cost of these 3 should always beat with processing with StringArray directly regardless of what kind of second computation is.
- Convert to StringViewArray and introduce CastExpr to keep the schema unchanged in physical optimziation
I was actually thinking the schema will be different between some ExecutionPlans (e.g. to pass a StringView between ParquetExec and a Filter)
An update here is I think @XiangpengHao has this working on a branch, but found that performance is actually slower for StringViewArray rather than StringView. Thus he is working on https://github.com/apache/arrow-rs/issues/5904 now
Want to share some thoughts here on when to use StringViewArray and when not.
We only consider the cost of loading data from parquet to narrow the scope.
To load a StringArray, we need to copy the data to a new buffer and build offset array. The extra memory we need to setup is array len * (string len + offset size). Specifically, StringArray is array len * (string len + 4), BigStringArray is array len * (string len + 8)
To load a StringViewArray, we only need to build view array and can reuse the buffer from parquet decoder. The extra memory to setup is array len * view size , i.e., array len * 16. Note that the memory consumption of StringViewArray is constant to string length, i.e., it takes 16 bytes of memory no matter how long the underlying string is.
For a sufficiently large array, the time to build the array should be proportional to the extra memory we set up.
This means that if each of the individual string is small, i.e., smaller than 12 bytes, StringArray is actually faster than StringViewArray. In other words, we should use StringViewArray only when strings are larger than 12 bytes.
Update -- @XiangpengHao found the root cause of the "small string is slower" -- read about it in this great writeup: https://github.com/apache/arrow-rs/pull/6031
TLDR is that we can make arrow/parquet reading faster than StringView always with (even) more! work
This is done on the string-view2 branch. Once we mereg https://github.com/apache/datafusion/pull/11667 we can close this ticket I think
https://github.com/apache/datafusion/pull/11667 is ready for review 🥳