Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow influxdb_listener to keep the database name from the query string if supplied #6257

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions plugins/inputs/influxdb_listener/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ submits data to InfluxDB determines the destination database.
tls_cert = "/etc/telegraf/cert.pem"
tls_key = "/etc/telegraf/key.pem"

## Optional tag name used to store the database name.
## If the write has a database in the query string then it will be kept in this tag name.
## This tag can be used in downstream outputs.
## The default value of nothing means it will be off and the database will not be recorded.
# database_tag = ""

## Optional username and password to accept for HTTP basic authentication.
## You probably want to make sure you have TLS configured above for this.
# basic_username = "foobar"
Expand Down
43 changes: 31 additions & 12 deletions plugins/inputs/influxdb_listener/http_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,18 @@ const (
type TimeFunc func() time.Time

type HTTPListener struct {
ServiceAddress string
ReadTimeout internal.Duration
WriteTimeout internal.Duration
MaxBodySize internal.Size
MaxLineSize internal.Size
Port int

ServiceAddress string `toml:"service_address"`
// Port gets pulled out of ServiceAddress
Port int
tlsint.ServerConfig

BasicUsername string
BasicPassword string
ReadTimeout internal.Duration `toml:"read_timeout"`
WriteTimeout internal.Duration `toml:"write_timeout"`
MaxBodySize internal.Size `toml:"max_body_size"`
MaxLineSize internal.Size `toml:"max_line_size"`
BasicUsername string `toml:"basic_username"`
BasicPassword string `toml:"basic_password"`
DatabaseTag string `toml:"database_tag"`

TimeFunc

Expand Down Expand Up @@ -93,6 +94,13 @@ const sampleConfig = `
## Maximum line size allowed to be sent in bytes.
## 0 means to use the default of 65536 bytes (64 kibibytes)
max_line_size = "64KiB"


## Optional tag name used to store the database.
## If the write has a database in the query string then it will be kept in this tag name.
## This tag can be used in downstream outputs.
## The default value of nothing means it will be off and the database will not be recorded.
# database_tag = ""

## Set one or more allowed client CA certificate file names to
## enable mutually authenticated TLS connections
Expand Down Expand Up @@ -258,6 +266,7 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) {
now := h.TimeFunc()

precision := req.URL.Query().Get("precision")
db := req.URL.Query().Get("db")

// Handle gzip request bodies
body := req.Body
Expand Down Expand Up @@ -315,7 +324,7 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) {

if err == io.ErrUnexpectedEOF {
// finished reading the request body
err = h.parse(buf[:n+bufStart], now, precision)
err = h.parse(buf[:n+bufStart], now, precision, db)
if err != nil {
log.Println("D! "+err.Error(), bufStart+n)
return400 = true
Expand Down Expand Up @@ -346,7 +355,7 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) {
bufStart = 0
continue
}
if err := h.parse(buf[:i+1], now, precision); err != nil {
if err := h.parse(buf[:i+1], now, precision, db); err != nil {
log.Println("D! " + err.Error())
return400 = true
}
Expand All @@ -359,7 +368,7 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) {
}
}

func (h *HTTPListener) parse(b []byte, t time.Time, precision string) error {
func (h *HTTPListener) parse(b []byte, t time.Time, precision, db string) error {
h.mu.Lock()
defer h.mu.Unlock()

Expand All @@ -371,6 +380,16 @@ func (h *HTTPListener) parse(b []byte, t time.Time, precision string) error {
}

for _, m := range metrics {
// Do we need to keep the database name in the query string
if h.DatabaseTag != "" {
// Did we get a database argument. If we didn't get it. We can't set it.
if db != "" {
// Is there already a database set. If not use the database in the query string.
if _, ok := m.Tags()[h.DatabaseTag]; !ok {
morfien101 marked this conversation as resolved.
Show resolved Hide resolved
morfien101 marked this conversation as resolved.
Show resolved Hide resolved
m.AddTag(h.DatabaseTag, db)
}
}
}
h.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}

Expand Down
19 changes: 4 additions & 15 deletions plugins/inputs/influxdb_listener/http_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,9 @@ func TestWriteHTTPBasicAuth(t *testing.T) {
require.EqualValues(t, http.StatusNoContent, resp.StatusCode)
}

func TestWriteHTTP(t *testing.T) {
func TestWriteHTTPKeepDatabase(t *testing.T) {
listener := newTestHTTPListener()
listener.DatabaseTag = "database"

acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc))
Expand All @@ -162,7 +163,7 @@ func TestWriteHTTP(t *testing.T) {
acc.Wait(1)
acc.AssertContainsTaggedFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(12)},
map[string]string{"host": "server01"},
map[string]string{"host": "server01", "database": "mydb"},
)

// post multiple message to listener
Expand All @@ -177,21 +178,9 @@ func TestWriteHTTP(t *testing.T) {
for _, hostTag := range hostTags {
acc.AssertContainsTaggedFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(12)},
map[string]string{"host": hostTag},
map[string]string{"host": hostTag, "database": "mydb"},
)
}

// Post a gigantic metric to the listener and verify that an error is returned:
resp, err = http.Post(createURL(listener, "http", "/write", "db=mydb"), "", bytes.NewBuffer([]byte(hugeMetric)))
require.NoError(t, err)
resp.Body.Close()
require.EqualValues(t, 400, resp.StatusCode)

acc.Wait(3)
acc.AssertContainsTaggedFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(12)},
map[string]string{"host": "server01"},
)
}

// http listener should add a newline at the end of the buffer if it's not there
Expand Down