Question: low inference performance with `tbb::flow::graph` — possible data-affinity issue
Hi — I'm seeing lower-than-expected throughput/latency for the inference stage when using tbb::flow::graph, and I would like advice from maintainers/experts.
Background / observed behavior
- I have a video-stream model pipeline where inference time is much larger than preprocessing and postprocessing.
- I run inference with the OpenVINO backend on an Intel CPU — all data, including model instances, live on the CPU.
- I suspect the performance issue is caused by data/memory affinity because model instances (from a pool) are effectively being used across different threads, i.e. model instance index is assigned at preprocess time and then the
inferencenode executes on a worker thread that may be different from the one that handled the model earlier.
Questions
-
For a video-stream model inference scenario like mine, is
tbb::flow::graphan appropriate choice? -
If inference performance is below expectations, what are the likely causes and remedies? Specifically:
- Is this likely a problem in my implementation?
- Is it likely a data-affinity (cache / NUMA / thread-locality) issue due to how model instances are assigned and used across threads?
- Or could there be other causes (TBB scheduling, contention, model internals, memory allocation, etc.)? Any suggestions on how to diagnose and fix this would be appreciated.
My implementation
template <typename ModelHandlerType>
class PipelineNodeTbb
{
public:
using ItemType = typename ModelHandlerType::ItemType;
using ModelType = typename ModelHandlerType::ModelType;
using InputType = std::optional<typename ModelHandlerType::InputType>;
using OutputType = typename ModelHandlerType::OutputType;
using ModelParams = const std::shared_ptr<typename ModelType::Params> &;
using DataPacket = std::tuple<std::shared_ptr<ItemType>, uint64_t>;
using InNode = tbb::flow::multifunction_node<InputType, std::tuple<DataPacket>>;
using ExNode = tbb::flow::function_node<DataPacket, DataPacket>;
using OuNode = tbb::flow::multifunction_node<DataPacket, std::tuple<OutputType>>;
tbb::flow::limiter_node<InputType> &get_input() { return *_limiter; } // Expose node for external connection
OuNode &get_output() { return tbb::flow::output_port<0>(*_postprocess); } // Expose node for external connection
void start() { _stop_requested.store(false, std::memory_order_release); }
void stop()
{
_stop_requested.store(true, std::memory_order_release);
_preprocess->try_put(std::nullopt);
}
explicit PipelineNodeTbb(tbb::flow::graph &g, ModelParams model_cfg,
std::function<void(std::shared_ptr<ItemType>, typename OuNode::output_ports_type &)> fun,
int num_thread = 1, int token_capacity = 64)
: _batch_size(std::max<int>(1, model_cfg->batch_size_)), _unpack(std::move(fun))
{
if (!_unpack)
throw std::runtime_error("Invalid unpack function ...");
num_thread = std::min(std::max<uint32_t>(1, num_thread), std::thread::hardware_concurrency()); // NOLINT
token_capacity = std::max(token_capacity, 1);
// Initialize model pool
_models.reserve(num_thread);
for (int i = 0; i < num_thread; ++i)
{
auto model = std::make_unique<ModelType>(model_cfg);
if (!model || !model->initialize() || !model->is_loaded())
throw std::runtime_error("Failed to create model instance.");
_models.emplace_back(std::move(model));
}
// preprocess
_preprocess = std::make_unique<InNode>(g, tbb::flow::serial, [this](auto &&inp_, auto &&outp_) { // NOLINT
std::shared_ptr<ItemType> item = nullptr;
if (_stop_requested.load(std::memory_order_acquire))
{
if (_batch_collector) // Directly push incomplete batch to avoid dropping frames
item = std::move(_batch_collector);
}
else
{
if (!inp_.has_value())
return;
if (!_batch_collector)
_batch_collector = std::make_shared<ItemType>();
ModelHandlerType::collect(_batch_collector, *inp_);
if (ModelHandlerType::get_batch_count(_batch_collector) == _batch_size)
item = std::move(_batch_collector);
}
if (!item)
return;
_batch_collector.reset();
const uint64_t index = _model_idx.fetch_add(1, std::memory_order_relaxed) % _models.size();
item->success = ModelHandlerType::preprocess(*_models[index], item);
std::get<0>(outp_).try_put(std::make_tuple(std::move(item), index));
});
// inference
_inference = std::make_unique<ExNode>(g, num_thread, [this](auto &&inp_) { // NOLINT
const auto [item, index] = inp_;
if (item && item->success)
item->success = ModelHandlerType::inference(*_models[index], item); // Only the inference step needs to be bound to a model instance
return inp_;
});
// postprocess
_postprocess = std::make_unique<OuNode>(g, tbb::flow::serial, [this](auto &&inp_, auto &&outp_) { // NOLINT
const auto [item, index] = inp_;
if (item && item->success)
item->success = ModelHandlerType::postprocess(*_models[index], item);
_unpack(item, outp_);
_limiter->decrementer().try_put(tbb::flow::continue_msg()); // Notify limiter_node to release one token
});
// make_edge
_limiter = std::make_unique<tbb::flow::limiter_node<InputType>>(g, token_capacity);
tbb::flow::make_edge(*_limiter, *_preprocess);
tbb::flow::make_edge(tbb::flow::output_port<0>(*_preprocess), *_inference);
tbb::flow::make_edge(*_inference, *_postprocess);
}
private:
std::atomic<bool> _stop_requested = false;
int _batch_size = 1;
std::shared_ptr<ItemType> _batch_collector = nullptr;
std::unique_ptr<tbb::flow::limiter_node<InputType>> _limiter = nullptr;
std::unique_ptr<InNode> _preprocess = nullptr;
std::unique_ptr<ExNode> _inference = nullptr;
std::unique_ptr<OuNode> _postprocess = nullptr;
std::atomic<uint64_t> _model_idx = 0; // avoid data races when assigning model index
std::vector<std::unique_ptr<ModelType>> _models; // model pool (length == parallelism)
std::function<void(std::shared_ptr<ItemType>, typename OuNode::output_ports_type &)> _unpack = nullptr;
};
Thanks in advance for any guidance
@xxf1ow
Yes, TBB Flow Graph component can be used for efficient parallelization of a video stream scenarios.
I noticed that you designed your PipelineNodeTbb as a graph node with input and output (likely connected to other Flow Graph nodes). You might consider defining it as a composite node. This approach allows PipelineNodeTbb to be used in any context expecting a flow graph node, such as with tbb::flow::make_edge.
Also, note that your pipeline starts with a limiter_node. Inputs will be rejected if the number of active pipeline tokens exceeds the limit. To handle this, either:
- Make the submitter responsible for buffering and retrying submissions later, or
- Insert a buffering graph node before the limiter. If the limiter rejects input, the buffer stores it and once the slot becomes available, the limiter takes the input from the buffer automatically.
Regarding the performance issue, your suspicion seems correct. The Flow Graph runs in the implicit unrestricted arena, where all available worker threads can operate. When input moves from the preprocess node to the inference node, the task that makes the inference is spawned. This task can be picked up by any available worker in the arena, even from a different NUMA node. Similarly, postprocessing may occur on yet another node. In the worst case, preprocessing happens on one NUMA node, inference on another, and postprocessing back on the first, causing two extra data transfers.
Unfortunately, oneTBB currently has no direct mechanism to optimize this. The only workaround is to constrain the entire Flow Graph to workers from a single NUMA node by executing it inside a constrained arena.
We are actively working on NUMA related improvements for oneTBB components and will consider your use case as additional motivation. If you'd like to contribute, please provide more details about your scenario or issues by submitting an RFC describing the problem and desired behavior without proposing a specific solution.