datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

Remove LargeUtf8|Binary, Utf8|BinaryView, and Dictionary from ScalarValue

Open notfilippo opened this issue 1 year ago • 18 comments

This PR removes LargeUtf8|Binary, Utf8|BinaryView, and Dictionary from ScalarValue, following the discussion on #11513


Open questions

Top level ScalarValue cast

This change initially failed the select_arrow_cast test (see review comments for updates). The test fails because of an interaction of expression_simplifier + optimize_projections.

The query is: SELECT arrow_cast(1234, 'Float64') as f64, arrow_cast('foo', 'LargeUtf8') as large

  1. expression_simplifier correctly tries to evaluate_to_scalar the arrow_cast('foo', 'LargeUtf8') cast (here)
  2. this leads to the cast physical expression (here), but this is a no-op since it transforms Utf8('foo') (scalar) → LargeUtf8('foo') (array) → Utf8('foo') (scalar)
    • TODO: if this is what we want this should probably not execute at all
  3. This edit doesn't change the underlying schema of the LogicalPlan
  4. optimize_projections rewrites the Projection and updates the schema, seeing the Utf8('foo') it correctly assumes that the LogicalPlan's schema field for this value should have DataType == Utf8

This check is the one raising this error but I guess it should instead check if schema fields are logically equivalent to eachother. I'm not totally convinced this is the correct solution because it removes some guarantees that might be expected by users downstream. Happy to hear everyone's opinion on this.

