Skip to content

TLDR: Flux for InfluxDB 3.0. Stand-alone, Serverless Flux API/Pipeline for querying, analyzing and interacting with remote data.

License

Notifications You must be signed in to change notification settings

metrico/fluXpipe

Repository files navigation

Build-n-Release

FluxPipe is an experimental stand-alone Flux API for serverless workers and embedded datasources
Execute your Flux scripts locally, in serverless functions or anywhere else - decoupled from the data and database.

Fluxpipe runs at 141,6Km/h* and is compatible with InfluxDB 3.0 / IOx, ClickHouse, Grafana and beyond!


InfluxDB Flux is a lightweight scripting language for querying databases and working with data. 1
Need a Flux introduction? Check out the official page, documentation or "3 Minutes to Flux" guide.

Demo

Try our serverless demo or launch your own instance to instantly fall in love with flux


Instructions

Download a binary release, docker or build from source

📦 Download Binary

curl -fsSL github.com/metrico/fluxpipe/releases/latest/download/fluxpipe-server -O \
&& chmod +x fluxpipe-server
🔌 Start Server w/ Options
./fluxpipe-server -port 8086

Run with -h for a full list of parameters

🐋 Using Docker

docker pull ghcr.io/metrico/fluxpipe:latest
docker run -ti --rm -p 8086:8086 ghcr.io/metrico/fluxpipe:latest

🐛 Usage

💡 Check out the scripts folder for working examples

Playground

Fluxpipe embeds a playground interface to instantly execute queries (borrowed from ClickHouse 2)


HTTP API

Fluxpipe serves a simple REST API loosely compatible with existing flux integrations and clients

Grafana Flux 1

Fluxpipe is compatible with the native Grafana InfluxDB/Flux datasource (url + organization fields are required!)


⭐ FlightSQL
SQL

You can query InfluxDB 3.0 IOx with raw SQL using the native sql.from handler

import "sql"

sql.from(
    driverName: "influxdb-iox",
    dataSourceName: "iox://iox-server:443/qryn_logs",
    query: "SELECT level, sender, body FROM logs WHERE body LIKE '%DELETE%' limit 10",
)

image

Flux IOx

You can query InfluxDB 3.0 IOx with Flux using the iox.from handler

import "contrib/qxip/iox"
iox.from(
     bucket: "test",
     host: "eu-central-1-1.aws.cloud2.influxdata.com:443",
     token: "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX",
     limit: "10",
     columns: "time, level, sender",
     table: "logs",
     start: -100d,
)
                    _time:time            level:string           sender:string
------------------------------  ----------------------  ----------------------
2023-08-31T00:00:00.091362490Z                    info                 logtest
2023-08-31T00:00:00.091380034Z                    info                 logtest
2023-08-31T00:00:00.091381374Z                    info                 logtest
2023-08-31T00:00:00.091382470Z                    info                 logtest
2023-08-31T00:00:00.091383354Z                    info                 logtest
...

You write data back to InfluxDB 3.0 IOx using the to and wideTo functions

import "contrib/qxip/iox"
import "influxdata/influxdb"
iox.from(
     bucket: "qxip",
     host: "eu-central-1-1.aws.cloud2.influxdata.com:443",
     token: "",
     limit: "10",
     table: "machine_data",
     start: -2d,
 )
 |> range(start: -2d)
 |> aggregateWindow(every: 5s, fn: mean, column: "load", createEmpty: false)
 |> set(key: "_measurement", value: "downsampled")
 |> wideTo(
     bucket: "qxip",
     host: "https://eu-central-1-1.aws.cloud2.influxdata.com",
     token: "",
     orgID: "6a841c0c08328fb1"
    )

⭐ ClickHouse SQL
import "contrib/qxip/clickhouse"

clickhouse.query(
  url: "https://play@play.clickhouse.com",
  query: "SELECT database, total_rows FROM tables WHERE total_rows > 0"
)
|> rename(columns: {database: "_value", total_rows: "_data"})
|> keep(columns: ["_value","_data"])

image

image

⭐ LogQL
import "contrib/qxip/logql"

option logql.defaultURL = "http://qryn:3100"
logql.query_range(
     query: "rate({job=\"dummy-server\"}[5m])",
     start: v.timeRangeStart, 
     end: v.timeRangeStop
)
|> map(fn: (r) => ({r with _time: time(v: uint(v: r.timestamp_ns)), _value: float(v: r.value) }))
|> drop(columns: ["timestamp_ns", "value"])
|> sort(columns: ["_time"])
|> group(columns: ["labels"])

image

⭐ CURL POST

Usage with curl

curl -XPOST localhost:8086/api/v2/query -sS \
  -H 'Accept:application/csv' \
  -H 'Content-type:application/vnd.flux' \
  -d 'import g "generate" g.from(start: 2022-04-01T00:00:00Z, stop: 2022-04-01T00:03:00Z, count: 3, fn: (n) => n)'

Secrets

Flux builds using EnvironmentSecretService accessing system environment variables from flux scripts.

import "influxdata/influxdb/secrets"
key = secrets.get(key: "ENV_SECRET")

STDIN CMD

Fluxpipe can be used as a command-line tool and stdin pipeline processor

Generate CSV
echo 'import g "generate" g.from(start: 2022-04-01T00:00:00Z, stop: 2022-04-01T00:03:00Z, count: 5, fn: (n) => 1)' \
| ./fluxpipe-server -stdin
Parse CSV
cat scripts/csv.flux | ./fluxpipe-server -stdin
Query SQL
cat scripts/sql.flux | ./fluxpipe-server -stdin

Public Demo

Grafana Datasource

Configure your Grafana instance with our public demo endpoint (limited resources) image

Documentation

Flux(pipe) is built using the InfluxCommunity/flux fork which contains additional features and contributions.
All the standard and additional functions available in Fluxpipe are included in the Flux community documentation


Status

  • Fluxlib
    • parser
    • executor
  • Contribs
    • contrib/qxip/clickhouse
    • contrib/qxip/logql
    • contrib/qxip/hash
    • ENV secrets
  • STDIN pipeline
  • HTTP api
    • plaintext
    • json support
    • web playground

Footnotes

  1. Project is not affiliated or endorsed by Influxdata or Grafana Labs. All rights belong to their respective owners. 2

  2. Used under Apache2 terms. Project is not affiliated or endorsed by ClickHouse Inc. All rights belong to their respective owners.