fluXpipe
fluXpipe copied to clipboard
TLDR: Flux for InfluxDB 3.0. Stand-alone, Serverless Flux API/Pipeline for querying, analyzing and interacting with remote data.
FluxPipe is an experimental stand-alone Flux API for serverless workers and embedded datasources
Execute your Flux scripts locally, in serverless functions or anywhere else - decoupled from the data and database.
Fluxpipe runs at 141,6Km/h* and is compatible with InfluxDB 3.0 / IOx, ClickHouse, Grafana and beyond!
InfluxDB Flux is a lightweight scripting language for querying databases and working with data. [^1]
Need a Flux introduction? Check out the official page, documentation or "3 Minutes to Flux" guide.
Demo
Try our serverless demo or launch your own instance to instantly fall in love with flux
Instructions
Download a binary release, docker or build from source
📦 Download Binary
curl -fsSL github.com/metrico/fluxpipe/releases/latest/download/fluxpipe-server -O \
&& chmod +x fluxpipe-server
🔌 Start Server w/ Options
./fluxpipe-server -port 8086
Run with -h
for a full list of parameters
🐋 Using Docker
docker pull ghcr.io/metrico/fluxpipe:latest
docker run -ti --rm -p 8086:8086 ghcr.io/metrico/fluxpipe:latest
🐛 Usage
💡 Check out the scripts folder for working examples
Playground
Fluxpipe embeds a playground interface to instantly execute queries (borrowed from ClickHouse [^2])
HTTP API
Fluxpipe serves a simple REST API loosely compatible with existing flux integrations and clients
Grafana Flux [^1]
Fluxpipe is compatible with the native Grafana InfluxDB/Flux datasource (url + organization fields are required!)
⭐ FlightSQL
SQL
You can query InfluxDB 3.0 IOx with raw SQL using the native sql.from
handler
import "sql"
sql.from(
driverName: "influxdb-iox",
dataSourceName: "iox://iox-server:443/qryn_logs",
query: "SELECT level, sender, body FROM logs WHERE body LIKE '%DELETE%' limit 10",
)
Flux IOx
You can query InfluxDB 3.0 IOx with Flux using the iox.from
handler
import "contrib/qxip/iox"
iox.from(
bucket: "test",
host: "eu-central-1-1.aws.cloud2.influxdata.com:443",
token: "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX",
limit: "10",
columns: "time, level, sender",
table: "logs",
start: -100d,
)
_time:time level:string sender:string
------------------------------ ---------------------- ----------------------
2023-08-31T00:00:00.091362490Z info logtest
2023-08-31T00:00:00.091380034Z info logtest
2023-08-31T00:00:00.091381374Z info logtest
2023-08-31T00:00:00.091382470Z info logtest
2023-08-31T00:00:00.091383354Z info logtest
...
You write data back to InfluxDB 3.0 IOx using the to
and wideTo
functions
import "contrib/qxip/iox"
import "influxdata/influxdb"
iox.from(
bucket: "qxip",
host: "eu-central-1-1.aws.cloud2.influxdata.com:443",
token: "",
limit: "10",
table: "machine_data",
start: -2d,
)
|> range(start: -2d)
|> aggregateWindow(every: 5s, fn: mean, column: "load", createEmpty: false)
|> set(key: "_measurement", value: "downsampled")
|> wideTo(
bucket: "qxip",
host: "https://eu-central-1-1.aws.cloud2.influxdata.com",
token: "",
orgID: "6a841c0c08328fb1"
)
⭐ ClickHouse SQL
import "contrib/qxip/clickhouse"
clickhouse.query(
url: "https://[email protected]",
query: "SELECT database, total_rows FROM tables WHERE total_rows > 0"
)
|> rename(columns: {database: "_value", total_rows: "_data"})
|> keep(columns: ["_value","_data"])
⭐ LogQL
import "contrib/qxip/logql"
option logql.defaultURL = "http://qryn:3100"
logql.query_range(
query: "rate({job=\"dummy-server\"}[5m])",
start: v.timeRangeStart,
end: v.timeRangeStop
)
|> map(fn: (r) => ({r with _time: time(v: uint(v: r.timestamp_ns)), _value: float(v: r.value) }))
|> drop(columns: ["timestamp_ns", "value"])
|> sort(columns: ["_time"])
|> group(columns: ["labels"])
⭐ CURL POST
Usage with curl
curl -XPOST localhost:8086/api/v2/query -sS \
-H 'Accept:application/csv' \
-H 'Content-type:application/vnd.flux' \
-d 'import g "generate" g.from(start: 2022-04-01T00:00:00Z, stop: 2022-04-01T00:03:00Z, count: 3, fn: (n) => n)'
Secrets
Flux builds using EnvironmentSecretService
accessing system environment variables from flux scripts.
import "influxdata/influxdb/secrets"
key = secrets.get(key: "ENV_SECRET")
STDIN CMD
Fluxpipe can be used as a command-line tool and stdin pipeline processor
Generate CSV
echo 'import g "generate" g.from(start: 2022-04-01T00:00:00Z, stop: 2022-04-01T00:03:00Z, count: 5, fn: (n) => 1)' \
| ./fluxpipe-server -stdin
Parse CSV
cat scripts/csv.flux | ./fluxpipe-server -stdin
Query SQL
cat scripts/sql.flux | ./fluxpipe-server -stdin
Public Demo
Grafana Datasource
Configure your Grafana instance with our public demo endpoint (limited resources)
Documentation
Flux(pipe) is built using the InfluxCommunity/flux fork which contains additional features and contributions.
All the standard and additional functions available in Fluxpipe are included in the Flux community documentation
Status
- [x] Fluxlib
- [x] parser
- [x] executor
- [x] Contribs
- [x] contrib/qxip/clickhouse
- [x] contrib/qxip/logql
- [x] contrib/qxip/hash
- [x] ENV secrets
- [x] STDIN pipeline
- [x] HTTP api
- [x] plaintext
- [x] json support
- [x] web playground
[^1]: Project is not affiliated or endorsed by Influxdata or Grafana Labs. All rights belong to their respective owners. [^2]: Used under Apache2 terms. Project is not affiliated or endorsed by ClickHouse Inc. All rights belong to their respective owners.