atomic-server
atomic-server copied to clipboard
Async all resources
Introduces the use of crossbeam_channel to add concurrent / parallel resource processing in db.all_resources.
Contains commits from #396 #394, credits to @AlexMikhalev. Rebased onto main, conflicts resolved. Seems to work.
PR Checklist:
- [x] Link to related issue
- [x] Add changelog entry linking to issue
- [ ] Added tests (if needed)
- [ ] (If new feature) added in description / readme
For some reason tauri won't build.
https://github.com/atomicdata-dev/atomic-data-rust/runs/6723553931?check_suite_focus=true#step:7:487
I guess I should try older versions
@AlexMikhalev If I try to run atomic-server --rebuild-index (which uses all_resources), I get issues:
2022-06-05T08:35:39.570694Z ERROR atomic_lib::db: Error sending resource in pool in all_resources: sending on a disconnected channel
However, the test seems to work just fine.
I'm assuming the issue is that atomic-server uses a tokio async runtime, and the two don't mix.
Maybe we need to use tokio in atomic-lib, too. I think this means we can get rid of these dependencies:
crossbeam-channel = "0.5.4"
threadpool = "1.3"
num_cpus = "1.4"
What are your thoughts on this?
I think we can safely remove threadpool and num_cpu, I was under impression I have done it. There is a method part of standard library. Crossbeam is another thing, currently I use special case - channels with zero buffer, it seems consumer dies before all tasks complete. How can I replicate crash?
Thanks @AlexMikhalev. Could you perhaps try to checkout this branch locally and try getting rid of these dependencies? I'm not entirely sure how!
Ok. I managed to replicate it. I think the issue is that actix spawns new processes then inside spawned future - not an actual process we spawn a number of threads per CPU. Before fixing, the question does the rebuild index need to be async future? Cause it's not actually doing anything until await is called - which I can't find in the code, I mean:
actix_web::rt::spawn(async move {
from here https://github.com/atomicdata-dev/atomic-data-rust/blob/f36e2b066bd0b55001de18a9f0971505dbeb15f9/server/src/serve.rs#L10
should have await? somewhere?
Oh dear. I just read tokio docs - there is a lot of magic in those few lines:
cfg_rt! {
/// An owned permission to join on a task (await its termination).
///
/// This can be thought of as the equivalent of [`std::thread::JoinHandle`] for
/// a task rather than a thread.
///
/// A `JoinHandle` *detaches* the associated task when it is dropped, which
/// means that there is no longer any handle to the task, and no way to `join`
/// on it.
///
/// This `struct` is created by the [`task::spawn`] and [`task::spawn_blocking`]
/// functions.
///
/// # Examples
///
/// Creation from [`task::spawn`]:
///
/// ```
/// use tokio::task;
///
/// # async fn doc() {
/// let join_handle: task::JoinHandle<_> = task::spawn(async {
/// // some work here
/// });
/// # }
/// ```
///
/// Creation from [`task::spawn_blocking`]:
///
/// ```
/// use tokio::task;
///
/// # async fn doc() {
/// let join_handle: task::JoinHandle<_> = task::spawn_blocking(|| {
/// // some blocking work here
/// });
/// # }
/// ```
///
/// The generic parameter `T` in `JoinHandle<T>` is the return type of the spawned task.
/// If the return value is an i32, the join handle has type `JoinHandle<i32>`:
///
/// ```
/// use tokio::task;
///
/// # async fn doc() {
/// let join_handle: task::JoinHandle<i32> = task::spawn(async {
/// 5 + 3
/// });
/// # }
///
/// ```
///
/// If the task does not have a return value, the join handle has type `JoinHandle<()>`:
///
/// ```
/// use tokio::task;
///
/// # async fn doc() {
/// let join_handle: task::JoinHandle<()> = task::spawn(async {
/// println!("I return nothing.");
/// });
/// # }
/// ```
///
/// Note that `handle.await` doesn't give you the return type directly. It is wrapped in a
/// `Result` because panics in the spawned task are caught by Tokio. The `?` operator has
/// to be double chained to extract the returned value:
///
/// ```
/// use tokio::task;
/// use std::io;
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
/// let join_handle: task::JoinHandle<Result<i32, io::Error>> = tokio::spawn(async {
/// Ok(5 + 3)
/// });
///
/// let result = join_handle.await??;
/// assert_eq!(result, 8);
/// Ok(())
/// }
/// ```
///
/// If the task panics, the error is a [`JoinError`] that contains the panic:
///
/// ```
/// use tokio::task;
/// use std::io;
/// use std::panic;
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
/// let join_handle: task::JoinHandle<Result<i32, io::Error>> = tokio::spawn(async {
/// panic!("boom");
/// });
///
/// let err = join_handle.await.unwrap_err();
/// assert!(err.is_panic());
/// Ok(())
/// }
///
/// ```
/// Child being detached and outliving its parent:
///
/// ```no_run
/// use tokio::task;
/// use tokio::time;
/// use std::time::Duration;
///
/// # #[tokio::main] async fn main() {
/// let original_task = task::spawn(async {
/// let _detached_task = task::spawn(async {
/// // Here we sleep to make sure that the first task returns before.
/// time::sleep(Duration::from_millis(10)).await;
/// // This will be called, even though the JoinHandle is dropped.
/// println!("♫ Still alive ♫");
/// });
/// });
///
/// original_task.await.expect("The task being joined has panicked");
/// println!("Original task is joined.");
///
/// // We make sure that the new task has time to run, before the main
/// // task returns.
///
/// time::sleep(Duration::from_millis(1000)).await;
/// # }
/// ```
///
/// [`task::spawn`]: crate::task::spawn()
/// [`task::spawn_blocking`]: crate::task::spawn_blocking
/// [`std::thread::JoinHandle`]: std::thread::JoinHandle
/// [`JoinError`]: crate::task::JoinError
pub struct JoinHandle<T> {
raw: Option<RawTask>,
id: Id,
_p: PhantomData<T>,
}
}
I don't think we can marry both - I think we have to choose tokio or channels, I read tokio is less performant on those tasks, hence I chose channels. Another option would be to define Sync on atomic data type and use rayon par_iter. Obviously! :) Tokio have it's own channels https://docs.rs/tokio/1.19.2/tokio/sync/mpsc/fn.channel.html I will have a go at trying to use them tomorrow.
Currently even without changes tests are failing:
failures:
---- db::test::query_include_external stdout ----
0
0
thread 'db::test::query_include_external' panicked at 'Amount of results should be higher for include_external', lib/src/db/test.rs:416:5
---- db::test::test_db_resources_all stdout ----
thread 'db::test::test_db_resources_all' panicked at 'Amount of results should be higher for include_external', lib/src/db/test.rs:428:5
---- db::test::index_invalidate_cache stdout ----
cache_invalidation test for https://atomicdata.dev/properties/filename
thread 'db::test::index_invalidate_cache' panicked at 'assertion failed: `(left == right)`
left: `0`,
right: `10`: Not the right amount of members in this collection', lib/src/db/test.rs:501:5
failures:
db::test::index_invalidate_cache
db::test::query_include_external
db::test::test_db_resources_all
test result: FAILED. 59 passed; 3 failed; 2 ignored; 0 measured; 0 filtered out; finished in 4.32s
error: test failed, to rerun pass '-p atomic_lib --lib'
Is it intentional?
@AlexMikhalev The failing tests are not intentional! I may have messed up while rebasing, or maybe the issue is with the new prefix logic?
https://github.com/atomicdata-dev/atomic-data-rust/pull/423/files#diff-e2b27b838cec3a19690473ec7f15767684e120c268132189d9183f24e442b208R516
That's strange - prefix worked when I committed it, "test_db_resources_all()" was covering that very purpose and now it's failing. I can't figure out why yet - seems the boolean parameter was just ignored. basic example returns 105 & 105 for both.
For some reason, the crossbeam try_iter makes the resources vector in all_resources not long enough. It often has only something like 8 items, instead of hundreds.
If we try iter instead, the performance becomes abysmal