tempo
tempo copied to clipboard
Provide a table creation utility method to take advantage of Generated Columns for Partition Filtering
Using generated columns, we could get some partition filtering on year, month, and/or day when using the ts_col for filtering at file scan.
https://docs.delta.io/latest/delta-batch.html#use-generated-columns
From the docs on Generated Columns:
Delta Lake may be able to generate partition filters for a query whenever a partition column is defined by one of the following expressions:
CAST(col AS DATE)
and the type of col isTIMESTAMP
.YEAR(col)
and the type of col isTIMESTAMP
. Two partition columns defined byYEAR(col)
,MONTH(col)
and the type of col isTIMESTAMP
. Three partition columns defined byYEAR(col)
,MONTH(col)
,DAY(col)
and the type of col isTIMESTAMP
. Four partition columns defined byYEAR(col)
,MONTH(col)
,DAY(col)
,HOUR(col)
and the type of col isTIMESTAMP
.SUBSTRING(col, pos, len)
and the type of col isSTRING
DATE_FORMAT(col, format)
and the type of col isTIMESTAMP
.
Particularly of interest is partition columns defined by YEAR(col)
, MONTH(col)
, DAY(col)
, and/or HOUR(col)
derived from tsdf.ts_col
attribute.
Example of partition pruning using "/databricks-datasets/amazon/test4K/" with and without generated columns below.
DeltaTable.createOrReplace(spark) \
.tableName("default.no_generated_columns") \
.addColumn("asin", "STRING") \
.addColumn("brand", "STRING") \
.addColumn("helpful", "ARRAY<BIGINT>") \
.addColumn("img", "STRING") \
.addColumn("price", "DOUBLE") \
.addColumn("rating", "DOUBLE") \
.addColumn("review", "STRING") \
.addColumn("time", "BIGINT") \
.addColumn("title", "STRING") \
.addColumn("user", "STRING") \
.addColumn("event_time", "TIMESTAMP") \
.addColumn("year", "INT") \
.addColumn("month", "INT") \
.addColumn("day", "INT") \
.partitionedBy("year", "month", "day") \
.execute()
spark.read.parquet("/databricks-datasets/amazon/test4K/") \
.withColumn("event_time", to_timestamp(from_unixtime("time"))) \
.withColumn("year", year(from_unixtime("time"))) \
.withColumn("month", month(from_unixtime("time"))) \
.withColumn("day", dayofmonth(from_unixtime("time"))) \
.write \
.format("delta").mode("append") \
.saveAsTable("default.no_generated_columns")
spark.table("default.no_generated_columns") \
.filter(expr("event_time > '2012-10-03'")) \
.explain(mode="formatted")
Produces the below plan for Parquet scan:
(1) Scan parquet default.no_generated_columns
Output [14]: [asin#897790, brand#897791, helpful#897792, img#897793, price#897794, rating#897795, review#897796, time#897797L, title#897798, user#897799, event_time#897800, year#897801, month#897802, day#897803]
Batched: true
Location: PreparedDeltaFileIndex [dbfs:/user/hive/warehouse/no_generated_columns]
PushedFilters: [IsNotNull(event_time), GreaterThan(event_time,2012-10-03 00:00:00.0)]
ReadSchema: struct<asin:string,brand:string,helpful:array<bigint>,img:string,price:double,rating:double,review:string,time:bigint,title:string,user:string,event_time:timestamp>
Changing the create table statement to use generated columns for year
, month
, and day
produces the additional partition filters in the physical plan.
DeltaTable.createOrReplace(spark) \
.tableName("default.generated_columns") \
.addColumn("asin", "STRING") \
.addColumn("brand", "STRING") \
.addColumn("helpful", "ARRAY<BIGINT>") \
.addColumn("img", "STRING") \
.addColumn("price", "DOUBLE") \
.addColumn("rating", "DOUBLE") \
.addColumn("review", "STRING") \
.addColumn("time", "BIGINT") \
.addColumn("title", "STRING") \
.addColumn("user", "STRING") \
.addColumn("event_time", "TIMESTAMP") \
.addColumn("year", "INT", generatedAlwaysAs="year(event_time)") \
.addColumn("month", "INT", generatedAlwaysAs="month(event_time)") \
.addColumn("day", "INT", generatedAlwaysAs="day(event_time)") \
.partitionedBy("year", "month", "day") \
.execute()
spark.read.parquet("/databricks-datasets/amazon/test4K/") \
.withColumn("event_time", to_timestamp(from_unixtime("time"))) \
.withColumn("year", year(from_unixtime("time"))) \
.withColumn("month", month(from_unixtime("time"))) \
.withColumn("day", dayofmonth(from_unixtime("time"))) \
.write \
.format("delta").mode("append") \
.saveAsTable("default.generated_columns")
spark.table("default.generated_columns") \
.filter(expr("event_time > '2012-10-03'")) \
.explain(mode="formatted")
(1) Scan parquet default.generated_columns
Output [14]: [asin#899426, brand#899427, helpful#899428, img#899429, price#899430, rating#899431, review#899432, time#899433L, title#899434, user#899435, event_time#899436, year#899437, month#899438, day#899439]
Batched: true
Location: PreparedDeltaFileIndex [dbfs:/user/hive/warehouse/generated_columns]
PartitionFilters: [(((year#899437 > year(cast(2012-10-03 00:00:00 as date))) OR ((year#899437 = year(cast(2012-10-03 00:00:00 as date))) AND (month#899438 > month(cast(2012-10-03 00:00:00 as date))))) OR (((year#899437 = year(cast(2012-10-03 00:00:00 as date))) AND (month#899438 = month(cast(2012-10-03 00:00:00 as date)))) AND (day#899439 >= dayofmonth(cast(2012-10-03 00:00:00 as date)))))]
PushedFilters: [IsNotNull(event_time), GreaterThan(event_time,2012-10-03 00:00:00.0)]
ReadSchema: struct<asin:string,brand:string,helpful:array<bigint>,img:string,price:double,rating:double,review:string,time:bigint,title:string,user:string,event_time:timestamp>