Have close/flush return a Future for callers to wait on
Currently, calling flush triggers an asynchronous flush but doesn't
provide the caller with a mechanism to wait for all queued events to
be flushed.
This PR changes the Client::flush and Client::close interfaces to be
async and fixes a few concurrency issues I ran into while making this
change:
- Replaces the non-async
crossbeam_channelwithasync_channel. Thecrossbeam_channelcrate blocks the current thread and isn't safe to use within an async executor. The timeout behavior is preserved usingfuture::timeout. - Avoids creating a new runtime within each
process_worktask. Instead, tasks are spawned within theTransmission's runtime. - Removes the
Mutexaround the runtime, which causes problems sharing the guards across threads and doesn't appear to be needed. - When processing the stop event, flushes queued events before breaking
out of the
process_workloop (#65).
Resolves #66 and fixes #65.
Sorry, this turned into a more involved PR than I expected!
Will take a look as soon as I can.
@ramosbugs apologies for such a long delay. I promise I'll look at this as soon as I'm not swamped. I appreciate the contribution!
no worries, thanks!
Although I didn't have the intention to expose async outside of the client, I kind of like the approach here.
Would you be so kind as to run cargo readme >! README.md and push it here? That would save me an update commit just to sync the README.md with what's now in lib.rs.
The main thing I'd like to change before I merge (if you're up for it!) is to make async-std and tokio features that can be toggled. Your change here adds async-std on top of tokio which isn't great. See https://github.com/nlopes/libhoney-rust/pull/61 for what I mean.
If you're not up for it (and I totally understand if you aren't), I'll bring it myself in another PR with this one as base. Just let me know what you'd prefer and I'm happy to oblige.
I really appreciate your contribution.
Although I didn't have the intention to expose async outside of the client, I kind of like the approach here. Would you be so kind as to run
cargo readme >! README.mdand push it here? That would save me an update commit just to sync theREADME.mdwith what's now inlib.rs.
done!
The main thing I'd like to change before I merge (if you're up for it!) is to make
async-stdandtokiofeatures that can be toggled. Your change here addsasync-stdon top oftokiowhich isn't great. See #61 for what I mean.
It looks like there are a few complexities around tokio vs. async_std here:
- Using
tokio::time::timeoutinstead ofasync_std::future::timeoutrequires a tokio runtime to avoid athere is no timer running, must be called from the context of Tokio runtimepanic. By contrast,async_std::future::timeoutseems to work for any runtime. tokio::sync::Mutexlooks like a potential alternative toasync_std::sync::Mutex; I'm not sure if it also requires a tokio runtime- calling
Transmission::stopcauses the tokio runtime created byTransmission::new_runtimeto be dropped. if the caller invokes this from inside another tokio runtime, tokio panics withCannot drop a runtime in a context where blocking is not allowed. This happens when a runtime is dropped from within an asynchronous context.. this was my original motivation for usingasync_std::testin the tests andasync_std::mainin the example. however, it's very easy for users of this crate to accidentally callstopinside of a tokio async context, so a more robust solution is probably needed. maybe we should just pass in aspawnclosure instead of creating the runtime inside this crate at all?
also, is the objection to adding async_std due to the additional dependency cost, or is it specifically about mixing the tokio and async_std runtimes? it feels like runtime-agnostic primitives like async_std::future::timeout and async_std::sync::Mutex are preferable to runtime-specific ones like tokio::time::timeout. using a spawn closure like I suggested above could potentially remove the tokio dependency altogether
it feels like runtime-agnostic primitives like async_std::future::timeout and async_std::sync::Mutex are preferable
You hit exactly what my message was intended to deliver and which I failed to deliver 😆 That's basically what I'd want: we either use runtime-agnostic primitives or we put different runtimes behind features.
I think given the issues around dropping runtimes at various points in the code, the runtime-agnostic approach would be preferable. I'll try to incorporate that change :-)
looks like the async_executors crate provides some nice adapters for different runtimes to be able to leverage the futures::task::Spawn trait. the commit I just pushed takes in an Arc<dyn Spawn> and avoids creating any runtimes directly. I also updated the unit tests to be run easily on all supported runtimes (currently just Tokio).
as reflected in #61, my change isn't sufficient to support runtimes other than Tokio because of Reqwest's dependency on Tokio, but it fixes the task management piece. switching to surf (in a follow-up change) would finish adding support for other runtimes.
there's also a caveat with my latest change: it relies on async_std's unstable feature flag in order to make use of an async condition variable (Condvar). I couldn't find any other suitable async-friendly primitives that wouldn't block the executor thread. note that this has nothing to do with "unstable" Rust and doesn't depend on nightly or anything. I think it just means that Condvar may not respect SemVer, so I pinned to the latest async-std 1.9.0 version. that means clients won't be able to integrate other versions easily, but that restriction can be removed once async-rs/async-std#217 is completed.
This is looking great. I'm going to do a deeper review this weekend and get this out there most likely.
I can't thank you enough for the contributions and patience on my late reviews.