flux icon indicating copy to clipboard operation
flux copied to clipboard

EPIC: scan function

Open rockstar opened this issue 2 years ago • 18 comments

After talking about the potential for a new/improved stateChanges function, the working group came up with a scan function proposal that we'd like to create. scan will have the following signature:

builtin scan : (<-tables: stream[A], fn: (accumulator: B, element: A) => B, init: A) => stream[B]

This function was heavily inspired by the scanl (and family) functions from Haskell. Here's how it could be used:

import "experimental/array"

testdata = array.from(rows:
  {_time: 2021-01-01T00:00:00Z, _value: 100, state: "crit"},
  {_time: 2021-01-01T00:01:00Z, _value: 100, state: "crit"},
  {_time: 2021-01-01T00:02:00Z, _value: 80, state: "warn"},
  {_time: 2021-01-01T00:03:00Z, _value: 82, state: "warn"},
  {_time: 2021-01-01T00:04:00Z, _value: 80, state: "warn"},
  {_time: 2021-01-01T00:05:00Z, _value: 52, state: "ok"},
  {_time: 2021-01-01T00:06:00Z, _value: 50, state: "ok"},
])

testdata
  |> scan(fn: (acc, record) => ({record with stateChanged: acc.state != record.state}))

This would emit a table with a new boolean field stateChanged that is true whenever the state changes. This would mean that a stateChanges function could look something like this, using scan.

stateChanges = (<-tables, value) => {
  (if exists value then tables
    |> prepend(value: value) else tables)
    |> scan(fn: (acc, ele) => ({ele with stateChanged: acc.state != ele.state}))
}

Questions:

  • The init parameter's use is still unclear. To specify it would mean knowing a lot more about data shape than one may know. What does the init look like? Does it have to be explicit? Could/should we create two functions, one that takes an init and one that doesn't?
  • How does scan affect the group key of a table? Does it work like reduce, where it carries the group key with it?

rockstar avatar Apr 13 '22 20:04 rockstar

+1

samhld avatar Jun 07 '22 21:06 samhld

Another use case here would be detecting phases in data. For example, if a counter resets, increment the phase:

testdata = array.from(rows:
  {_time: 2021-01-01T00:00:00Z, _value: 0},
  {_time: 2021-01-01T00:01:00Z, _value: 23},
  {_time: 2021-01-01T00:02:00Z, _value: 50},
  {_time: 2021-01-01T00:03:00Z, _value: 0},
  {_time: 2021-01-01T00:04:00Z, _value: 18},
  {_time: 2021-01-01T00:05:00Z, _value: 32},
  {_time: 2021-01-01T00:06:00Z, _value: 0},
])

testdata
  |> scan(fn: (accumulator, r) => ({r with phase: if accumulator._value <= r._value then accumulator.phase else accumulator.phase + 1 }))

The expected output would look something like:

_time _value phase
2021-01-01T00:00:00Z 0 0
2021-01-01T00:01:00Z 23 0
2021-01-01T00:02:00Z 50 0
2021-01-01T00:03:00Z 0 1
2021-01-01T00:04:00Z 18 1
2021-01-01T00:05:00Z 32 1
2021-01-01T00:06:00Z 0 2

The init parameter's use is still unclear. To specify it would mean knowing a lot more about data shape than one may know. What does the init look like? Does it have to be explicit? Could/should we create two functions, one that takes an init and one that doesn't?

The init's purpose is to act as the accumulator for the first row of each table. Without an init, there is nothing to compare the first row to. I'd love to have some way for the init to be implicit, but with the current type implementation, I don't know how it would work.

What if there was a concept of a "table record"—a default record with all group key columns populated and all non-group-key columns set as null. You could then extend the table record for the init. Something like:

init: {tableRecord with _value: 0, phase: 0}

With that in place, the function call could look like this:

testdata
    |> scan(
        init: {tableRecord with _value: 0, phase: 0},
        fn: (accumulator, r) => ({
            r with
            phase: if accumulator._value <= r._value then accumulator.phase else accumulator.phase + 1,
            _value: r._value,
        })
    )

How does scan affect the group key of a table? Does it work like reduce, where it carries the group key with it?

I would say yes, it carries the group key with it. It would be up to the user to group by any columns added by the transformation.

sanderson avatar Oct 26 '22 22:10 sanderson

To add to this, I think there should also be an array.scan function that provides this functionality for arrays. For example:

import "array"

a = [1, 2, 3, 4]
b = 20

c = array.scan(
    arr: a,
    init: b,
    fn: (x, acc) => x + acc
)

// c = [21,23,26,30]

The signature of array.scan would be something like:

builtin scan : (<-arr: [A], fn: (acc: B, x: A) => B, init: A) => [B]

sanderson avatar Nov 02 '22 21:11 sanderson

This proposal is urgently needed.

I have described this requirement here: https://community.influxdata.com/t/number-taxi-rides/28939

UlrichThiess avatar Mar 10 '23 16:03 UlrichThiess

Mybe there is no need for an scan function?

I asked ChatGPT for help. This is our solution: (ChatGPT pushes me in the right direction.)

// Helper function to return an integer from a table
getFieldValue = (tables=<-, field) => {
  extract = tables
    |> last() // shrink table to one row
    |> findColumn(fn: (key) => key._field == field, column: "_value")

  return if length(arr: extract) == 0 then 0 else extract[0] // return 0 if there is no table else last value
}

