streamz icon indicating copy to clipboard operation
streamz copied to clipboard

Using partition "breaks" program logic

Open scherand opened this issue 4 years ago • 2 comments

I am struggling to use partition in a pipeline because it "breaks" the logic of my program; presumably because it introduces asynchronous processing.

As a simplified example, I have something that works along the lines of this:

import streamz


def main():
    state = {
        "cnt": 0,
    }
    stream = streamz.Stream()
    cntd = stream.accumulate(cnt,
                             returns_state=True,
                             start=state)
    cntd.sink(print)

    with open("many_lines.txt", "r") as fh:
        for line in fh:
            stream.emit(line)
    print(f"found {state.get('cnt')} lines")


def cnt(state, itm):
    state["cnt"] += 1
    return state, itm


if __name__ == "__main__":
    main()

This basically runs through all the lines in the file many_lines.txt, counts and prints them and then reports

found 10000 lines

So far so good.

When I introduce partition now, like this:

import streamz


def main():
    state = {
        "cnt": 0,
    }
    stream = streamz.Stream()
    parted = stream.partition(10001, timeout=2)  # <= PARTITION HERE
    cntd = parted.accumulate(cnt,
                             returns_state=True,
                             start=state)
    cntd.sink(print)

    with open("many_lines.txt", "r") as fh:
        for line in fh:
            stream.emit(line)
    print(f"found {state.get('cnt')} lines")


def cnt(state, itm):
    state["cnt"] += 1
    return state, itm


if __name__ == "__main__":
    main()

I would want to see basically the same result. But I see nothing for some time and then

found 0 lines

I know, there are only 10'000 lines in many_lines.txt so the partition will never fill up, but it should hit the timeout at some point and "release" the data, no?

I suspect that the program terminates before the partition hits the timeout, so I tried (many variations of) awaiting stream.emit(line). That was inspired by the async def process_file(fn): function in Processing Time and Back Pressure.

For example like this:

import streamz


def main():
    state = {
        "cnt": 0,
    }
    stream = streamz.Stream()
    parted = stream.partition(10001, timeout=2)
    cntd = parted.accumulate(cnt,
                             returns_state=True,
                             start=state)
    cntd.sink(print)

    with open("many_lines.txt", "r") as fh:
        for line in fh:
            await stream.emit(line)  # <= USE AWAIT HERE
    print(f"found {state.get('cnt')} lines")


def cnt(state, itm):
    state["cnt"] += 1
    return state, itm


if __name__ == "__main__":
    main()

But this (obviously) does not work (SyntaxError: 'await' outside async function). And I also did not find a way to make it work.

(How) Can I make sure the for loop terminates before the print statement (or any remaining code, for that matter) is executed? Or am I getting this completely wrong?

My use case is to read (all) lines in pretty big files (I cannot load into memory at once), send them through a streamz pipeline and then continue with my program. "Then" meaning, after all lines are processed (also those that might be "stuck" in a partition when no more lines are emitted because we reached EOF; this is why I need the timeout, I believe).

scherand avatar Jul 14 '21 22:07 scherand

Found a closely related SO post, unfortunately not providing a clear answer:

"Is there a nice way to otherwise check if a Stream still contains elements being processed": not that I am aware of.

Is there none or is the person just "not aware of it"?

scherand avatar Sep 29 '21 11:09 scherand

I made some progress now. The following seems to achieve what I was hoping to achieve. I am however not 100% sure if this only works in this pathological toy-example. I am mainly unsure if the call to .wait() blocks the current (and only?) thread so no work would be done by streamz while wait()ing in a real application?

import streamz
import threading

from functools import partial


def main():
    # Event used to signal when processing is done
    signal_done = threading.Event()

    state = {
        "cnt": 0,
    }

    done_cb = partial(_singal_done_cb, evnt=signal_done)
    # crate a RefCounter keeping track of number of items in stream
    ref_c = streamz.RefCounter(cb=done_cb)

    source = streamz.Stream()
    parted = source.partition(10001, timeout=2)  # <= PARTITION HERE
    cntd = parted.accumulate(cnt,
                             returns_state=True,
                             start=state)
    cntd.sink(dev_null)

    with open("many_lines.txt", "r") as fh:
        signal_done.clear()
        for line in fh:
            source.emit(line, metadata=[{"ref": ref_c}])

    signal_done.wait(timeout=5)
    print(f"found {state.get('cnt')} lines")


def cnt(state, itm):
    state["cnt"] += 1
    return state, itm


def dev_null(itm):
    return None


def _singal_done_cb(evnt=None):
    evnt.set()


if __name__ == "__main__":
    main()

This now returns

found 1 lines

after about 2 seconds, which is expected since partition only emits one partition.

scherand avatar Sep 29 '21 12:09 scherand