streamz icon indicating copy to clipboard operation
streamz copied to clipboard

Combining the streamz.Stream.filenames() and streamz.Stream.from_textfile() using dask scatter?

Open ayanb07 opened this issue 10 months ago • 2 comments

I want to use the streamz.Stream.filenames() API to feed the filenames parallelly to streamz.Stream.from_textfile() and do computation on the lines of the textfile till they keep coming. But the API is made in such a way that we can use either one but not both. Below is the example of the existing code written partially for a single file. Each signal I get is of 1000 values. I get 4K signals per second. per channel. This one file is for one channel.

source = streamz.Stream.from_textfile(r"~\Channel_C.txt")

def split(line):
    return float(line.strip().split()[1])

pulse = source.map(split).partition(1000).map(list)
pulse.map(invert_signal).map(find_true_cfd).map(write_true_cfd_to_file)
source.start()

If the two API could be combined. Then I would like to do the following. I would distribute the files I am getting per channel. 200 such files will be there. Distribute them to dask worker threads.

from dask.distributed import LocalCluster
cluster = LocalCluster()
client = cluster.get_client()

source = streamz.Stream.filenames("*.txt").scatter().from_textfile().map(split).partition(1000).map(list).map(invert_signal).map(find_true_cfd).map(write_true_cfd_to_file)
source.start()

ayanb07 avatar Feb 10 '25 19:02 ayanb07

I came up with the following example:

import streamz


dir = "<temp dir>"

def reset_textfile(source, fname):
    source.file = open(fname, mode='r')
    source.buffer = ''


text_source = streamz.from_textfile(f"{dir}/firstfile", asynchronous=True)
files_source = streamz.filenames(dir, asynchronous=True)
files_source.sink(lambda x: reset_textfile(text_source, x))
files_source.sink(print)
text_source.sink(print)

files_source.start()
text_source.start()

Now, when you create a new file in the temporary directory, it will become the "current file" and new lines appearing in it will become events. If you make another new file, the state of the previously open file is forgotten. I think this is the pattern you were after.

martindurant avatar Feb 24 '25 17:02 martindurant

(incuding the print call is, of course, just to show that something is happening. I should also have mentioned, this assumes that the first file already exists at startup, but you could make a dummy empty file).

martindurant avatar Feb 24 '25 18:02 martindurant