Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding timeout to alert.post and http_post #1462

Merged
merged 3 commits into from
Oct 27, 2017
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

### Features

- [#1461](https://github.com/influxdata/kapacitor/issues/1461): alert.post and https_post timeouts needed.
- [#1413](https://github.com/influxdata/kapacitor/issues/1413): Add subscriptions modes to InfluxDB subscriptions.
- [#1436](https://github.com/influxdata/kapacitor/issues/1436): Add linear fill support for QueryNode.
- [#1345](https://github.com/influxdata/kapacitor/issues/1345): Add MQTT Alert Handler
Expand Down
1 change: 1 addition & 0 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, d NodeDiagnostic) (a
Endpoint: p.Endpoint,
Headers: p.Headers,
CaptureResponse: p.CaptureResponseFlag,
Timeout: p.Timeout,
}
h := et.tm.HTTPPostService.Handler(c, ctx...)
an.handlers = append(an.handlers, h)
Expand Down
33 changes: 22 additions & 11 deletions bufpool/bufpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,39 @@ package bufpool

import (
"bytes"
"io"
"sync"
)

type Pool struct {
p sync.Pool
p *sync.Pool
}

func New() *Pool {
syncPool := sync.Pool{}
syncPool.New = func() interface{} {
return &closingBuffer{
pool: &syncPool,
}
}

return &Pool{
p: sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
},
p: &syncPool,
}
}

func (p *Pool) Get() *bytes.Buffer {
return p.p.Get().(*bytes.Buffer)
func (p *Pool) Get() *closingBuffer {
return p.p.Get().(*closingBuffer)
}

type closingBuffer struct {
bytes.Buffer
io.Closer
pool *sync.Pool
}

func (p *Pool) Put(b *bytes.Buffer) {
b.Reset()
p.p.Put(b)
func (cb *closingBuffer) Close() error {
cb.Reset()
cb.pool.Put(cb)
return nil
}
21 changes: 17 additions & 4 deletions http_post.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"net/http"
"strconv"
"sync"
"time"

"context"
"github.com/influxdata/kapacitor/bufpool"
"github.com/influxdata/kapacitor/edge"
"github.com/influxdata/kapacitor/keyvalue"
Expand All @@ -23,15 +25,18 @@ type HTTPPostNode struct {
endpoint *httppost.Endpoint
mu sync.RWMutex
bp *bufpool.Pool
timeout time.Duration
hc *http.Client
}

// Create a new HTTPPostNode which submits received items via POST to an HTTP endpoint
func newHTTPPostNode(et *ExecutingTask, n *pipeline.HTTPPostNode, d NodeDiagnostic) (*HTTPPostNode, error) {

hn := &HTTPPostNode{
node: node{Node: n, et: et, diag: d},
c: n,
bp: bufpool.New(),
node: node{Node: n, et: et, diag: d},
c: n,
bp: bufpool.New(),
timeout: n.Timeout,
}

// Should only ever be 0 or 1 from validation of n
Expand Down Expand Up @@ -160,7 +165,6 @@ func (n *HTTPPostNode) doPost(row *models.Row) int {

func (n *HTTPPostNode) postRow(row *models.Row) (*http.Response, error) {
body := n.bp.Get()
defer n.bp.Put(body)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change can leak byte buffers from the pool. This is possible when there is an error writing to the buffer and then the early return. This isn't horrible as the pool will just create new buffers but it would be best if it were not possible.

Maybe we should just ditch the idea of a buffer pool here and use a new buffer each time. Then we can later address the GC pressure if its an issue. We should leave a comment though that using a buffer pool can cause a race when the request is canceled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, there's at least 3 places where the code may return without closing the buffer, it's too bad there's no way to cancel defers, that would make things a lot simpler


var contentType string
if n.endpoint.RowTemplate() != nil {
Expand All @@ -184,12 +188,21 @@ func (n *HTTPPostNode) postRow(row *models.Row) (*http.Response, error) {
return nil, errors.Wrap(err, "failed to marshal row data json")
}

// Set content type and other headers
if contentType != "" {
req.Header.Set("Content-Type", contentType)
}
for k, v := range n.c.Headers {
req.Header.Set(k, v)
}

// Set timeout
if n.timeout > 0 {
ctx, cancel := context.WithTimeout(req.Context(), n.timeout)
defer cancel()
req = req.WithContext(ctx)
}

resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
Expand Down
171 changes: 171 additions & 0 deletions integrations/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2966,6 +2966,177 @@ batch
testBatcherWithOutput(t, "TestBatch_HttpPost", script, 30*time.Second, er, false)
}

func TestBatch_HttpPost_Timeout(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
result := models.Result{}
dec := json.NewDecoder(r.Body)
err := dec.Decode(&result)
if err != nil {
t.Fatal(err)
}
time.Sleep(100 * time.Millisecond)
}))
defer ts.Close()

var script = `
batch
|query('''
SELECT mean("value")
FROM "telegraf"."default".cpu_usage_idle
WHERE "host" = 'serverA' AND "cpu" != 'cpu-total'
''')
.period(10s)
.every(10s)
.groupBy(time(2s), 'cpu')
|httpPost('` + ts.URL + `').timeout(1ms)
|httpOut('TestBatch_HttpPost_Timeout')
`

er := models.Result{
Series: models.Rows{
{
Name: "cpu_usage_idle",
Tags: map[string]string{"cpu": "cpu-total"},
Columns: []string{"time", "mean"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 20, 0, time.UTC),
91.06416290101595,
},
{
time.Date(1971, 1, 1, 0, 0, 22, 0, time.UTC),
85.9694442394385,
},
{
time.Date(1971, 1, 1, 0, 0, 24, 0, time.UTC),
90.62985736134186,
},
{
time.Date(1971, 1, 1, 0, 0, 26, 0, time.UTC),
86.45443196005628,
},
{
time.Date(1971, 1, 1, 0, 0, 28, 0, time.UTC),
88.97243107764031,
},
},
},
{
Name: "cpu_usage_idle",
Tags: map[string]string{"cpu": "cpu0"},
Columns: []string{"time", "mean"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 20, 0, time.UTC),
85.08910891088406,
},
{
time.Date(1971, 1, 1, 0, 0, 22, 0, time.UTC),
78.00000000002001,
},
{
time.Date(1971, 1, 1, 0, 0, 24, 0, time.UTC),
84.23607066586464,
},
{
time.Date(1971, 1, 1, 0, 0, 26, 0, time.UTC),
80.85858585861834,
},
{
time.Date(1971, 1, 1, 0, 0, 28, 0, time.UTC),
80.61224489791657,
},
},
},
{
Name: "cpu_usage_idle",
Tags: map[string]string{"cpu": "cpu1"},
Columns: []string{"time", "mean"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 20, 0, time.UTC),
96.49999999996908,
},
{
time.Date(1971, 1, 1, 0, 0, 22, 0, time.UTC),
93.46464646468584,
},
{
time.Date(1971, 1, 1, 0, 0, 24, 0, time.UTC),
95.00950095007724,
},
{
time.Date(1971, 1, 1, 0, 0, 26, 0, time.UTC),
92.99999999998636,
},
{
time.Date(1971, 1, 1, 0, 0, 28, 0, time.UTC),
90.99999999998545,
},
},
},
},
}

