docs: Add an example argmax reducer using cccl
Hello @shwina!
I've been trying to profile this function that I made. Unfortunately, it's now running slower than the one that awkward currently has (that is using raw cuda-kernels). For example here is me running cccl_argmax function that I wrote, on a randomly generated awkward array:
It takes ~65ms per run.
Here is the same array processed by the
awkward.argmax() function:
If I try to profile cccl_argmax line by line, I see that most of the time is spent on the ak.local_index() function that I use to get the local indices of all the values in an array.
Now what is strange, if I try to profile just the ak.local_index() it runs much faster by itself. (1.84ms compared to 10.7ms inside cccl_argmax)
I've been trying to profile these function using Nvidia Nsight but I don't see anything there. Maybe you can see something that could be slowing the whole thing down. Any suggestions (including other tools I could use to profile) will be greatly appreciated as I'm a bit stuck here now ~
The documentation preview is ready to be viewed at http://preview.awkward-array.org.s3-website.us-east-1.amazonaws.com/PR3763
@maxymnaumchyk - could you, please, post the profiling of the ak.local_index itself? I think, you have shown me? Thanks.
Thanks for the ping! Taking a look now and I'll get back to you soon.
Thanks @maxymnaumchyk - you've run up to a known issue with CUB's DeviceSegmentedReduce function - which is what segmented_reduce calls down to: https://github.com/NVIDIA/cccl/issues/6171.
We're going to prioritize this right away and get back to you here. In the mean time, let me see if we can suggest workarounds.
Thank you for the feedback @shwina! How did you point down that the issue is related to DeviceSegmentedReduce function?
Thanks, @maxymnaumchyk - here are some steps you can use to hopefully reproduce the results I'm seeing.
First, I modified your script a bit for benchmarking:
https://gist.github.com/shwina/9ef807626377c3839ca8266a7de84720#file-awkward_argmax_reduction-_script-py
Notably:
- I added some
cp.cuda.Device().synchronize()calls to ensure that the asynchronous execution of CUDA kernels does not affect the benchmarking - I added an nvtx annotation around the section of code we are interested in studying
Then, I ran this script using the Nsight Systems profiler, and inspected the results using the Nsight Systems GUI. You can also use nsightful if you prefer to view the results on your browser. Here's a notebook that you can run to produce and view the profile with nsightful:
https://gist.github.com/shwina/9ef807626377c3839ca8266a7de84720#file-generate_profile-ipynb
Here's what the resulting profile looked like for me; you can see that most of the time is being spent in the DeviceSegmentedReduce kernel - and comparatively little time is being spent in the local_index kernel:
Now, regarding some of your previous observations --
If I try to profile cccl_argmax line by line, I see that most of the time is spent on the ak.local_index() function that I use to get the local indices of all the values in an array.
My suspicion is that this is due to not having synchronize() after each kernel invocation within the cccl_argmax function. If you added that, do you see more consistent results?
I've been trying to profile these function using Nvidia Nsight but I don't see anything there.
You may have to expand the "CUDA HW" section of the profile to see the call to cub::DeviceSegmentedReduceKernel. Here's what it looks like for me:
Please let me know if you are still having trouble or not seeing similar results to mine! In the mean time, we're working to improve the performance of cub::DeviceSegmentedReduceKernel for small segment sizes.
Thanks a lot for the explanation @shwina! Yes, now after adding cp.cuda.Device().synchronize() calls, I get similar results to yours.
An unrelated question: the cudaDeviceSynchronize process that I see on host isn't actually doing anything? It just makes the host wait until the kernel is executed?
An unrelated question: the cudaDeviceSynchronize process that I see on host isn't actually doing anything? It just makes the host wait until the kernel is executed?
Yes -- precisely!
Hi @maxymnaumchyk - with the latest cuda-cccl (v0.4.4.4); you can define operations that have side-effects This allows using unary_transform to compute argmax, in the following way:
def cccl_argmax_new(awkward_array):
input_data = awkward_array.layout.content.data
# Prepare the start and end offsets
offsets = awkward_array.layout.offsets.data
start_o = offsets[:-1]
end_o = offsets[1:]
# Prepare the output array
n_segments = start_o.size
output = cp.empty([n_segments], dtype=np.int64)
def segment_reduce_op(segment_id: np.int64) -> np.int64:
start_idx = start_o[segment_id]
end_idx = end_o[segment_id]
segment = input_data[start_idx:end_idx]
if len(segment) == 0:
return -1
return np.argmax(segment)
segment_ids = CountingIterator(np.int64(0))
unary_transform(segment_ids, output, segment_reduce_op, n_segments)
return output
The reason this is much faster than segmented_reduce is that the latter currently uses 1 block per segment. For small segment sizes like you have here, that's not super performant. On the contrary, the unary_transform based approach uses 1 thread per segment, keeping the GPU much busier.
On my machine:
Time taken for ak.argmax: 0.0037847493775188925 seconds
Time taken for cccl_argmax: 0.028108258079737426 seconds
Time taken for cccl_argmax_new: 0.0004994929768145084 seconds
Full Script
import awkward as ak
import cupy as cp
import numpy as np
import nvtx
import timeit
from cuda.compute import unary_transform, CountingIterator, gpu_struct, ZipIterator, segmented_reduce
def cccl_argmax(awkward_array):
@gpu_struct
class ak_array:
data: cp.float64
local_index: cp.inat64
# compare the values of the arrays
def max_op(a: ak_array, b: ak_array):
return a if a.data > b.data else b
input_data = awkward_array.layout.content.data
# use an internal awkward function to get the local indicies
local_indicies = ak.local_index(awkward_array, axis=1)
local_indicies = local_indicies.layout.content.data
# Combine data and their indicies into a single structure
# input_struct = cp.stack((input_data, parents), axis=1).view(ak_array.dtype)
input_struct = ZipIterator(input_data, local_indicies)
# Prepare the start and end offsets
offsets = awkward_array.layout.offsets.data
start_o = offsets[:-1]
end_o = offsets[1:]
# Prepare the output array
n_segments = start_o.size
output = cp.zeros([n_segments], dtype=ak_array.dtype)
# Initial value for the reduction
h_init = ak_array(-1, -1)
# Perform the segmented reduce
segmented_reduce(
input_struct, output, start_o, end_o, max_op, h_init, n_segments
)
return output
def cccl_argmax_new(awkward_array):
input_data = awkward_array.layout.content.data
# Prepare the start and end offsets
offsets = awkward_array.layout.offsets.data
start_o = offsets[:-1]
end_o = offsets[1:]
# Prepare the output array
n_segments = start_o.size
output = cp.empty([n_segments], dtype=np.int64)
def segment_reduce_op(segment_id: np.int64) -> np.int64:
start_idx = start_o[segment_id]
end_idx = end_o[segment_id]
segment = input_data[start_idx:end_idx]
if len(segment) == 0:
return -1
return np.argmax(segment)
segment_ids = CountingIterator(np.int64(0))
unary_transform(segment_ids, output, segment_reduce_op, n_segments)
return output
print("Loading the array...")
awkward_array = ak.to_backend(ak.from_parquet(
"random_listoffset_small.parquet"), 'cuda')
# first time, ak.argmax:
_ = ak.argmax(awkward_array, axis=1) # warmup
start_time = timeit.default_timer()
for i in range(10):
expect = ak.argmax(awkward_array, axis=1)
cp.cuda.Device().synchronize()
end_time = timeit.default_timer()
print(f"Time taken for ak.argmax: {(end_time - start_time) / 10} seconds")
# first time, cccl_argmax:
_ = cccl_argmax(awkward_array) # warmup
start_time = timeit.default_timer()
for i in range(10):
got = cccl_argmax(awkward_array)
cp.cuda.Device().synchronize()
end_time = timeit.default_timer()
print(f"Time taken for cccl_argmax: {(end_time - start_time) / 10} seconds")
# check results
assert np.all(ak.to_numpy(ak.to_backend(ak.fill_none(expect, -1), "cpu"))
== got.get()['local_index'])
# next, time cccl_argmax_new:
_ = cccl_argmax_new(awkward_array) # warmup
start_time = timeit.default_timer()
for i in range(10):
got = cccl_argmax_new(awkward_array)
cp.cuda.Device().synchronize()
end_time = timeit.default_timer()
print(
f"Time taken for cccl_argmax_new: {(end_time - start_time) / 10} seconds")
# check results
assert np.all(ak.to_numpy(ak.to_backend(ak.fill_none(expect, -1), "cpu"))
== got.get())