FluxPipe is an experimental stand-alone Flux API for serverless workers and embedded datasources
Execute your Flux scripts locally, in serverless functions or anywhere else - decoupled from the data and database.
Fluxpipe runs at 141,6Km/h* and is compatible with InfluxDB 3.0 / IOx, ClickHouse, Grafana and beyond!
InfluxDB Flux is a lightweight scripting language for querying databases and working with data. 1
Need a Flux introduction? Check out the official page, documentation or "3 Minutes to Flux" guide.
Try our serverless demo or launch your own instance to instantly fall in love with flux
Download a binary release, docker or build from source
curl -fsSL github.com/metrico/fluxpipe/releases/latest/download/fluxpipe-server -O \
&& chmod +x fluxpipe-server
./fluxpipe-server -port 8086
Run with -h
for a full list of parameters
docker pull ghcr.io/metrico/fluxpipe:latest
docker run -ti --rm -p 8086:8086 ghcr.io/metrico/fluxpipe:latest
💡 Check out the scripts folder for working examples
Fluxpipe embeds a playground interface to instantly execute queries (borrowed from ClickHouse 2)
Fluxpipe serves a simple REST API loosely compatible with existing flux integrations and clients
Grafana Flux 1
Fluxpipe is compatible with the native Grafana InfluxDB/Flux datasource (url + organization fields are required!)
You can query InfluxDB 3.0 IOx with raw SQL using the native sql.from
handler
import "sql"
sql.from(
driverName: "influxdb-iox",
dataSourceName: "iox://iox-server:443/qryn_logs",
query: "SELECT level, sender, body FROM logs WHERE body LIKE '%DELETE%' limit 10",
)
You can query InfluxDB 3.0 IOx with Flux using the iox.from
handler
import "contrib/qxip/iox"
iox.from(
bucket: "test",
host: "eu-central-1-1.aws.cloud2.influxdata.com:443",
token: "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX",
limit: "10",
columns: "time, level, sender",
table: "logs",
start: -100d,
)
_time:time level:string sender:string
------------------------------ ---------------------- ----------------------
2023-08-31T00:00:00.091362490Z info logtest
2023-08-31T00:00:00.091380034Z info logtest
2023-08-31T00:00:00.091381374Z info logtest
2023-08-31T00:00:00.091382470Z info logtest
2023-08-31T00:00:00.091383354Z info logtest
...
You write data back to InfluxDB 3.0 IOx using the to
and wideTo
functions
import "contrib/qxip/iox"
import "influxdata/influxdb"
iox.from(
bucket: "qxip",
host: "eu-central-1-1.aws.cloud2.influxdata.com:443",
token: "",
limit: "10",
table: "machine_data",
start: -2d,
)
|> range(start: -2d)
|> aggregateWindow(every: 5s, fn: mean, column: "load", createEmpty: false)
|> set(key: "_measurement", value: "downsampled")
|> wideTo(
bucket: "qxip",
host: "https://eu-central-1-1.aws.cloud2.influxdata.com",
token: "",
orgID: "6a841c0c08328fb1"
)
import "contrib/qxip/clickhouse"
clickhouse.query(
url: "https://play@play.clickhouse.com",
query: "SELECT database, total_rows FROM tables WHERE total_rows > 0"
)
|> rename(columns: {database: "_value", total_rows: "_data"})
|> keep(columns: ["_value","_data"])
import "contrib/qxip/logql"
option logql.defaultURL = "http://qryn:3100"
logql.query_range(
query: "rate({job=\"dummy-server\"}[5m])",
start: v.timeRangeStart,
end: v.timeRangeStop
)
|> map(fn: (r) => ({r with _time: time(v: uint(v: r.timestamp_ns)), _value: float(v: r.value) }))
|> drop(columns: ["timestamp_ns", "value"])
|> sort(columns: ["_time"])
|> group(columns: ["labels"])
Usage with curl
curl -XPOST localhost:8086/api/v2/query -sS \
-H 'Accept:application/csv' \
-H 'Content-type:application/vnd.flux' \
-d 'import g "generate" g.from(start: 2022-04-01T00:00:00Z, stop: 2022-04-01T00:03:00Z, count: 3, fn: (n) => n)'
Flux builds using EnvironmentSecretService
accessing system environment variables from flux scripts.
import "influxdata/influxdb/secrets"
key = secrets.get(key: "ENV_SECRET")
Fluxpipe can be used as a command-line tool and stdin pipeline processor
echo 'import g "generate" g.from(start: 2022-04-01T00:00:00Z, stop: 2022-04-01T00:03:00Z, count: 5, fn: (n) => 1)' \
| ./fluxpipe-server -stdin
cat scripts/csv.flux | ./fluxpipe-server -stdin
cat scripts/sql.flux | ./fluxpipe-server -stdin
Configure your Grafana instance with our public demo endpoint (limited resources)
Flux(pipe) is built using the InfluxCommunity/flux fork which contains additional features and contributions.
All the standard and additional functions available in Fluxpipe are included in the Flux community documentation
- Fluxlib
- parser
- executor
- Contribs
- contrib/qxip/clickhouse
- contrib/qxip/logql
- contrib/qxip/hash
- ENV secrets
- STDIN pipeline
- HTTP api
- plaintext
- json support
- web playground