flux icon indicating copy to clipboard operation
flux copied to clipboard

Proposal: user-defined stateful operators

Open affo opened this issue 5 years ago • 9 comments

One thing that Flux is missing right now is user-defined stateful operators. At the moment, we provide users with many stateful operators that hide the complexity of updating an internal state (count, max, derivative, ...). It would be cool to provide the user with the tools to build custom ones at Flux language level. This would relieve us from the burden of implementing in Go every operator that requires state, and also give huge flexibility to Flux users. This is the reason why stateful, user-defined functions is a feature that every modern stream processor provides to users.

As an example, this is a user-defined counter:

from()
  |> filter(...)
  |> group(...)
  |> map(fn: {
    state: {c: 0},
    call: (s, r) => {
      s.c = s.c + 1
      return {v: r._value, c: s.c}
    },
  })

A stateful function is an object that has state and call property. call function gets called every time a new record must be processed. The state is passed to it as first parameter. Every transformation that accepts a function should also be able to accept a stateful function.

Note that state is per-group, multiple replicas of the map transformation must run in parallel without ever touching the same piece of state; this is key to scaling out computation.

affo avatar May 08 '19 14:05 affo

@affo This sounds powerful, how does it differ from the reduce function that we have?

nathanielc avatar May 08 '19 15:05 nathanielc

@nathanielc reduce gives you the ability to aggregate and forces your output to be the accumulator.

Stateful UDFs would give you the tools to add state to any higher order function and use it as you wish. This means that you can filter, map, flatMap with state. The output is not forced to contain the internal state.

For instance:

  • suppose you want to filter out records when some internal state (some function of input records) reaches some threshold;
  • or you want to buffer records for some reason and output them if some condition is met (flatMap would allow to output 0, 1, or many records for each input record);
  • suppose that you want to align two streams: the first record of the first with the first of the second and so on, you can do it only with dedicated buffers;
  • the state could be a connection to a dbms that can load some data to enrich incoming records from a stream (a common use case, we can talk more about this and model it differently in Flux, but this could be a first step towards stream enrichment).

In general this could be used in many many different ways to achieve different results.
In the future we could also allow to expose internal updates to the state in a dedicated output stream, so that one can inspect the internal state of operators in real time.

affo avatar May 10 '19 10:05 affo

I like the idea, I am currently ending up here thus my need is that I simply want to get all iteration-steps /values in between of the reduce()-function and not only the total/aggregated sum from now.

wetterfrosch avatar Feb 15 '20 22:02 wetterfrosch

@wetterfrosch yes, you need a map with state 👍

affo avatar Feb 18 '20 08:02 affo

@affo If you say so! Thanks ... but I think I may assume that a "map with state" is not yet available, too, or?

wetterfrosch avatar Feb 18 '20 13:02 wetterfrosch

No, it's not, unfortunately.

As a workaround, can't you manage your case with StateCount or StateDuration (https://github.com/influxdata/flux/blob/master/docs/SPEC.md#statecount)?

affo avatar Feb 18 '20 15:02 affo

As a workaround, can't you manage your case with StateCount or StateDuration (https://github.com/influxdata/flux/blob/master/docs/SPEC.md#statecount)?

hm, that would indeed help with the first part of my problem! thanks for that :) ... the main goal is to get consecutive iterations, in the meaning to calculate a value from it's own preceeding value as a input.

The "phenomen" described in the calculation is mortality. Think of it as 'decrease population by 2.5% every day", while adding the daily amount of new-born bees (in this case).

So we start a "season" with an assumed winter-population which has a "non-dynamic" mortality until the first egg hatches ("egg-lay-rate" is derived from temperature). When I would use stateCount I would be somehow able calculate the correct population based on the winter-population for every day "from scratch", but I wouldn't see the chance to integrate the sum of new-borns into this "living-bees/population"-value before calculating the mortality for it for the day being.

But maybe I am still missing the forest through the trees ... fortunately I had the chance to meet @davidgs today (and hope to have tomorrow again), he darkly recalled that there might be a way while I mentioned the accumulator parameter of reduce.

wetterfrosch avatar Feb 18 '20 21:02 wetterfrosch

Think I quite understood 👍

I see two ways of doing that:

  • enable explicit triggering for users: if you could tune result triggering on a per-row basis, your reduce would output its result at every iteration;
  • enable stateful functions.

Anyways, I do think that, even if you were able to workaround that, a stateful map would be useful and fairly intuitive for your use case (more than explicit triggering maybe 🤔 )!

I am curious to hear @nathanielc thoughts about your use case

affo avatar Feb 19 '20 11:02 affo

Related to https://github.com/influxdata/flux/issues/2452

sanderson avatar Feb 19 '20 12:02 sanderson