delta-rs icon indicating copy to clipboard operation
delta-rs copied to clipboard

Allow specifying per-column encoding when writing delta lake tables

Open ghost opened this issue 8 months ago • 3 comments
trafficstars

Allow specifying per-column encoding to achieve ~95% disk space reduction

According to the parquet specification docs there are many types of encodings available:

A user should be able to specify/overwrite the default dictionary & RLE encoding currently used by delta-rs to specify a different encoding that is more suitable to their data. Alternatively or in addition, auto-detecting the encoding based on a sample of data would be quite nifty.

Use Case

Large time-series data sets often have numeric (int) columns that minimally change between rows and benefit greatly from DELTA_BINARY_PACKED encoding.

Example

Make some fake time-series data and use deltalake and apache pyarrow to write datasets:

import pandas as pd
import numpy as np

from deltalake import write_deltalake

import pyarrow.parquet as pq
import pyarrow as pa

# Make some fake time series data
TOTAL_ROWS = 100_000_000
timestamps = pd.date_range(start=pd.Timestamp.now(), periods=TOTAL_ROWS, freq="5us")
timeline = np.linspace(0, len(timestamps), len(timestamps))
pat = pa.Table.from_pandas(
    pd.DataFrame(
        {
            # Microsecond timestamp
            "timestamp": (timestamps.astype("int") / 1000).astype("int"),
            # 3 decimals of precision, stored as int
            "timeseries_data": (
                np.round(
                    10 * np.sin(2 * np.pi * 50 * timeline),
                    3,
                )
                * 1000
            ).astype("int"),
            # 1 minute partitions
            "partition_label": timestamps.strftime("%Y%m%d_%H%M"),
        }
    )
)

output_path_normal = "example_deltalake"
write_deltalake(
    output_path_normal,
    data=pat,
    partition_by=["partition_label"],
    engine="rust",
    # Can't specify per-column encoding
)


output_path_delta_binary_packed_encoded = "example_pyarrow_delta_binary_packed_encoding"
pq.write_to_dataset(
    pat,
    output_path_delta_binary_packed_encoded,
    partition_cols=["partition_label"],
    # Ability to specify column encodings here
    column_encoding={
        "timestamp": "DELTA_BINARY_PACKED",
        "timeseries_data": "DELTA_BINARY_PACKED",
        "partition_label": "RLE",
    },
    use_dictionary=False,
)

The above produces parquet & delta datasets:

6.4M	example_pyarrow_delta_binary_packed_encoding
423M	example_deltalake

The dataset written with DELTA_BINARY_PACKED encoding is 98.5% smaller!

Related Issues & PRs

  • https://github.com/delta-io/delta-rs/issues/1772
  • https://github.com/delta-io/delta-rs/issues/3212
  • https://github.com/delta-io/delta-rs/pull/3214

ghost avatar Mar 11 '25 21:03 ghost

It's a matter of exposing it on the columns properties and writer properties and then setting this in the binding layer.

Wanna put a PR up for this?

ion-elgreco avatar Mar 11 '25 22:03 ion-elgreco

I must admit I have never used Rust before but I will poke around and try :)

ghost avatar Mar 11 '25 22:03 ghost

@ion-elgreco here is my attempt: https://github.com/delta-io/delta-rs/pull/3320

Feel free to run with it

ghost avatar Mar 12 '25 13:03 ghost