c := make(chan bool, 1)
go func() {
testBatcherWithOutput(t, "TestBatch_HttpPost_Timeout", script, 30*time.Second, er, false)
c <- true
}()
select {
case <-c:
case <-time.After(time.Second):
t.Fatal("Test timeout reached, httpPost().timeout() may not be functioning")
}
}

func TestBatch_AlertPost_Timeout(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ad := alert.Data{}
dec := json.NewDecoder(r.Body)
err := dec.Decode(&ad)
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Second)
}))
defer ts.Close()
var script = `
batch
|query('''
SELECT mean("value")
FROM "telegraf"."default".cpu_usage_idle
WHERE "host" = 'serverA' AND "cpu" != 'cpu-total'
''')
.period(10s)
.every(10s)
.groupBy(time(2s), 'cpu')
|alert()
.crit(lambda:"mean" > 90)
.stateChangesOnly()
.levelField('level')
.details('')
.post('` + ts.URL + `').timeout(1ms)
`

c := make(chan bool, 1)
go func() {
clock, et, replayErr, tm := testBatcher(t, "TestBatch_AlertPostTimeout", script)
defer tm.Close()

err := fastForwardTask(clock, et, replayErr, tm, 40*time.Second)
if err != nil {
t.Error(err)
}
c <- true
}()
select {
case <-c:
case <-time.After(time.Second):
t.Fatal("Test timeout reached, alert().post().timeout() may not be functioning")
}
}

