Skip to content

Commit

Permalink
Adding timeout to alert.post and http_post
Browse files Browse the repository at this point in the history
Adding timeout to alert.post and http_post nodes.  Currently
http.DefaultClient is used, which blocks indefinitely when the endpoint
accepts the connection but does not respond to the HTTP request.
Switching to using a non-default http.Client with a timeout defined per
node.
  • Loading branch information
sputnik13 committed Sep 23, 2017
1 parent eacb373 commit 0416cef
Show file tree
Hide file tree
Showing 10 changed files with 225 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

### Features

- [#1461](https://github.com/influxdata/kapacitor/issues/1461): alert.post and https_post timeouts needed.
- [#1413](https://github.com/influxdata/kapacitor/issues/1413): Add subscriptions modes to InfluxDB subscriptions.
- [#1436](https://github.com/influxdata/kapacitor/issues/1436): Add linear fill support for QueryNode.
- [#1345](https://github.com/influxdata/kapacitor/issues/1345): Add MQTT Alert Handler
Expand Down
1 change: 1 addition & 0 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, d NodeDiagnostic) (a
Endpoint: p.Endpoint,
Headers: p.Headers,
CaptureResponse: p.CaptureResponseFlag,
Timeout: p.Timeout,
}
h := et.tm.HTTPPostService.Handler(c, ctx...)
an.handlers = append(an.handlers, h)
Expand Down
19 changes: 16 additions & 3 deletions http_post.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package kapacitor

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strconv"
"sync"
"time"

"github.com/influxdata/kapacitor/bufpool"
"github.com/influxdata/kapacitor/edge"
Expand All @@ -23,15 +25,17 @@ type HTTPPostNode struct {
endpoint *httppost.Endpoint
mu sync.RWMutex
bp *bufpool.Pool
timeout time.Duration
}

// Create a new HTTPPostNode which submits received items via POST to an HTTP endpoint
func newHTTPPostNode(et *ExecutingTask, n *pipeline.HTTPPostNode, d NodeDiagnostic) (*HTTPPostNode, error) {

hn := &HTTPPostNode{
node: node{Node: n, et: et, diag: d},
c: n,
bp: bufpool.New(),
node: node{Node: n, et: et, diag: d},
c: n,
bp: bufpool.New(),
timeout: n.Timeout,
}

// Should only ever be 0 or 1 from validation of n
Expand Down Expand Up @@ -184,12 +188,21 @@ func (n *HTTPPostNode) postRow(row *models.Row) (*http.Response, error) {
return nil, errors.Wrap(err, "failed to marshal row data json")
}

// Set content type and other headers
if contentType != "" {
req.Header.Set("Content-Type", contentType)
}
for k, v := range n.c.Headers {
req.Header.Set(k, v)
}

// Set timeout
if n.timeout > 0 {
ctx, cancel := context.WithTimeout(req.Context(), n.timeout)
defer cancel()
req = req.WithContext(ctx)
}

resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
Expand Down
171 changes: 171 additions & 0 deletions integrations/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2966,6 +2966,177 @@ batch
testBatcherWithOutput(t, "TestBatch_HttpPost", script, 30*time.Second, er, false)
}

func TestBatch_HttpPost_Timeout(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
result := models.Result{}
dec := json.NewDecoder(r.Body)
err := dec.Decode(&result)
if err != nil {
t.Fatal(err)
}
time.Sleep(100 * time.Millisecond)
}))
defer ts.Close()

var script = `
batch
|query('''
SELECT mean("value")
FROM "telegraf"."default".cpu_usage_idle
WHERE "host" = 'serverA' AND "cpu" != 'cpu-total'
''')
.period(10s)
.every(10s)
.groupBy(time(2s), 'cpu')
|httpPost('` + ts.URL + `').timeout(1ms)
|httpOut('TestBatch_HttpPost_Timeout')
`

er := models.Result{
Series: models.Rows{
{
Name: "cpu_usage_idle",
Tags: map[string]string{"cpu": "cpu-total"},
Columns: []string{"time", "mean"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 20, 0, time.UTC),
91.06416290101595,
},
{
time.Date(1971, 1, 1, 0, 0, 22, 0, time.UTC),
85.9694442394385,
},
{
time.Date(1971, 1, 1, 0, 0, 24, 0, time.UTC),
90.62985736134186,
},
{
time.Date(1971, 1, 1, 0, 0, 26, 0, time.UTC),
86.45443196005628,
},
{
time.Date(1971, 1, 1, 0, 0, 28, 0, time.UTC),
88.97243107764031,
},
},
},
{
Name: "cpu_usage_idle",
Tags: map[string]string{"cpu": "cpu0"},
Columns: []string{"time", "mean"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 20, 0, time.UTC),
85.08910891088406,
},
{
time.Date(1971, 1, 1, 0, 0, 22, 0, time.UTC),
78.00000000002001,
},
{
time.Date(1971, 1, 1, 0, 0, 24, 0, time.UTC),
84.23607066586464,
},
{
time.Date(1971, 1, 1, 0, 0, 26, 0, time.UTC),
80.85858585861834,
},
{
time.Date(1971, 1, 1, 0, 0, 28, 0, time.UTC),
80.61224489791657,
},
},
},
{
Name: "cpu_usage_idle",
Tags: map[string]string{"cpu": "cpu1"},
Columns: []string{"time", "mean"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 20, 0, time.UTC),
96.49999999996908,
},
{
time.Date(1971, 1, 1, 0, 0, 22, 0, time.UTC),
93.46464646468584,
},
{
time.Date(1971, 1, 1, 0, 0, 24, 0, time.UTC),
95.00950095007724,
},
{
time.Date(1971, 1, 1, 0, 0, 26, 0, time.UTC),
92.99999999998636,
},
{
time.Date(1971, 1, 1, 0, 0, 28, 0, time.UTC),
90.99999999998545,
},
},
},
},
}

