datafusion-comet icon indicating copy to clipboard operation
datafusion-comet copied to clipboard

Add support for native Parquet writes

Open andygrove opened this issue 8 months ago • 3 comments

What is the problem the feature request solves?

With ETL queries that read from Parquet, perform a transformation, and then write back out to Parquet, we currently have to perform a columnar-to-row conversion before the write.

If we add support for DataFusion's Parquet writer then we can avoid the columnar-to-row conversion.

Describe the potential solution

High-Level Plan: Native Parquet Write Support (generated by Claude)

Architecture Overview

Follow the established pattern from ShuffleWriterExec and CometScanExec:

  • Native Rust operator for actual Parquet writing using DataFusion/Arrow
  • JVM Scala operator that integrates with Spark's write path
  • Protobuf-based communication for configuration
  • Arrow FFI for efficient data transfer from JVM to native

Phase 1: Core Native Writer Infrastructure

1.1 Native Operator Implementation (native/core/src/execution/operators/parquet_writer.rs)

  • Create ParquetWriterExec struct implementing DataFusion's ExecutionPlan
  • Use Arrow Rust's parquet::arrow::ArrowWriter for actual writing
  • Support configuration:
  • Output path and file naming strategy
  • Compression codec (Snappy, Zstd, Lz4, Gzip)
  • Row group size and page size
  • Partition columns
  • Write options (encoding, statistics, bloom filters)

1.2 File Management

  • Implement Spark-compatible file naming: part-{taskId}-{uuid}.{codec}.parquet
  • Support writing to temp locations with atomic rename
  • Leverage existing object_store integration for S3/GCS/HDFS/local FS
  • Handle directory creation and cleanup

1.3 Metrics Collection

  • Track: bytes written, rows written, files created, write time
  • Integrate with existing ExecutionPlanMetricsSet pattern
  • Expose metrics back to JVM for Spark UI

Phase 2: Partitioning & Bucketing

2.1 Dynamic Partitioning

  • Group incoming batches by partition column values
  • Maintain separate file writers for each partition
  • Handle memory pressure with configurable limits on concurrent writers
  • Implement spilling strategy for high-cardinality partitions

2.2 Static Partitioning

  • Support pre-defined partition values passed from JVM
  • Single partition per task execution

2.3 Bucketing Support (Future)

  • Hash-based bucketing compatible with Spark's bucketing
  • Bucket ID calculation and file assignment

Phase 3: JNI Interface & Protocol

3.1 Protobuf Definition (native/proto/src/proto/operator.proto)

message ParquetWriter {
  string output_path = 1;
  repeated string partition_columns = 2;
  CompressionCodec compression = 3;
  ParquetWriteOptions options = 4;
  map<string, string> hadoop_conf = 5;
}

message ParquetWriteOptions {
  int64 row_group_size = 1;
  int64 data_page_size = 2;
  bool enable_statistics = 3;
  bool enable_bloom_filter = 4;
}

3.2 JNI Methods (native/core/src/jvm_bridge/jni_api.rs)

  • Extend createPlan to handle ParquetWriter operator
  • Add getWriteStats method to retrieve file statistics after write
  • Integrate with existing execution context pattern

Phase 4: Spark Integration (JVM Side)

4.1 Physical Operator (CometWriteFilesExec.scala)

  • Extend Spark's UnaryExecNode or integrate with WriteFilesExec
  • Replace Spark's native write path when Comet is enabled
  • Configuration:
  • Check if native write is enabled (spark.comet.nativeWrite.enabled)
  • Validate supported features (compression, types, etc.)
  • Fall back to Spark's writer if unsupported

4.2 Write Strategy

class CometWriteFilesExec(
  child: SparkPlan,
  fileFormat: FileFormat,
  partitionColumns: Seq[Attribute],
  options: Map[String, String]
  ) extends UnaryExecNode {
  
  override def doExecute(): RDD[InternalRow] = {
    // Transform child plan to native
    // Create ParquetWriter operator
    // Execute via JNI
    // Return write statistics
  }
}

4.3 Plan Transformation

  • Add rule in CometSparkSessionExtensions
  • Transform WriteFilesExec → CometWriteFilesExec when applicable
  • Validate compatibility (data types, features, formats)

Phase 5: Task Coordination & Commit Protocol