Hey looks like the account who made it got deleted :( I was really interested in this as well, @ion-elgreco should I create a new new feature request for this and try to get a PR open?

niltecedu avatar Sep 05 '25 11:09 niltecedu

Putting a more realistic example or just what we would use, still a large reduction.

import pandas as pd
import numpy as np

from deltalake import write_deltalake, WriterProperties, ColumnProperties

import pyarrow.parquet as pq
import pyarrow as pa

# Make some fake time series data
TOTAL_ROWS = 100_000_00
timestamps = pd.date_range(start=pd.Timestamp.now(), periods=TOTAL_ROWS, freq="5ms")
timeline = np.linspace(0, len(timestamps), len(timestamps))
print("Generating data...")
pat = pa.Table.from_pandas(
    pd.DataFrame(
        {
            # timestamp (auto-generated)
            "timestamp": timestamps,
            # Timeseries data as float32
            "timeseries_data": (10 * np.sin(2 * np.pi * 50 * timeline)).astype(
                np.float32
            ),
            "timeseries_int_data": (1000 * np.sin(2 * np.pi * 50 * timeline)).astype(
                np.int32
            ),
            # 1 minute partitions
            "partition_label": timestamps.strftime("%H%M"),
        }
    )
)
print("Data generated.")
output_path_normal = "example_deltalake"
write_deltalake(
    output_path_normal,
    data=pat,
    partition_by=["partition_label"],
    # Enabled compression for equivalent comparison, dictionary enabled leads to a larger file size
    writer_properties=WriterProperties(
        compression="ZSTD",
        compression_level=1,
        default_column_properties=ColumnProperties(dictionary_enabled=False),
    ),
    mode="overwrite",
    # Can't specify per-column encoding
)
print("Wrote normal delta table.")


output_path_default_encoded = "example_pyarrow_delta_default_encoding"
pq.write_to_dataset(
    pat,
    output_path_default_encoded,
    partition_cols=["partition_label"],
    use_dictionary=False,
    use_byte_stream_split=True,
    compression="ZSTD",
    compression_level=1,
)


output_path_delta_specifc_encoded = "example_pyarrow_delta_specifc_col_encoding"
pq.write_to_dataset(
    pat,
    output_path_delta_specifc_encoded,
    partition_cols=["partition_label"],
    # Ability to specify column encodings here
    use_dictionary=False,
    use_byte_stream_split=False,
    column_encoding={
        "timestamp": "DELTA_BINARY_PACKED",
        "timeseries_data": "BYTE_STREAM_SPLIT",
        "timeseries_int_data": "DELTA_BINARY_PACKED",
        "partition_label": "DELTA_BINARY_PACKED",
    },
    compression="ZSTD",
    compression_level=1,
)
print("Wrote delta table with pyarrow column encodings.")

Comes to the following The File size of delta table is 85.5853796005249 MB The File size of delta table with pyarrow default column encodings is 22.02027130126953 MB The File size of delta table with pyarrow specific column encodings is 17.961445808410645 MB

Package versions used deltalake = "1.1.4" pyarrow = "17.0.0" pandas = "2.2.2." numpy = "1.26.3"

Environment: Windows 11 Python 3.11.13

niltecedu avatar Sep 05 '25 12:09 niltecedu

@niltecedu feel free to make a PR :)

ion-elgreco avatar Sep 05 '25 12:09 ion-elgreco

Thanks @ion-elgreco Let me get something open!

niltecedu avatar Sep 05 '25 12:09 niltecedu

take

niltecedu avatar Sep 05 '25 12:09 niltecedu

Initial testing with the updated code is

The File size of delta table is 13.744157791137695 MB The File size of delta table with parquet encoding is 3.4160261154174805 MB The File size of delta table with pyarrow default column encodings is 3.4160261154174805 MB The File size of delta table with pyarrow specific column encodings is 2.8036413192749023 MB

Enviroment: WSL 2 Ubuntu 24.04 Python 3.12.3

PR on the way. Will look into why pyarrow encoding is more efficient with same parameters, maybe I am being blind here. Posting the entire testing code down below, wasnt sure where I should put it in the directory

import os
import pandas as pd
import numpy as np

from deltalake import write_deltalake, WriterProperties, ColumnProperties,DeltaTable
from pathlib import Path

import pyarrow.parquet as pq
import pyarrow as pa

# Make some fake time series data
TOTAL_ROWS = 100_000_0
timestamps = pd.date_range(start=pd.Timestamp.now(), periods=TOTAL_ROWS, freq="5ms")
timeline = np.linspace(0, len(timestamps), len(timestamps))
print("Generating data...")
pat = pa.Table.from_pandas(
    pd.DataFrame(
        {
            # timestamp (auto-generated)
            "timestamp": timestamps,
            # Timeseries data as float32
            "timeseries_data": (10 * np.sin(2 * np.pi * 50 * timeline)).astype(
                np.float32
            ),
            "timeseries_int_data": (1000 * np.sin(2 * np.pi * 50 * timeline)).astype(
                np.int32
            ),
            # 1 minute partitions
            "partition_label": timestamps.strftime("%H%M"),
        }
    )
)
print("Data generated.")
output_path_normal = "example_deltalake"
write_deltalake(
    output_path_normal,
    data=pat,
    partition_by=["partition_label"],
    # Enabled compression for equivalent comparison, dictionary enabled leads to a larger file size
    writer_properties=WriterProperties(
        compression="ZSTD",
        compression_level=22,
        default_column_properties=ColumnProperties(dictionary_enabled=True),
    ),
    mode="overwrite",
    # Can't specify per-column encoding
)
print("Wrote encoded delta table.")



