datafusion
datafusion copied to clipboard
Remove LargeUtf8|Binary, Utf8|BinaryView, and Dictionary from ScalarValue
This PR removes LargeUtf8|Binary, Utf8|BinaryView, and Dictionary from ScalarValue, following the discussion on #11513
Open questions
Top level ScalarValue cast
This change initially failed the select_arrow_cast test (see review comments for updates). The test fails because of an interaction of expression_simplifier + optimize_projections.
The query is: SELECT arrow_cast(1234, 'Float64') as f64, arrow_cast('foo', 'LargeUtf8') as large
- expression_simplifier correctly tries to evaluate_to_scalar the
arrow_cast('foo', 'LargeUtf8')cast (here) - this leads to the cast physical expression (here), but this is a no-op since it transforms
Utf8('foo')(scalar) →LargeUtf8('foo')(array) →Utf8('foo')(scalar)- TODO: if this is what we want this should probably not execute at all
- This edit doesn't change the underlying schema of the LogicalPlan
optimize_projectionsrewrites the Projection and updates the schema, seeing theUtf8('foo')it correctly assumes that the LogicalPlan's schema field for this value should have DataType == Utf8
This check is the one raising this error but I guess it should instead check if schema fields are logically equivalent to eachother. I'm not totally convinced this is the correct solution because it removes some guarantees that might be expected by users downstream. Happy to hear everyone's opinion on this.
---- optimizer::select_arrow_cast stdout ----
thread 'optimizer::select_arrow_cast' panicked at datafusion/core/tests/optimizer/mod.rs:118:30:
called `Result::unwrap()` on an `Err` value: Context("Optimizer rule 'optimize_projections' failed", Context("optimize_projections", Internal("Failed due to a difference in schemas, original schema: DFSchema { inner: Schema { fields: [Field { name: \"f64\", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"large\", data_type: LargeUtf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, field_qualifiers: [None, None], functional_dependencies: FunctionalDependencies { deps: [] } }, new schema: DFSchema { inner: Schema { fields: [Field { name: \"f64\", data_type: Float64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"large\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, field_qualifiers: [None, None], functional_dependencies: FunctionalDependencies { deps: [] } }")))
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
arrow_typeof
https://github.com/apache/datafusion/blob/main/datafusion/functions/src/core/arrowtypeof.rs#L59-L72 uses column_value.data_type() to determine the type of the argument but this information is not really accurate. If the ColumnValue is a ScalarValue the data_type() will be "logical". e.g. arrow_typeof(arrow_cast('hello', 'Utf8View')) would yield Utf8.
Type info
Let's take this expr as an example a_function_that_takes_utf8_view(arrow_cast('test', 'Utf8View')) the cast expression currently evaluates to a ColumnValue::Scalar(Utf8View("test")) and the function is happy to receive that. With this change the cast expression instead evaluates to ColumnValue::Scalar(Utf8("test")) (as ScalarValue::Utf8View doesn't exist it produces a logically equal value) and the cast expression data_type() returns Utf8View.
- What would the function do?
- How can we communicate to the parent expression in the expression tree that the column value needs to be of that particular type? (it should know because of the schema)
statement ok
create table test_source as values
('Andrew', 'X'),
('Xiangpeng', 'Xiangpeng'),
('Raphael', 'R'),
(NULL, 'R')
;
statement ok
create table t as
SELECT
arrow_cast(column1, 'Utf8') as column1_utf8,
arrow_cast(column2, 'Utf8') as column2_utf8,
arrow_cast(column1, 'LargeUtf8') as column1_large_utf8,
arrow_cast(column2, 'LargeUtf8') as column2_large_utf8,
arrow_cast(column1, 'Utf8View') as column1_utf8view,
arrow_cast(column2, 'Utf8View') as column2_utf8view,
arrow_cast(column1, 'Dictionary(Int32, Utf8)') as column1_dict,
arrow_cast(column2, 'Dictionary(Int32, Utf8)') as column2_dict
FROM test_source;
query error DataFusion error: Arrow error: Invalid argument error: column types must match schema types, expected Utf8View but found Utf8 at column index 0
select min(column1_utf8view) from t;
I run this test and fail. It success on main branch.
I guess we need schema for converting ScalarValue to ArrayRef
Given that we have ScalarValue::Utf8, and we have StringView in schema. We can then get the corresponding StrginViewArray.
Given that we have ScalarValue::Utf8, and we have StringView in schema. We can then get the corresponding StrginViewArray.
I feel like this is a little bit more complex because casting a scalar value to a logically equal type (arrow_cast('test', 'Dictionary(Int32, Utf8)')) is a no-op for all cases but one the cast is at the top level, which indicates that you actually want an array of that type. So we need a way to distinguish those two scenarios, which is why I've added this:
https://github.com/apache/datafusion/blob/78dc034d4b7d644e6c34745286e82a6332dd7606/datafusion/physical-plan/src/projection.rs#L312
But I'm not really sure about it...
Since you have no commit yet, so CI is not running.
You could run these command to pass CI
cargo test --test sqllogictests (slt files)
./dev/rust_lint.sh
cargo test --lib --tests --bins --features avro,json (rust test)
Since you have no commit yet, so CI is not running.
You could run these command to pass CI
cargo test --test sqllogictests(slt files)./dev/rust_lint.shcargo test --lib --tests --bins --features avro,json(rust test)
Thanks! Don't worry about turning it on as I can still get results from my fork.
Starting to check this out
I wonder if we could do some sort of hybrid approach where ScalarValue still retains a DataType, type, but it isn't encoded in the variant
like
enum ScalarValue {
...
/// A logical string value, with the specified data type
ScalarValue::Utf8(String, DataType) ,
}
That way you would still have the DataType for each ScalarValue 🤔
I wonder if we could do some sort of hybrid approach where
ScalarValuestill retains a DataType, type, but it isn't encoded in the variantlike
enum ScalarValue { ... /// A logical string value, with the specified data type ScalarValue::Utf8(String, DataType) , }That way you would still have the
DataTypefor eachScalarValue🤔
This should be the last resort if we don't have any other way to figure the actual target type to cast to. Otherwise, IMO this is just a code simplification (Still a nice step forward) but not the goal of decoupling types #11513 , since we still hide the physical type inside ScalarValue.
I think it is possible to get the DataType from Schema without it passed along the logical layer 🤔
Otherwise, IMO this is just a code simplification (Still a nice step forward) but not the goal of decoupling types https://github.com/apache/datafusion/issues/11513 ,
Yes I agree with this assesment
🤔 so that leaves us with the question of how to incrementally try and introduce logical types into logical plans 🤔
🤔 so that leaves us with the question of how to incrementally try and introduce logical types into logical plans 🤔
As @jayzhan211 says, the type information is still retained in the Schema, but it would be impractical to modify all expression orders to support the fact that the type is not stored in the ColumnarValue.
My discussion around Datum revolves around the fact that to retain physical type information maybe ColumnarValue enum should instead be:
enum ColumnarValue {
Array(Arc<dyn Array>),
Scalar(Arc<dyn Array>) // this is an array of len = 1
}
(edit: this possibility was already discussed https://github.com/apache/datafusion/issues/7353)
or that ScalarValue is kept as is and a LogicalScalarValue is introduced.
I think that the main problem is that scalar values in this change are purely logical. Instead, if in the future we introduce logical types for arrays, we would have a physical type that is distinct from its logical representation and that we can use during physical execution (e.g. that can be cast to another physical type without losing its type information).
Yeah, I think Datum is actually what we need. After casting the array, we don't need to convert it back to ScalarValue, it has additional conversion cost and the physical type information is lost too. Ideally we could replace ColumnarValue with Datum, but it seems the change is quite large.
We could try replacing ScalarValue with Scalar.
enum ColumnarValue {
Array(Arc<dyn Array>),
Scalar(arrow_array::Scalar)
}
#7353 is talking about changing something like ScalarValue::Int64(i64) to ScalarValue::Int64(arrow_array::Scalar), I don't think we require the change for this PR
Upd: If there is performance concern, a conservative change is to keep the ScalarValue.It depends on how we rely on ColumnarValue::Scalar and whether they are performance critical. I'm not sure about it too 🤔
enum ColumnarValue {
Array(Arc<dyn Array>),
ScalarArray(arrow_array::Scalar)
Scalar(ScalarValue)
}
I've pushed a fairly big experiment. I've tried to change ColumnarValue to
#[derive(Clone, Debug)]
pub enum ColumnarValue {
/// Array of values
Array(ArrayRef),
/// A single value
Scalar(ScalarValue),
Scalar(Scalar),
}
#[derive(Clone, Debug)]
pub struct Scalar {
value: ScalarValue,
data_type: DataType,
}
which follows the approache that was discussed with @jayzhan211 in the comments above. I've opted for this hybrid solution to retain most of the flexibility of the original ColumnarValue and I'm mostly satisfied with how it turned out. Curious to hear your thoughts @jayzhan211 and @alamb
It seems the trick you did is to get the first index of ArrayRef (instead of keeping it as arrow::Scalar) as ScalarValue but we still ends up require DataType to keep the type information.
However, I think we could move on with this approach, we could figure out if there is better approach later on
I'm happy to report that I've got most sqllogictests to run successfully (albeit there is still the issue pointed out by @alamb, which i plan to address after I've got all tests passing). The only errors I'm seeing are the following:
Aggregates using ScalarValues as state
Aggregates use ScalarValue to represent state and evaluate to a result. Should I look into restricting their return type to the subset of types which can be represented by ScalarValues?
External error: query failed: DataFusion error: Arrow error: Invalid argument error: column types must match schema types, expected LargeUtf8 but found Utf8 at column index 3
[SQL] SELECT
min(utf8),
max(utf8),
count(utf8),
min(largeutf8),
max(largeutf8),
count(largeutf8)
FROM t
at test_files/aggregate.slt:4104
Weird error I don't quite understand
Removing this query doesn't yield any other error in the slt file. I don't have any other clue and I'm not sure where to start 🤷 .
External error: query result mismatch:
[SQL] SELECT
count,
LAG(timestamp, 1) OVER (ORDER BY timestamp),
arrow_typeof(LAG(timestamp, 1) OVER (ORDER BY timestamp))
FROM timestamp_with_tz
LIMIT 10;
[Diff] (-expected|+actual)
0 NULL Timestamp(Millisecond, Some("UTC"))
0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
- 4 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
- 0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
+ 12 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
+ 2 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
14 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
- 0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
- 0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
+ 5 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
+ 1 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
at test_files/parquet.slt:255
Aggregates using ScalarValues as state
The issue is that we have only ScalarValue but no DataType to get the actual ArrayRef back.
One of the solution is to store DataType in MaxAccumulator and call to_array_of_type to return ArrayRef as the output of evaluate, maybe evaluate_as_array to avoid breaking existing function.
Weird error I don't quite understand
I fail to reproduce the error 😕
UIpd: It seems there is only one place that calls Accumulator::evaluate and transform it to ArrayRef, I think we can just change the return value to ArrayRef for Accumulator::evaluate
https://github.com/apache/datafusion/blob/37e54ee874e9027a329ba4f6c1e0e6359d63a33c/datafusion/physical-plan/src/aggregates/mod.rs#L1093
UIpd: It seems there is only one place that calls Accumulator::evaluate and transform it to ArrayRef, I think we can just change the return value to ArrayRef for Accumulator::evaluate
It seems like this solution is not that easy as the state needs to also be accounted for, but it's definitely a good start!
Marking as "ready for review" because all tests pass on my end. Some casting issues still remain that I'll look into soon but in the meantime I'm looking forward to some feedback on this huge change ❤️
Will evaluate_as_scalar replaces evaluate and state_as_scalars replaces state? If it is, then it looks good to me
Will evaluate_as_scalar replaces evaluate and state_as_scalars replaces state? If it is, then it looks good to me
Yes that's the plan!
I'm back from vacation and I've rebased my PR to the latest upstream.
To make it clear this PR is not waiting on more review (we are discussing on https://github.com/apache/datafusion/pull/12536) marking this as draft
Closing in favour of #12622