c := make(chan bool, 1)
go func() {
testBatcherWithOutput(t, "TestBatch_HttpPost_Timeout", script, 30*time.Second, er, false)
c <- true
}()
select {
case <-c:
case <-time.After(time.Second):
t.Fatal("Test timeout reached, httpPost().timeout() may not be functioning")
}
}

func TestBatch_AlertPost_Timeout(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ad := alert.Data{}
dec := json.NewDecoder(r.Body)
err := dec.Decode(&ad)
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Second)
}))
defer ts.Close()
var script = `
batch
|query('''
SELECT mean("value")
FROM "telegraf"."default".cpu_usage_idle
WHERE "host" = 'serverA' AND "cpu" != 'cpu-total'
''')
.period(10s)
.every(10s)
.groupBy(time(2s), 'cpu')
|alert()
.crit(lambda:"mean" > 90)
.stateChangesOnly()
.levelField('level')
.details('')
.post('` + ts.URL + `').timeout(1ms)
`

c := make(chan bool, 1)
go func() {
clock, et, replayErr, tm := testBatcher(t, "TestBatch_AlertPostTimeout", script)
defer tm.Close()

err := fastForwardTask(clock, et, replayErr, tm, 40*time.Second)
if err != nil {
t.Error(err)
}
c <- true
}()
select {
case <-c:
case <-time.After(time.Second):
t.Fatal("Test timeout reached, alert().post().timeout() may not be functioning")
}
}