output_path_encoded = "encoded_example_deltalake"
write_deltalake(
    output_path_encoded,
    data=pat,
    partition_by=["partition_label"],
    # Enabled compression for equivalent comparison, dictionary enabled leads to a larger file size
    writer_properties=WriterProperties(
        compression="ZSTD",
        compression_level=22,
        column_properties={
            "timestamp": ColumnProperties(dictionary_enabled=False,encoding="DELTA_BINARY_PACKED"),
            "timeseries_data": ColumnProperties(dictionary_enabled=False,encoding="BYTE_STREAM_SPLIT"),
            "timeseries_int_data": ColumnProperties(dictionary_enabled=False,encoding="DELTA_BINARY_PACKED"),
            "partition_label": ColumnProperties(dictionary_enabled=False,encoding="DELTA_BINARY_PACKED"),
        },
    ),
    mode="overwrite",
    # Can't specify per-column encoding
)
print("Wrote normal delta table.")

output_path_default_encoded = "example_pyarrow_delta_default_encoding"
pq.write_to_dataset(
    pat,
    output_path_default_encoded,
    partition_cols=["partition_label"],
    use_dictionary=False,
    use_byte_stream_split=True,
    compression="ZSTD",
    compression_level=22,
)


output_path_delta_specifc_encoded = "example_pyarrow_delta_specifc_col_encoding"
pq.write_to_dataset(
    pat,
    output_path_delta_specifc_encoded,
    partition_cols=["partition_label"],
    # Ability to specify column encodings here
    use_dictionary=False,
    use_byte_stream_split=False,
    column_encoding={
        "timestamp": "DELTA_BINARY_PACKED",
        "timeseries_data": "BYTE_STREAM_SPLIT",
        "timeseries_int_data": "DELTA_BINARY_PACKED",
        "partition_label": "DELTA_BINARY_PACKED",
    },
    compression="ZSTD",
    compression_level=22,
)
print("Wrote delta table with pyarrow column encodings.")


def get_folder_size(folder):
    return ByteSize(
        sum(file.stat().st_size for file in Path(folder).rglob("*"))
    ).megabytes


class ByteSize(int):
    _KB = 1024
    _suffixes = "B", "KB", "MB", "GB", "PB"

    def __new__(cls, *args, **kwargs):
        return super().__new__(cls, *args, **kwargs)

    def __init__(self, *args, **kwargs):
        self.bytes = self.B = int(self)
        self.kilobytes = self.KB = self / self._KB**1
        self.megabytes = self.MB = self / self._KB**2
        self.gigabytes = self.GB = self / self._KB**3
        self.petabytes = self.PB = self / self._KB**4
        *suffixes, last = self._suffixes
        suffix = next(
            (suffix for suffix in suffixes if 1 < getattr(self, suffix) < self._KB),
            last,
        )
        self.readable = suffix, getattr(self, suffix)

        super().__init__()

    def __str__(self):
        return self.__format__(".2f")

print(DeltaTable(output_path_encoded).to_pandas())

print(f"The File size of delta table is {get_folder_size(output_path_normal)} MB")
print(f"The File size of delta table with parquet encoding is {get_folder_size(output_path_default_encoded)} MB")
print(
    f"The File size of delta table with pyarrow default column encodings is {get_folder_size(output_path_default_encoded)} MB"
)
print(
    f"The File size of delta table with pyarrow specific column encodings is {get_folder_size(output_path_delta_specifc_encoded)} MB"
)


print("Deleting the folders now...")
import shutil

shutil.rmtree(output_path_normal)
shutil.rmtree(output_path_encoded)
shutil.rmtree(output_path_default_encoded)
shutil.rmtree(output_path_delta_specifc_encoded)
print("Deleted the folders.")

niltecedu avatar Sep 05 '25 15:09 niltecedu

@ion-elgreco Opened the PR #3737 , let me know whats needed (I can repoint to a version release branch). This is my first public Rust project contribution so things might be lacking but happy to get them fixed And how much time does take for version releases which contain features?

niltecedu avatar Sep 05 '25 16:09 niltecedu