Using partition "breaks" program logic
I am struggling to use partition in a pipeline because it "breaks" the logic of my program; presumably because it introduces asynchronous processing.
As a simplified example, I have something that works along the lines of this:
import streamz
def main():
state = {
"cnt": 0,
}
stream = streamz.Stream()
cntd = stream.accumulate(cnt,
returns_state=True,
start=state)
cntd.sink(print)
with open("many_lines.txt", "r") as fh:
for line in fh:
stream.emit(line)
print(f"found {state.get('cnt')} lines")
def cnt(state, itm):
state["cnt"] += 1
return state, itm
if __name__ == "__main__":
main()
This basically runs through all the lines in the file many_lines.txt, counts and prints them and then reports
found 10000 lines
So far so good.
When I introduce partition now, like this:
import streamz
def main():
state = {
"cnt": 0,
}
stream = streamz.Stream()
parted = stream.partition(10001, timeout=2) # <= PARTITION HERE
cntd = parted.accumulate(cnt,
returns_state=True,
start=state)
cntd.sink(print)
with open("many_lines.txt", "r") as fh:
for line in fh:
stream.emit(line)
print(f"found {state.get('cnt')} lines")
def cnt(state, itm):
state["cnt"] += 1
return state, itm
if __name__ == "__main__":
main()
I would want to see basically the same result. But I see nothing for some time and then
found 0 lines
I know, there are only 10'000 lines in many_lines.txt so the partition will never fill up, but it should hit the timeout at some point and "release" the data, no?
I suspect that the program terminates before the partition hits the timeout, so I tried (many variations of) awaiting stream.emit(line). That was inspired by the async def process_file(fn): function in Processing Time and Back Pressure.
For example like this:
import streamz
def main():
state = {
"cnt": 0,
}
stream = streamz.Stream()
parted = stream.partition(10001, timeout=2)
cntd = parted.accumulate(cnt,
returns_state=True,
start=state)
cntd.sink(print)
with open("many_lines.txt", "r") as fh:
for line in fh:
await stream.emit(line) # <= USE AWAIT HERE
print(f"found {state.get('cnt')} lines")
def cnt(state, itm):
state["cnt"] += 1
return state, itm
if __name__ == "__main__":
main()
But this (obviously) does not work (SyntaxError: 'await' outside async function). And I also did not find a way to make it work.
(How) Can I make sure the for loop terminates before the print statement (or any remaining code, for that matter) is executed? Or am I getting this completely wrong?
My use case is to read (all) lines in pretty big files (I cannot load into memory at once), send them through a streamz pipeline and then continue with my program. "Then" meaning, after all lines are processed (also those that might be "stuck" in a partition when no more lines are emitted because we reached EOF; this is why I need the timeout, I believe).
Found a closely related SO post, unfortunately not providing a clear answer:
"Is there a nice way to otherwise check if a Stream still contains elements being processed": not that I am aware of.
Is there none or is the person just "not aware of it"?
I made some progress now. The following seems to achieve what I was hoping to achieve. I am however not 100% sure if this only works in this pathological toy-example. I am mainly unsure if the call to .wait() blocks the current (and only?) thread so no work would be done by streamz while wait()ing in a real application?
import streamz
import threading
from functools import partial
def main():
# Event used to signal when processing is done
signal_done = threading.Event()
state = {
"cnt": 0,
}
done_cb = partial(_singal_done_cb, evnt=signal_done)
# crate a RefCounter keeping track of number of items in stream
ref_c = streamz.RefCounter(cb=done_cb)
source = streamz.Stream()
parted = source.partition(10001, timeout=2) # <= PARTITION HERE
cntd = parted.accumulate(cnt,
returns_state=True,
start=state)
cntd.sink(dev_null)
with open("many_lines.txt", "r") as fh:
signal_done.clear()
for line in fh:
source.emit(line, metadata=[{"ref": ref_c}])
signal_done.wait(timeout=5)
print(f"found {state.get('cnt')} lines")
def cnt(state, itm):
state["cnt"] += 1
return state, itm
def dev_null(itm):
return None
def _singal_done_cb(evnt=None):
evnt.set()
if __name__ == "__main__":
main()
This now returns
found 1 lines
after about 2 seconds, which is expected since partition only emits one partition.