optimize performace : Daft count(distinct) on lance
Is your feature request related to a problem?
SQL:
select count(distinct(__data_item_id)) from dataset
-- Number of lines: 10 million -- Total data volume: 1.62 GB
- Daft: 8s
- Duckdb: 3s
- Rust Datafusion: 600ms
Describe the solution you'd like
as above
Describe alternatives you've considered
No response
Additional Context
No response
Would you like to implement a fix?
No
Do you happen to have more information as to why this takes a while? Such as the Lance dataset we can use to test with?
@srilman It seems that this is not just the datasource of Lance. I originally thought that the pushdown didn't push the column down, but it turns out that's not the case.
reproduce script
def test_sql_read_pushdowns_distinct_count(tmp_path):
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import time
import duckdb
import numpy as np
import daft
# Create larger test dataset (10,000 rows)
test_parquet_path = tmp_path / "distinct_count_test.parquet"
np.random.seed(42)
# Generate data with 50% duplicates for distinct_id
distinct_ids = [7337737632543232000 + i for i in range(5000)]
duplicate_ids = distinct_ids * 2 # 10,000 items with 5,000 distinct values
data = {
"__data_item_id": duplicate_ids,
"project": np.random.choice(['A', 'B', 'C'], 10000),
"value": np.random.randint(1, 100, 10000)
}
df = pd.DataFrame(data)
pq.write_table(pa.Table.from_pandas(df), str(test_parquet_path))
# Benchmark Daft
daft_times = {}
start = time.perf_counter()
dataset = daft.read_parquet(str(test_parquet_path))
distinct_df = daft.sql("SELECT COUNT(DISTINCT __data_item_id) FROM dataset")
result = distinct_df.to_pandas().iloc[0, 0]
daft_times['distinct'] = time.perf_counter() - start
assert result == 5000
start = time.perf_counter()
dataset = daft.read_parquet(str(test_parquet_path))
count_df = daft.sql("SELECT COUNT(__data_item_id) FROM dataset")
result = count_df.to_pandas().iloc[0, 0]
daft_times['count'] = time.perf_counter() - start
assert result == 10000
# Benchmark DuckDB
duckdb_times = {}
conn = duckdb.connect()
start = time.perf_counter()
distinct_result = conn.execute(f"""
SELECT COUNT(DISTINCT __data_item_id)
FROM '{str(test_parquet_path)}'
""").fetchone()[0]
duckdb_times['distinct'] = time.perf_counter() - start
assert distinct_result == 5000
start = time.perf_counter()
count_result = conn.execute(f"""
SELECT COUNT(__data_item_id)
FROM '{str(test_parquet_path)}'
""").fetchone()[0]
duckdb_times['count'] = time.perf_counter() - start
assert count_result == 10000
# Print results
print(f"\nDaft COUNT(DISTINCT): {daft_times['distinct']:.4f}s")
print(f"DuckDB COUNT(DISTINCT): {duckdb_times['distinct']:.4f}s")
print(f"\nDaft COUNT(): {daft_times['count']:.4f}s")
print(f"DuckDB COUNT(): {duckdb_times['count']:.4f}s")
# Print execution plans
print("\nDaft COUNT(DISTINCT) plan:")
distinct_df.explain()
print("\nDuckDB COUNT(DISTINCT) plan:")
print(conn.execute(f"EXPLAIN SELECT COUNT(DISTINCT __data_item_id) FROM '{str(test_parquet_path)}'").df())
@colin-ho colin, When it's convenient for you, please also pay attention to this issue. It seems that it is mainly related to streaming reads. Currently, it doesn't seem to have streaming either.
@Jay-ju There are certainly some things we can do to make aggregations in general faster, and in particular count-distinct, but I'm not sure that's something we can get to soon because it requires a larger rewrite.
Where do you see that its related to reads instead?