flux
flux copied to clipboard
Proposal: user-defined stateful operators
One thing that Flux is missing right now is user-defined stateful operators. At the moment, we provide users with many stateful operators that hide the complexity of updating an internal state (count
, max
, derivative
, ...). It would be cool to provide the user with the tools to build custom ones at Flux language level. This would relieve us from the burden of implementing in Go every operator that requires state, and also give huge flexibility to Flux
users. This is the reason why stateful, user-defined functions is a feature that every modern stream processor provides to users.
As an example, this is a user-defined counter:
from()
|> filter(...)
|> group(...)
|> map(fn: {
state: {c: 0},
call: (s, r) => {
s.c = s.c + 1
return {v: r._value, c: s.c}
},
})
A stateful function is an object that has state
and call
property. call
function gets called every time a new record must be processed. The state is passed to it as first parameter. Every transformation that accepts a function should also be able to accept a stateful function.
Note that state
is per-group, multiple replicas of the map
transformation must run in parallel without ever touching the same piece of state; this is key to scaling out computation.
@affo This sounds powerful, how does it differ from the reduce
function that we have?
@nathanielc reduce
gives you the ability to aggregate and forces your output to be the accumulator.
Stateful UDFs would give you the tools to add state to any higher order function and use it as you wish. This means that you can filter
, map
, flatMap
with state. The output is not forced to contain the internal state.
For instance:
- suppose you want to filter out records when some internal state (some function of input records) reaches some threshold;
- or you want to buffer records for some reason and output them if some condition is met (
flatMap
would allow to output 0, 1, or many records for each input record); - suppose that you want to align two streams: the first record of the first with the first of the second and so on, you can do it only with dedicated buffers;
- the state could be a connection to a dbms that can load some data to enrich incoming records from a stream (a common use case, we can talk more about this and model it differently in Flux, but this could be a first step towards stream enrichment).
In general this could be used in many many different ways to achieve different results.
In the future we could also allow to expose internal updates to the state in a dedicated output stream, so that one can inspect the internal state of operators in real time.
I like the idea, I am currently ending up here thus my need is that I simply want to get all iteration-steps /values in between of the reduce()-function and not only the total/aggregated sum from now.
@wetterfrosch yes, you need a map
with state 👍
@affo If you say so! Thanks ... but I think I may assume that a "map with state" is not yet available, too, or?
No, it's not, unfortunately.
As a workaround, can't you manage your case with StateCount
or StateDuration
(https://github.com/influxdata/flux/blob/master/docs/SPEC.md#statecount)?
As a workaround, can't you manage your case with StateCount or StateDuration (https://github.com/influxdata/flux/blob/master/docs/SPEC.md#statecount)?
hm, that would indeed help with the first part of my problem! thanks for that :) ... the main goal is to get consecutive iterations, in the meaning to calculate a value from it's own preceeding value as a input.
The "phenomen" described in the calculation is mortality. Think of it as 'decrease population by 2.5% every day", while adding the daily amount of new-born bees (in this case).
So we start a "season" with an assumed winter-population which has a "non-dynamic" mortality until the first egg hatches ("egg-lay-rate" is derived from temperature). When I would use stateCount
I would be somehow able calculate the correct population based on the winter-population for every day "from scratch", but I wouldn't see the chance to integrate the sum of new-borns into this "living-bees/population"-value before calculating the mortality for it for the day being.
But maybe I am still missing the forest through the trees ... fortunately I had the chance to meet @davidgs today (and hope to have tomorrow again), he darkly recalled that there might be a way while I mentioned the accumulator
parameter of reduce
.
Think I quite understood 👍
I see two ways of doing that:
- enable explicit triggering for users: if you could tune result triggering on a per-row basis, your
reduce
would output its result at every iteration; - enable stateful functions.
Anyways, I do think that, even if you were able to workaround that, a stateful map
would be useful and fairly intuitive for your use case (more than explicit triggering maybe 🤔 )!
I am curious to hear @nathanielc thoughts about your use case
Related to https://github.com/influxdata/flux/issues/2452