---- optimizer::select_arrow_cast stdout ----
thread 'optimizer::select_arrow_cast' panicked at datafusion/core/tests/optimizer/mod.rs:118:30:
called `Result::unwrap()` on an `Err` value: Context("Optimizer rule 'optimize_projections' failed", Context("optimize_projections", Internal("Failed due to a difference in schemas, original schema: DFSchema { inner: Schema { fields: [Field { name: \"f64\", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"large\", data_type: LargeUtf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, field_qualifiers: [None, None], functional_dependencies: FunctionalDependencies { deps: [] } }, new schema: DFSchema { inner: Schema { fields: [Field { name: \"f64\", data_type: Float64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"large\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, field_qualifiers: [None, None], functional_dependencies: FunctionalDependencies { deps: [] } }")))
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

arrow_typeof

https://github.com/apache/datafusion/blob/main/datafusion/functions/src/core/arrowtypeof.rs#L59-L72 uses column_value.data_type() to determine the type of the argument but this information is not really accurate. If the ColumnValue is a ScalarValue the data_type() will be "logical". e.g. arrow_typeof(arrow_cast('hello', 'Utf8View')) would yield Utf8.

Type info

Let's take this expr as an example a_function_that_takes_utf8_view(arrow_cast('test', 'Utf8View')) the cast expression currently evaluates to a ColumnValue::Scalar(Utf8View("test")) and the function is happy to receive that. With this change the cast expression instead evaluates to ColumnValue::Scalar(Utf8("test")) (as ScalarValue::Utf8View doesn't exist it produces a logically equal value) and the cast expression data_type() returns Utf8View.

  • What would the function do?
  • How can we communicate to the parent expression in the expression tree that the column value needs to be of that particular type? (it should know because of the schema)

notfilippo avatar Aug 14 '24 10:08 notfilippo

statement ok
create table test_source as values
    ('Andrew', 'X'),
    ('Xiangpeng', 'Xiangpeng'),
    ('Raphael', 'R'),
    (NULL, 'R')
;

statement ok
create table t as
SELECT
  arrow_cast(column1, 'Utf8') as column1_utf8,
  arrow_cast(column2, 'Utf8') as column2_utf8,
  arrow_cast(column1, 'LargeUtf8') as column1_large_utf8,
  arrow_cast(column2, 'LargeUtf8') as column2_large_utf8,
  arrow_cast(column1, 'Utf8View') as column1_utf8view,
  arrow_cast(column2, 'Utf8View') as column2_utf8view,
  arrow_cast(column1, 'Dictionary(Int32, Utf8)') as column1_dict,
  arrow_cast(column2, 'Dictionary(Int32, Utf8)') as column2_dict
FROM test_source;

query error DataFusion error: Arrow error: Invalid argument error: column types must match schema types, expected Utf8View but found Utf8 at column index 0
select min(column1_utf8view) from t;

I run this test and fail. It success on main branch.

I guess we need schema for converting ScalarValue to ArrayRef

Given that we have ScalarValue::Utf8, and we have StringView in schema. We can then get the corresponding StrginViewArray.

jayzhan211 avatar Aug 15 '24 06:08 jayzhan211

Given that we have ScalarValue::Utf8, and we have StringView in schema. We can then get the corresponding StrginViewArray.

I feel like this is a little bit more complex because casting a scalar value to a logically equal type (arrow_cast('test', 'Dictionary(Int32, Utf8)')) is a no-op for all cases but one the cast is at the top level, which indicates that you actually want an array of that type. So we need a way to distinguish those two scenarios, which is why I've added this:

https://github.com/apache/datafusion/blob/78dc034d4b7d644e6c34745286e82a6332dd7606/datafusion/physical-plan/src/projection.rs#L312

But I'm not really sure about it...

notfilippo avatar Aug 16 '24 13:08 notfilippo

Since you have no commit yet, so CI is not running.

You could run these command to pass CI cargo test --test sqllogictests (slt files) ./dev/rust_lint.sh cargo test --lib --tests --bins --features avro,json (rust test)

jayzhan211 avatar Aug 16 '24 14:08 jayzhan211

Since you have no commit yet, so CI is not running.

You could run these command to pass CI cargo test --test sqllogictests (slt files) ./dev/rust_lint.sh cargo test --lib --tests --bins --features avro,json (rust test)

Thanks! Don't worry about turning it on as I can still get results from my fork.

notfilippo avatar Aug 16 '24 14:08 notfilippo

Starting to check this out

alamb avatar Aug 16 '24 16:08 alamb

I wonder if we could do some sort of hybrid approach where ScalarValue still retains a DataType, type, but it isn't encoded in the variant

like

enum ScalarValue {
  ...
  /// A logical string value, with the specified data type
  ScalarValue::Utf8(String,  DataType) ,
}

That way you would still have the DataType for each ScalarValue 🤔

alamb avatar Aug 16 '24 16:08 alamb

I wonder if we could do some sort of hybrid approach where ScalarValue still retains a DataType, type, but it isn't encoded in the variant

like

enum ScalarValue {
  ...
  /// A logical string value, with the specified data type
  ScalarValue::Utf8(String,  DataType) ,
}

That way you would still have the DataType for each ScalarValue 🤔

This should be the last resort if we don't have any other way to figure the actual target type to cast to. Otherwise, IMO this is just a code simplification (Still a nice step forward) but not the goal of decoupling types #11513 , since we still hide the physical type inside ScalarValue.

I think it is possible to get the DataType from Schema without it passed along the logical layer 🤔

jayzhan211 avatar Aug 17 '24 01:08 jayzhan211

Otherwise, IMO this is just a code simplification (Still a nice step forward) but not the goal of decoupling types https://github.com/apache/datafusion/issues/11513 ,

Yes I agree with this assesment

🤔 so that leaves us with the question of how to incrementally try and introduce logical types into logical plans 🤔

alamb avatar Aug 17 '24 11:08 alamb

🤔 so that leaves us with the question of how to incrementally try and introduce logical types into logical plans 🤔

As @jayzhan211 says, the type information is still retained in the Schema, but it would be impractical to modify all expression orders to support the fact that the type is not stored in the ColumnarValue.

My discussion around Datum revolves around the fact that to retain physical type information maybe ColumnarValue enum should instead be:

enum ColumnarValue {
	Array(Arc<dyn Array>),
	Scalar(Arc<dyn Array>) // this is an array of len = 1
}

(edit: this possibility was already discussed https://github.com/apache/datafusion/issues/7353)

or that ScalarValue is kept as is and a LogicalScalarValue is introduced.

I think that the main problem is that scalar values in this change are purely logical. Instead, if in the future we introduce logical types for arrays, we would have a physical type that is distinct from its logical representation and that we can use during physical execution (e.g. that can be cast to another physical type without losing its type information).

notfilippo avatar Aug 17 '24 11:08 notfilippo

Yeah, I think Datum is actually what we need. After casting the array, we don't need to convert it back to ScalarValue, it has additional conversion cost and the physical type information is lost too. Ideally we could replace ColumnarValue with Datum, but it seems the change is quite large.

We could try replacing ScalarValue with Scalar.

enum ColumnarValue {
	Array(Arc<dyn Array>),
	Scalar(arrow_array::Scalar)
}

#7353 is talking about changing something like ScalarValue::Int64(i64) to ScalarValue::Int64(arrow_array::Scalar), I don't think we require the change for this PR

Upd: If there is performance concern, a conservative change is to keep the ScalarValue.It depends on how we rely on ColumnarValue::Scalar and whether they are performance critical. I'm not sure about it too 🤔

enum ColumnarValue {
	Array(Arc<dyn Array>),
        ScalarArray(arrow_array::Scalar)
	Scalar(ScalarValue)
}

jayzhan211 avatar Aug 17 '24 12:08 jayzhan211

I've pushed a fairly big experiment. I've tried to change ColumnarValue to

#[derive(Clone, Debug)]
pub enum ColumnarValue {
    /// Array of values
    Array(ArrayRef),
    /// A single value
    Scalar(ScalarValue),
    Scalar(Scalar),
}

#[derive(Clone, Debug)]
pub struct Scalar {
    value: ScalarValue,
    data_type: DataType,
}

which follows the approache that was discussed with @jayzhan211 in the comments above. I've opted for this hybrid solution to retain most of the flexibility of the original ColumnarValue and I'm mostly satisfied with how it turned out. Curious to hear your thoughts @jayzhan211 and @alamb

notfilippo avatar Aug 19 '24 21:08 notfilippo

It seems the trick you did is to get the first index of ArrayRef (instead of keeping it as arrow::Scalar) as ScalarValue but we still ends up require DataType to keep the type information.

However, I think we could move on with this approach, we could figure out if there is better approach later on

jayzhan211 avatar Aug 20 '24 02:08 jayzhan211

I'm happy to report that I've got most sqllogictests to run successfully (albeit there is still the issue pointed out by @alamb, which i plan to address after I've got all tests passing). The only errors I'm seeing are the following:

Aggregates using ScalarValues as state

Aggregates use ScalarValue to represent state and evaluate to a result. Should I look into restricting their return type to the subset of types which can be represented by ScalarValues?

External error: query failed: DataFusion error: Arrow error: Invalid argument error: column types must match schema types, expected LargeUtf8 but found Utf8 at column index 3
[SQL] SELECT
  min(utf8),
  max(utf8),
  count(utf8),
  min(largeutf8),
  max(largeutf8),
  count(largeutf8)
FROM t
at test_files/aggregate.slt:4104

Weird error I don't quite understand

Removing this query doesn't yield any other error in the slt file. I don't have any other clue and I'm not sure where to start 🤷 .

External error: query result mismatch:
[SQL] SELECT
  count,
  LAG(timestamp, 1) OVER (ORDER BY timestamp),
  arrow_typeof(LAG(timestamp, 1) OVER (ORDER BY timestamp))
FROM timestamp_with_tz
LIMIT 10;
[Diff] (-expected|+actual)
    0 NULL Timestamp(Millisecond, Some("UTC"))
    0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
    0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
-   4 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
    0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
-   0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
+   12 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
+   2 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
    0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
    14 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
-   0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
-   0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
+   5 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
+   1 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
at test_files/parquet.slt:255

notfilippo avatar Aug 20 '24 09:08 notfilippo

Aggregates using ScalarValues as state

The issue is that we have only ScalarValue but no DataType to get the actual ArrayRef back. One of the solution is to store DataType in MaxAccumulator and call to_array_of_type to return ArrayRef as the output of evaluate, maybe evaluate_as_array to avoid breaking existing function.

Weird error I don't quite understand

I fail to reproduce the error 😕

UIpd: It seems there is only one place that calls Accumulator::evaluate and transform it to ArrayRef, I think we can just change the return value to ArrayRef for Accumulator::evaluate

https://github.com/apache/datafusion/blob/37e54ee874e9027a329ba4f6c1e0e6359d63a33c/datafusion/physical-plan/src/aggregates/mod.rs#L1093

jayzhan211 avatar Aug 20 '24 13:08 jayzhan211

UIpd: It seems there is only one place that calls Accumulator::evaluate and transform it to ArrayRef, I think we can just change the return value to ArrayRef for Accumulator::evaluate

It seems like this solution is not that easy as the state needs to also be accounted for, but it's definitely a good start!

notfilippo avatar Aug 23 '24 09:08 notfilippo

Marking as "ready for review" because all tests pass on my end. Some casting issues still remain that I'll look into soon but in the meantime I'm looking forward to some feedback on this huge change ❤️

notfilippo avatar Aug 23 '24 09:08 notfilippo

Will evaluate_as_scalar replaces evaluate and state_as_scalars replaces state? If it is, then it looks good to me

jayzhan211 avatar Aug 24 '24 05:08 jayzhan211

Will evaluate_as_scalar replaces evaluate and state_as_scalars replaces state? If it is, then it looks good to me

Yes that's the plan!

notfilippo avatar Aug 27 '24 13:08 notfilippo

I'm back from vacation and I've rebased my PR to the latest upstream.

notfilippo avatar Sep 16 '24 17:09 notfilippo

To make it clear this PR is not waiting on more review (we are discussing on https://github.com/apache/datafusion/pull/12536) marking this as draft

alamb avatar Sep 21 '24 11:09 alamb

Closing in favour of #12622

notfilippo avatar Sep 30 '24 20:09 notfilippo