oneTBB icon indicating copy to clipboard operation
oneTBB copied to clipboard

Question: low inference performance with `tbb::flow::graph` — possible data-affinity issue

Open xxf1ow opened this issue 1 month ago • 1 comments

Hi — I'm seeing lower-than-expected throughput/latency for the inference stage when using tbb::flow::graph, and I would like advice from maintainers/experts.

Background / observed behavior

  • I have a video-stream model pipeline where inference time is much larger than preprocessing and postprocessing.
  • I run inference with the OpenVINO backend on an Intel CPU — all data, including model instances, live on the CPU.
  • I suspect the performance issue is caused by data/memory affinity because model instances (from a pool) are effectively being used across different threads, i.e. model instance index is assigned at preprocess time and then the inference node executes on a worker thread that may be different from the one that handled the model earlier.

Questions

  1. For a video-stream model inference scenario like mine, is tbb::flow::graph an appropriate choice?

  2. If inference performance is below expectations, what are the likely causes and remedies? Specifically:

    • Is this likely a problem in my implementation?
    • Is it likely a data-affinity (cache / NUMA / thread-locality) issue due to how model instances are assigned and used across threads?
    • Or could there be other causes (TBB scheduling, contention, model internals, memory allocation, etc.)? Any suggestions on how to diagnose and fix this would be appreciated.

My implementation

template <typename ModelHandlerType>
class PipelineNodeTbb
{
public:
    using ItemType = typename ModelHandlerType::ItemType;
    using ModelType = typename ModelHandlerType::ModelType;
    using InputType = std::optional<typename ModelHandlerType::InputType>;
    using OutputType = typename ModelHandlerType::OutputType;
    using ModelParams = const std::shared_ptr<typename ModelType::Params> &;
    using DataPacket = std::tuple<std::shared_ptr<ItemType>, uint64_t>;
    using InNode = tbb::flow::multifunction_node<InputType, std::tuple<DataPacket>>;
    using ExNode = tbb::flow::function_node<DataPacket, DataPacket>;
    using OuNode = tbb::flow::multifunction_node<DataPacket, std::tuple<OutputType>>;

    tbb::flow::limiter_node<InputType> &get_input() { return *_limiter; } // Expose node for external connection

    OuNode &get_output() { return tbb::flow::output_port<0>(*_postprocess); } // Expose node for external connection

    void start() { _stop_requested.store(false, std::memory_order_release); }

    void stop()
    {
        _stop_requested.store(true, std::memory_order_release);
        _preprocess->try_put(std::nullopt);
    }


