[Enh]: TODO: ArrowSeriesCatNamespace has an operation that is potentially expensive
Please describe the purpose of the new feature or describe the problem to solve.
See this snippet
class ArrowSeriesCatNamespace:
def __init__(self, series: ArrowSeries) -> None:
self._arrow_series = series
def get_categories(self) -> ArrowSeries:
pa = get_pyarrow()
ca = self._arrow_series._native_series
# TODO(Unassigned): this looks potentially expensive - is there no better way?
out = pa.chunked_array(
[pa.concat_arrays([x.dictionary for x in ca.chunks]).unique()]
)
return self._arrow_series._from_native_series(out)
Would ca.combine_chunks().dictionary.unique() be any better?
dunno...from SO, someone's suggesting this https://stackoverflow.com/questions/78697270/get-categories-of-arrow-chunkedarray/78698208#78698208
pa.chunked_array(x.dictionary for x in ca.chunks).unique()
If it wasn't already benchmarked and implemeted I'd like to take it
sure, if this interests you, a data-driven decision would be good!
I run some benchmarks, the TL;DR is the following:
- the idea I proposed to combine chunks first is ~absolute garbage~ orders of magnitude slower
- changing our implementation from list comprehension to a generator seems to be slightly faster (~5%) but have significant less variability.
out = pa.chunked_array( - [pa.concat_arrays([x.dictionary for x in ca.chunks]).unique()] + [pa.concat_arrays(x.dictionary for x in ca.chunks).unique()] )
Timings with 34M rows, 100 runs, 26 different categories, chunked array with 3 chunks (times in ns):
current: mean=9811.13, std=2007.93, min=8883, max=28028 generator: mean=9309.68, std=702.47, min=8593, max=14887 combine_first: mean=81949949.25, std=15852923.98, min=63757292, max=192054893
Script to replicate
from time import perf_counter_ns as perf_counter
from functools import wraps
import string
import pyarrow as pa
import numpy as np
REPEAT = 100
IGNORE_FIRST = True
SIZE = 1_000_000
def timeit(repeat=REPEAT, ignore_first=IGNORE_FIRST):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
total_times = []
for i, _ in enumerate(range(repeat)):
start_time = perf_counter() # Record start time
_ = func(*args, **kwargs)
end_time = perf_counter() # Record end time
if (
ignore_first and i == 0
): # Ignore the first run if ignore_first is True
continue
total_times.append(
end_time - start_time
) # Keep a list of all total times
return total_times
return wrapper
return decorator
table = pa.table(
{
"a": pa.chunked_array([
["a", "b", None, "d"]*SIZE,
["b", "c", None, "d"]*SIZE,
list(string.ascii_lowercase)*SIZE,
],
pa.dictionary(pa.int64(), pa.utf8())
)
}
)
ca = table["a"]
@timeit()
def current(ca):
return pa.chunked_array(
[pa.concat_arrays([x.dictionary for x in ca.chunks]).unique()]
)
@timeit()
def generator(ca):
return pa.chunked_array(
[pa.chunked_array(x.dictionary for x in ca.chunks).unique()]
)
@timeit()
def combine_first(ca):
return pa.chunked_array(ca.combine_chunks().dictionary.unique())
t1 = current(ca)
t2 = generator(ca)
t3 = combine_first(ca)
print(f"current: mean={np.mean(t1)}, std={np.std(t1)}, min={np.min(t1)}, max={np.max(t1)}")
print(f"generator: mean={np.mean(t2)}, std={np.std(t2)}, min={np.min(t2)}, max={np.max(t2)}")
print(f"combine_first: mean={np.mean(t3)}, std={np.std(t3)}, min={np.min(t3)}, max={np.max(t3)}")
python t.py
current: mean=9811.13, std=2007.93, min=8883, max=28028 generator: mean=9309.68, std=702.47, min=8593, max=14887 combine_first: mean=81949949.25, std=15852923.98, min=63757292, max=192054893