incubator-gluten icon indicating copy to clipboard operation
incubator-gluten copied to clipboard

[CH]Support Optimize and VACUUM command for clickhouse tables

Open binmahone opened this issue 10 months ago • 1 comments

Description

Support Optimize and VACUUM command for clickhouse tables.

Usage

the basic usage is:

optimize table_name
vacuum table_name RETAIN 0 HOURS

delta optimize/vacuum parameters, such as spark.databricks.delta.vacuum.parallelDelete.enabled, spark.databricks.delta.optimize.minFileSize also works for clickhouse table.

necessary spark configs:

  • spark.databricks.delta.retentionDurationCheck.enabled=false
  • spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension

checkout more details at GlutenClickHouseMergeTreeOptimizeSuite

Limitation

  1. when VACUUM, have to set spark.gluten.enabled=false
  2. VACUUM should run twice to clean all empty folders

binmahone avatar Mar 27 '24 08:03 binmahone

a full playbook to illustrate usage:

  • start spark session

export LD_PRELOAD=/usr/local/clickhouse/lib/libch.so && bin/spark-sql  \
  --master 'local[30]'  \
  --driver-memory 60g  \
  --conf spark.driver.memoryOverhead=60G  \
  --conf spark.executor.memoryOverhead=30g  \
  --conf spark.serializer=org.apache.spark.serializer.JavaSerializer  \
  --conf spark.memory.fraction=0.6  \
  --conf spark.memory.storageFraction=0.3  \
  --conf spark.sql.sources.ignoreDataLocality=true  \
  --conf spark.default.parallelism=45  \
  --conf spark.sql.shuffle.partitions=90  \
  --conf spark.sql.files.minPartitionNum=1  \
  --conf spark.sql.files.maxPartitionBytes=1G  \
  --conf spark.sql.files.openCostInBytes=1073741824  \
  --conf spark.sql.adaptive.enabled=true  \
  --conf spark.sql.adaptive.coalescePartitions.enabled=false  \
  --conf spark.sql.parquet.filterPushdown=true  \
  --conf spark.sql.parquet.enableVectorizedReader=true  \
  --conf spark.sql.columnVector.offheap.enabled=true  \
  --conf spark.memory.offHeap.enabled=true  \
  --conf spark.memory.offHeap.size=60G  \
  --conf spark.locality.wait=0  \
  --conf spark.locality.wait.node=0  \
  --conf spark.locality.wait.process=0  \
  --conf spark.sql.autoBroadcastJoinThreshold=100MB  \
  --conf spark.sql.adaptive.autoBroadcastJoinThreshold=-1  \
  --conf spark.eventLog.enabled=true  \
  --conf spark.eventLog.compress=true  \
  --conf spark.eventLog.compression.codec=snappy  \
  --conf spark.plugins=io.glutenproject.GlutenPlugin  \
  --conf spark.gluten.sql.columnar.columnartorow=true  \
  --conf spark.gluten.sql.columnar.loadnative=true  \
  --conf spark.gluten.sql.columnar.iterator=true  \
  --conf spark.gluten.sql.columnar.loadarrow=false  \
  --conf spark.gluten.sql.columnar.backend.lib=ch  \
  --conf spark.gluten.sql.columnar.libpath=/usr/local/clickhouse/lib/libch.so  \
  --conf spark.gluten.sql.columnar.hashagg.enablefinal=true  \
  --conf spark.gluten.sql.enable.native.validation=false  \
  --conf spark.gluten.sql.columnar.backend.ch.use.v2=false  \
  --conf spark.gluten.sql.columnar.separate.scan.rdd.for.ch=false  \
  --conf spark.gluten.sql.columnar.forceshuffledhashjoin=true  \
  --conf spark.gluten.sql.columnar.backend.ch.worker.id=1  \
  --conf spark.gluten.sql.columnar.coalesce.batches=false  \
  --conf spark.gluten.sql.columnar.sort=true  \
  --conf spark.gluten.sql.columnar.backend.ch.runtime_conf.logger.level=error  \
  --conf spark.shuffle.manager=org.apache.spark.shuffle.sort.ColumnarShuffleManager  \
  --conf spark.io.compression.codec=LZ4  \
  --conf spark.gluten.sql.columnar.shuffle.customizedCompression.codec=LZ4  \
  --conf spark.gluten.sql.columnar.backend.ch.customized.shuffle.codec.enable=true  \
  --conf spark.gluten.sql.columnar.backend.ch.customized.buffer.size=4096   \
  --conf spark.gluten.sql.columnar.backend.ch.runtime_conf.enable_nullable=true  \
  --conf spark.gluten.sql.columnar.backend.ch.runtime_conf.local_engine.settings.log_processors_profiles=false  \
  --conf spark.gluten.sql.columnar.backend.ch.runtime_conf.local_engine.settings.metrics_perf_events_enabled=false \
  --conf spark.gluten.sql.native.parquet.writer.enabled=false \
  --conf spark.sql.hive.convertMetastoreParquet=false  \
  --conf spark.local.dir=/data0/spark-local \
  --conf spark.gluten.enabled=true   \
  --conf spark.executorEnv.LD_PRELOAD=/usr/local/clickhouse/lib/libch.so  \
  --conf spark.gluten.sql.columnar.backend.ch.shuffle.hash.algorithm=sparkMurmurHash3_32   \
  --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseSparkCatalog \
  --conf spark.databricks.delta.maxSnapshotLineageLength=20 \
  --conf spark.databricks.delta.snapshotPartitions=1 \
  --conf spark.databricks.delta.properties.defaults.checkpointInterval=5 \
  --conf spark.databricks.delta.stalenessLimit=3600000 \
  --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension \
  --conf spark.databricks.delta.retentionDurationCheck.enabled=false \
  --conf spark.driver.extraClassPath=/data0/hongbin/code/gluten2/backends-clickhouse/target/gluten-1.2.0-SNAPSHOT-spark-3.3-jar-with-dependencies.jar
  • import parquet table into a clickhouse table
create table lineitem_ch using clickhouse location '/tmp/lineitem_ch' as select * from lineitem
  • optimize the clickhouse table (by default will merge to 1G parts)
optimize lineitem_ch
  • vacuum the table
set spark.gluten.enabled=false;
vacuum lineitem_ch retain 0 hours;
vacuum lineitem_ch retain 0 hours;
set spark.gluten.enabled=true;

image

binmahone avatar Mar 28 '24 08:03 binmahone

when testing large table, it's also recommended to set :

 --conf spark.databricks.delta.vacuum.parallelDelete.parallelism=24 
  --conf spark.databricks.delta.vacuum.parallelDelete.enabled=true 

to speed up deleting. by default the delete parallelism is 1

binmahone avatar Apr 11 '24 02:04 binmahone