oneTBB icon indicating copy to clipboard operation
oneTBB copied to clipboard

TBB may not handle RAII struct very well in parallel_reduce

Open LXYan2333 opened this issue 2 years ago • 7 comments

Hello, I have an OpenMP parallized program, and I want to migrate to use TBB.

Baically it creates 3 matrices on each threads and do a lot of calculation. At the end, all these matrixs on different threads are added up to 3 final matrices.

So I choose tbb::parallel_reduce to parallize it:

struct res_struct {
    std::vector<double> a;
    std::vector<double> b;
    std::vector<double> c;
};

auto res = tbb::parallel_reduce(
        tbb::blocked_range{0, ij_end_index * (ij_end_index + 1) / 2, 1024},
        res_struct{std::vector<double>(m_size * m_size * batch_size, 0),
                   std::vector<double>(m_size * m_size * batch_size, 0),
                   std::vector<double>(m_size * m_size * batch_size, 0)},
        [&](const auto& range, res_struct res) {
            // do a lot work about res
            return res;
        },
        [&](res_struct res1, res_struct res2) {
            std::transform(res1.a.begin(),
                           res1.a.end(),
                           res2.a.begin(),
                           res1.a.begin(),
                           std::plus<double>());
            std::transform(res1.b.begin(),
                           res1.b.end(),
                           res2.b.begin(),
                           res1.b.begin(),
                           std::plus<double>());
            std::transform(res1.c.begin(),
                           res1.c.end(),
                           res2.c.begin(),
                           res1.c.begin(),
                           std::plus<double>());
            return res1;
        });

After migration, the program works as before, but the performance droped. The run time rised from 23.125s to 26.932s.

I did some tests using vtune, and found that memory access is significantly higher than before.

OpenMP (23s)

图片

TBB parallel_reduce (26s)

图片

and in the thread test, there is a lot of spin time:

Spin time of TBB parallel_reduce

图片

I guess it might be caused by TBB copied these matrices (instead of move them) in some cases, so I did a little demo:

#include <chrono>
#include <iostream>
#include <tbb/tbb.h>
#include <thread>

tbb::spin_mutex cout_mutex;

tbb::spin_mutex pig_count_mutex;
static int pig_count = 0;

struct Pig {

    int order;

    Pig() : order(pig_count) {
        tbb::spin_mutex::scoped_lock lock(pig_count_mutex);
        pig_count++;
        std::cout << "I'm the " << order << " pig, "
                  << "construct!" << std::endl;
    };

    Pig(const Pig& other) {
        {
            tbb::spin_mutex::scoped_lock lock(pig_count_mutex);
            order = pig_count;
            pig_count++;
        }
        std::cout << "I'm the " << order << " pig, "
                  << "copy construct from" << other.order << std::endl;
    };
    Pig(Pig&& other) noexcept {
        {
            tbb::spin_mutex::scoped_lock lock(pig_count_mutex);
            order = pig_count;
            pig_count++;
        }
        std::cout << "I'm the " << order << " pig, "
                  << "move construct from" << other.order << std::endl;
    };

    Pig& operator=(const Pig& other) {
        std::cout << "I'm the " << order << " pig, "
                  << "copy assignment from " << other.order << std::endl;
        return *this;
    };

    Pig& operator=(Pig&& other) noexcept {
        std::cout << "I'm the " << order << " pig, "
                  << "move assignment! from" << other.order << std::endl;
        return *this;
    };

    ~Pig() {
        std::cout << "I'm the " << order << " pig, "
                  << "destruct!" << std::endl;
    };

    void do_something() const {
        std::cout << "something is done to pig " << order << std::endl;
    }
};