// Helper test function for batcher
func testBatcher(t *testing.T, name, script string) (clock.Setter, *kapacitor.ExecutingTask, <-chan error, *kapacitor.TaskMaster) {
if testing.Verbose() {
Expand Down
4 changes: 4 additions & 0 deletions integrations/data/TestBatch_AlertPostTimeout.0.brpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"name":"cpu_usage_idle","tags":{"cpu":"cpu-total"},"points":[{"fields":{"mean":90.38281469458698},"time":"2015-10-30T17:14:12Z"},{"fields":{"mean":86.51447101892941},"time":"2015-10-30T17:14:14Z"},{"fields":{"mean":91.71877558217454},"time":"2015-10-30T17:14:16Z"},{"fields":{"mean":87.10524436107617},"time":"2015-10-30T17:14:18Z"},{"fields":{"mean":90.3900735196668},"time":"2015-10-30T17:14:20Z"}]}
{"name":"cpu_usage_idle","tags":{"cpu":"cpu-total"},"points":[{"fields":{"mean":90.8919959776013},"time":"2015-10-30T17:14:22Z"},{"fields":{"mean":86.54244306420236},"time":"2015-10-30T17:14:24Z"},{"fields":{"mean":91.01699558842134},"time":"2015-10-30T17:14:26Z"},{"fields":{"mean":85.66378399063848},"time":"2015-10-30T17:14:28Z"},{"fields":{"mean":89.90919811320221},"time":"2015-10-30T17:14:30Z"}]}
{"name":"cpu_usage_idle","tags":{"cpu":"cpu-total"},"points":[{"fields":{"mean":91.06416290101595},"time":"2015-10-30T17:14:32Z"},{"fields":{"mean":95.9694442394385},"time":"2015-10-30T17:14:34Z"},{"fields":{"mean":70.62985736134186},"time":"2015-10-30T17:14:36Z"},{"fields":{"mean":86.45443196005628},"time":"2015-10-30T17:14:38Z"},{"fields":{"mean":88.97243107764031},"time":"2015-10-30T17:14:40Z"}]}
{"name":"cpu_usage_idle","tags":{"cpu":"cpu-total"},"points":[{"fields":{"mean":71.06416290101595},"time":"2015-10-30T17:14:42Z"},{"fields":{"mean":85.9694442394385},"time":"2015-10-30T17:14:44Z"},{"fields":{"mean":70.62985736134186},"time":"2015-10-30T17:14:46Z"},{"fields":{"mean":86.45443196005628},"time":"2015-10-30T17:14:48Z"},{"fields":{"mean":88.97243107764031},"time":"2015-10-30T17:14:50Z"}]}
9 changes: 9 additions & 0 deletions integrations/data/TestBatch_HttpPost_Timeout.0.brpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{"name":"cpu_usage_idle","tags":{"cpu":"cpu-total"},"points":[{"fields":{"mean":90.38281469458698},"time":"2015-10-30T17:14:12Z"},{"fields":{"mean":86.51447101892941},"time":"2015-10-30T17:14:14Z"},{"fields":{"mean":91.71877558217454},"time":"2015-10-30T17:14:16Z"},{"fields":{"mean":87.10524436107617},"time":"2015-10-30T17:14:18Z"},{"fields":{"mean":90.3900735196668},"time":"2015-10-30T17:14:20Z"}]}
{"name":"cpu_usage_idle","tags":{"cpu":"cpu0"},"points":[{"fields":{"mean":83.56930693069836},"time":"2015-10-30T17:14:12Z"},{"fields":{"mean":79.12871287128638},"time":"2015-10-30T17:14:14Z"},{"fields":{"mean":88.99559823928229},"time":"2015-10-30T17:14:16Z"},{"fields":{"mean":85.50000000000182},"time":"2015-10-30T17:14:18Z"},{"fields":{"mean":86.02860286029956},"time":"2015-10-30T17:14:20Z"}]}
{"name":"cpu_usage_idle","tags":{"cpu":"cpu1"},"points":[{"fields":{"mean":93.49999999999409},"time":"2015-10-30T17:14:12Z"},{"fields":{"mean":91.44444444443974},"time":"2015-10-30T17:14:14Z"},{"fields":{"mean":93.44897959187637},"time":"2015-10-30T17:14:16Z"},{"fields":{"mean":95.99999999995998},"time":"2015-10-30T17:14:18Z"},{"fields":{"mean":97.00970097012197},"time":"2015-10-30T17:14:20Z"}]}
{"name":"cpu_usage_idle","tags":{"cpu":"cpu-total"},"points":[{"fields":{"mean":90.8919959776013},"time":"2015-10-30T17:14:22Z"},{"fields":{"mean":86.54244306420236},"time":"2015-10-30T17:14:24Z"},{"fields":{"mean":91.01699558842134},"time":"2015-10-30T17:14:26Z"},{"fields":{"mean":85.66378399063848},"time":"2015-10-30T17:14:28Z"},{"fields":{"mean":89.90919811320221},"time":"2015-10-30T17:14:30Z"}]}
{"name":"cpu_usage_idle","tags":{"cpu":"cpu0"},"points":[{"fields":{"mean":81.72501716191164},"time":"2015-10-30T17:14:22Z"},{"fields":{"mean":81.03810381037587},"time":"2015-10-30T17:14:24Z"},{"fields":{"mean":85.93434343435388},"time":"2015-10-30T17:14:26Z"},{"fields":{"mean":85.36734693878043},"time":"2015-10-30T17:14:28Z"},{"fields":{"mean":83.01320528210614},"time":"2015-10-30T17:14:30Z"}]}
{"name":"cpu_usage_idle","tags":{"cpu":"cpu1"},"points":[{"fields":{"mean":95.98484848485191},"time":"2015-10-30T17:14:22Z"},{"fields":{"mean":92.098039215696},"time":"2015-10-30T17:14:24Z"},{"fields":{"mean":92.99999999998363},"time":"2015-10-30T17:14:26Z"},{"fields":{"mean":86.54015887023496},"time":"2015-10-30T17:14:28Z"},{"fields":{"mean":95.48979591840603},"time":"2015-10-30T17:14:30Z"}]}
{"name":"cpu_usage_idle","tags":{"cpu":"cpu-total"},"points":[{"fields":{"mean":91.06416290101595},"time":"2015-10-30T17:14:32Z"},{"fields":{"mean":85.9694442394385},"time":"2015-10-30T17:14:34Z"},{"fields":{"mean":90.62985736134186},"time":"2015-10-30T17:14:36Z"},{"fields":{"mean":86.45443196005628},"time":"2015-10-30T17:14:38Z"},{"fields":{"mean":88.97243107764031},"time":"2015-10-30T17:14:40Z"}]}
{"name":"cpu_usage_idle","tags":{"cpu":"cpu0"},"points":[{"fields":{"mean":85.08910891088406},"time":"2015-10-30T17:14:32Z"},{"fields":{"mean":78.00000000002001},"time":"2015-10-30T17:14:34Z"},{"fields":{"mean":84.23607066586464},"time":"2015-10-30T17:14:36Z"},{"fields":{"mean":80.85858585861834},"time":"2015-10-30T17:14:38Z"},{"fields":{"mean":80.61224489791657},"time":"2015-10-30T17:14:40Z"}]}
{"name":"cpu_usage_idle","tags":{"cpu":"cpu1"},"points":[{"fields":{"mean":96.49999999996908},"time":"2015-10-30T17:14:32Z"},{"fields":{"mean":93.46464646468584},"time":"2015-10-30T17:14:34Z"},{"fields":{"mean":95.00950095007724},"time":"2015-10-30T17:14:36Z"},{"fields":{"mean":92.99999999998636},"time":"2015-10-30T17:14:38Z"},{"fields":{"mean":90.99999999998545},"time":"2015-10-30T17:14:40Z"}]}
3 changes: 3 additions & 0 deletions pipeline/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,9 @@ type AlertHTTPPostHandler struct {

// tick:ignore
CaptureResponseFlag bool `tick:"CaptureResponse"`

// Timeout for HTTP Post
Timeout time.Duration
}

// Set a header key and value on the post request.
Expand Down
4 changes: 4 additions & 0 deletions pipeline/http_post.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"strings"
"time"
)

