Add support for native Parquet writes
What is the problem the feature request solves?
With ETL queries that read from Parquet, perform a transformation, and then write back out to Parquet, we currently have to perform a columnar-to-row conversion before the write.
If we add support for DataFusion's Parquet writer then we can avoid the columnar-to-row conversion.
Describe the potential solution
High-Level Plan: Native Parquet Write Support (generated by Claude)
Architecture Overview
Follow the established pattern from ShuffleWriterExec and CometScanExec:
- Native Rust operator for actual Parquet writing using DataFusion/Arrow
- JVM Scala operator that integrates with Spark's write path
- Protobuf-based communication for configuration
- Arrow FFI for efficient data transfer from JVM to native
Phase 1: Core Native Writer Infrastructure
1.1 Native Operator Implementation (native/core/src/execution/operators/parquet_writer.rs)
- Create ParquetWriterExec struct implementing DataFusion's ExecutionPlan
- Use Arrow Rust's parquet::arrow::ArrowWriter for actual writing
- Support configuration:
- Output path and file naming strategy
- Compression codec (Snappy, Zstd, Lz4, Gzip)
- Row group size and page size
- Partition columns
- Write options (encoding, statistics, bloom filters)
1.2 File Management
- Implement Spark-compatible file naming: part-{taskId}-{uuid}.{codec}.parquet
- Support writing to temp locations with atomic rename
- Leverage existing object_store integration for S3/GCS/HDFS/local FS
- Handle directory creation and cleanup
1.3 Metrics Collection
- Track: bytes written, rows written, files created, write time
- Integrate with existing ExecutionPlanMetricsSet pattern
- Expose metrics back to JVM for Spark UI
Phase 2: Partitioning & Bucketing
2.1 Dynamic Partitioning
- Group incoming batches by partition column values
- Maintain separate file writers for each partition
- Handle memory pressure with configurable limits on concurrent writers
- Implement spilling strategy for high-cardinality partitions
2.2 Static Partitioning
- Support pre-defined partition values passed from JVM
- Single partition per task execution
2.3 Bucketing Support (Future)
- Hash-based bucketing compatible with Spark's bucketing
- Bucket ID calculation and file assignment
Phase 3: JNI Interface & Protocol
3.1 Protobuf Definition (native/proto/src/proto/operator.proto)
message ParquetWriter {
string output_path = 1;
repeated string partition_columns = 2;
CompressionCodec compression = 3;
ParquetWriteOptions options = 4;
map<string, string> hadoop_conf = 5;
}
message ParquetWriteOptions {
int64 row_group_size = 1;
int64 data_page_size = 2;
bool enable_statistics = 3;
bool enable_bloom_filter = 4;
}
3.2 JNI Methods (native/core/src/jvm_bridge/jni_api.rs)
- Extend createPlan to handle ParquetWriter operator
- Add getWriteStats method to retrieve file statistics after write
- Integrate with existing execution context pattern
Phase 4: Spark Integration (JVM Side)
4.1 Physical Operator (CometWriteFilesExec.scala)
- Extend Spark's UnaryExecNode or integrate with WriteFilesExec
- Replace Spark's native write path when Comet is enabled
- Configuration:
- Check if native write is enabled (spark.comet.nativeWrite.enabled)
- Validate supported features (compression, types, etc.)
- Fall back to Spark's writer if unsupported
4.2 Write Strategy
class CometWriteFilesExec(
child: SparkPlan,
fileFormat: FileFormat,
partitionColumns: Seq[Attribute],
options: Map[String, String]
) extends UnaryExecNode {
override def doExecute(): RDD[InternalRow] = {
// Transform child plan to native
// Create ParquetWriter operator
// Execute via JNI
// Return write statistics
}
}
4.3 Plan Transformation
- Add rule in CometSparkSessionExtensions
- Transform WriteFilesExec → CometWriteFilesExec when applicable
- Validate compatibility (data types, features, formats)
Phase 5: Task Coordination & Commit Protocol
5.1 Task Execution
- Each task writes data files to task-specific temp location
- Collect file metadata: path, size, row count, partition values
- Return WriteTaskResult to driver
5.2 Job Commit
- Driver coordinates commit across all tasks
- Move files from temp to final location
- Write _SUCCESS marker
- Integrate with Spark's FileCommitProtocol
- Handle failures and cleanup
5.3 Speculative Execution
- Ensure only one copy of speculatively executed tasks commits
- Use task attempt IDs in temp file paths
Phase 6: Spark Compatibility Features
6.1 Metadata & Schema
- Write Spark-compatible Parquet metadata
- Include partition column information in file metadata
- Support schema evolution settings
6.2 Type Compatibility
- Handle Spark-specific types (DecimalType, TimestampType, DateType)
- Timezone handling for timestamps
- Legacy format support if needed
6.3 Write Options Parity
- Support common options: compression, parquet.block.size, etc.
- Map Spark configs to native Parquet writer settings
- Document unsupported options with fallback behavior
Phase 7: Testing & Validation
7.1 Unit Tests
- Rust tests for ParquetWriterExec
- Test partitioning logic, compression, encoding
- Memory management and resource cleanup
7.2 Integration Tests
- Write-then-read roundtrip tests
- Multi-partition writes
- Various compression codecs
- Large datasets with memory pressure
7.3 Compatibility Tests
- Files written by Comet must be readable by Spark's native reader
- Files written by Spark must be readable by Comet's native reader
- Test with external tools (parquet-tools, DuckDB, etc.)
7.4 Performance Benchmarks
- Compare against Spark's native Parquet writer
- Test with TPC-H/TPC-DS write queries
- Measure throughput, memory usage, CPU utilization
Implementation Sequence
Milestone 1: Basic Write (MVP)
- Single-file, non-partitioned writes
- Basic compression support (Snappy)
- Integration with Spark's write path
- Roundtrip test (write → read verification)
Milestone 2: Partitioning
- Dynamic partitioning support
- Memory-efficient partition handling
- Task coordination and commit protocol
Milestone 3: Feature Parity
- All compression codecs
- Write options (row group size, statistics, etc.)
- Bucketing support
- Full type compatibility
Milestone 4: Production Ready
- Performance optimization
- Comprehensive testing
- Documentation
- Configuration and tuning guide
Key Design Decisions
- Follow ShuffleWriter Pattern: Proven architecture for native write operations
- Object Store Abstraction: Reuse existing infrastructure for cloud storage
- Memory Management: Integrate with Spark's task memory manager, implement spilling
- Incremental Development: Start with basic writes, add features iteratively
- Compatibility First: Ensure files are compatible with Spark and standard tools
Potential Challenges
- Task Commit Coordination: Complex protocol with failure recovery
- High-Cardinality Partitions: Memory management for many open file writers
- Speculative Execution: Handling duplicate writes correctly
- Spark Version Compatibility: Different APIs across Spark 3.4, 3.5, 4.0
- Type System Differences: Mapping Spark types to Arrow/Parquet types correctly
Expected Benefits
- 2-5x performance improvement over Spark's Java-based writer
- Better memory efficiency through Rust's zero-cost abstractions
- End-to-end native execution for queries with both scan and write
- Reduced JVM GC pressure by doing heavy lifting in native code
Additional context
No response
Hello @andygrove , this sounds like an interesting problem I'd be keen to work on. I think I might need some guidance to navigate it effectively. Would you be open to chatting on Slack or Discord while I'm working on it?
Hello @andygrove , this sounds like an interesting problem I'd be keen to work on. I think I might need some guidance to navigate it effectively. Would you be open to chatting on Slack or Discord while I'm working on it?
@dharanad I'm happy to chat, but I'm not sure that I know enough about implementing this without some research myself.
I updated the issue with Claude's suggestions for implementing this feature.