modin icon indicating copy to clipboard operation
modin copied to clipboard

Feature Request: Integration of Delta Lake Kernel with Modin for Advanced DataFrame Operations

Open tlinkin opened this issue 2 years ago • 2 comments
trafficstars

While modin allows us to scale and easily develop data code, it currently lacks a mechanism to allow the processing of large, yet frequently updated data frames within production pipelines. There are also numerous other benefits which would allow modin to be 1st rate alternative to something like PySpark - ei: ACID Transactions, Time Travel, Concurrent Read/Write, etc - more info: https://delta.io/

With the release of delta 3.0.0rc1, the Delta Kernel implementation has been announced - a low level library which will greatly help to facilitate the building of a modin dataframe API to integrate with the DeltaLake Format. pd.to_deltalake(), pd.read_deltalake()

Delta Kernel Announcement taken from: https://github.com/delta-io/delta/releases/tag/v3.0.0rc1

Delta Kernel

The Delta Kernel project is a set of Java libraries (Rust will be coming soon) for building Delta connectors that can read (and soon, write to) Delta tables without the need to understand the Delta protocol details).

You can use this library to do the following:

  • Read data from small Delta tables in a single thread in a single process.
  • Read data from large Delta tables using multiple threads in a single process.
  • Build a complex connector for a distributed processing engine and read very large Delta tables.
  • [soon!] Write to Delta tables from multiple threads / processes / distributed engines.

Here is an example of a simple table scan with a filter:

TableClient myTableClient = DefaultTableClient.create() ;        // define a client (more details below)
Table myTable = Table.forPath("/delta/table/path");              // define what table to scan
Snapshot mySnapshot = myTable.getLatestSnapshot(myTableClient);  // define which version of table to scan
Scan myScan = mySnapshot.getScanBuilder(myTableClient)           // specify the scan details
        .withFilters(scanFilter)
        .build();
Scan.readData(...)                                               // returns the table data 

For more information, refer to Delta Kernel Github docs.

tlinkin avatar Aug 25 '23 02:08 tlinkin

Thanks @tlinkin! Looks interesting!

cc @mvashishtha @Garra1980

anmyachev avatar Aug 25 '23 11:08 anmyachev

While researching alternatives, a possible solution if running on Ray Cluster is to leverage https://github.com/oap-project/raydp.

For example load dataframe from spark (full delta lake support) and pass it to modin. Do some data magic, then again pass the dataframe to spark to write it do the delta lake.

If this is possible (have not tested it yet, but will soon) and there is no drawbacks in terms of speed, apart of spinning up a spark ray instances it can serve as a valid workaround. What do you think ?

tlinkin avatar Aug 25 '23 11:08 tlinkin