// Helper test function for batcher
func testBatcher(t *testing.T, name, script string) (clock.Setter, *kapacitor.ExecutingTask, <-chan error, *kapacitor.TaskMaster) {
if testing.Verbose() {
Expand Down
4 changes: 4 additions & 0 deletions integrations/data/TestBatch_AlertPostTimeout.0.brpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"name":"cpu_usage_idle","tags":{"cpu":"cpu-total"},"points":[{"fields":{"mean":90.38281469458698},"time":"2015-10-30T17:14:12Z"},{"fields":{"mean":86.51447101892941},"time":"2015-10-30T17:14:14Z"},{"fields":{"mean":91.71877558217454},"time":"2015-10-30T17:14:16Z"},{"fields":{"mean":87.10524436107617},"time":"2015-10-30T17:14:18Z"},{"fields":{"mean":90.3900735196668},"time":"2015-10-30T17:14:20Z"}]}
{"name":"cpu_usage_idle","tags":{"cpu":"cpu-total"},"points":[{"fields":{"mean":90.8919959776013},"time":"2015-10-30T17:14:22Z"},{"fields":{"mean":86.54244306420236},"time":"2015-10-30T17:14:24Z"},{"fields":{"mean":91.01699558842134},"time":"2015-10-30T17:14:26Z"},{"fields":{"mean":85.66378399063848},"time":"2015-10-30T17:14:28Z"},{"fields":{"mean":89.90919811320221},"time":"2015-10-30T17:14:30Z"}]}
{"name":"cpu_usage_idle","tags":{"cpu":"cpu-total"},"points":[{"fields":{"mean":91.06416290101595},"time":"2015-10-30T17:14:32Z"},{"fields":{"mean":95.9694442394385},"time":"2015-10-30T17:14:34Z"},{"fields":{"mean":70.62985736134186},"time":"2015-10-30T17:14:36Z"},{"fields":{"mean":86.45443196005628},"time":"2015-10-30T17:14:38Z"},{"fields":{"mean":88.97243107764031},"time":"2015-10-30T17:14:40Z"}]}
{"name":"cpu_usage_idle","tags":{"cpu":"cpu-total"},"points":[{"fields":{"mean":71.06416290101595},"time":"2015-10-30T17:14:42Z"},{"fields":{"mean":85.9694442394385},"time":"2015-10-30T17:14:44Z"},{"fields":{"mean":70.62985736134186},"time":"2015-10-30T17:14:46Z"},{"fields":{"mean":86.45443196005628},"time":"2015-10-30T17:14:48Z"},{"fields":{"mean":88.97243107764031},"time":"2015-10-30T17:14:50Z"}]}
9 changes: 9 additions & 0 deletions integrations/data/TestBatch_HttpPost_Timeout.0.brpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{"name":"cpu_usage_idle","tags":{"cpu":"cpu-total"},"points":[{"fields":{"mean":90.38281469458698},"time":"2015-10-30T17:14:12Z"},{"fields":{"mean":86.51447101892941},"time":"2015-10-30T17:14:14Z"},{"fields":{"mean":91.71877558217454},"time":"2015-10-30T17:14:16Z"},{"fields":{"mean":87.10524436107617},"time":"2015-10-30T17:14:18Z"},{"fields":{"mean":90.3900735196668},"time":"2015-10-30T17:14:20Z"}]}
{"name":"cpu_usage_idle","tags":{"cpu":"cpu0"},"points":[{"fields":{"mean":83.56930693069836},"time":"2015-10-30T17:14:12Z"},{"fields":{"mean":79.12871287128638},"time":"2015-10-30T17:14:14Z"},{"fields":{"mean":88.99559823928229},"time":"2015-10-30T17:14:16Z"},{"fields":{"mean":85.50000000000182},"time":"2015-10-30T17:14:18Z"},{"fields":{"mean":86.02860286029956},"time":"2015-10-30T17:14:20Z"}]}
{"name":"cpu_usage_idle","tags":{"cpu":"cpu1"},"points":[{"fields":{"mean":93.49999999999409},"time":"2015-10-30T17:14:12Z"},{"fields":{"mean":91.44444444443974},"time":"2015-10-30T17:14:14Z"},{"fields":{"mean":93.44897959187637},"time":"2015-10-30T17:14:16Z"},{"fields":{"mean":95.99999999995998},"time":"2015-10-30T17:14:18Z"},{"fields":{"mean":97.00970097012197},"time":"2015-10-30T17:14:20Z"}]}
{"name":"cpu_usage_idle","tags":{"cpu":"cpu-total"},"points":[{"fields":{"mean":90.8919959776013},"time":"2015-10-30T17:14:22Z"},{"fields":{"mean":86.54244306420236},"time":"2015-10-30T17:14:24Z"},{"fields":{"mean":91.01699558842134},"time":"2015-10-30T17:14:26Z"},{"fields":{"mean":85.66378399063848},"time":"2015-10-30T17:14:28Z"},{"fields":{"mean":89.90919811320221},"time":"2015-10-30T17:14:30Z"}]}
{"name":"cpu_usage_idle","tags":{"cpu":"cpu0"},"points":[{"fields":{"mean":81.72501716191164},"time":"2015-10-30T17:14:22Z"},{"fields":{"mean":81.03810381037587},"time":"2015-10-30T17:14:24Z"},{"fields":{"mean":85.93434343435388},"time":"2015-10-30T17:14:26Z"},{"fields":{"mean":85.36734693878043},"time":"2015-10-30T17:14:28Z"},{"fields":{"mean":83.01320528210614},"time":"2015-10-30T17:14:30Z"}]}
{"name":"cpu_usage_idle","tags":{"cpu":"cpu1"},"points":[{"fields":{"mean":95.98484848485191},"time":"2015-10-30T17:14:22Z"},{"fields":{"mean":92.098039215696},"time":"2015-10-30T17:14:24Z"},{"fields":{"mean":92.99999999998363},"time":"2015-10-30T17:14:26Z"},{"fields":{"mean":86.54015887023496},"time":"2015-10-30T17:14:28Z"},{"fields":{"mean":95.48979591840603},"time":"2015-10-30T17:14:30Z"}]}
{"name":"cpu_usage_idle","tags":{"cpu":"cpu-total"},"points":[{"fields":{"mean":91.06416290101595},"time":"2015-10-30T17:14:32Z"},{"fields":{"mean":85.9694442394385},"time":"2015-10-30T17:14:34Z"},{"fields":{"mean":90.62985736134186},"time":"2015-10-30T17:14:36Z"},{"fields":{"mean":86.45443196005628},"time":"2015-10-30T17:14:38Z"},{"fields":{"mean":88.97243107764031},"time":"2015-10-30T17:14:40Z"}]}
{"name":"cpu_usage_idle","tags":{"cpu":"cpu0"},"points":[{"fields":{"mean":85.08910891088406},"time":"2015-10-30T17:14:32Z"},{"fields":{"mean":78.00000000002001},"time":"2015-10-30T17:14:34Z"},{"fields":{"mean":84.23607066586464},"time":"2015-10-30T17:14:36Z"},{"fields":{"mean":80.85858585861834},"time":"2015-10-30T17:14:38Z"},{"fields":{"mean":80.61224489791657},"time":"2015-10-30T17:14:40Z"}]}
{"name":"cpu_usage_idle","tags":{"cpu":"cpu1"},"points":[{"fields":{"mean":96.49999999996908},"time":"2015-10-30T17:14:32Z"},{"fields":{"mean":93.46464646468584},"time":"2015-10-30T17:14:34Z"},{"fields":{"mean":95.00950095007724},"time":"2015-10-30T17:14:36Z"},{"fields":{"mean":92.99999999998636},"time":"2015-10-30T17:14:38Z"},{"fields":{"mean":90.99999999998545},"time":"2015-10-30T17:14:40Z"}]}
3 changes: 3 additions & 0 deletions pipeline/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,9 @@ type AlertHTTPPostHandler struct {

// tick:ignore
CaptureResponseFlag bool `tick:"CaptureResponse"`

// Timeout for HTTP Post
Timeout time.Duration
}

// Set a header key and value on the post request.
Expand Down
4 changes: 4 additions & 0 deletions pipeline/http_post.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"strings"
"time"
)