5.1 Task Execution

  • Each task writes data files to task-specific temp location
  • Collect file metadata: path, size, row count, partition values
  • Return WriteTaskResult to driver

5.2 Job Commit

  • Driver coordinates commit across all tasks
  • Move files from temp to final location
  • Write _SUCCESS marker
  • Integrate with Spark's FileCommitProtocol
  • Handle failures and cleanup

5.3 Speculative Execution

  • Ensure only one copy of speculatively executed tasks commits
  • Use task attempt IDs in temp file paths

Phase 6: Spark Compatibility Features

6.1 Metadata & Schema

  • Write Spark-compatible Parquet metadata
  • Include partition column information in file metadata
  • Support schema evolution settings

6.2 Type Compatibility

  • Handle Spark-specific types (DecimalType, TimestampType, DateType)
  • Timezone handling for timestamps
  • Legacy format support if needed

6.3 Write Options Parity

  • Support common options: compression, parquet.block.size, etc.
  • Map Spark configs to native Parquet writer settings
  • Document unsupported options with fallback behavior

Phase 7: Testing & Validation

7.1 Unit Tests

  • Rust tests for ParquetWriterExec
  • Test partitioning logic, compression, encoding
  • Memory management and resource cleanup

7.2 Integration Tests

  • Write-then-read roundtrip tests
  • Multi-partition writes
  • Various compression codecs
  • Large datasets with memory pressure

7.3 Compatibility Tests

  • Files written by Comet must be readable by Spark's native reader
  • Files written by Spark must be readable by Comet's native reader
  • Test with external tools (parquet-tools, DuckDB, etc.)

7.4 Performance Benchmarks

  • Compare against Spark's native Parquet writer
  • Test with TPC-H/TPC-DS write queries
  • Measure throughput, memory usage, CPU utilization

Implementation Sequence

Milestone 1: Basic Write (MVP)

  • Single-file, non-partitioned writes
  • Basic compression support (Snappy)
  • Integration with Spark's write path
  • Roundtrip test (write → read verification)

Milestone 2: Partitioning

  • Dynamic partitioning support
  • Memory-efficient partition handling
  • Task coordination and commit protocol

Milestone 3: Feature Parity

  • All compression codecs
  • Write options (row group size, statistics, etc.)
  • Bucketing support
  • Full type compatibility

Milestone 4: Production Ready

  • Performance optimization
  • Comprehensive testing
  • Documentation
  • Configuration and tuning guide

Key Design Decisions

  1. Follow ShuffleWriter Pattern: Proven architecture for native write operations
  2. Object Store Abstraction: Reuse existing infrastructure for cloud storage
  3. Memory Management: Integrate with Spark's task memory manager, implement spilling
  4. Incremental Development: Start with basic writes, add features iteratively
  5. Compatibility First: Ensure files are compatible with Spark and standard tools

Potential Challenges

  1. Task Commit Coordination: Complex protocol with failure recovery
  2. High-Cardinality Partitions: Memory management for many open file writers
  3. Speculative Execution: Handling duplicate writes correctly
  4. Spark Version Compatibility: Different APIs across Spark 3.4, 3.5, 4.0
  5. Type System Differences: Mapping Spark types to Arrow/Parquet types correctly

Expected Benefits

  • 2-5x performance improvement over Spark's Java-based writer
  • Better memory efficiency through Rust's zero-cost abstractions
  • End-to-end native execution for queries with both scan and write
  • Reduced JVM GC pressure by doing heavy lifting in native code

Additional context

No response

andygrove avatar Apr 08 '25 18:04 andygrove

Hello @andygrove , this sounds like an interesting problem I'd be keen to work on. I think I might need some guidance to navigate it effectively. Would you be open to chatting on Slack or Discord while I'm working on it?

dharanad avatar Jun 26 '25 18:06 dharanad

Hello @andygrove , this sounds like an interesting problem I'd be keen to work on. I think I might need some guidance to navigate it effectively. Would you be open to chatting on Slack or Discord while I'm working on it?

@dharanad I'm happy to chat, but I'm not sure that I know enough about implementing this without some research myself.

andygrove avatar Jul 03 '25 02:07 andygrove

I updated the issue with Claude's suggestions for implementing this feature.

andygrove avatar Nov 21 '25 16:11 andygrove