incubator-gluten
incubator-gluten copied to clipboard
[CH]Support Optimize and VACUUM command for clickhouse tables
Description
Support Optimize and VACUUM command for clickhouse tables.
Usage
the basic usage is:
optimize table_name
vacuum table_name RETAIN 0 HOURS
delta optimize/vacuum parameters, such as spark.databricks.delta.vacuum.parallelDelete.enabled, spark.databricks.delta.optimize.minFileSize also works for clickhouse table.
necessary spark configs:
- spark.databricks.delta.retentionDurationCheck.enabled=false
- spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension
checkout more details at GlutenClickHouseMergeTreeOptimizeSuite
Limitation
- when VACUUM, have to set spark.gluten.enabled=false
- VACUUM should run twice to clean all empty folders
a full playbook to illustrate usage:
- start spark session
export LD_PRELOAD=/usr/local/clickhouse/lib/libch.so && bin/spark-sql \
--master 'local[30]' \
--driver-memory 60g \
--conf spark.driver.memoryOverhead=60G \
--conf spark.executor.memoryOverhead=30g \
--conf spark.serializer=org.apache.spark.serializer.JavaSerializer \
--conf spark.memory.fraction=0.6 \
--conf spark.memory.storageFraction=0.3 \
--conf spark.sql.sources.ignoreDataLocality=true \
--conf spark.default.parallelism=45 \
--conf spark.sql.shuffle.partitions=90 \
--conf spark.sql.files.minPartitionNum=1 \
--conf spark.sql.files.maxPartitionBytes=1G \
--conf spark.sql.files.openCostInBytes=1073741824 \
--conf spark.sql.adaptive.enabled=true \
--conf spark.sql.adaptive.coalescePartitions.enabled=false \
--conf spark.sql.parquet.filterPushdown=true \
--conf spark.sql.parquet.enableVectorizedReader=true \
--conf spark.sql.columnVector.offheap.enabled=true \
--conf spark.memory.offHeap.enabled=true \
--conf spark.memory.offHeap.size=60G \
--conf spark.locality.wait=0 \
--conf spark.locality.wait.node=0 \
--conf spark.locality.wait.process=0 \
--conf spark.sql.autoBroadcastJoinThreshold=100MB \
--conf spark.sql.adaptive.autoBroadcastJoinThreshold=-1 \
--conf spark.eventLog.enabled=true \
--conf spark.eventLog.compress=true \
--conf spark.eventLog.compression.codec=snappy \
--conf spark.plugins=io.glutenproject.GlutenPlugin \
--conf spark.gluten.sql.columnar.columnartorow=true \
--conf spark.gluten.sql.columnar.loadnative=true \
--conf spark.gluten.sql.columnar.iterator=true \
--conf spark.gluten.sql.columnar.loadarrow=false \
--conf spark.gluten.sql.columnar.backend.lib=ch \
--conf spark.gluten.sql.columnar.libpath=/usr/local/clickhouse/lib/libch.so \
--conf spark.gluten.sql.columnar.hashagg.enablefinal=true \
--conf spark.gluten.sql.enable.native.validation=false \
--conf spark.gluten.sql.columnar.backend.ch.use.v2=false \
--conf spark.gluten.sql.columnar.separate.scan.rdd.for.ch=false \
--conf spark.gluten.sql.columnar.forceshuffledhashjoin=true \
--conf spark.gluten.sql.columnar.backend.ch.worker.id=1 \
--conf spark.gluten.sql.columnar.coalesce.batches=false \
--conf spark.gluten.sql.columnar.sort=true \
--conf spark.gluten.sql.columnar.backend.ch.runtime_conf.logger.level=error \
--conf spark.shuffle.manager=org.apache.spark.shuffle.sort.ColumnarShuffleManager \
--conf spark.io.compression.codec=LZ4 \
--conf spark.gluten.sql.columnar.shuffle.customizedCompression.codec=LZ4 \
--conf spark.gluten.sql.columnar.backend.ch.customized.shuffle.codec.enable=true \
--conf spark.gluten.sql.columnar.backend.ch.customized.buffer.size=4096 \
--conf spark.gluten.sql.columnar.backend.ch.runtime_conf.enable_nullable=true \
--conf spark.gluten.sql.columnar.backend.ch.runtime_conf.local_engine.settings.log_processors_profiles=false \
--conf spark.gluten.sql.columnar.backend.ch.runtime_conf.local_engine.settings.metrics_perf_events_enabled=false \
--conf spark.gluten.sql.native.parquet.writer.enabled=false \
--conf spark.sql.hive.convertMetastoreParquet=false \
--conf spark.local.dir=/data0/spark-local \
--conf spark.gluten.enabled=true \
--conf spark.executorEnv.LD_PRELOAD=/usr/local/clickhouse/lib/libch.so \
--conf spark.gluten.sql.columnar.backend.ch.shuffle.hash.algorithm=sparkMurmurHash3_32 \
--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseSparkCatalog \
--conf spark.databricks.delta.maxSnapshotLineageLength=20 \
--conf spark.databricks.delta.snapshotPartitions=1 \
--conf spark.databricks.delta.properties.defaults.checkpointInterval=5 \
--conf spark.databricks.delta.stalenessLimit=3600000 \
--conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension \
--conf spark.databricks.delta.retentionDurationCheck.enabled=false \
--conf spark.driver.extraClassPath=/data0/hongbin/code/gluten2/backends-clickhouse/target/gluten-1.2.0-SNAPSHOT-spark-3.3-jar-with-dependencies.jar
- import parquet table into a clickhouse table
create table lineitem_ch using clickhouse location '/tmp/lineitem_ch' as select * from lineitem
- optimize the clickhouse table (by default will merge to 1G parts)
optimize lineitem_ch
- vacuum the table
set spark.gluten.enabled=false;
vacuum lineitem_ch retain 0 hours;
vacuum lineitem_ch retain 0 hours;
set spark.gluten.enabled=true;
when testing large table, it's also recommended to set :
--conf spark.databricks.delta.vacuum.parallelDelete.parallelism=24
--conf spark.databricks.delta.vacuum.parallelDelete.enabled=true
to speed up deleting. by default the delete parallelism is 1