futures-rs icon indicating copy to clipboard operation
futures-rs copied to clipboard

add mapped_futures

Open StoicDeveloper opened this issue 2 years ago • 13 comments

EDIT: This implementation of this PR has been completely reworked, though the interface is the same. In short, the new version makes one very minor change to the existing FuturesUnordered implementation. All of the mapping features are instead introduced by wrapping FuturesUnordered in a struct that also includes a HashSet< HashTask<K, HashFut<K, Fut>>>, where HashTask { inner: *const Task, key: K } and HashFut exists to know the associated key of a completed future, so that the corresponding task can be removed from the set. See comment below for details.

Adds the functionality asked for in this issue: https://github.com/rust-lang/futures-rs/issues/2329. I wanted this feature, searched for it, found only people wishing that the feature existed, and so decided to write it myself. A previous attempt that wrapped FuturesUnordered didn't work; I wanted to avoid anything unsafe which I haven't used before. Having worked with it now though, I can see that unsafe rust is really just C with some guardrails.

The motivation is to have a HashMap that can run futures. Adds the MappedFutures struct, which supports inserting, get_muting, cancelling, removing, and replacing futures by key, and running them in an identical style as FuturesUnordered.

Internally it uses a HashSet and adds a hashable wrapper around the Task struct from FuturesUnordered. In terms of ergonomics, the API is perhaps a bit confused, including both the insert(key, fut) -> bool and replace(key, fut) -> Option<Fut> methods which are more similar to the methods of HashSet than HashMap. The source of this confusion is that some methods can only work if the Future is Unpin; ie. anything that would return an owned future or future reference.

