RxPY icon indicating copy to clipboard operation
RxPY copied to clipboard

Convert async iterator to Obervable

Open samfrances opened this issue 5 years ago • 5 comments

Does RxPY provide a facility for converting async iterators into Observables?

For example, if I have:

async foo():
    for x in range(5):
        result = await get_data_from_somewhere(x)
        yield result

Is there a straightforward way of converting this into an Observable? (Obviously, I would only expect this to work when running using the AsyncIOScheduler).

If this isn't available, can I suggest that this would be a welcome feature?

samfrances avatar Jun 07 '19 12:06 samfrances

yes that would be a nice addition because it is very common when working with asyncio. @dbrattli @jcafhe do we already have some operators specific to a scheduler ? I mean such an operator only makes sense when using an async/await compatbile scheduler, so it should be somehow tied to an async/await compatible scheduler (it may be the AsyncIOScheduler but potentially other ones).

MainRo avatar Jul 23 '19 22:07 MainRo

@MainRo I don't think there's currently an operator with a static/forced scheduler, but some creation operators don't use subscription scheduler anyway so it should be ok to have something like from_coroutine with AsyncIOScheduler regardless of which scheduler is declared in subscribe.

However I feel that @dbrattli @erikkemperman may have a different opinion since this "breaks" the scheduler/subscribe contract in some ways.

If we want to use schedulers other than AsyncIOScheduler, we need to detect the capability of a scheduler, possibly by creating a specialized interface (as we did with periodic schedulers). IMHO, it feels a bit complicated and it would be easier to stick with AsyncIOScheduler only.

jcafhe avatar Jul 25 '19 08:07 jcafhe

What I had in mind is that this operator should use the provided scheduler (to ensure that the loop is explicitly provided), but would fail if this operator is not an AsyncioScheduler (or inherited from it). Ideally it should accept any scheduler compatible with asyncio, but I am not sure there are valid use cases. So it would be an operator limited to the asyncio environment, but not breaking the scheduler/subscribe API: It would use the provided scheduler, but In case of unexpected scheduler type, it would fail.

MainRo avatar Jul 30 '19 18:07 MainRo

I was about to ask about this :-). Would it be possible to include the contents of this blog post to the documentation?

https://blog.oakbits.com/rxpy-and-asyncio.html

It explains very well and very concisely how to create an Observable from an asynchronous iterable. It would be a great addition to the docs imo. Maybe the blog post author will agree to having it added verbatim. Otherwise, just a link would be helpful.

mnieber avatar Dec 17 '20 08:12 mnieber

Thanks for the comment, this is actually my blog :) . Rather than adding this article verbatim, we should add the from_aiter and to_agen operators in rxpy (with some fixes). This was my initial plan, but it is still on the todo list.

MainRo avatar Dec 20 '20 22:12 MainRo