Daft icon indicating copy to clipboard operation
Daft copied to clipboard

optimize performace : Daft count(distinct) on lance

Open Jay-ju opened this issue 6 months ago • 4 comments

Is your feature request related to a problem?

SQL:

select count(distinct(__data_item_id)) from dataset

-- Number of lines: 10 million -- Total data volume: 1.62 GB

  • Daft: 8s
  • Duckdb: 3s
  • Rust Datafusion: 600ms

Describe the solution you'd like

as above

Describe alternatives you've considered

No response

Additional Context

No response

Would you like to implement a fix?

No

Jay-ju avatar Jun 30 '25 13:06 Jay-ju

Do you happen to have more information as to why this takes a while? Such as the Lance dataset we can use to test with?

srilman avatar Jun 30 '25 17:06 srilman

@srilman It seems that this is not just the datasource of Lance. I originally thought that the pushdown didn't push the column down, but it turns out that's not the case.

reproduce script

def test_sql_read_pushdowns_distinct_count(tmp_path):
    import pandas as pd
    import pyarrow as pa
    import pyarrow.parquet as pq
    import time
    import duckdb
    import numpy as np
    import daft
    
    # Create larger test dataset (10,000 rows)
    test_parquet_path = tmp_path / "distinct_count_test.parquet"
    np.random.seed(42)
    
    # Generate data with 50% duplicates for distinct_id
    distinct_ids = [7337737632543232000 + i for i in range(5000)]
    duplicate_ids = distinct_ids * 2  # 10,000 items with 5,000 distinct values
    
    data = {
        "__data_item_id": duplicate_ids,
        "project": np.random.choice(['A', 'B', 'C'], 10000),
        "value": np.random.randint(1, 100, 10000)
    }
    
    df = pd.DataFrame(data)
    pq.write_table(pa.Table.from_pandas(df), str(test_parquet_path))

    # Benchmark Daft
    daft_times = {}
    
    start = time.perf_counter()
    dataset = daft.read_parquet(str(test_parquet_path))
    distinct_df = daft.sql("SELECT COUNT(DISTINCT __data_item_id) FROM dataset")
    result = distinct_df.to_pandas().iloc[0, 0]
    daft_times['distinct'] = time.perf_counter() - start
    assert result == 5000
    
    start = time.perf_counter()
    dataset = daft.read_parquet(str(test_parquet_path))
    count_df = daft.sql("SELECT COUNT(__data_item_id) FROM dataset")
    result = count_df.to_pandas().iloc[0, 0]
    daft_times['count'] = time.perf_counter() - start
    assert result == 10000

    # Benchmark DuckDB
    duckdb_times = {}
    
    conn = duckdb.connect()
    
    start = time.perf_counter()
    distinct_result = conn.execute(f"""
        SELECT COUNT(DISTINCT __data_item_id) 
        FROM '{str(test_parquet_path)}'
    """).fetchone()[0]
    duckdb_times['distinct'] = time.perf_counter() - start
    assert distinct_result == 5000
    
    start = time.perf_counter()
    count_result = conn.execute(f"""
        SELECT COUNT(__data_item_id) 
        FROM '{str(test_parquet_path)}'
    """).fetchone()[0]
    duckdb_times['count'] = time.perf_counter() - start
    assert count_result == 10000

    # Print results
    print(f"\nDaft COUNT(DISTINCT): {daft_times['distinct']:.4f}s")
    print(f"DuckDB COUNT(DISTINCT): {duckdb_times['distinct']:.4f}s")
    print(f"\nDaft COUNT(): {daft_times['count']:.4f}s")
    print(f"DuckDB COUNT(): {duckdb_times['count']:.4f}s")
    
    # Print execution plans
    print("\nDaft COUNT(DISTINCT) plan:")
    distinct_df.explain()
    
    print("\nDuckDB COUNT(DISTINCT) plan:")
    print(conn.execute(f"EXPLAIN SELECT COUNT(DISTINCT __data_item_id) FROM '{str(test_parquet_path)}'").df())


Jay-ju avatar Jul 01 '25 07:07 Jay-ju

@colin-ho colin, When it's convenient for you, please also pay attention to this issue. It seems that it is mainly related to streaming reads. Currently, it doesn't seem to have streaming either.

Jay-ju avatar Jul 02 '25 06:07 Jay-ju

@Jay-ju There are certainly some things we can do to make aggregations in general faster, and in particular count-distinct, but I'm not sure that's something we can get to soon because it requires a larger rewrite.

Where do you see that its related to reads instead?

srilman avatar Jul 07 '25 19:07 srilman