tempo icon indicating copy to clipboard operation
tempo copied to clipboard

Provide a table creation utility method to take advantage of Generated Columns for Partition Filtering

Open R7L208 opened this issue 2 years ago • 0 comments

Using generated columns, we could get some partition filtering on year, month, and/or day when using the ts_col for filtering at file scan.

https://docs.delta.io/latest/delta-batch.html#use-generated-columns

From the docs on Generated Columns:

Delta Lake may be able to generate partition filters for a query whenever a partition column is defined by one of the following expressions:

  • CAST(col AS DATE) and the type of col is TIMESTAMP.
  • YEAR(col) and the type of col is TIMESTAMP. Two partition columns defined by YEAR(col), MONTH(col) and the type of col is TIMESTAMP. Three partition columns defined by YEAR(col), MONTH(col), DAY(col) and the type of col is TIMESTAMP. Four partition columns defined by YEAR(col), MONTH(col), DAY(col), HOUR(col) and the type of col is TIMESTAMP. SUBSTRING(col, pos, len) and the type of col is STRING DATE_FORMAT(col, format) and the type of col is TIMESTAMP.

Particularly of interest is partition columns defined by YEAR(col), MONTH(col), DAY(col), and/or HOUR(col) derived from tsdf.ts_col attribute.


Example of partition pruning using "/databricks-datasets/amazon/test4K/" with and without generated columns below.

DeltaTable.createOrReplace(spark) \
  .tableName("default.no_generated_columns") \
  .addColumn("asin", "STRING") \
  .addColumn("brand", "STRING") \
  .addColumn("helpful", "ARRAY<BIGINT>") \
  .addColumn("img", "STRING") \
  .addColumn("price", "DOUBLE") \
  .addColumn("rating", "DOUBLE") \
  .addColumn("review", "STRING") \
  .addColumn("time", "BIGINT") \
  .addColumn("title", "STRING") \
  .addColumn("user", "STRING") \
  .addColumn("event_time", "TIMESTAMP") \
  .addColumn("year", "INT") \
  .addColumn("month", "INT") \
  .addColumn("day", "INT") \
  .partitionedBy("year", "month", "day") \
  .execute()
  
 spark.read.parquet("/databricks-datasets/amazon/test4K/") \
  .withColumn("event_time", to_timestamp(from_unixtime("time"))) \
  .withColumn("year", year(from_unixtime("time"))) \
  .withColumn("month", month(from_unixtime("time"))) \
  .withColumn("day", dayofmonth(from_unixtime("time"))) \
  .write \
  .format("delta").mode("append") \
  .saveAsTable("default.no_generated_columns")
  
spark.table("default.no_generated_columns") \
  .filter(expr("event_time > '2012-10-03'")) \
  .explain(mode="formatted")

Produces the below plan for Parquet scan:

(1) Scan parquet default.no_generated_columns
Output [14]: [asin#897790, brand#897791, helpful#897792, img#897793, price#897794, rating#897795, review#897796, time#897797L, title#897798, user#897799, event_time#897800, year#897801, month#897802, day#897803]
Batched: true
Location: PreparedDeltaFileIndex [dbfs:/user/hive/warehouse/no_generated_columns]
PushedFilters: [IsNotNull(event_time), GreaterThan(event_time,2012-10-03 00:00:00.0)]
ReadSchema: struct<asin:string,brand:string,helpful:array<bigint>,img:string,price:double,rating:double,review:string,time:bigint,title:string,user:string,event_time:timestamp>

Changing the create table statement to use generated columns for year, month, and day produces the additional partition filters in the physical plan.

DeltaTable.createOrReplace(spark) \
  .tableName("default.generated_columns") \
  .addColumn("asin", "STRING") \
  .addColumn("brand", "STRING") \
  .addColumn("helpful", "ARRAY<BIGINT>") \
  .addColumn("img", "STRING") \
  .addColumn("price", "DOUBLE") \
  .addColumn("rating", "DOUBLE") \
  .addColumn("review", "STRING") \
  .addColumn("time", "BIGINT") \
  .addColumn("title", "STRING") \
  .addColumn("user", "STRING") \
  .addColumn("event_time", "TIMESTAMP") \
  .addColumn("year", "INT", generatedAlwaysAs="year(event_time)") \
  .addColumn("month", "INT", generatedAlwaysAs="month(event_time)") \
  .addColumn("day", "INT", generatedAlwaysAs="day(event_time)") \
  .partitionedBy("year", "month", "day") \
  .execute()

spark.read.parquet("/databricks-datasets/amazon/test4K/") \
  .withColumn("event_time", to_timestamp(from_unixtime("time"))) \
  .withColumn("year", year(from_unixtime("time"))) \
  .withColumn("month", month(from_unixtime("time"))) \
  .withColumn("day", dayofmonth(from_unixtime("time"))) \
  .write \
  .format("delta").mode("append") \
  .saveAsTable("default.generated_columns")

spark.table("default.generated_columns") \
  .filter(expr("event_time > '2012-10-03'")) \
  .explain(mode="formatted")

(1) Scan parquet default.generated_columns
Output [14]: [asin#899426, brand#899427, helpful#899428, img#899429, price#899430, rating#899431, review#899432, time#899433L, title#899434, user#899435, event_time#899436, year#899437, month#899438, day#899439]
Batched: true
Location: PreparedDeltaFileIndex [dbfs:/user/hive/warehouse/generated_columns]
PartitionFilters: [(((year#899437 > year(cast(2012-10-03 00:00:00 as date))) OR ((year#899437 = year(cast(2012-10-03 00:00:00 as date))) AND (month#899438 > month(cast(2012-10-03 00:00:00 as date))))) OR (((year#899437 = year(cast(2012-10-03 00:00:00 as date))) AND (month#899438 = month(cast(2012-10-03 00:00:00 as date)))) AND (day#899439 >= dayofmonth(cast(2012-10-03 00:00:00 as date)))))]
PushedFilters: [IsNotNull(event_time), GreaterThan(event_time,2012-10-03 00:00:00.0)]
ReadSchema: struct<asin:string,brand:string,helpful:array<bigint>,img:string,price:double,rating:double,review:string,time:bigint,title:string,user:string,event_time:timestamp>

R7L208 avatar Jul 12 '22 14:07 R7L208