streamz icon indicating copy to clipboard operation
streamz copied to clipboard

Simple example of stream with Asyncio operations

Open massyah opened this issue 4 years ago • 2 comments

Hi,

I've been following the docs and reading through the tests, and I cannot get streamz working with Asyncio :/

Here's a very minimal example of a stream comprising of two async operation and one sync :

  1. we retrieve content via an aiohttp call
  2. return content length
  3. simulate DB write with an Asyncio sleep
  4. sink to stdout
import asyncio

import aiohttp
from streamz import Stream


async def fetch(url):
    print("fetching url {}", url)
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            print(resp.status)
            body = await resp.text()
    print("Finished w/ url {}", url)
    return body


def count(x):
    print("I", x)
    return len(x)


async def write(x):
    await asyncio.sleep(0.2)
    print("O", x)
    return x

async def f():
    print("Starting stream")
    source = Stream(asynchronous=True)
    source.map(fetch).map(count).map(write).sink(print)
    urls = [
        'https://httpstatus.io/?i=1',
        'https://httpstatus.io/?i=2',
        'https://httpstatus.io/?i=3',
        'https://httpstatus.io/?i=4',
        'https://httpstatus.io/?i=5',
        'https://httpstatus.io/?i=6',
    ]
    for u in urls:
        await source.emit(u)


if __name__ == '__main__':
    asyncio.run(f())

I've tried a lot of combinations using tornado event loop etc. but didn't manage to get anything working. Is this supposed to be possible or is the Asyncio support still behind? Am I missing something obvious?

Thanks for the help

massyah avatar Sep 11 '21 09:09 massyah

Streamz uses asyncio/coroutines as a way to manage backpressure, i.e., that the emitting process must wait for there to be space in the pipeline to add stuff in. Your model is the opposite, waiting on some async process as a way to put data into the pipeline. We could very well have a source that does what you want, something like

def from_coroutines(Source):
    def __init__(self, coroutines):
        self.coos = coroutines

    async def _run(self):
        for coro in as_completed(self.coos):
             res = await coro
             await self._emit(res)

martindurant avatar Sep 11 '21 18:09 martindurant

@massyah , did you have a chance to do something with my suggestion? It would make a nice example for the docs, I think - although you don't really need streamz for this particular linear workflow.

martindurant avatar Oct 04 '21 17:10 martindurant