ReactivePlusPlus icon indicating copy to clipboard operation
ReactivePlusPlus copied to clipboard

Create pool based schedulers: IO and Computation

Open victimsnino opened this issue 2 years ago • 5 comments

victimsnino avatar Dec 01 '22 20:12 victimsnino

+1, right now I have to write something cumbersome like:

https://github.com/RobinQu/instinct.cpp/blob/sprint/v0.1.1/modules/instinct-retrieval/include/retrieval/MultiVectorRetriever.hpp

void Ingest(const AsyncIterator<Document>& input) override {
            static int BUFFER_SIZE = 10;
            static ThreadPool WORKER_POOL;

            Futures<void> tasks;
            input
            | rpp::operators::as_blocking()
            | rpp::operators::subscribe([&](const Document& parent_doc) {
                auto f = WORKER_POOL.submit_task([&,parent_doc]() {
                    Document copied_doc = parent_doc;
                    doc_store_->AddDocument(copied_doc);
                    auto sub_docs = std::invoke(guidance_, copied_doc);
                        LOG_DEBUG("{} guidance doc(s) generated for parent doc with id {}", sub_docs.size(), copied_doc.id());
                    UpdateResult update_result;
                    vector_store_->AddDocuments(sub_docs, update_result);
                    assert_true(update_result.failed_documents_size()==0, "all sub docs should be inserted successfully");
                });
                tasks.push_back(std::move(f));
            });

            tasks.wait();
        }
    };

PS: ThreadPool and Futures are aliases to BS::thread_pool and [BS::multi_future<T> in bshoshany/thread-pool

RobinQu avatar Apr 18 '24 01:04 RobinQu

@RobinQu , added local-based thread_pool + global computational scheduler. Is it what you've exptected?

victimsnino avatar Apr 21 '24 20:04 victimsnino

@RobinQu , added local-based thread_pool + global computational scheduler. Is it what you've exptected?

I think it's a yes.

Thanks for you quick response. it's good enough for common IO-bound tasks.

RobinQu avatar Apr 22 '24 15:04 RobinQu

@RobinQu , added local-based thread_pool + global computational scheduler. Is it what you've exptected?

I think it's a yes.

Thanks for you quick response. it's good enough for common IO-bound tasks.

You are welcome.

BTW: I've seen your another point in email notification: to have some specific background thread to schedule background job you can use thread_pool(1) as global/static variable and schedule all necessary things to it :)

victimsnino avatar Apr 22 '24 17:04 victimsnino

BTW: I've seen your another point in email notification: to have some specific background thread to schedule background job you can use thread_pool(1) as global/static variable and schedule all necessary things to it :)

Yes indeed.

RobinQu avatar Apr 23 '24 00:04 RobinQu