ReactivePlusPlus
ReactivePlusPlus copied to clipboard
Create pool based schedulers: IO and Computation
+1, right now I have to write something cumbersome like:
https://github.com/RobinQu/instinct.cpp/blob/sprint/v0.1.1/modules/instinct-retrieval/include/retrieval/MultiVectorRetriever.hpp
void Ingest(const AsyncIterator<Document>& input) override {
static int BUFFER_SIZE = 10;
static ThreadPool WORKER_POOL;
Futures<void> tasks;
input
| rpp::operators::as_blocking()
| rpp::operators::subscribe([&](const Document& parent_doc) {
auto f = WORKER_POOL.submit_task([&,parent_doc]() {
Document copied_doc = parent_doc;
doc_store_->AddDocument(copied_doc);
auto sub_docs = std::invoke(guidance_, copied_doc);
LOG_DEBUG("{} guidance doc(s) generated for parent doc with id {}", sub_docs.size(), copied_doc.id());
UpdateResult update_result;
vector_store_->AddDocuments(sub_docs, update_result);
assert_true(update_result.failed_documents_size()==0, "all sub docs should be inserted successfully");
});
tasks.push_back(std::move(f));
});
tasks.wait();
}
};
PS: ThreadPool
and Futures
are aliases to BS::thread_pool
and [BS::multi_future<T>
in bshoshany/thread-pool
@RobinQu , added local-based thread_pool + global computational scheduler. Is it what you've exptected?
@RobinQu , added local-based thread_pool + global computational scheduler. Is it what you've exptected?
I think it's a yes.
Thanks for you quick response. it's good enough for common IO-bound tasks.
@RobinQu , added local-based thread_pool + global computational scheduler. Is it what you've exptected?
I think it's a yes.
Thanks for you quick response. it's good enough for common IO-bound tasks.
You are welcome.
BTW: I've seen your another point in email notification: to have some specific background thread to schedule background job you can use thread_pool(1) as global/static variable and schedule all necessary things to it :)
BTW: I've seen your another point in email notification: to have some specific background thread to schedule background job you can use thread_pool(1) as global/static variable and schedule all necessary things to it :)
Yes indeed.