diff --git a/bufpool/bufpool.go b/bufpool/bufpool.go index 261af90038..e827ef7119 100644 --- a/bufpool/bufpool.go +++ b/bufpool/bufpool.go @@ -2,28 +2,39 @@ package bufpool import ( "bytes" + "io" "sync" ) type Pool struct { - p sync.Pool + p *sync.Pool } func New() *Pool { + syncPool := sync.Pool{} + syncPool.New = func() interface{} { + return &closingBuffer{ + pool: &syncPool, + } + } + return &Pool{ - p: sync.Pool{ - New: func() interface{} { - return new(bytes.Buffer) - }, - }, + p: &syncPool, } } -func (p *Pool) Get() *bytes.Buffer { - return p.p.Get().(*bytes.Buffer) +func (p *Pool) Get() *closingBuffer { + return p.p.Get().(*closingBuffer) +} + +type closingBuffer struct { + bytes.Buffer + io.Closer + pool *sync.Pool } -func (p *Pool) Put(b *bytes.Buffer) { - b.Reset() - p.p.Put(b) +func (cb *closingBuffer) Close() error { + cb.Reset() + cb.pool.Put(cb) + return nil } diff --git a/http_post.go b/http_post.go index 0a5f018a98..92883e2250 100644 --- a/http_post.go +++ b/http_post.go @@ -1,7 +1,6 @@ package kapacitor import ( - "context" "encoding/json" "fmt" "io/ioutil" @@ -10,6 +9,7 @@ import ( "sync" "time" + "context" "github.com/influxdata/kapacitor/bufpool" "github.com/influxdata/kapacitor/edge" "github.com/influxdata/kapacitor/keyvalue" @@ -26,6 +26,7 @@ type HTTPPostNode struct { mu sync.RWMutex bp *bufpool.Pool timeout time.Duration + hc *http.Client } // Create a new HTTPPostNode which submits received items via POST to an HTTP endpoint @@ -164,7 +165,6 @@ func (n *HTTPPostNode) doPost(row *models.Row) int { func (n *HTTPPostNode) postRow(row *models.Row) (*http.Response, error) { body := n.bp.Get() - defer n.bp.Put(body) var contentType string if n.endpoint.RowTemplate() != nil { diff --git a/services/alert/handlers.go b/services/alert/handlers.go index dc5ed92a7b..fe57c5871f 100644 --- a/services/alert/handlers.go +++ b/services/alert/handlers.go @@ -110,7 +110,7 @@ func NewExecHandler(c ExecHandlerConfig, d HandlerDiagnostic) alert.Handler { func (h *execHandler) Handle(event alert.Event) { buf := h.bp.Get() - defer h.bp.Put(buf) + defer buf.Close() ad := event.AlertData() err := json.NewEncoder(buf).Encode(ad) @@ -156,7 +156,7 @@ func NewTCPHandler(c TCPHandlerConfig, d HandlerDiagnostic) alert.Handler { func (h *tcpHandler) Handle(event alert.Event) { buf := h.bp.Get() - defer h.bp.Put(buf) + defer buf.Close() ad := event.AlertData() err := json.NewEncoder(buf).Encode(ad) diff --git a/services/httppost/service.go b/services/httppost/service.go index 57060dbea1..db80f6403d 100644 --- a/services/httppost/service.go +++ b/services/httppost/service.go @@ -2,7 +2,6 @@ package httppost import ( "bytes" - "context" "encoding/json" "fmt" "io" @@ -14,6 +13,7 @@ import ( "time" + "context" "github.com/influxdata/kapacitor/alert" "github.com/influxdata/kapacitor/bufpool" "github.com/influxdata/kapacitor/keyvalue" @@ -253,6 +253,8 @@ type handler struct { diag Diagnostic timeout time.Duration + + hc *http.Client } func (s *Service) Handler(c HandlerConfig, ctx ...keyvalue.T) alert.Handler { @@ -289,7 +291,6 @@ func (h *handler) Handle(event alert.Event) { // Construct the body of the HTTP request body := h.bp.Get() - defer h.bp.Put(body) ad := event.AlertData() var contentType string