atomic-server icon indicating copy to clipboard operation
atomic-server copied to clipboard

Async all resources

Open joepio opened this issue 3 years ago • 10 comments

Introduces the use of crossbeam_channel to add concurrent / parallel resource processing in db.all_resources.

Contains commits from #396 #394, credits to @AlexMikhalev. Rebased onto main, conflicts resolved. Seems to work.

PR Checklist:

  • [x] Link to related issue
  • [x] Add changelog entry linking to issue
  • [ ] Added tests (if needed)
  • [ ] (If new feature) added in description / readme

joepio avatar Jun 03 '22 07:06 joepio

For some reason tauri won't build.

https://github.com/atomicdata-dev/atomic-data-rust/runs/6723553931?check_suite_focus=true#step:7:487

I guess I should try older versions

joepio avatar Jun 03 '22 10:06 joepio

@AlexMikhalev If I try to run atomic-server --rebuild-index (which uses all_resources), I get issues:

2022-06-05T08:35:39.570694Z ERROR atomic_lib::db: Error sending resource in pool in all_resources: sending on a disconnected channel

However, the test seems to work just fine.

I'm assuming the issue is that atomic-server uses a tokio async runtime, and the two don't mix.

Maybe we need to use tokio in atomic-lib, too. I think this means we can get rid of these dependencies:

crossbeam-channel = "0.5.4"
threadpool = "1.3"
num_cpus = "1.4"

What are your thoughts on this?

joepio avatar Jun 05 '22 08:06 joepio

I think we can safely remove threadpool and num_cpu, I was under impression I have done it. There is a method part of standard library. Crossbeam is another thing, currently I use special case - channels with zero buffer, it seems consumer dies before all tasks complete. How can I replicate crash?

AlexMikhalev avatar Jun 05 '22 22:06 AlexMikhalev

Thanks @AlexMikhalev. Could you perhaps try to checkout this branch locally and try getting rid of these dependencies? I'm not entirely sure how!

joepio avatar Jun 06 '22 13:06 joepio

Ok. I managed to replicate it. I think the issue is that actix spawns new processes then inside spawned future - not an actual process we spawn a number of threads per CPU. Before fixing, the question does the rebuild index need to be async future? Cause it's not actually doing anything until await is called - which I can't find in the code, I mean:

actix_web::rt::spawn(async move {

from here https://github.com/atomicdata-dev/atomic-data-rust/blob/f36e2b066bd0b55001de18a9f0971505dbeb15f9/server/src/serve.rs#L10

should have await? somewhere?

AlexMikhalev avatar Jun 07 '22 21:06 AlexMikhalev

Oh dear. I just read tokio docs - there is a lot of magic in those few lines:

cfg_rt! {
    /// An owned permission to join on a task (await its termination).
    ///
    /// This can be thought of as the equivalent of [`std::thread::JoinHandle`] for
    /// a task rather than a thread.
    ///
    /// A `JoinHandle` *detaches* the associated task when it is dropped, which
    /// means that there is no longer any handle to the task, and no way to `join`
    /// on it.
    ///
    /// This `struct` is created by the [`task::spawn`] and [`task::spawn_blocking`]
    /// functions.
    ///
    /// # Examples
    ///
    /// Creation from [`task::spawn`]:
    ///
    /// ```
    /// use tokio::task;
    ///
    /// # async fn doc() {
    /// let join_handle: task::JoinHandle<_> = task::spawn(async {
    ///     // some work here
    /// });
    /// # }
    /// ```
    ///
    /// Creation from [`task::spawn_blocking`]:
    ///
    /// ```
    /// use tokio::task;
    ///
    /// # async fn doc() {
    /// let join_handle: task::JoinHandle<_> = task::spawn_blocking(|| {
    ///     // some blocking work here
    /// });
    /// # }
    /// ```
    ///
    /// The generic parameter `T` in `JoinHandle<T>` is the return type of the spawned task.
    /// If the return value is an i32, the join handle has type `JoinHandle<i32>`:
    ///
    /// ```
    /// use tokio::task;
    ///
    /// # async fn doc() {
    /// let join_handle: task::JoinHandle<i32> = task::spawn(async {
    ///     5 + 3
    /// });
    /// # }
    ///
    /// ```
    ///
    /// If the task does not have a return value, the join handle has type `JoinHandle<()>`:
    ///
    /// ```
    /// use tokio::task;
    ///
    /// # async fn doc() {
    /// let join_handle: task::JoinHandle<()> = task::spawn(async {
    ///     println!("I return nothing.");
    /// });
    /// # }
    /// ```
    ///
    /// Note that `handle.await` doesn't give you the return type directly. It is wrapped in a
    /// `Result` because panics in the spawned task are caught by Tokio. The `?` operator has
    /// to be double chained to extract the returned value:
    ///
    /// ```
    /// use tokio::task;
    /// use std::io;
    ///
    /// #[tokio::main]
    /// async fn main() -> io::Result<()> {
    ///     let join_handle: task::JoinHandle<Result<i32, io::Error>> = tokio::spawn(async {
    ///         Ok(5 + 3)
    ///     });
    ///
    ///     let result = join_handle.await??;
    ///     assert_eq!(result, 8);
    ///     Ok(())
    /// }
    /// ```
    ///
    /// If the task panics, the error is a [`JoinError`] that contains the panic:
    ///
    /// ```
    /// use tokio::task;
    /// use std::io;
    /// use std::panic;
    ///
    /// #[tokio::main]
    /// async fn main() -> io::Result<()> {
    ///     let join_handle: task::JoinHandle<Result<i32, io::Error>> = tokio::spawn(async {
    ///         panic!("boom");
    ///     });
    ///
    ///     let err = join_handle.await.unwrap_err();
    ///     assert!(err.is_panic());
    ///     Ok(())
    /// }
    ///
    /// ```
    /// Child being detached and outliving its parent:
    ///
    /// ```no_run
    /// use tokio::task;
    /// use tokio::time;
    /// use std::time::Duration;
    ///
    /// # #[tokio::main] async fn main() {
    /// let original_task = task::spawn(async {
    ///     let _detached_task = task::spawn(async {
    ///         // Here we sleep to make sure that the first task returns before.
    ///         time::sleep(Duration::from_millis(10)).await;
    ///         // This will be called, even though the JoinHandle is dropped.
    ///         println!("♫ Still alive ♫");
    ///     });
    /// });
    ///
    /// original_task.await.expect("The task being joined has panicked");
    /// println!("Original task is joined.");
    ///
    /// // We make sure that the new task has time to run, before the main
    /// // task returns.
    ///
    /// time::sleep(Duration::from_millis(1000)).await;
    /// # }
    /// ```
    ///
    /// [`task::spawn`]: crate::task::spawn()
    /// [`task::spawn_blocking`]: crate::task::spawn_blocking
    /// [`std::thread::JoinHandle`]: std::thread::JoinHandle
    /// [`JoinError`]: crate::task::JoinError
    pub struct JoinHandle<T> {
        raw: Option<RawTask>,
        id: Id,
        _p: PhantomData<T>,
    }
}

I don't think we can marry both - I think we have to choose tokio or channels, I read tokio is less performant on those tasks, hence I chose channels. Another option would be to define Sync on atomic data type and use rayon par_iter. Obviously! :) Tokio have it's own channels https://docs.rs/tokio/1.19.2/tokio/sync/mpsc/fn.channel.html I will have a go at trying to use them tomorrow.

AlexMikhalev avatar Jun 07 '22 22:06 AlexMikhalev

Currently even without changes tests are failing:


failures:

---- db::test::query_include_external stdout ----
0
0
thread 'db::test::query_include_external' panicked at 'Amount of results should be higher for include_external', lib/src/db/test.rs:416:5

---- db::test::test_db_resources_all stdout ----
thread 'db::test::test_db_resources_all' panicked at 'Amount of results should be higher for include_external', lib/src/db/test.rs:428:5

---- db::test::index_invalidate_cache stdout ----
cache_invalidation test for https://atomicdata.dev/properties/filename
thread 'db::test::index_invalidate_cache' panicked at 'assertion failed: `(left == right)`
  left: `0`,
 right: `10`: Not the right amount of members in this collection', lib/src/db/test.rs:501:5


failures:
    db::test::index_invalidate_cache
    db::test::query_include_external
    db::test::test_db_resources_all

test result: FAILED. 59 passed; 3 failed; 2 ignored; 0 measured; 0 filtered out; finished in 4.32s

error: test failed, to rerun pass '-p atomic_lib --lib'

Is it intentional?

AlexMikhalev avatar Jun 07 '22 23:06 AlexMikhalev

@AlexMikhalev The failing tests are not intentional! I may have messed up while rebasing, or maybe the issue is with the new prefix logic?

https://github.com/atomicdata-dev/atomic-data-rust/pull/423/files#diff-e2b27b838cec3a19690473ec7f15767684e120c268132189d9183f24e442b208R516

joepio avatar Jun 08 '22 08:06 joepio

That's strange - prefix worked when I committed it, "test_db_resources_all()" was covering that very purpose and now it's failing. I can't figure out why yet - seems the boolean parameter was just ignored. basic example returns 105 & 105 for both.

AlexMikhalev avatar Jun 08 '22 08:06 AlexMikhalev

For some reason, the crossbeam try_iter makes the resources vector in all_resources not long enough. It often has only something like 8 items, instead of hundreds.

If we try iter instead, the performance becomes abysmal

joepio avatar Jun 09 '22 07:06 joepio