streamz icon indicating copy to clipboard operation
streamz copied to clipboard

Real tail of stream (last n elements)

Open JulianWgs opened this issue 3 years ago • 3 comments

To my knowledge there is currently no way to show (or plot) the last n elements of a Streamz DataFrame.

I think this is a very useful function for debugging, but also for slower Streamz, for example CFD results from OpenFOAM.

Here is a naive implementation:

def tail(length):
    def tail_func(x, y):
        x = x.append(y, ignore_index=True)
        return x.iloc[-length:].reset_index(drop=True)
    return tail_func

Here is an example: (thanks for the feedback @martindurant)

import pandas as pd
from streamz import Stream
import numpy as np

names = ["Mike", "Tim", "Anna", "Kim", "Andy"]

def emitter():
    n = np.random.randint(1, 5)
    return {"name": np.random.choice(names, n), "age": np.random.randint(18, 32, n)}

stream = Stream.from_periodic(emitter, 1)
df1 = pd.DataFrame({"name": ["test"], "age": [40]})
out = stream.map(pd.DataFrame).accumulate(tail(6), start=pd.DataFrame()).to_dataframe(df1)
# out.stream.sink(print)  # optional, or some other output

stream.start()

# if using a Jupyter Lab
out

This could either be the bare representation of window or could replace the tail function. I think the name tail suits this function better than the current one. What do you think?

Implementation wise one could be much more efficient by using .loc and round robin replacing values and saving the order somewhere else, but for a first go, I would stick by the naive version.

JulianWgs avatar Apr 20 '21 17:04 JulianWgs

You may well be right, and I encourage others to offer an opinion. First, may I nit-pick and suggest this version as a simpler example

import pandas as pd
from streamz import Stream
import numpy as np

names = ["Mike", "Tim", "Anna", "Kim", "Andy"]

def emitter():
    n = np.random.randint(1, 5)
    return {"name": np.random.choice(names, n), "age": np.random.randint(18, 32, n)}

stream = Stream.from_periodic(emitter, 1)
df1 = pd.DataFrame({"name": ["test"], "age": [40]})
out = stream.map(pd.DataFrame).accumulate(tail(6), start=pd.DataFrame()).to_dataframe(df1)
out.stream.sink(print)  # optional, or some other output

stream.start()

martindurant avatar Apr 20 '21 19:04 martindurant

Hey @martindurant ,

it seams that there are not many other opinions on this issue.

How should we proceed? For a pull request two things must be discussed before hand:

  1. Round robin, naive or other implementation
  2. How would the API look like?

JulianWgs avatar Jan 15 '22 15:01 JulianWgs

  1. I am fine with naive - we can always come up with something better in the future
  2. your API looks OK? You mean you would like to have a DataFrame.tail method that does the same thing? That would be OK with me.

To my knowledge there is currently no way to show (or plot) the last n elements of a Streamz DataFrame.

I should have noted, that hvplot on a dataframe has a backlog= argument (see here) which can be displayed in a table of plot, and might have been what you wanted all along.

martindurant avatar Jan 18 '22 17:01 martindurant