int main() {
    {
        tbb::parallel_reduce(
            tbb::blocked_range<int>(0, 2),
            Pig{},
            [](const tbb::blocked_range<int>& range, Pig pig) {
                {
                    tbb::spin_mutex::scoped_lock lock(cout_mutex);
                    std::cout << "real body! range " << range.begin() << " "
                              << range.end() << std::endl;
                    pig.do_something();
                }
                std::this_thread::sleep_for(std::chrono::seconds(2));
                return pig;
            },
            [](Pig left, const Pig&) {
                {
                    tbb::spin_mutex::scoped_lock lock(cout_mutex);
                    std::cout << "reduction" << std::endl;
                    left.do_something();
                }
                std::this_thread::sleep_for(std::chrono::seconds(2));
                return left;
            });
    }
    std::cout << "reduce complete!" << std::endl;
}

run it:

$ taskset -c 0,1 ./pig
I'm the 0 pig, construct!
I'm the 1 pig, copy construct from0                  # why? since pig 1 is not used later, and is move assigned by pig 5
I'm the 2 pig, copy construct from1
real body! range 0 1
something is done to pig 2
I'm the 3 pig, copy construct from0                 # why? pig 4 can directly copy/move from pig 0
I'm the 4 pig, copy construct from3                 # since it is the last reduce element, I guess it can be directly move from pig 0?
real body! range 1 2
something is done to pig 4
I'm the 5 pig, move construct from2
I'm the 2 pig, destruct!
I'm the 1 pig, move assignment! from5
I'm the 5 pig, destruct!
I'm the 6 pig, move construct from4
I'm the 4 pig, destruct!
I'm the 3 pig, move assignment! from6
I'm the 6 pig, destruct!
I'm the 7 pig, copy construct from1                # why don't move from pig 5?
reduction
something is done to pig 7
I'm the 8 pig, move construct from7
I'm the 7 pig, destruct!
I'm the 1 pig, move assignment! from8
I'm the 8 pig, destruct!
I'm the 3 pig, destruct!
I'm the 9 pig, copy construct from1              # why? just move pig 1
I'm the 1 pig, destruct!
I'm the 9 pig, destruct!
I'm the 0 pig, destruct!
reduce complete!

In the ideal case, we need just copy pig{} once so 2 starter thread can have their own identity pig{}. However, the pig is copied 6 times instead. This may lead to observable performance penalty.

I guess I can always use TLS to address this issue.

LXYan2333 avatar Nov 09 '23 17:11 LXYan2333

To further prove my guess, I tried to use POD struct instead of std container, and manage resource alloc and free manually. The code looks like this:

struct res_struct {
    double* a = nullptr;
    double* b = nullptr;
    double* c = nullptr;
    bool is_initialized = false;
};
auto res = tbb::parallel_reduce(
        tbb::blocked_range{0, ij_end_index * (ij_end_index + 1) / 2, 1024},
        res_struct{},
        [&](const auto& range, res_struct res) {
            if (res.is_initialized == false) {
                res.a = (double*)malloc(sizeof(double) * m_size
                                                   * m_size * batch_size);
                res.b = (double*)malloc(sizeof(double) * m_size
                                                   * m_size * batch_size);
                res.c = (double*)malloc(sizeof(double) * m_size
                                                     * m_size * batch_size);
                memset(res.a,
                       0,
                       sizeof(double) * m_size * m_size * batch_size);
                memset(res.b,
                       0,
                       sizeof(double) * m_size * m_size * batch_size);
                memset(res.c,
                       0,
                       sizeof(double) * m_size * m_size * batch_size);
                res.is_initialized = true;
            }
            // do a lot work about res
            return res;
        },
        [&](res_struct res1, res_struct res2) {
            if (res1.is_initialized == false) {
                return res2;
            }
            if (res2.is_initialized == false) {
                return res1;
            }
            for (int m = 0; m < batch_size; m++) {
                for (int i = 0; i < m_size * m_size; i++) {
                    res1.a[m * m_size * m_size + i] +=
                        res2.a[m * m_size * m_size + i];
                    res1.b[m * m_size * m_size + i] +=
                        res2.b[m * m_size * m_size + i];
                    res1.c[m * m_size * m_size + i] +=
                        res2.c[m * m_size * m_size + i];
                }
            }
            free(res2.j_matrix_tmp);
            free(res2.k_matrix_tmp);
            free(res2.k_matrix_b_tmp);
            return res1;
        });

