flint
flint copied to clipboard
Extending Window capabilities
I would like to extend the window capabilities and like to discuss how to best implement these. Considering existing functionality we can do:
val result = priceTSRdd.addWindows(Window.pastAbsoluteTime("1000ns"))
// time price window_past_1000ns
// ------------------------------------------------------
// 1000L 1.0 [[1000L, 1.0]]
// 1500L 2.0 [[1000L, 1.0], [1500L, 2.0]]
// 2000L 3.0 [[1000L, 1.0], [1500L, 2.0], [2000L, 3.0]]
// 2500L 4.0 [[1500L, 2.0], [2000L, 3.0], [2500L, 4.0]]
-
Window at predefined time stamps only This creates a window at each row backward. For very "dense" time series with samples at nano scale we might not do the window at each observation but run some statistics or other discovery method to find those points at which we want to create a window.
-
Windows of varying length In the trading world we can imagine windows of varying time length, e.g. determined by a "volume clock"
-
Windows of fixed number of observations. I saw a count window but not clear to me how to use it.
-
Two segment windows. A first segment (section) of a window could be used to calculate some online statistics which are then consumed by a summary function which is applied over the second part of the window (adaptive summary stats, e.g. consider thresholds based on an online volatility estimator).
How would these more general features best implemented? Any good advise how to add these extensions? Happy to contribute as well.
Hi - I believe some of these are good additions but also non-trivial to implement:
-
There are something called window join in KDB which is a window operations with two tables and for each row in the left table, associate it with a window from rows in the right table. But currently not available in Flint... Another option is maybe to use http://spark.apache.org/docs/2.3.0/api/python/pyspark.sql.html#pyspark.sql.functions.window
-
Can you explain what is a "volume clock"?
-
https://github.com/twosigma/flint/issues/40
-
Can you explain this a bit more? What is done in the first segment v.s. the second?
I would also like to use a "volume clock". Basically, the idea is that instead of a global/constant shift, there's a second column which determines the size of the shift. This makes it possible to have essentially per-row time shifts. Is this possible to do in flint?
I believe this is possible. With the Python API, you just write a udf to reset the time, i.e.
@udf('timestamp')
def set_new_time(time, other_column):
new_time = ...
return new_time
df = df.withColumn('time', set_new_time(df['time'], df['other_column']))
Although, after this, you need to resort the dataframe because time is not guaranteed to be in order:
df = flintContext.read.dataframe(df, is_sorted=False)
I see what you mean by resetting the times; however, what I want to calculate isn't quite like that. I want to implement a ShiftTimeWindow
that isn't restricted to look at only the time column--I want it to also look at one or more additional columns when it invokes shift()
.
What do you think about extending that API?
Also @icexelloss, have you thought about how to make a KDB-style window join? I'd be happy to help implement it.
I see. I think it's a useful feature but it seems quite tricky to extend the existing API to work with that. It will probably be something like Window[K1, K2] where K1 is time and K2 is some other column and we need to do sth with the OrderedRDD[K1, V] interface to incorporate K2. Do you have some ideas?
Have you seen similar API to the extended window API that you described?
I think the way to do the join would be to solely drive it from a summarizer, since you can always just collect_list if you don’t want a summary join. I imagine a high-level api like a.leftWindowSummary(b, window, sunmarizer, key). You’d basically just run a window summarizer on b, but with the time stamps from a. Then, you could follow up with a left join on the summarized df.
I think the part I’m not sure about if how to drive a window summarizer with a different orderedrdd’s timestamps.