datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

Implement physical plan serialization for COPY plans `CsvLogicalExtensionCodec`

Open alamb opened this issue 1 year ago • 1 comments

Is your feature request related to a problem or challenge?

As part of https://github.com/apache/datafusion/pull/11060, @devinjdangelo made file format support into a Trait which is good!

However the code to serialize these new (dynamic) structures is not yet implemented

As @devinjdangelo says https://github.com/apache/datafusion/pull/11060/files#r1650268578

Users depending on the ability to serialize COPY plans (e.g. ballista) will need this TODO to be completed before upgrading to any version of datafusion including this change.

It turns out there are no unit tests for them either so no tests failed

Describe the solution you'd like

Implement the named codec for serializing plans and a test for it

Describe alternatives you've considered

The code is here: datafusion/proto/src/logical_plan/file_formats.rs The test would go here: https://github.com/apache/datafusion/blob/main/datafusion/proto/tests/cases/roundtrip_physical_plan.rs

Note there is already coverage for LogicalPlans here: https://github.com/apache/datafusion/blob/d2ff2189dfb8b4624ae2c08846cd713871b37d8c/datafusion/proto/tests/cases/roundtrip_logical_plan.rs#L325-L346

Additional context

There are several other codecs needed:

  • JsonLogicalExtensionCodec
  • ParquetLogicalExtensionCodec
  • ArrowLogicalExtensionCodec
  • AvroLogicalExtensionCodec

However, I think we need to get one example done and then we can file tickets to fill out the others

Maybe this is what @lewiszlw was getting at with https://github.com/apache/datafusion/pull/11095 / https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/composed_extension_codec.rs

alamb avatar Jun 27 '24 22:06 alamb

take

Lordworms avatar Jun 28 '24 03:06 Lordworms

starts to work on this one, was delayed by the substrait one.

Lordworms avatar Jul 16 '24 23:07 Lordworms

When I was doing this one, I added a test like

#[tokio::test]
async fn roundtrip_physical_plan_copy_to_sql_options() -> Result<()> {
    // Create a new SessionContext
    let ctx = SessionContext::new();
    let file_type = format_as_file_type(Arc::new(CsvFormatFactory::new()));
    // Create a CSV scan as input
    let input = create_csv_scan(&ctx).await?;
    let plan = LogicalPlan::Copy(CopyTo {
        input: Arc::new(input),
        output_url: "test.csv".to_string(),
        partition_by: vec!["a".to_string(), "b".to_string(), "c".to_string()],
        file_type,
        options: Default::default(),
    });

    // Convert the logical plan to a physical plan
    let planner = DefaultPhysicalPlanner::default();
    let physical_plan = planner.create_physical_plan(&plan, &ctx.state()).await?;
    roundtrip_test(physical_plan)
}

I find that it seems like the roundtrip in physical plan test should be ok since the fileformatfactory is translated into Fileformat automaticly here

LogicalPlan::Copy(CopyTo {
                input,
                output_url,
                file_type,
                partition_by,
                options: source_option_tuples,
            }) => {
                let input_exec = children.one()?;
                let parsed_url = ListingTableUrl::parse(output_url)?;
                let object_store_url = parsed_url.object_store();

                let schema: Schema = (**input.schema()).clone().into();

                // Note: the DataType passed here is ignored for the purposes of writing and inferred instead
                // from the schema of the RecordBatch being written. This allows COPY statements to specify only
                // the column name rather than column name + explicit data type.
                let table_partition_cols = partition_by
                    .iter()
                    .map(|s| (s.to_string(), arrow_schema::DataType::Null))
                    .collect::<Vec<_>>();

                let keep_partition_by_columns = match source_option_tuples
                    .get("execution.keep_partition_by_columns")
                    .map(|v| v.trim()) {
                    None => session_state.config().options().execution.keep_partition_by_columns,
                    Some("true") => true,
                    Some("false") => false,
                    Some(value) =>
                        return Err(DataFusionError::Configuration(format!("provided value for 'execution.keep_partition_by_columns' was not recognized: \"{}\"", value))),
                };

                // Set file sink related options
                let config = FileSinkConfig {
                    object_store_url,
                    table_paths: vec![parsed_url],
                    file_groups: vec![],
                    output_schema: Arc::new(schema),
                    table_partition_cols,
                    overwrite: false,
                    keep_partition_by_columns,
                };

                let sink_format = file_type_to_format(file_type)?
                    .create(session_state, source_option_tuples)?;

                sink_format
                    .create_writer_physical_plan(input_exec, session_state, config, None)
                    .await?
            }

