datafusion
datafusion copied to clipboard
Can't serialize example `ExecutionPlan` to substrait
Is your feature request related to a problem or challenge?
While working on an example for serializing substrait plans (see https://github.com/apache/arrow-datafusion/pull/9260 PR), I found I could not write an example for serializing an execution plan. The feature is still not complete enough
Describe the solution you'd like
What I would like:
I would like to add the example (or something like it) to datafusion/substrait/src/lib.rs
and have
it work.
//! # Example: Serializing [`ExecutionPlan`]s //! //! This functionality is still under development and only works for a small subset of plans //! //! ``` //! # use datafusion::prelude::*; //! # use std::collections::HashMap; //! # use std::sync::Arc; //! # use datafusion::error::Result; //! # use datafusion::arrow::array::Int32Array; //! # use datafusion::arrow::record_batch::RecordBatch; //! # use datafusion_substrait::physical_plan; //! # #[tokio::main(flavor = "current_thread")] //! # async fn main() -> Result{ //! // Create a plan that scans table 't' //! let ctx = SessionContext::new(); //! let batch = RecordBatch::try_from_iter(vec![("x", Arc::new(Int32Array::from(vec![42])) as _)])?; //! ctx.register_batch("t", batch)?; //! let df = ctx.sql("SELECT x from t").await?; //! let physical_plan = df.create_physical_plan().await?; //! //! // Convert the plan into a substrait (protobuf) Rel //! let mut extension_info= (vec![], HashMap::new()); //! let substrait_plan = physical_plan::producer::to_substrait_rel(physical_plan.as_ref(), &mut extension_info)?; //! //! // Decode bytes from somewhere (over network, etc.) back to ExecutionPlan //! let physical_round_trip = physical_plan::consumer::from_substrait_rel( //! &ctx, &substrait_plan, &HashMap::new() //! ).await?; //! assert_eq!(format!("{:?}", physical_plan), format!("{:?}", physical_round_trip)); //! # Ok(()) //! # }
When you run this test today you get an error about "mem provider not implemented" or something like that
Describe alternatives you've considered
No response
Additional context
I think making this work would be a matter of implementing serialization of MemTable / MemExec perhaps
I am interested in taking a look at this. I was able to reproduce the error using the example above:
Error: Substrait("Unsupported plan in Substrait physical plan producer: MemoryExec: partitions=1, partition_sizes=[1]\n")
I looked into the example and the physical plan substrait producer/consumer code. Unfortunately for physical plans, the subtrait consumer and producer are only implemented for ParquetExec
and even then they are not fully implemented, so I do not believe any practical example will execute without further development.
Here is an example which makes it further than the above but panics on the roundtrip assertion:
use datafusion::prelude::*;
use std::collections::HashMap;
use datafusion::error::Result;
use datafusion_substrait::physical_plan;
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()>{
// Create a plan that scans table 't'
let ctx = SessionContext::new();
let testdata = datafusion::test_util::parquet_test_data();
ctx.register_parquet(
"alltypes_plain",
&format!("{testdata}/alltypes_plain.parquet"),
ParquetReadOptions::default(),
)
.await?;
let df = ctx
.sql(
"SELECT * from alltypes_plain",
)
.await?;
let physical_plan = df.create_physical_plan().await?;
// Convert the plan into a substrait (protobuf) Rel
let mut extension_info= (vec![], HashMap::new());
let substrait_plan = physical_plan::producer::to_substrait_rel(physical_plan.as_ref(), &mut extension_info)?;
// Decode bytes from somewhere (over network, etc.) back to ExecutionPlan
let physical_round_trip = physical_plan::consumer::from_substrait_rel(
&ctx, &substrait_plan, &HashMap::new()
).await?;
assert_eq!(format!("{:?}", physical_plan), format!("{:?}", physical_round_trip));
Ok(())
}
And here is the panic output:
thread 'main' panicked at datafusion/substrait/src/lib.rs:37:2:
assertion `left == right` failed
left: "ParquetExec { pushdown_filters: None, reorder_filters: None, enable_page_index: None, enable_bloom_filter: None, base_config: object_store_url=ObjectStoreUrl { url: Url { scheme: \"file\", cannot_be_a_base: false, username: \"\", password: None, host: None, port: None, path: \"/\", query: None, fragment: None } }, statistics=Statistics { num_rows: Exact(8), total_byte_size: Exact(671), column_statistics: [ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }] }, file_groups={1 group: [[home/dev/arrow-datafusion/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], projected_statistics: Statistics { num_rows: Exact(8), total_byte_size: Absent, column_statistics: [ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }] }, projected_schema: Schema { fields: [Field { name: \"id\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"bool_col\", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"tinyint_col\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"smallint_col\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"int_col\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"bigint_col\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"float_col\", data_type: Float32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"double_col\", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"date_string_col\", data_type: Binary, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"string_col\", data_type: Binary, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"timestamp_col\", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, projected_output_ordering: [], metrics: ExecutionPlanMetricsSet { inner: Mutex { data: MetricsSet { metrics: [Metric { value: Count { name: \"num_predicate_creation_errors\", count: Count { value: 0 } }, labels: [], partition: None }] } } }, predicate: None, pruning_predicate: None, page_pruning_predicate: None, metadata_size_hint: None, parquet_file_reader_factory: None }"
right: "ParquetExec { pushdown_filters: None, reorder_filters: None, enable_page_index: None, enable_bloom_filter: None, base_config: object_store_url=ObjectStoreUrl { url: Url { scheme: \"file\", cannot_be_a_base: false, username: \"\", password: None, host: None, port: None, path: \"/\", query: None, fragment: None } }, statistics=Statistics { num_rows: Absent, total_byte_size: Absent, column_statistics: [] }, file_groups={1 group: [[home/dev/arrow-datafusion/parquet-testing/data/alltypes_plain.parquet]]}, projected_statistics: Statistics { num_rows: Absent, total_byte_size: Absent, column_statistics: [] }, projected_schema: Schema { fields: [], metadata: {} }, projected_output_ordering: [], metrics: ExecutionPlanMetricsSet { inner: Mutex { data: MetricsSet { metrics: [Metric { value: Count { name: \"num_predicate_creation_errors\", count: Count { value: 0 } }, labels: [], partition: None }] } } }, predicate: None, pruning_predicate: None, page_pruning_predicate: None, metadata_size_hint: None, parquet_file_reader_factory: None }"
stack backtrace:
...
You can see that the round trip lost many details about the ParquetExec
such as projected_schema and projected_statistics.
I think if we want to include a user facing example of a physical plan substrait roundtrip, we will need to cut a ticket to complete the implementation of ParquetExec
to substrait first.
It looks like #5176 built the initial framework for serializing physical plans, but it hasn't been picked up since then.
I think if we want to include a user facing example of a physical plan substrait roundtrip, we will need to cut a ticket to complete the implementation of
ParquetExec
to substrait first.
I filed https://github.com/apache/arrow-datafusion/issues/9347 to track this
Thank you for looking into this @devinjdangelo
I also made https://github.com/apache/arrow-datafusion/issues/5173 an epic and added this ticket and https://github.com/apache/arrow-datafusion/issues/9347 to it
I added to the substrait support epic: https://github.com/apache/datafusion/issues/5173