Now TBB performs better than OpenMP:

图片

Please fix it……

LXYan2333 avatar Nov 10 '23 01:11 LXYan2333

found this in the document:

Use oneapi::tbb::parallel_reduce when the objects are inexpensive to construct. It works even if the reduction operation is not commutative. Use oneapi::tbb::parallel_for and oneapi::tbb::combinable if the reduction operation is commutative and instances of the type are expensive.

However, the combine() function of the combinable class is still sequential according to Pro TBB (and my own test). So what is the approrpiate method to parallel reduce large vectors?

LXYan2333 avatar Nov 13 '23 01:11 LXYan2333

Hi @LXYan2333, what is the performance with oneapi::tbb::parallel_for and oneapi::tbb::combinable? Basically, approach with pointers is a right W/A to avoid excessive copies. Could you elaborate more on your use case? I see you didn't use static_partitioner in your example that might result in more task than actual threads (more copies).

pavelkumbrasev avatar Nov 13 '23 10:11 pavelkumbrasev

Hello, after benchmark several strategies, the parallel_for and tbb::combinable performs best. However, the combine method of tbb::combinable is actually serial.

I use ITTAPI and vtune to track the whole progress:

图片

the yellow task is the parallel parallel_for, and the blue task is the serial combine method.

Now my question is, is there any parallel way to reduce several large vectors/matrices stored on different threads?

  1. tbb::parallel_reduce can parallel reduce them, but the performance is slow due to unnecessary copy vectors (instead of move them)
  2. tbb::parallel_for + tbb::combinable performs better than tbb::parallel_reduce, but the reduction is serial.

LXYan2333 avatar Nov 13 '23 12:11 LXYan2333

BTW, directly use vector as reduction identity is actually an example of the book Pro TBB:

https://github.com/Apress/pro-TBB/blob/54f27a680e540ab66a3ffbe881d056904109feb4/ch05/fig_5_27.cpp#L69

on page 170.

LXYan2333 avatar Nov 13 '23 12:11 LXYan2333

I'm thinking if there is a chance to do reduction on original vector without creating a copy of structure per thread since it introduces a lot of overhead (just working with ranges). You can also try to parallelize reduction with ETS instead of tbb::combinable since ETS can provide a range that might be used for parallezation.

pavelkumbrasev avatar Nov 15 '23 10:11 pavelkumbrasev

maybe this should be natively and efficiently supported by TBB, just like OpenMP?

https://github.com/OpenMP/Examples/blob/075683d57463d9251d483badd944e1a60e15192f/data_environment/sources/reduction.7.c#L22

LXYan2333 avatar Nov 17 '23 02:11 LXYan2333

Hi @LXYan2333, what is the performance with oneapi::tbb::parallel_for and oneapi::tbb::combinable? Basically, approach with pointers is a right W/A to avoid excessive copies. Could you elaborate more on your use case? I see you didn't use static_partitioner in your example that might result in more task than actual threads (more copies).

hello:

I wrote a demo:

#include <benchmark/benchmark.h>
#include <cstddef>
#include <iostream>
#include <tbb/tbb.h>
#include <valarray>

static constexpr size_t m_size = 8000000;

auto testset() {
    std::mt19937 gen(20240228);
    std::uniform_real_distribution<double> dist(0, 20.0);

    std::valarray<double> vec(m_size);

    for (size_t i = 0; i < m_size; ++i) {
        vec[i] = dist(gen);
    }

    return vec;
}