    explicit PipelineNodeTbb(tbb::flow::graph &g, ModelParams model_cfg,
                             std::function<void(std::shared_ptr<ItemType>, typename OuNode::output_ports_type &)> fun,
                             int num_thread = 1, int token_capacity = 64)
        : _batch_size(std::max<int>(1, model_cfg->batch_size_)), _unpack(std::move(fun))
    {
        if (!_unpack)
            throw std::runtime_error("Invalid unpack function ...");
        num_thread = std::min(std::max<uint32_t>(1, num_thread), std::thread::hardware_concurrency()); // NOLINT
        token_capacity = std::max(token_capacity, 1);
        // Initialize model pool
        _models.reserve(num_thread);
        for (int i = 0; i < num_thread; ++i)
        {
            auto model = std::make_unique<ModelType>(model_cfg);
            if (!model || !model->initialize() || !model->is_loaded())
                throw std::runtime_error("Failed to create model instance.");
            _models.emplace_back(std::move(model));
        }
        // preprocess
        _preprocess = std::make_unique<InNode>(g, tbb::flow::serial, [this](auto &&inp_, auto &&outp_) { // NOLINT
            std::shared_ptr<ItemType> item = nullptr;
            if (_stop_requested.load(std::memory_order_acquire))
            {
                if (_batch_collector) // Directly push incomplete batch to avoid dropping frames
                    item = std::move(_batch_collector);
            }
            else
            {
                if (!inp_.has_value())
                    return;
                if (!_batch_collector)
                    _batch_collector = std::make_shared<ItemType>();
                ModelHandlerType::collect(_batch_collector, *inp_);
                if (ModelHandlerType::get_batch_count(_batch_collector) == _batch_size)
                    item = std::move(_batch_collector);
            }
            if (!item)
                return;
            _batch_collector.reset();
            const uint64_t index = _model_idx.fetch_add(1, std::memory_order_relaxed) % _models.size();
            item->success = ModelHandlerType::preprocess(*_models[index], item);
            std::get<0>(outp_).try_put(std::make_tuple(std::move(item), index));
        });
        // inference
        _inference = std::make_unique<ExNode>(g, num_thread, [this](auto &&inp_) { // NOLINT
            const auto [item, index] = inp_;
            if (item && item->success)
                item->success = ModelHandlerType::inference(*_models[index], item); // Only the inference step needs to be bound to a model instance
            return inp_;
        });
        // postprocess
        _postprocess = std::make_unique<OuNode>(g, tbb::flow::serial, [this](auto &&inp_, auto &&outp_) { // NOLINT
            const auto [item, index] = inp_;
            if (item && item->success)
                item->success = ModelHandlerType::postprocess(*_models[index], item);
            _unpack(item, outp_);
            _limiter->decrementer().try_put(tbb::flow::continue_msg()); // Notify limiter_node to release one token
        });
        // make_edge
        _limiter = std::make_unique<tbb::flow::limiter_node<InputType>>(g, token_capacity);
        tbb::flow::make_edge(*_limiter, *_preprocess);
        tbb::flow::make_edge(tbb::flow::output_port<0>(*_preprocess), *_inference);
        tbb::flow::make_edge(*_inference, *_postprocess);
    }

private:
    std::atomic<bool> _stop_requested = false;
    int _batch_size = 1;
    std::shared_ptr<ItemType> _batch_collector = nullptr;
    std::unique_ptr<tbb::flow::limiter_node<InputType>> _limiter = nullptr;
    std::unique_ptr<InNode> _preprocess = nullptr;
    std::unique_ptr<ExNode> _inference = nullptr;
    std::unique_ptr<OuNode> _postprocess = nullptr;
    std::atomic<uint64_t> _model_idx = 0;            // avoid data races when assigning model index
    std::vector<std::unique_ptr<ModelType>> _models; // model pool (length == parallelism)
    std::function<void(std::shared_ptr<ItemType>, typename OuNode::output_ports_type &)> _unpack = nullptr;
};

Thanks in advance for any guidance

xxf1ow avatar Nov 17 '25 15:11 xxf1ow

@xxf1ow

Yes, TBB Flow Graph component can be used for efficient parallelization of a video stream scenarios.

I noticed that you designed your PipelineNodeTbb as a graph node with input and output (likely connected to other Flow Graph nodes). You might consider defining it as a composite node. This approach allows PipelineNodeTbb to be used in any context expecting a flow graph node, such as with tbb::flow::make_edge.

Also, note that your pipeline starts with a limiter_node. Inputs will be rejected if the number of active pipeline tokens exceeds the limit. To handle this, either:

  • Make the submitter responsible for buffering and retrying submissions later, or
  • Insert a buffering graph node before the limiter. If the limiter rejects input, the buffer stores it and once the slot becomes available, the limiter takes the input from the buffer automatically.

Regarding the performance issue, your suspicion seems correct. The Flow Graph runs in the implicit unrestricted arena, where all available worker threads can operate. When input moves from the preprocess node to the inference node, the task that makes the inference is spawned. This task can be picked up by any available worker in the arena, even from a different NUMA node. Similarly, postprocessing may occur on yet another node. In the worst case, preprocessing happens on one NUMA node, inference on another, and postprocessing back on the first, causing two extra data transfers.

Unfortunately, oneTBB currently has no direct mechanism to optimize this. The only workaround is to constrain the entire Flow Graph to workers from a single NUMA node by executing it inside a constrained arena.

We are actively working on NUMA related improvements for oneTBB components and will consider your use case as additional motivation. If you'd like to contribute, please provide more details about your scenario or issues by submitting an RFC describing the problem and desired behavior without proposing a specific solution.

kboyarinov avatar Dec 02 '25 13:12 kboyarinov