// An HTTPPostNode will take the incoming data stream and POST it to an HTTP endpoint.
Expand Down Expand Up @@ -47,6 +48,9 @@ type HTTPPostNode struct {

// tick:ignore
URLs []string

// Timeout for HTTP Post
Timeout time.Duration
}

func newHTTPPostNode(wants EdgeType, urls ...string) *HTTPPostNode {
Expand Down
1 change: 1 addition & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7437,6 +7437,7 @@ func TestServer_ListServiceTests(t *testing.T) {
"endpoint": "example",
"url": "http://localhost:3000/",
"headers": map[string]interface{}{"Auth": "secret"},
"timeout": float64(0),
},
},
{
Expand Down
15 changes: 15 additions & 0 deletions services/httppost/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package httppost

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
Expand All @@ -11,6 +12,8 @@ import (
"sync"
"text/template"

"time"

"github.com/influxdata/kapacitor/alert"
"github.com/influxdata/kapacitor/bufpool"
"github.com/influxdata/kapacitor/keyvalue"
Expand Down Expand Up @@ -185,6 +188,7 @@ type testOptions struct {
Endpoint string `json:"endpoint"`
URL string `json:"url"`
Headers map[string]string `json:"headers"`
Timeout time.Duration `json:"timeout"`
}

func (s *Service) TestOptions() interface{} {
Expand Down Expand Up @@ -234,6 +238,7 @@ type HandlerConfig struct {
Endpoint string `mapstructure:"endpoint"`
Headers map[string]string `mapstructure:"headers"`
CaptureResponse bool `mapstructure:"capture-response"`
Timeout time.Duration `mapstructure:"timeout"`
}

type handler struct {
Expand All @@ -246,6 +251,8 @@ type handler struct {
captureResponse bool

diag Diagnostic

timeout time.Duration
}

func (s *Service) Handler(c HandlerConfig, ctx ...keyvalue.T) alert.Handler {
Expand All @@ -260,6 +267,7 @@ func (s *Service) Handler(c HandlerConfig, ctx ...keyvalue.T) alert.Handler {
diag: s.diag.WithContext(ctx...),
headers: c.Headers,
captureResponse: c.CaptureResponse,
timeout: c.Timeout,
}
}

Expand Down Expand Up @@ -310,6 +318,13 @@ func (h *handler) Handle(event alert.Event) {
req.Header.Set("Content-Type", contentType)
}

// Set timeout
if h.timeout > 0 {
ctx, cancel := context.WithTimeout(req.Context(), h.timeout)
defer cancel()
req = req.WithContext(ctx)
}

// Execute the request
resp, err := http.DefaultClient.Do(req)
if err != nil {
Expand Down

0 comments on commit 0416cef

Please sign in to comment.