Unstable: Async NIFs and Tasks
Async NIF Support with Tokio Runtime
Adds support for async Rust functions in NIFs, allowing long-running operations without blocking the BEAM scheduler. Async NIFs spawn tasks onto a Tokio runtime and send results via message passing.
Key Features
1. Async Functions
The #[rustler::nif] macro detects async fn and generates wrapper code that:
- Returns
:okimmediately (non-blocking) - Spawns task onto global Tokio runtime
- Sends result via
enif_sendwhen complete - Requires owned types only (no
Env,Term, or references)
#[rustler::nif]
async fn async_operation(input: String) -> String {
tokio::time::sleep(Duration::from_secs(1)).await;
input.to_uppercase()
}
2. CallerPid for Intermediate Messages
Optional CallerPid first parameter for sending progress updates. Doesn't count toward NIF arity.
#[rustler::nif]
async fn with_progress(caller: CallerPid, items: i64) -> i64 {
for i in 0..items {
let mut env = OwnedEnv::new();
env.send_and_clear(caller.as_pid(), |e| ("progress", i).encode(e));
}
items
}
3. Configurable Runtime
Application developers configure via standard Elixir config:
# config/config.exs
config :my_nif_library, MyNifLibrary,
load_data: [worker_threads: 4, thread_name: "myapp-async"]
NIF authors decode in load callback:
fn load(_env: Env, load_info: Term) -> bool {
#[cfg(feature = "tokio_rt")]
{
if let Ok(config) = load_info.decode::<rustler::tokio::RuntimeConfig>() {
rustler::tokio::configure(config).ok();
}
}
true
}
Implementation Details
Code Generation
- Sync NIFs: Standard wrapper (existing behavior)
- Async NIFs: Detected via
sig.asyncness.is_some()- Decodes arguments before spawning (owned types requirement)
- Injects CallerPid if first parameter (doesn't count toward arity)
- Spawns on
runtime_handle(), sends result viaOwnedEnv
Runtime Management
- Lazy-initialized global runtime (
OnceCell<Arc<Runtime>>) - Falls back to current runtime if already in Tokio context
RuntimeConfig: Decodable struct withworker_threads,thread_name,thread_stack_sizeconfigure(RuntimeConfig): Configure from Elixir termconfigure_runtime(|builder|): Programmatic configuration
CallerPid Type
New wrapper type around LocalPid that the macro detects for special handling.
Design Decisions
- Global runtime: Single lazy-initialized runtime for simpler resource management, falls back to current runtime if in Tokio context
- Message passing: Return
:okimmediately, send result via message (non-blocking, BEAM-idiomatic) - Owned types: Arguments decoded before spawning (Env/Term not Send)
- CallerPid parameter: Optional first parameter, doesn't affect arity
- Application config: Leverage existing
use Rustlerconfig merging (standard Elixir pattern)
Testing
- Basic async operations (computation, owned types, tuples)
- Concurrent execution validation
- CallerPid intermediate messages
- Runtime configuration via Application config
- All 175 tests pass
Dependencies
Under tokio_rt feature: tokio = "1" (rt, rt-multi-thread, sync), once_cell = "1"
Backward Compatibility
Fully backward-compatible, gated behind tokio_rt feature flag.
Usage Example
# config/config.exs
config :my_app, MyApp.NIF,
load_data: [worker_threads: 4, thread_name: "my-app"]
# lib/my_app/nif.ex
defmodule MyApp.NIF do
use Rustler, otp_app: :my_app, crate: :my_nif
def heavy_computation(_), do: :erlang.nif_error(:nif_not_loaded)
end
# Usage
:ok = MyApp.NIF.heavy_computation("data")
receive do
result -> IO.puts(result)
after
5000 -> :timeout
end
#[rustler::nif]
async fn heavy_computation(input: String) -> String {
tokio::time::sleep(Duration::from_secs(2)).await;
input.to_uppercase()
}
fn load(_env: Env, load_info: Term) -> bool {
#[cfg(feature = "tokio_rt")]
if let Ok(config) = load_info.decode::<rustler::tokio::RuntimeConfig>() {
rustler::tokio::configure(config).ok();
}
true
}
rustler::init!("Elixir.MyApp.NIF", load = load);
This behaviour of the async NIFs is not quite what I was after. Users could implement something like this right now with just a few more lines of code. My intention was to make the async NIFs "block" from the BEAM side, in that they only return once the NIF has run fully through, but yield at all .awaits. That would allow users to implement IO-constrained NIFs without having to rely on dirty scheduling.
I still think that what you have built here has merit:
- It should use a new macro, something like
#[rustler::nif_task]to clearly separate it from NIFs that return what the function returns - If we can do without the user implementing
load, we should. Maybe lift thisRustlerConfigto a "primary" feature that is always tried on theload_infoterm? - Not necessarily for the first attempt, but you are using an extremely small part of Tokio directly, to the point that users could simply bring their own spawn function in the initial configuration. We could have our own
AsyncRuntimetrait with feature-activated implementations. - IMO, everything that can be called fully asynchronously has to return a
refthat is included in the result message
My intention was to make the async NIFs "block" from the BEAM side, in that they only return once the NIF has run fully through, but
yieldat all.awaits.
Without knowing all previous discussion, this is what I thought about a bit when reading the description as well. From an ergonomics perspective, I think NIFs should handle the same (if possible!) from within Elixir, as a NIF might be exposed to users through libraries. As a user of such a NIF, I'd probably not want to know that I need to expect a message some time in the future, but just block for the long running work.
@evnu
As a user of such a NIF, I'd probably not want to know that I need to expect a message some time in the future, but just block for the long running work.
The intention is that the library author of the NIF library should have some public API that exposes it as a "sync" function.
So users of the NIF library wouldn't know any difference.
@filmor
Users could implement something like this right now with just a few more lines of code.
Yes, that's a fair point. However, I'm tired of implementing this every time I want this functionality.
My intention was to make the async NIFs "block" from the BEAM side, in that they only return once the NIF has run fully through, but
yieldat all.awaits.
How would you handle a NIF which streams results back to the caller? If the call blocks, the caller can't handle intermediate messages as they arrive.
That would allow users to implement IO-constrained NIFs without having to rely on dirty scheduling.
I need to hear more about what you have in mind here. As I see it, with an async NIF the way it's implemented right now, you don't need dirty scheduling because the work is immediately spawned onto the async runtime.
- If we can do without the user implementing
load, we should. Maybe lift thisRustlerConfigto a "primary" feature that is always tried on theload_infoterm?
I think this is doable. I was thinking about doing this but I held off to reduce scope.
- It should use a new macro, something like
#[rustler::nif_task]to clearly separate it from NIFs that return what the function returns
sure, maybe #[rustler::task]. I need to understand what you mean about the return value. In this case, the return value is whatever the NIF sends to the caller.
- Not necessarily for the first attempt, but you are using an extremely small part of Tokio directly, to the point that users could simply bring their own spawn function in the initial configuration. We could have our own
AsyncRuntimetrait with feature-activated implementations.
Say more? Can you expand on an example of how you'd use what you are thinking here? Are you talking about file system and networking APIs?
IMO, everything that can be called fully asynchronously has to return a ref that is included in the result message
agreed 💯
@filmor @evnu I've updated the functionality based on feedback.
Channel API for Async Tasks
Status: Experimental (requires rustler_unstable cfg flag)
Overview
The Channel API provides type-safe, bidirectional communication between Elixir and Rust async tasks. It replaces the need for manual message handling with a clean, ergonomic interface.
Enabling
Create .cargo/config.toml in your NIF crate directory:
[build]
rustflags = ["--cfg", "rustler_unstable"]
Basic Examples
Example 1: One-Way Communication with Progress Updates
Send progress updates back to Elixir while processing work:
use rustler::runtime::Channel;
#[rustler::task]
async fn process_items(channel: Channel<(), String>, items: Vec<String>) {
for (i, item) in items.iter().enumerate() {
tokio::time::sleep(Duration::from_millis(50)).await;
// Send progress update
channel.send(format!("Processing {}/{}: {}", i + 1, items.len(), item));
}
// Send final result
channel.finish(format!("Completed {} items", items.len()));
}
Elixir usage:
ref = MyNif.process_items(["task1", "task2", "task3"])
# Receive all messages
receive do
{^ref, "Completed " <> _ = final} ->
IO.puts("Done: #{final}")
{^ref, progress} ->
IO.puts(progress)
# Continue receiving...
end
Example 2: Bidirectional Communication with Commands
Build interactive workers that receive commands and send responses:
use rustler::runtime::Channel;
#[derive(rustler::NifTaggedEnum, Clone)]
enum Command {
Add { value: i64 },
Multiply { value: i64 },
GetCurrent,
Shutdown,
}
#[derive(rustler::NifTaggedEnum, Clone)]
enum Response {
Updated { old_value: i64, new_value: i64 },
Current { value: i64 },
ShuttingDown { final_value: i64 },
}
#[rustler::task]
async fn stateful_worker(channel: Channel<Command, Response>) {
let mut current_value = 0i64;
while let Some(cmd) = channel.next().await {
let response = match cmd {
Command::Add { value } => {
let old = current_value;
current_value += value;
Response::Updated { old_value: old, new_value: current_value }
}
Command::Multiply { value } => {
let old = current_value;
current_value *= value;
Response::Updated { old_value: old, new_value: current_value }
}
Command::GetCurrent => {
Response::Current { value: current_value }
}
Command::Shutdown => {
channel.send(Response::ShuttingDown { final_value: current_value });
break;
}
};
channel.send(response);
}
channel.finish(Response::ShuttingDown { final_value: current_value });
}
// Helper NIF for sending commands
#[rustler::nif]
fn worker_send_command(
env: rustler::Env,
sender: rustler::runtime::ChannelSender<Command>,
command: rustler::Term,
) -> rustler::NifResult<rustler::types::Atom> {
rustler::runtime::channel::send(env, sender, command)
}
Elixir usage:
# Start worker
worker = MyNif.stateful_worker()
# Send commands
MyNif.worker_send_command(worker, {:add, %{value: 10}})
receive do
{^worker, {:updated, %{new_value: value}}} ->
IO.puts("New value: #{value}")
end
MyNif.worker_send_command(worker, {:multiply, %{value: 2}})
receive do
{^worker, {:updated, %{new_value: value}}} ->
IO.puts("New value: #{value}")
end
MyNif.worker_send_command(worker, :get_current)
receive do
{^worker, {:current, %{value: value}}} ->
IO.puts("Current: #{value}")
end
MyNif.worker_send_command(worker, :shutdown)
receive do
{^worker, {:shutting_down, %{final_value: value}}} ->
IO.puts("Final value: #{value}")
end
Key Concepts
Channel Types
Channel<(), Response>- One-way: task sends responses to ElixirChannel<Request, Response>- Bidirectional: task receives requests and sends responses
Message Format
All messages are tuples: {channel_sender, payload}
channel_sender- The reference returned by the taskpayload- Your data (type-checked)
Channel Methods
// Receive next request (bidirectional only)
channel.next().await -> Option<Request>
// Send response
channel.send(response)
// Send final response and close
channel.finish(response)
// Get cloneable sender for spawned tasks
channel.responder() -> ResponseSender
Helper for Sending from Elixir
#[rustler::nif]
fn send_to_channel(
env: rustler::Env,
sender: rustler::runtime::ChannelSender<YourRequestType>,
message: rustler::Term,
) -> rustler::NifResult<rustler::types::Atom> {
rustler::runtime::channel::send(env, sender, message)
}
Runtime Configuration
Configure the Tokio runtime in your load function:
fn load(_env: rustler::Env, _load_info: rustler::Term) -> bool {
rustler::runtime::builder(|builder| {
builder
.worker_threads(4)
.thread_name("my-nif-worker")
.thread_stack_size(2 * 1024 * 1024);
}).is_ok()
}
rustler::init!("Elixir.MyNif", load = load);
Examples
See working examples in rustler_tests/native/rustler_test/src/test_async.rs
Limitations
- Channel parameter must be first in function signature
- Tasks with Channel don't need explicit return types
- All parameters must be owned types (no
EnvorTerm) - Requires
rustler_unstablecfg flag
Feedback
This is experimental. Let me know what you think.
@filmor @evnu ok...I got yielding async NIF support with enif_schedule_nif.
Cooperative Yielding NIFs
What is it?
A new way to write long-running NIFs that cooperate with the BEAM scheduler using enif_schedule_nif. They appear synchronous to Elixir while yielding internally.
Usage
use rustler::runtime::yield_now;
#[rustler::nif]
async fn process_large_dataset(items: i64) -> i64 {
let mut sum = 0;
for i in 0..items {
sum += i;
if i % 100 == 0 {
yield_now().await; // Yield to scheduler
}
}
sum
}
# Appears synchronous - blocks until complete
result = MyNif.process_large_dataset(10_000)
Key Differences
#[rustler::nif] async fn |
#[rustler::task] |
|
|---|---|---|
| Syntax | result = nif() |
ref = nif() → receive {^ref, result} |
| Appearance | Synchronous | Asynchronous |
| Return | Direct value | Reference + message |
| Mechanism | enif_schedule_nif |
Tokio spawn + enif_send |
| Use case | CPU-bound work that needs to yield | I/O-bound or background tasks |
How it works
- NIF polls async function
- If pending, reschedules itself via
enif_schedule_nif - BEAM scheduler calls continuation when ready
- Repeats until complete
- Returns result directly (no messages)
I'll try to review this in detail this week.
Good idea to side-step the issues with Terms for now by just preventing compilation if Envs or Terms are present :+1:. A long term improvement could involve implementing https://github.com/rusterlium/rustler/pull/666 and hiding away the Env entirely. The yield could pass all "known" objects through, e.g. with something like let (a, b) = rustler::yield((a, b)).await;.
On reasonably recent versions of OTP (in particular, all that we support), you could also monitor the created process through the resource object.
@scrogson good work! :) Before diving into the code, I have a more general question regarding process life-cycles when using the task API.
When using a task, what is the expected way to handle a crash in the "owner" process on the Elixir side to avoid leaking an async thread? Example:
test "async_with_progress sends intermediate messages using Caller" do
spawn(fn ->
ref = RustlerTest.async_with_progress(300)
exit(:die!)
end)
:timer.sleep(10000)
end
My assumption was that I should be able to use channel.is_closed() to abort when the owner is gone, but it seems that the channel might be kept alive (maybe because the Resource is shared between two distinct threads?).
Follow-up question: When waiting in a bidirectional channel and the owning process dies, how would this need to be handled by the task?
A long term improvement could involve implementing #666 and hiding away the
Enventirely.
@filmor yes, agreed. I think this would be quite nice.
On reasonably recent versions of OTP (in particular, all that we support), you could also monitor the created process through the resource object.
@filmor yes, I was thinking about this as well.
good work! :)
@evnu thanks...although the AI overlords were a big help 😉
When using a task, what is the expected way to handle a crash in the "owner" process on the Elixir side to avoid leaking an async thread?
@evnu great question, I should probably write a test and go from there. But, since the channel is a resource I imagine that from the elixir side if you try to use it by calling a NIF again, you will get some sort of error when using it to send the message to a channel with no receiver.
When waiting in a bidirectional channel and the owning process dies, how would this need to be handled by the task?
Another great question. I'll need to make sure that we have some tests which show exactly how this works. But I assume that since it's a resource, when the elixir side drops the resource and there are no more channel senders...the NIF side will receive a None and do some clean up. We might need to add process monitoring...but maybe it doesn't matter.
Examples
I've got a branch of my franz example working with this branch here: https://github.com/scrogson/franz/tree/async-nifs
Hey, this is cool work!
One thing that needs to be confirmed, I recall talking to some people from the OTP team back in the day asking whether it is safe to hold terms across enif_schedule_nif schedules. Back then at least they did not want to guarantee that invariant.
The consequence is that any terms that need to be held across to a subsequent call to enif_schedule_nif needs to be passed along through argv.
Since terms are held within the future, there is no way to accomplish this with the current API design.
This may very well have changed since then, but if it hasn't then I don't think this API design is sound
Hey, this is cool work!
Thanks @hansihe !
One thing that needs to be confirmed, I recall talking to some people from the OTP team back in the day asking whether it is safe to hold terms across
enif_schedule_nifschedules. Back then at least they did not want to guarantee that invariant.The consequence is that any terms that need to be held across to a subsequent call to
enif_schedule_nifneeds to be passed along throughargv.Since terms are held within the future, there is no way to accomplish this with the current API design.
This may very well have changed since then, but if it hasn't then I don't think this API design is sound
Yeah, we should be good! The design specifically avoids holding terms across schedule boundaries by using resources for state and creating fresh terms for passing through argv.
Yeah, we should be good! The design specifically avoids holding terms across schedule boundaries by using resources for state and creating fresh terms for passing through argv.
How does it deal with terms held inside the future?
How does it deal with terms held inside the future?
https://github.com/rusterlium/rustler/blob/bdcb343d958ca0673d9cdcf313b4249efc0ab215/rustler_codegen/src/nif.rs#L509-L515
The macro explicitly forbids Term and Env parameters in async NIFs. It forces you to use decodable types - so if you write:
#[rustler::nif]
async fn my_nif(data: String, count: i64) -> String {
// ...
}
The codegen decodes String and i64 from the terms before creating the future. The future only captures owned Rust values, not terms.
There might be other types we might need to exclude here (maybe Binary?)
Excellent questions...keep it coming.
Well, one can generate a process-independent environment (OwnedEnv), create terms from there and keep those around across .await points. But that shouldn't be an issue, right?
Great! Yeah if term and env is not exposed then this should not be a problem.
There might be other types we might need to exclude here (maybe
Binary?)
I think Binary should be fine as is, from what I recall it is independently reference counted.
Well, one can generate a process-independent environment (
OwnedEnv), create terms from there and keep those around across.awaitpoints. But that shouldn't be an issue, right?
Yep, OwnedEnv shouldn't be an issue.
I'll find some time ASAP to review this properly
maybe a hot take, but i think anything involving external async runtimes should be left out, since there isn't actually any integration with e.g. tokio being provided (most notably with wakers...) i think it gives a false sense of what is actually happening to someone writing code with rustler. (in fact i'd say building efficient code that interfaces with async rust probably should not use rescheduled nifs at all, but use messages with e.g. enif_send instead...)