// An HTTPPostNode will take the incoming data stream and POST it to an HTTP endpoint.
Expand Down Expand Up @@ -47,6 +48,9 @@ type HTTPPostNode struct {

// tick:ignore
URLs []string

// Timeout for HTTP Post
Timeout time.Duration
}

func newHTTPPostNode(wants EdgeType, urls ...string) *HTTPPostNode {
Expand Down
1 change: 1 addition & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8031,6 +8031,7 @@ func TestServer_ListServiceTests(t *testing.T) {
"endpoint": "example",
"url": "http://localhost:3000/",
"headers": map[string]interface{}{"Auth": "secret"},
"timeout": float64(0),
},
},
{
Expand Down
4 changes: 2 additions & 2 deletions services/alert/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func NewExecHandler(c ExecHandlerConfig, d HandlerDiagnostic) alert.Handler {

func (h *execHandler) Handle(event alert.Event) {
buf := h.bp.Get()
defer h.bp.Put(buf)
defer buf.Close()
ad := event.AlertData()

err := json.NewEncoder(buf).Encode(ad)
Expand Down Expand Up @@ -156,7 +156,7 @@ func NewTCPHandler(c TCPHandlerConfig, d HandlerDiagnostic) alert.Handler {

func (h *tcpHandler) Handle(event alert.Event) {
buf := h.bp.Get()
defer h.bp.Put(buf)
defer buf.Close()
ad := event.AlertData()

err := json.NewEncoder(buf).Encode(ad)
Expand Down
Loading