delta-rs
delta-rs copied to clipboard
Allow specifying per-column encoding when writing delta lake tables
Allow specifying per-column encoding to achieve ~95% disk space reduction
According to the parquet specification docs there are many types of encodings available:
A user should be able to specify/overwrite the default dictionary & RLE encoding currently used by delta-rs to specify a different encoding that is more suitable to their data. Alternatively or in addition, auto-detecting the encoding based on a sample of data would be quite nifty.
Use Case
Large time-series data sets often have numeric (int) columns that minimally change between rows and benefit greatly from DELTA_BINARY_PACKED encoding.
Example
Make some fake time-series data and use deltalake and apache pyarrow to write datasets:
import pandas as pd
import numpy as np
from deltalake import write_deltalake
import pyarrow.parquet as pq
import pyarrow as pa
# Make some fake time series data
TOTAL_ROWS = 100_000_000
timestamps = pd.date_range(start=pd.Timestamp.now(), periods=TOTAL_ROWS, freq="5us")
timeline = np.linspace(0, len(timestamps), len(timestamps))
pat = pa.Table.from_pandas(
pd.DataFrame(
{
# Microsecond timestamp
"timestamp": (timestamps.astype("int") / 1000).astype("int"),
# 3 decimals of precision, stored as int
"timeseries_data": (
np.round(
10 * np.sin(2 * np.pi * 50 * timeline),
3,
)
* 1000
).astype("int"),
# 1 minute partitions
"partition_label": timestamps.strftime("%Y%m%d_%H%M"),
}
)
)
output_path_normal = "example_deltalake"
write_deltalake(
output_path_normal,
data=pat,
partition_by=["partition_label"],
engine="rust",
# Can't specify per-column encoding
)
output_path_delta_binary_packed_encoded = "example_pyarrow_delta_binary_packed_encoding"
pq.write_to_dataset(
pat,
output_path_delta_binary_packed_encoded,
partition_cols=["partition_label"],
# Ability to specify column encodings here
column_encoding={
"timestamp": "DELTA_BINARY_PACKED",
"timeseries_data": "DELTA_BINARY_PACKED",
"partition_label": "RLE",
},
use_dictionary=False,
)
The above produces parquet & delta datasets:
6.4M example_pyarrow_delta_binary_packed_encoding
423M example_deltalake
The dataset written with DELTA_BINARY_PACKED encoding is 98.5% smaller!
Related Issues & PRs
- https://github.com/delta-io/delta-rs/issues/1772
- https://github.com/delta-io/delta-rs/issues/3212
- https://github.com/delta-io/delta-rs/pull/3214
It's a matter of exposing it on the columns properties and writer properties and then setting this in the binding layer.
Wanna put a PR up for this?
I must admit I have never used Rust before but I will poke around and try :)
@ion-elgreco here is my attempt: https://github.com/delta-io/delta-rs/pull/3320
Feel free to run with it
Hey looks like the account who made it got deleted :( I was really interested in this as well, @ion-elgreco should I create a new new feature request for this and try to get a PR open?
Putting a more realistic example or just what we would use, still a large reduction.
import pandas as pd
import numpy as np
from deltalake import write_deltalake, WriterProperties, ColumnProperties
import pyarrow.parquet as pq
import pyarrow as pa
# Make some fake time series data
TOTAL_ROWS = 100_000_00
timestamps = pd.date_range(start=pd.Timestamp.now(), periods=TOTAL_ROWS, freq="5ms")
timeline = np.linspace(0, len(timestamps), len(timestamps))
print("Generating data...")
pat = pa.Table.from_pandas(
pd.DataFrame(
{
# timestamp (auto-generated)
"timestamp": timestamps,
# Timeseries data as float32
"timeseries_data": (10 * np.sin(2 * np.pi * 50 * timeline)).astype(
np.float32
),
"timeseries_int_data": (1000 * np.sin(2 * np.pi * 50 * timeline)).astype(
np.int32
),
# 1 minute partitions
"partition_label": timestamps.strftime("%H%M"),
}
)
)
print("Data generated.")
output_path_normal = "example_deltalake"
write_deltalake(
output_path_normal,
data=pat,
partition_by=["partition_label"],
# Enabled compression for equivalent comparison, dictionary enabled leads to a larger file size
writer_properties=WriterProperties(
compression="ZSTD",
compression_level=1,
default_column_properties=ColumnProperties(dictionary_enabled=False),
),
mode="overwrite",
# Can't specify per-column encoding
)
print("Wrote normal delta table.")
output_path_default_encoded = "example_pyarrow_delta_default_encoding"
pq.write_to_dataset(
pat,
output_path_default_encoded,
partition_cols=["partition_label"],
use_dictionary=False,
use_byte_stream_split=True,
compression="ZSTD",
compression_level=1,
)
output_path_delta_specifc_encoded = "example_pyarrow_delta_specifc_col_encoding"
pq.write_to_dataset(
pat,
output_path_delta_specifc_encoded,
partition_cols=["partition_label"],
# Ability to specify column encodings here
use_dictionary=False,
use_byte_stream_split=False,
column_encoding={
"timestamp": "DELTA_BINARY_PACKED",
"timeseries_data": "BYTE_STREAM_SPLIT",
"timeseries_int_data": "DELTA_BINARY_PACKED",
"partition_label": "DELTA_BINARY_PACKED",
},
compression="ZSTD",
compression_level=1,
)
print("Wrote delta table with pyarrow column encodings.")
Comes to the following The File size of delta table is 85.5853796005249 MB The File size of delta table with pyarrow default column encodings is 22.02027130126953 MB The File size of delta table with pyarrow specific column encodings is 17.961445808410645 MB
Package versions used deltalake = "1.1.4" pyarrow = "17.0.0" pandas = "2.2.2." numpy = "1.26.3"
Environment: Windows 11 Python 3.11.13
@niltecedu feel free to make a PR :)
Thanks @ion-elgreco Let me get something open!
take
Initial testing with the updated code is
The File size of delta table is 13.744157791137695 MB The File size of delta table with parquet encoding is 3.4160261154174805 MB The File size of delta table with pyarrow default column encodings is 3.4160261154174805 MB The File size of delta table with pyarrow specific column encodings is 2.8036413192749023 MB
Enviroment: WSL 2 Ubuntu 24.04 Python 3.12.3
PR on the way. Will look into why pyarrow encoding is more efficient with same parameters, maybe I am being blind here. Posting the entire testing code down below, wasnt sure where I should put it in the directory
import os
import pandas as pd
import numpy as np
from deltalake import write_deltalake, WriterProperties, ColumnProperties,DeltaTable
from pathlib import Path
import pyarrow.parquet as pq
import pyarrow as pa
# Make some fake time series data
TOTAL_ROWS = 100_000_0
timestamps = pd.date_range(start=pd.Timestamp.now(), periods=TOTAL_ROWS, freq="5ms")
timeline = np.linspace(0, len(timestamps), len(timestamps))
print("Generating data...")
pat = pa.Table.from_pandas(
pd.DataFrame(
{
# timestamp (auto-generated)
"timestamp": timestamps,
# Timeseries data as float32
"timeseries_data": (10 * np.sin(2 * np.pi * 50 * timeline)).astype(
np.float32
),
"timeseries_int_data": (1000 * np.sin(2 * np.pi * 50 * timeline)).astype(
np.int32
),
# 1 minute partitions
"partition_label": timestamps.strftime("%H%M"),
}
)
)
print("Data generated.")
output_path_normal = "example_deltalake"
write_deltalake(
output_path_normal,
data=pat,
partition_by=["partition_label"],
# Enabled compression for equivalent comparison, dictionary enabled leads to a larger file size
writer_properties=WriterProperties(
compression="ZSTD",
compression_level=22,
default_column_properties=ColumnProperties(dictionary_enabled=True),
),
mode="overwrite",
# Can't specify per-column encoding
)
print("Wrote encoded delta table.")
output_path_encoded = "encoded_example_deltalake"
write_deltalake(
output_path_encoded,
data=pat,
partition_by=["partition_label"],
# Enabled compression for equivalent comparison, dictionary enabled leads to a larger file size
writer_properties=WriterProperties(
compression="ZSTD",
compression_level=22,
column_properties={
"timestamp": ColumnProperties(dictionary_enabled=False,encoding="DELTA_BINARY_PACKED"),
"timeseries_data": ColumnProperties(dictionary_enabled=False,encoding="BYTE_STREAM_SPLIT"),
"timeseries_int_data": ColumnProperties(dictionary_enabled=False,encoding="DELTA_BINARY_PACKED"),
"partition_label": ColumnProperties(dictionary_enabled=False,encoding="DELTA_BINARY_PACKED"),
},
),
mode="overwrite",
# Can't specify per-column encoding
)
print("Wrote normal delta table.")
output_path_default_encoded = "example_pyarrow_delta_default_encoding"
pq.write_to_dataset(
pat,
output_path_default_encoded,
partition_cols=["partition_label"],
use_dictionary=False,
use_byte_stream_split=True,
compression="ZSTD",
compression_level=22,
)
output_path_delta_specifc_encoded = "example_pyarrow_delta_specifc_col_encoding"
pq.write_to_dataset(
pat,
output_path_delta_specifc_encoded,
partition_cols=["partition_label"],
# Ability to specify column encodings here
use_dictionary=False,
use_byte_stream_split=False,
column_encoding={
"timestamp": "DELTA_BINARY_PACKED",
"timeseries_data": "BYTE_STREAM_SPLIT",
"timeseries_int_data": "DELTA_BINARY_PACKED",
"partition_label": "DELTA_BINARY_PACKED",
},
compression="ZSTD",
compression_level=22,
)
print("Wrote delta table with pyarrow column encodings.")
def get_folder_size(folder):
return ByteSize(
sum(file.stat().st_size for file in Path(folder).rglob("*"))
).megabytes
class ByteSize(int):
_KB = 1024
_suffixes = "B", "KB", "MB", "GB", "PB"
def __new__(cls, *args, **kwargs):
return super().__new__(cls, *args, **kwargs)
def __init__(self, *args, **kwargs):
self.bytes = self.B = int(self)
self.kilobytes = self.KB = self / self._KB**1
self.megabytes = self.MB = self / self._KB**2
self.gigabytes = self.GB = self / self._KB**3
self.petabytes = self.PB = self / self._KB**4
*suffixes, last = self._suffixes
suffix = next(
(suffix for suffix in suffixes if 1 < getattr(self, suffix) < self._KB),
last,
)
self.readable = suffix, getattr(self, suffix)
super().__init__()
def __str__(self):
return self.__format__(".2f")
print(DeltaTable(output_path_encoded).to_pandas())
print(f"The File size of delta table is {get_folder_size(output_path_normal)} MB")
print(f"The File size of delta table with parquet encoding is {get_folder_size(output_path_default_encoded)} MB")
print(
f"The File size of delta table with pyarrow default column encodings is {get_folder_size(output_path_default_encoded)} MB"
)
print(
f"The File size of delta table with pyarrow specific column encodings is {get_folder_size(output_path_delta_specifc_encoded)} MB"
)
print("Deleting the folders now...")
import shutil
shutil.rmtree(output_path_normal)
shutil.rmtree(output_path_encoded)
shutil.rmtree(output_path_default_encoded)
shutil.rmtree(output_path_delta_specifc_encoded)
print("Deleted the folders.")
@ion-elgreco Opened the PR #3737 , let me know whats needed (I can repoint to a version release branch). This is my first public Rust project contribution so things might be lacking but happy to get them fixed And how much time does take for version releases which contain features?