Unintuitive grouping inside of windows
Summary
Creating a query that windows and groups by a tag within each window is unintuitive and should be addressed somehow.
Background
I've been working with @rawkode to build a query that windows data aggregates a count of unique values for a specific tag within each window. At the outset, it seemed pretty simple, but it took some experimenting to finally get the desired results.
The initial query was something like this:
from(bucket: "crime-san-francisco")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r._measurement == "incidents")
|> filter(fn: (r) => r._field == "incident_id")
|> group(columns: ["incident_category"], mode: "by")
|> aggregateWindow(every: 1d, fn: count)
When run locally, I'd get an index out of range error and a panic in the influxd output (shown below). I tried this next:
from(bucket: "crime-san-francisco")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r._measurement == "incidents")
|> filter(fn: (r) => r._field == "incident_id")
|> window(every: 24h)
|> group(columns: ["incident_category"])
|> count()
While this returned a result, it wasn't exactly what I was looking for and it couldn't be visualized. What I realized is that window() is essentially a special application of group(). It defines upper and lower bounds for windows of time and assigns those as _start and _stop for each point that falls within those bounds. It then groups by _start and _stop so all points that share common bounds are in the same table.
But if you window(every: 24) then group(columns: "incident_category"), you actually negate the windowing. It changes the grouping mechanism.
To preserve both the windowing AND grouping by incident_category, I had to group by _start, _stop, and incident_category. I then counted the value of each table, duplicated the _start column as _time so the points can be graphed by time, then regrouped all the counts by incident_category (similar to what aggregateWindow() does). This is what the final query looks like
Final working query
from(bucket: "crime-san-francisco")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r._measurement == "incidents")
|> filter(fn: (r) => r._field == "incident_id")
|> window(every: 24h)
|> group(columns: ["_start", "_stop", "incident_category"])
|> count()
|> duplicate(column: "_start", as: "_time")
|> group(columns: ["incident_category"])
Proposal
This seems like it could be a fairly common use case and could be addressed with a higher level function. Maybe something like:
aggregateInWindows = (tables=<-, every, byColumn, fn, aggregateColumn="_value") =>
|> window(every:every)
|> group(columns: ["_start", "_stop", byColumn])
|> fn(column: aggregateColumn)
|> duplicate(column: "_start", as: "_time")
|> group(columns: [byColumn])
With this defined, the above query would be:
from(bucket: "crime-san-francisco")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r._measurement == "incidents" and r._field == "incident_id")
|> aggregateInWindows(every: 24h, byColumn: "incident_category", fn: count)
Panic output
goroutine 7268 [running]:
runtime/debug.Stack(0xc043bd5680, 0x2870e00, 0x2437386)
/usr/local/Cellar/go/1.12.1/libexec/src/runtime/debug/stack.go:24 +0x9d
github.com/influxdata/flux/execute.(*poolDispatcher).Start.func1.1(0xc043bd56e0)
/Users/scott/go/pkg/mod/github.com/influxdata/[email protected]/execute/dispatcher.go:75 +0x270
panic(0x21f4280, 0x50483f0)
/usr/local/Cellar/go/1.12.1/libexec/src/runtime/panic.go:522 +0x1b5
github.com/apache/arrow/go/arrow/array.(*Int64).Value(...)
/Users/scott/go/pkg/mod/github.com/apache/arrow/go/[email protected]/array/numeric.gen.go:41
github.com/influxdata/flux/stdlib/universe.(*fixedWindowTransformation).Process.func1(0x28c3660, 0xc0cbca7ae0, 0x0, 0x0)
/Users/scott/go/pkg/mod/github.com/influxdata/[email protected]/stdlib/universe/window.go:309 +0x5e3
github.com/influxdata/influxdb/storage/reads.(*integerGroupTable).Do(0xc025cff980, 0xc04cf6a700, 0x0, 0x0)
/Users/scott/Projects/influxdb/storage/reads/table.gen.go:384 +0xc3
github.com/influxdata/flux/stdlib/universe.(*fixedWindowTransformation).Process(0xc03b89c820, 0x6552906ed608c849, 0xfaea7ee306af8890, 0x53c00498, 0xc025cff980, 0xc043bade01, 0x10412b3)
/Users/scott/go/pkg/mod/github.com/influxdata/[email protected]/stdlib/universe/window.go:306 +0x981
github.com/influxdata/flux/execute.processMessage(0x28ab140, 0xc03b89c820, 0x2889440, 0xc0cbc5fbe0, 0xc043badf68, 0xc06ee19e60, 0xc043bd55c0)
/Users/scott/go/pkg/mod/github.com/influxdata/[email protected]/execute/transport.go:199 +0x1e4
github.com/influxdata/flux/execute.(*consecutiveTransport).processMessages(0xc043bd5740, 0xa)
/Users/scott/go/pkg/mod/github.com/influxdata/[email protected]/execute/transport.go:156 +0xa1
github.com/influxdata/flux/execute.(*poolDispatcher).run(0xc043bd56e0, 0x28a41c0, 0xc072ee0080)
/Users/scott/go/pkg/mod/github.com/influxdata/[email protected]/execute/dispatcher.go:126 +0x4b
github.com/influxdata/flux/execute.(*poolDispatcher).Start.func1(0xc043bd56e0, 0x28a41c0, 0xc072ee0080)
/Users/scott/go/pkg/mod/github.com/influxdata/[email protected]/execute/dispatcher.go:80 +0x95
created by github.com/influxdata/flux/execute.(*poolDispatcher).Start
/Users/scott/go/pkg/mod/github.com/influxdata/[email protected]/execute/dispatcher.go:61 +0x7e
@sanderson can you get me a dump of the data table? downloading the CSV for
from(bucket: "crime-san-francisco")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r._measurement == "incidents")
|> filter(fn: (r) => r._field == "incident_id")
would work.
@rawkode Do you still have this ^ dataset? I deleted the bucket and can't find the original set.
@aanthony1243 Since creating this issue, I've worked with a handful of people who all had similar use cases, and I think there's a more general high-level function we could add. All of them need to group inside of windows, then run an aggregate on each group. I think if we added a groupInWindows or groupInWindows function, users could shape the data how they needed, then run an aggregation on each group.
groupInWindows = (
tables=<-,
every,
period, // Not sure how to default this to the 'every' value in raw flux
offset=0s,
timeColumn: "_time",
startColumn: "_start",
stopColumn: "_stop",
createEmpty: false,
columns=[*],
mode="by",
) => tables
|> window(every, period, offset, timeColumn, startColumn, stopColumn, createEmpty)
// We need a way to join the 'columns' array with the columns array below
|> group(columns: [startColumn, stopColumn, columns], mode:mode)
There are a few things in this function that wouldn't work in raw Flux, but I think you get the idea. The concept that people have struggled with is when you window, you assign _start and _stop values to each row based on the window they fall in, then group/partition by _start and _stop. Users would window the data, then group by a given set of columns but exclude _start and _stop, essentially undoing the windowing.
Maybe the solution is just documenting that you need to include _start and _stop when grouping to preserve windows rather than a new custom function...
@sanderson I think it's worth discussing as a possible inclusion. However, I'm more concerned with the panic, which is why I'd like to see the data if it's still available.
I'm driving for the next six hours, so I'll try to get you the line protocol tomorrow
If you have time, you can load manually: https://github.com/rawkode/influxdb-crime-san-francisco
ok, i tried and can't reproduce that panic on cloud 2.
re: your proposal.
|> group(columns: ["x", "y", "z"])
|> window(every:5m)
is the same as
|> window(every: 5m)
|> group(columns: ["x", "y","z", "_start", "_stop"])
similarly, you should get what you want if you do:
|> group(columns: ["x" , "y", "z"])
|> aggregateWindow(every: 5m, fn: mean)
it's a low-cost to provide your proposal as a helper function, but I'm not convinced that it's the "best" way to train users to use flux so I don't know if I would build it in. I'm open to feedback though.
If you are able to reproduce that panic locally, I'd be very interested to pair up to see what's going on.
please re-open this if you are able to get a panic.
I think we lost track of the original problem here. Lets forget the panic for a moment and focus on the unintuitive grouping:
This "feels" like it should work, but instead we get an error: invalid partition key order
from(bucket: "crime-san-francisco")
|> range(start: -360d, stop: v.timeRangeStop)
|> filter(fn: (r) => r._measurement == "incidents")
|> filter(fn: (r) => r._field == "incident_id")
|> group(columns: ["incident_category"], mode: "by")
|> aggregateWindow(every: 1d, fn: count)
Instead, we need to do this:
from(bucket: "crime-san-francisco")
|> range(start: -360d, stop: v.timeRangeStop)
|> filter(fn: (r) => r._measurement == "incidents")
|> filter(fn: (r) => r._field == "incident_id")
|> window(every: 24h)
|> group(columns: ["_start", "_stop", "incident_category"])
|> count()
|> duplicate(column: "_start", as: "_time")
|> group(columns: ["incident_category"])
Line protocol attached:
invalid partition key order is an error message from the engine. what version of influxdb are you using?
This issue has had no recent activity and will be closed soon.