Some justifications:

  • I set the output for the stream as (key, fut_output) to be similar to a LinkedHashMap, which in some ways this library is similar to (but with next().await in place of pop_front(). I think it will be typical for a consumer to want to know the key of whichever future has completed, though of course any future could be mapped as such prior to insertion.
  • an earlier version used HashMap internally, which was the obvious solution for a struct which is "A hashmap that runs futures", but returning the key with the output required having a copy of the key in both the HashMap and in the Task, and so the Key had to be Clone. I added a wrapper HashTask for Arc<Task> which is placed in a HashSet to get around this requirement. Its possible that a different solution would have worked (like pointers to and from a Key wrapper and Task), but this method seemed the most expedient.

Potential improvements:

  • Allow selecting which hasher to use, like how HashSet does. This should be a pretty easy change.

I did already publish these changes in a crate which I will yank if this gets merged. https://crates.io/crates/mapped_futures

StoicDeveloper avatar Jun 11 '23 13:06 StoicDeveloper

I'm also considering further additions for more complex data structure support, such as:

  • MultiMapFutures which allows a key to index for one or more futures
  • BiMapFutures which allows futures to be accessed from either of two key types (really the BiMap is between Key-Value pairs, neither of which is a future, and each pair is associated with a future) so that each key, value, or key-value pair will index to 0 or 1 futures.
  • BiMultiMapFutures which is like BiMapFutures, but allows duplicates among keys and values, so that each key or value may index to 1 or more futures, though each key-value pair will index to only 0 or 1 futures.

StoicDeveloper avatar Jun 12 '23 21:06 StoicDeveloper

Thanks for the PR! It looks like a lot of the code added here is a copy from FutureUnordered. Is it possible to replace some of them with re-exports or wrappers for the types and functions used in FutureUnordered?

taiki-e avatar Jun 18 '23 19:06 taiki-e

Thanks for the PR! It looks like a lot of the code added here is a copy from FutureUnordered. Is it possible to replace some of them with re-exports or wrappers for the types and functions used in FutureUnordered?

This was my first approach, but it didn't work out. This PR required several small internal changes to FuturesUnordered, such that I don't think it would be possible to implement this as a simple wrapper. However, it may be possible to do so by making changes within the actual FuturesUnordered module that would make it easier to reuse. I'll look into that.

StoicDeveloper avatar Jun 18 '23 22:06 StoicDeveloper

However, it may be possible to do so by making changes within the actual FuturesUnordered module that would make it easier to reuse. I'll look into that.

Yeah, I guess making mapped_futures a submodule of the futures_unordered module or creating a new module for the code used in both can remove a lot of duplication.

taiki-e avatar Jul 19 '23 15:07 taiki-e

@taiki-e

I've implemented a new module futures_keyed which contains most of the code that was in futures_unordered. futures_unordered and mapped_futures now consume the API of futures_keyed. There is more work to do, and there are some things about this implementation that are not ideal

Work remaining:

  • implement more mapping types, like mapped_streams, bi_multi_map_futures, and bi_multi_map_streams, which I have implemented in the mapped_futures crate, but I wanted to check whether what I have so far is acceptable to this crate's maintainers before going further
  • other housekeeping, remove commented code, add more comments and documentation, implement Debug on some structs to silence compiler warnings

Things that aren't ideal

  • The API for futures_keyed adds the ReleasesTask trait, and consumers that contain KeyedFutures must provide an implementor of this trait. I personally find this to be very unergonomic, but it was the only way I could think of to make this change. The API for futures_unordered is of course unchanged, but internally it now creates a DummyStruct that implements ReleasesTask using logic that does nothing, since the existing FuturesKeyed::release_task() contains all necessary logic. This trait was necessary so that a consumer can remove residual elements from its internal data structure if release_task is called. Ex. MappedFutures uses a HashSet of Tasks, but calling release_task on the Task will not remove the corresponding struct from the HashSet; now release_task() will handle that by calling code from the trait.
  • I couldn't think of a better name than futures_keyed; perhaps futures_unordered_internal. It might be better to not say "keyed" since there is the potential to use the new key field of Task for things other than keys.
    • For example, a MultiMapFutures struct, in which a single key maps to 0 or more futures (such that a getter function could have the signature MultiMapFutures<K,F>::get(&self, key: K) -> &Vec<Entry<Task<K,F>>>), but each future does not have a unique key, could work if Task contains a pointer to the Entry instead of a key. Such a struct could be implemented as a consumer of FuturesKeyed without further modification, but the naming scheme wouldn't make sense.

Please let me know of any problems with this implementation; if you find none then I will proceed with implementing more consumers of futures_keyed

StoicDeveloper avatar Sep 30 '23 18:09 StoicDeveloper

I've implemented a new module futures_keyed which contains most of the code that was in futures_unordered.

Thanks.

  • implement more mapping types, like mapped_streams, bi_multi_map_futures, and bi_multi_map_streams, which I have implemented in the mapped_futures crate, but I wanted to check whether what I have so far is acceptable to this crate's maintainers before going further

For now, I would like to add a minimum (mapped_futures and mapped_streams; other ones are implemented on top of them, right?).

I couldn't think of a better name than futures_keyed; perhaps futures_unordered_internal. It might be better to not say "keyed" since there is the potential to use the new key field of Task for things other than keys.

Yeah, futures_unordered_internal would be better.

taiki-e avatar Oct 08 '23 13:10 taiki-e

@taiki-e I've addressed your comments and some merge conflicts resulting from work over the past few months. Let me know if there's anything else; I should be able to address comments more promptly in the near future.

StoicDeveloper avatar Jun 08 '24 12:06 StoicDeveloper

Also, the crate I published evidently has unsound code that causes this bug in tokio: https://github.com/StoicDeveloper/mapped_futures/issues/4 That issue will obviously have to be resolved here before this PR can proceed.

EDIT: This issue has been resolved.

StoicDeveloper avatar Jul 18 '24 18:07 StoicDeveloper

I found unsoundness by running this PR's tests through miri. The unsoundness comes from the potential for a dropped task to still have a pointer in the ReadyToRunQueue, which will get dereferenced during polling. Solutions may include:

  • ~~add to Task Option<PrevReady> and Option<NextReady> fields that can be used to remove an item from the queue if it gets cancelled~~
  • ~~drop the future on cancellation, but don't drop the task. Instead check during ReadyToRunQueue dequeue() whether the future was dropped. If so, dequeue again and drop the current task.~~

~~I'm leaning towards the latter as the simpler option, and its runtime cost should be negligible, but I'm open to suggestions.~~

The Miri output can be found in the MappedFutures crate's issue: https://github.com/StoicDeveloper/mapped_futures/issues/5

EDIT: The real cause was that I wasn't calling release_task() during future cancellation and removal. With that issue fixed, Miri no longer finds UB in this PR's tests.

StoicDeveloper avatar Jul 18 '24 22:07 StoicDeveloper