snowplow-rdb-loader
snowplow-rdb-loader copied to clipboard
Databricks // Trigger parquet FORMAT_OPTION merge-schema in loader, when schema drifts within the batch emited by stream transformer.
Problem
Some of the users highlighted issues with parquet loading:
The investigation concluded that databricks is distributing load based on a number of parquet partitions. Spark would create a Task
per each Core
until it runs out of partitions to load. So higher number of partitions is generally good for performance unless they become too small making Task creating overhead takeover.
FORMAT_OPTIONS functional tests
A set of experiments with mixing parquet schema within the batch showed, that result of their combination is non-deterministic (it losses a different number of columns, sometimes none), and oftentimes results in a data loss of one or more columns. PR referenced above also shows that it is possible to crush the loading of the batch entirely.
COPY_OPTIONS benchmarks
COPY_OPTIONS were benchmarked as a part of a different exercise, showing no impact on loading performance.
The figure below shows the difference between COPY_OPTIONS
vs no COPY_OPTIONS
. The
Average of 10000 runs load time difference between COPY_OPTIONS
vs no COPY_OPTIONS
is < 0.01 sec.
Paritioning/FORMAT_OPTIONS benchmark
Here are the results of our benchmarks (3Gb dataset) on 3 node cluster:
No of Partitions | Heavily utilized cluster | Fresh cluster | FORMAT_OPTIONS on fresh cluster |
---|---|---|---|
1 | 16m51s | 2m33s | 2m 15s |
2 | 3m 53s | 56s | 48s |
2 | 59s | 37s | 51s |
Conclusions
-
On large clusters, it is beneficial to decrease the number of items per partition while increasing the time window. Which increases the number of parquet partitions. Enabling better load distribution.
-
FORMAT_OPTIONS is critical to load the data consistently, during schema drifts.
-
FORMAT_OPTIONS had shown a performance decrease on the datasets with many partitions. FORMAT_OPTIONS merge schema across the parquet dataset, as opposed to COPY OPTIONS that merge schema with the target table.
Proposed Solution
Make the stream transformer detect schema drift within the window and flag such an event as an Optional
part of the ShreddingComplete
message. So that loader would attach the FORMAT_OPTIONS ('merge_schema' = true)
to the COPY
command only when it is necessary.