So it causes my confusion here how to actually add a test here, appreciate your guidence @alamb

Lordworms avatar Jul 20 '24 04:07 Lordworms

So it causes my confusion here how to actually add a test here, appreciate your guidence @alamb

Do you mean that the test you wrote above passes without any code changes? Maybe @devinjdangelo has some idea

alamb avatar Jul 20 '24 09:07 alamb

The format factories can optionally carry modified default write options. This is only used for DataFrame::write_* methods, so that they do not need to pass strings to modify write behavior.

https://github.com/apache/datafusion/blob/827d0e3a29c0ea34bafbf03f5102407bd8e9b826/datafusion/core/src/datasource/file_format/csv.rs#L60-L62

The existing SerDe implementation does not handle the case when the format factory carries options itself. This will prevent systems like Ballista from correctly passing around options in its DataFrame::write_* methods. Here are the lines that are not fully implemented:

https://github.com/apache/datafusion/blob/827d0e3a29c0ea34bafbf03f5102407bd8e9b826/datafusion/proto/src/logical_plan/file_formats.rs#L74-L89

You can see that the encoding method does not actually write any bytes, and the decoding method just initializes a default struct, ignoring any written bytes.

devinjdangelo avatar Jul 20 '24 12:07 devinjdangelo

So it causes my confusion here how to actually add a test here, appreciate your guidence @alamb

Do you mean that the test you wrote above passes without any code changes? Maybe @devinjdangelo has some idea

yes, that's why I am confused since it is transformed internally

Lordworms avatar Jul 20 '24 16:07 Lordworms

The format factories can optionally carry modified default write options. This is only used for DataFrame::write_* methods, so that they do not need to pass strings to modify write behavior.

https://github.com/apache/datafusion/blob/827d0e3a29c0ea34bafbf03f5102407bd8e9b826/datafusion/core/src/datasource/file_format/csv.rs#L60-L62

The existing SerDe implementation does not handle the case when the format factory carries options itself. This will prevent systems like Ballista from correctly passing around options in its DataFrame::write_* methods. Here are the lines that are not fully implemented:

https://github.com/apache/datafusion/blob/827d0e3a29c0ea34bafbf03f5102407bd8e9b826/datafusion/proto/src/logical_plan/file_formats.rs#L74-L89

You can see that the encoding method does not actually write any bytes, and the decoding method just initializes a default struct, ignoring any written bytes.

let me take a look and

The format factories can optionally carry modified default write options. This is only used for DataFrame::write_* methods, so that they do not need to pass strings to modify write behavior.

https://github.com/apache/datafusion/blob/827d0e3a29c0ea34bafbf03f5102407bd8e9b826/datafusion/core/src/datasource/file_format/csv.rs#L60-L62

The existing SerDe implementation does not handle the case when the format factory carries options itself. This will prevent systems like Ballista from correctly passing around options in its DataFrame::write_* methods. Here are the lines that are not fully implemented:

https://github.com/apache/datafusion/blob/827d0e3a29c0ea34bafbf03f5102407bd8e9b826/datafusion/proto/src/logical_plan/file_formats.rs#L74-L89

You can see that the encoding method does not actually write any bytes, and the decoding method just initializes a default struct, ignoring any written bytes.

sure, I'll take a look then

Lordworms avatar Jul 20 '24 16:07 Lordworms