From ae7552811ad9e73ffb785f6f76f4c212956c07cc Mon Sep 17 00:00:00 2001 From: Min Pae Date: Sat, 23 Sep 2017 16:48:30 -0700 Subject: [PATCH] modifying bufpool to provide closingBuffer bufpool provided buffers result in race condition when used with a buffer user that expects to take ownership of the buffer (like http client appears to). Returning an extended version of Buffer that allows it to be closed, where the closing the buffer performs a Reset() on the buffer and returns it to the pool it came from. --- bufpool/bufpool.go | 33 ++++++++++++++++++++++----------- http_post.go | 4 ++-- services/alert/handlers.go | 4 ++-- services/httppost/service.go | 5 +++-- 4 files changed, 29 insertions(+), 17 deletions(-) diff --git a/bufpool/bufpool.go b/bufpool/bufpool.go index 261af90038..e827ef7119 100644 --- a/bufpool/bufpool.go +++ b/bufpool/bufpool.go @@ -2,28 +2,39 @@ package bufpool import ( "bytes" + "io" "sync" ) type Pool struct { - p sync.Pool + p *sync.Pool } func New() *Pool { + syncPool := sync.Pool{} + syncPool.New = func() interface{} { + return &closingBuffer{ + pool: &syncPool, + } + } + return &Pool{ - p: sync.Pool{ - New: func() interface{} { - return new(bytes.Buffer) - }, - }, + p: &syncPool, } } -func (p *Pool) Get() *bytes.Buffer { - return p.p.Get().(*bytes.Buffer) +func (p *Pool) Get() *closingBuffer { + return p.p.Get().(*closingBuffer) +} + +type closingBuffer struct { + bytes.Buffer + io.Closer + pool *sync.Pool } -func (p *Pool) Put(b *bytes.Buffer) { - b.Reset() - p.p.Put(b) +func (cb *closingBuffer) Close() error { + cb.Reset() + cb.pool.Put(cb) + return nil } diff --git a/http_post.go b/http_post.go index 0a5f018a98..92883e2250 100644 --- a/http_post.go +++ b/http_post.go @@ -1,7 +1,6 @@ package kapacitor import ( - "context" "encoding/json" "fmt" "io/ioutil" @@ -10,6 +9,7 @@ import ( "sync" "time" + "context" "github.com/influxdata/kapacitor/bufpool" "github.com/influxdata/kapacitor/edge" "github.com/influxdata/kapacitor/keyvalue" @@ -26,6 +26,7 @@ type HTTPPostNode struct { mu sync.RWMutex bp *bufpool.Pool timeout time.Duration + hc *http.Client } // Create a new HTTPPostNode which submits received items via POST to an HTTP endpoint @@ -164,7 +165,6 @@ func (n *HTTPPostNode) doPost(row *models.Row) int { func (n *HTTPPostNode) postRow(row *models.Row) (*http.Response, error) { body := n.bp.Get() - defer n.bp.Put(body) var contentType string if n.endpoint.RowTemplate() != nil { diff --git a/services/alert/handlers.go b/services/alert/handlers.go index dc5ed92a7b..fe57c5871f 100644 --- a/services/alert/handlers.go +++ b/services/alert/handlers.go @@ -110,7 +110,7 @@ func NewExecHandler(c ExecHandlerConfig, d HandlerDiagnostic) alert.Handler { func (h *execHandler) Handle(event alert.Event) { buf := h.bp.Get() - defer h.bp.Put(buf) + defer buf.Close() ad := event.AlertData() err := json.NewEncoder(buf).Encode(ad) @@ -156,7 +156,7 @@ func NewTCPHandler(c TCPHandlerConfig, d HandlerDiagnostic) alert.Handler { func (h *tcpHandler) Handle(event alert.Event) { buf := h.bp.Get() - defer h.bp.Put(buf) + defer buf.Close() ad := event.AlertData() err := json.NewEncoder(buf).Encode(ad) diff --git a/services/httppost/service.go b/services/httppost/service.go index 57060dbea1..db80f6403d 100644 --- a/services/httppost/service.go +++ b/services/httppost/service.go @@ -2,7 +2,6 @@ package httppost import ( "bytes" - "context" "encoding/json" "fmt" "io" @@ -14,6 +13,7 @@ import ( "time" + "context" "github.com/influxdata/kapacitor/alert" "github.com/influxdata/kapacitor/bufpool" "github.com/influxdata/kapacitor/keyvalue" @@ -253,6 +253,8 @@ type handler struct { diag Diagnostic timeout time.Duration + + hc *http.Client } func (s *Service) Handler(c HandlerConfig, ctx ...keyvalue.T) alert.Handler { @@ -289,7 +291,6 @@ func (h *handler) Handle(event alert.Event) { // Construct the body of the HTTP request body := h.bp.Get() - defer h.bp.Put(body) ad := event.AlertData() var contentType string