static void tbb_parallel_reduce(benchmark::State& state) {
    auto test_set = testset();
    for (auto _ : state) {
        std::valarray<double> res = tbb::parallel_reduce(
            tbb::blocked_range{0uz, m_size, 4},
            std::valarray<double>(m_size),
            [&](const auto& range, std::valarray<double> res) {
                for (size_t i = range.begin(); i < range.end(); ++i) {
                    res[i] += std::sin(std::pow(std::exp(test_set[i]), 8.0));
                }
                return res;
            },
            [&](std::valarray<double> res1, std::valarray<double> res2) {
                res1 += res2;
                return res1;
            });
        benchmark::DoNotOptimize(res);
    }
}

static void openmp_parallel_reduce(benchmark::State& state) {
    auto test_set = testset();
    for (auto _ : state) {
        std::valarray<double> res(m_size);
#pragma omp parallel
        {
            std::valarray<double> thread_private_res(m_size);
#pragma omp for schedule(dynamic, 4) nowait
            for (size_t i = 0; i < m_size; ++i) {
                thread_private_res[i] +=
                    std::sin(std::pow(std::exp(test_set[i]), 8.0));
            }
#pragma omp critical
            { res += thread_private_res; }
        }
        benchmark::DoNotOptimize(res);
    }
}

BENCHMARK(openmp_parallel_reduce);

BENCHMARK_MAIN();

It create a valarray on each thread and add something to that valarray. Finally, all valarrays are added into the res valarray.

I compile it using clang17 with parameter "-O3 -ffast-math -NDEBUG -march=native" and run this program on my own pc and get the result:

Run on (8 X 4000 MHz CPU s)
CPU Caches:
  L1 Data 48 KiB (x4)
  L1 Instruction 32 KiB (x4)
  L2 Unified 1280 KiB (x4)
  L3 Unified 8192 KiB (x1)
Load Average: 1.73, 1.79, 1.08
***WARNING*** CPU scaling is enabled, the benchmark real time measurements may be noisy and will incur extra overhead.
-----------------------------------------------------------------
Benchmark                       Time             CPU   Iterations
-----------------------------------------------------------------
tbb_parallel_reduce    7081146946 ns   6209464864 ns            1
openmp_parallel_reduce  504056713 ns    503715576 ns            1

tbb is significantly slower than opemmp.

I've checked the result and they all produce the same result.

So the conclution is, do not use anything that is expensive to copy as the identity within tbb::parallel_reduce? (I still feel this is a bug……)

Thank you pavelkumbrasev for you help and advice.

LXYan2333 avatar Feb 28 '24 06:02 LXYan2333

Hi @LXYan2333, Sorry for replying after closing the issue.

Unfortunately, for parallel_reduce algorithm, we have to copy the identity element into each task to ensure each task has it's own value to reduce to without any additional synchonizations. Regarding doing unnecessary copies of the value, we have added better rvalue-friendly API for parallel_reduce (see #1307). It is part of master branch now and would be included (and documented) as part one of the next releases of oneTBB. For your use-case, I would propose to rewrite it in this way:

struct res_struct {
    res_struct() = default;

    std::vector<double> a;
    std::vector<double> b;
    std::vector<double> c;
    
    bool empty() {
        return a.empty() && b.empty() && c.empty();
    }
};

auto res = tbb::parallel_reduce(
        tbb::blocked_range{0, ij_end_index * (ij_end_index + 1) / 2, 1024},
        res_struct{}, // using empty res_struct as inexpensive to copy

        [&](const auto& range, res_struct&& res) {
            if (res.empty()) {
                res.a.resize(m_size * m_size * batch_size, 0);
                // same for b and c
            }
            
            // do a lot of work about res
            // assuming res is rvalue
            return std::move(res);
        },         
        
        [&](res_struct&& res1, res_struct&& res2) {
            // combine res1 and res2
            // as rvalues
            return std::move(res1);
        });

If you are forced to use previous TBB versions, consider using parallel_reduce interface with the complete Body structure (defining operator() and join) instead of lambda-friendly interface. It would be possible to achieve the same goal using it. In case of any further questions, don't hesitate to contact us again.

kboyarinov avatar Feb 28 '24 11:02 kboyarinov