// We need the last tripId as integer
lastId = from(bucket: "obd2")
  |> range(start: 2023-02-24T21:48:57.8Z, stop: 2023-02-24T22:54:46.955Z)
  |> filter(fn: (r) => r._measurement == "Taxi" and r._field == "tripId" and r._value != 0) // search for tripId
  |> last() // shrink table to one row
  |> getFieldValue(field: "_field" )

rpm_data = from(bucket: "obd2")
  |> range(start: 2023-02-24T21:48:57.8Z, stop: 2023-02-24T22:54:46.955Z)
  |> filter(fn: (r) => r._measurement == "Taxi" and r._field == "EngineRPM")
  |> map(fn: (r) => ({ r with tmp: if r._value > 0 then 1 else 0 })) // create helperfield tmp
  |> derivative(unit: 1s, nonNegative: true, columns: ["tmp"]) // only the changes from 0 to not 0
  |> map(fn: (r) => ({ r with tripId: int(v: r.tmp) })) // create tripIp with integer of tmp
  |> drop(columns: ["tmp"]) // remove helperfield tmp
  |> cumulativeSum(columns: ["tripId"])
  |> map(fn: (r) => ({ r with tripId: if r._value > 0 then r.tripId + lastId else 0 }))
  |> yield(name: "rpm")

I will make a short trigger of that and do more tests.

UlrichThiess avatar Mar 16 '23 22:03 UlrichThiess

This issue has had no recent activity and will be closed soon.

github-actions[bot] avatar May 31 '23 01:05 github-actions[bot]

any updates on this?

paulwer avatar Jun 27 '23 07:06 paulwer

This can be closed, as I have found a solution.

UlrichThiess avatar Jun 27 '23 07:06 UlrichThiess

cannot reproduce it for our stateChanged use-case, any further explaination on your example? if its working there has to be a documentation for this use case (from my point of view)

paulwer avatar Jun 27 '23 07:06 paulwer

Paste this into the InfluxDB 2 DataExplorer in the Script Editor Window.

import "array"

testdata = array.from(rows: [
  {_time: 2021-01-01T00:00:00Z, _value: 0},
  {_time: 2021-01-01T00:01:00Z, _value: 23},
  {_time: 2021-01-01T00:02:00Z, _value: 50},
  {_time: 2021-01-01T00:03:00Z, _value: 0},
  {_time: 2021-01-01T00:04:00Z, _value: 18},
  {_time: 2021-01-01T00:05:00Z, _value: 32},
  {_time: 2021-01-01T00:06:00Z, _value: 0}
])

testdata

  // TripId
  |> map(fn: (r) => ({r with TripId: if r._value > 0 then 1 else 0}))
  |> derivative(unit: 1m, nonNegative: true, columns: ["TripId"], initialZero: true)
  |> cumulativeSum(columns: ["TripId"])
  |> map(fn: (r) => ({r with TripId: if r._value > 0 then r.TripId else 0.0}))
//  |> filter(fn: (r) => r.TripId != 0)

  |> yield()

UlrichThiess avatar Jun 27 '23 09:06 UlrichThiess

i guess this wont cover cases, where you want to only get values and their time, when the value changes. f.ex. statuses, which are saved as boolean => get only value changes => determine, which warnings were active within a timeframe

paulwer avatar Jun 27 '23 11:06 paulwer

That sounds easier. However, I don't know the exact requirement. That discussion should be elsewhere, not here. Paul, if I can help, speak to me directly.

UlrichThiess avatar Jun 27 '23 12:06 UlrichThiess

This issue has had no recent activity and will be closed soon.

github-actions[bot] avatar Aug 27 '23 01:08 github-actions[bot]

This issue has had no recent activity and will be closed soon.

github-actions[bot] avatar Nov 28 '23 01:11 github-actions[bot]

This issue has had no recent activity and will be closed soon.

github-actions[bot] avatar Jan 28 '24 01:01 github-actions[bot]

This issue has had no recent activity and will be closed soon.

github-actions[bot] avatar Mar 29 '24 01:03 github-actions[bot]

This issue has had no recent activity and will be closed soon.

github-actions[bot] avatar May 29 '24 01:05 github-actions[bot]

This issue has had no recent activity and will be closed soon.

github-actions[bot] avatar Jul 29 '24 01:07 github-actions[bot]

Paste this into the InfluxDB 2 DataExplorer in the Script Editor Window.

import "array"

testdata = array.from(rows: [
  {_time: 2021-01-01T00:00:00Z, _value: 0},
  {_time: 2021-01-01T00:01:00Z, _value: 23},
  {_time: 2021-01-01T00:02:00Z, _value: 50},
  {_time: 2021-01-01T00:03:00Z, _value: 0},
  {_time: 2021-01-01T00:04:00Z, _value: 18},
  {_time: 2021-01-01T00:05:00Z, _value: 32},
  {_time: 2021-01-01T00:06:00Z, _value: 0}
])

testdata

  // TripId
  |> map(fn: (r) => ({r with TripId: if r._value > 0 then 1 else 0}))
  |> derivative(unit: 1m, nonNegative: true, columns: ["TripId"], initialZero: true)
  |> cumulativeSum(columns: ["TripId"])
  |> map(fn: (r) => ({r with TripId: if r._value > 0 then r.TripId else 0.0}))
//  |> filter(fn: (r) => r.TripId != 0)

  |> yield()

Thanks very much for this @UlrichThiess It unblocked me 🍻

jgladch avatar Oct 03 '24 01:10 jgladch