flux
flux copied to clipboard
EPIC: scan function
After talking about the potential for a new/improved stateChanges
function, the working group came up with a scan
function proposal that we'd like to create. scan
will have the following signature:
builtin scan : (<-tables: stream[A], fn: (accumulator: B, element: A) => B, init: A) => stream[B]
This function was heavily inspired by the scanl
(and family) functions from Haskell. Here's how it could be used:
import "experimental/array"
testdata = array.from(rows:
{_time: 2021-01-01T00:00:00Z, _value: 100, state: "crit"},
{_time: 2021-01-01T00:01:00Z, _value: 100, state: "crit"},
{_time: 2021-01-01T00:02:00Z, _value: 80, state: "warn"},
{_time: 2021-01-01T00:03:00Z, _value: 82, state: "warn"},
{_time: 2021-01-01T00:04:00Z, _value: 80, state: "warn"},
{_time: 2021-01-01T00:05:00Z, _value: 52, state: "ok"},
{_time: 2021-01-01T00:06:00Z, _value: 50, state: "ok"},
])
testdata
|> scan(fn: (acc, record) => ({record with stateChanged: acc.state != record.state}))
This would emit a table with a new boolean field stateChanged
that is true
whenever the state changes. This would mean that a stateChanges
function could look something like this, using scan
.
stateChanges = (<-tables, value) => {
(if exists value then tables
|> prepend(value: value) else tables)
|> scan(fn: (acc, ele) => ({ele with stateChanged: acc.state != ele.state}))
}
Questions:
- The
init
parameter's use is still unclear. To specify it would mean knowing a lot more about data shape than one may know. What does theinit
look like? Does it have to be explicit? Could/should we create two functions, one that takes aninit
and one that doesn't? - How does
scan
affect the group key of a table? Does it work likereduce
, where it carries the group key with it?
+1
Another use case here would be detecting phases in data. For example, if a counter resets, increment the phase:
testdata = array.from(rows:
{_time: 2021-01-01T00:00:00Z, _value: 0},
{_time: 2021-01-01T00:01:00Z, _value: 23},
{_time: 2021-01-01T00:02:00Z, _value: 50},
{_time: 2021-01-01T00:03:00Z, _value: 0},
{_time: 2021-01-01T00:04:00Z, _value: 18},
{_time: 2021-01-01T00:05:00Z, _value: 32},
{_time: 2021-01-01T00:06:00Z, _value: 0},
])
testdata
|> scan(fn: (accumulator, r) => ({r with phase: if accumulator._value <= r._value then accumulator.phase else accumulator.phase + 1 }))
The expected output would look something like:
_time | _value | phase |
---|---|---|
2021-01-01T00:00:00Z | 0 | 0 |
2021-01-01T00:01:00Z | 23 | 0 |
2021-01-01T00:02:00Z | 50 | 0 |
2021-01-01T00:03:00Z | 0 | 1 |
2021-01-01T00:04:00Z | 18 | 1 |
2021-01-01T00:05:00Z | 32 | 1 |
2021-01-01T00:06:00Z | 0 | 2 |
The
init
parameter's use is still unclear. To specify it would mean knowing a lot more about data shape than one may know. What does theinit
look like? Does it have to be explicit? Could/should we create two functions, one that takes aninit
and one that doesn't?
The init
's purpose is to act as the accumulator for the first row of each table. Without an init, there is nothing to compare the first row to. I'd love to have some way for the init
to be implicit, but with the current type implementation, I don't know how it would work.
What if there was a concept of a "table record"—a default record with all group key columns populated and all non-group-key columns set as null. You could then extend the table record for the init. Something like:
init: {tableRecord with _value: 0, phase: 0}
With that in place, the function call could look like this:
testdata
|> scan(
init: {tableRecord with _value: 0, phase: 0},
fn: (accumulator, r) => ({
r with
phase: if accumulator._value <= r._value then accumulator.phase else accumulator.phase + 1,
_value: r._value,
})
)
How does scan affect the group key of a table? Does it work like reduce, where it carries the group key with it?
I would say yes, it carries the group key with it. It would be up to the user to group by any columns added by the transformation.
To add to this, I think there should also be an array.scan
function that provides this functionality for arrays. For example:
import "array"
a = [1, 2, 3, 4]
b = 20
c = array.scan(
arr: a,
init: b,
fn: (x, acc) => x + acc
)
// c = [21,23,26,30]
The signature of array.scan would be something like:
builtin scan : (<-arr: [A], fn: (acc: B, x: A) => B, init: A) => [B]
This proposal is urgently needed.
I have described this requirement here: https://community.influxdata.com/t/number-taxi-rides/28939
Mybe there is no need for an scan function?
I asked ChatGPT for help. This is our solution: (ChatGPT pushes me in the right direction.)
// Helper function to return an integer from a table
getFieldValue = (tables=<-, field) => {
extract = tables
|> last() // shrink table to one row
|> findColumn(fn: (key) => key._field == field, column: "_value")
return if length(arr: extract) == 0 then 0 else extract[0] // return 0 if there is no table else last value
}
// We need the last tripId as integer
lastId = from(bucket: "obd2")
|> range(start: 2023-02-24T21:48:57.8Z, stop: 2023-02-24T22:54:46.955Z)
|> filter(fn: (r) => r._measurement == "Taxi" and r._field == "tripId" and r._value != 0) // search for tripId
|> last() // shrink table to one row
|> getFieldValue(field: "_field" )
rpm_data = from(bucket: "obd2")
|> range(start: 2023-02-24T21:48:57.8Z, stop: 2023-02-24T22:54:46.955Z)
|> filter(fn: (r) => r._measurement == "Taxi" and r._field == "EngineRPM")
|> map(fn: (r) => ({ r with tmp: if r._value > 0 then 1 else 0 })) // create helperfield tmp
|> derivative(unit: 1s, nonNegative: true, columns: ["tmp"]) // only the changes from 0 to not 0
|> map(fn: (r) => ({ r with tripId: int(v: r.tmp) })) // create tripIp with integer of tmp
|> drop(columns: ["tmp"]) // remove helperfield tmp
|> cumulativeSum(columns: ["tripId"])
|> map(fn: (r) => ({ r with tripId: if r._value > 0 then r.tripId + lastId else 0 }))
|> yield(name: "rpm")
I will make a short trigger of that and do more tests.
This issue has had no recent activity and will be closed soon.
any updates on this?
This can be closed, as I have found a solution.
cannot reproduce it for our stateChanged use-case, any further explaination on your example? if its working there has to be a documentation for this use case (from my point of view)
Paste this into the InfluxDB 2 DataExplorer in the Script Editor Window.
import "array"
testdata = array.from(rows: [
{_time: 2021-01-01T00:00:00Z, _value: 0},
{_time: 2021-01-01T00:01:00Z, _value: 23},
{_time: 2021-01-01T00:02:00Z, _value: 50},
{_time: 2021-01-01T00:03:00Z, _value: 0},
{_time: 2021-01-01T00:04:00Z, _value: 18},
{_time: 2021-01-01T00:05:00Z, _value: 32},
{_time: 2021-01-01T00:06:00Z, _value: 0}
])
testdata
// TripId
|> map(fn: (r) => ({r with TripId: if r._value > 0 then 1 else 0}))
|> derivative(unit: 1m, nonNegative: true, columns: ["TripId"], initialZero: true)
|> cumulativeSum(columns: ["TripId"])
|> map(fn: (r) => ({r with TripId: if r._value > 0 then r.TripId else 0.0}))
// |> filter(fn: (r) => r.TripId != 0)
|> yield()
i guess this wont cover cases, where you want to only get values and their time, when the value changes. f.ex. statuses, which are saved as boolean => get only value changes => determine, which warnings were active within a timeframe
That sounds easier. However, I don't know the exact requirement. That discussion should be elsewhere, not here. Paul, if I can help, speak to me directly.
This issue has had no recent activity and will be closed soon.
This issue has had no recent activity and will be closed soon.
This issue has had no recent activity and will be closed soon.
This issue has had no recent activity and will be closed soon.
This issue has had no recent activity and will be closed soon.
This issue has had no recent activity and will be closed soon.
Paste this into the InfluxDB 2 DataExplorer in the Script Editor Window.
import "array" testdata = array.from(rows: [ {_time: 2021-01-01T00:00:00Z, _value: 0}, {_time: 2021-01-01T00:01:00Z, _value: 23}, {_time: 2021-01-01T00:02:00Z, _value: 50}, {_time: 2021-01-01T00:03:00Z, _value: 0}, {_time: 2021-01-01T00:04:00Z, _value: 18}, {_time: 2021-01-01T00:05:00Z, _value: 32}, {_time: 2021-01-01T00:06:00Z, _value: 0} ]) testdata // TripId |> map(fn: (r) => ({r with TripId: if r._value > 0 then 1 else 0})) |> derivative(unit: 1m, nonNegative: true, columns: ["TripId"], initialZero: true) |> cumulativeSum(columns: ["TripId"]) |> map(fn: (r) => ({r with TripId: if r._value > 0 then r.TripId else 0.0})) // |> filter(fn: (r) => r.TripId != 0) |> yield()
Thanks very much for this @UlrichThiess It unblocked me 🍻