From 598fde642c5a947c6aee16dc620c48bae2973629 Mon Sep 17 00:00:00 2001 From: Min Pae Date: Thu, 26 Oct 2017 12:25:14 -0700 Subject: [PATCH] reverting bufpool --- bufpool/bufpool.go | 33 +++++++++++---------------------- http_post.go | 6 ++---- services/alert/handlers.go | 4 ++-- services/httppost/service.go | 7 ++----- 4 files changed, 17 insertions(+), 33 deletions(-) diff --git a/bufpool/bufpool.go b/bufpool/bufpool.go index e827ef711..261af9003 100644 --- a/bufpool/bufpool.go +++ b/bufpool/bufpool.go @@ -2,39 +2,28 @@ package bufpool import ( "bytes" - "io" "sync" ) type Pool struct { - p *sync.Pool + p sync.Pool } func New() *Pool { - syncPool := sync.Pool{} - syncPool.New = func() interface{} { - return &closingBuffer{ - pool: &syncPool, - } - } - return &Pool{ - p: &syncPool, + p: sync.Pool{ + New: func() interface{} { + return new(bytes.Buffer) + }, + }, } } -func (p *Pool) Get() *closingBuffer { - return p.p.Get().(*closingBuffer) -} - -type closingBuffer struct { - bytes.Buffer - io.Closer - pool *sync.Pool +func (p *Pool) Get() *bytes.Buffer { + return p.p.Get().(*bytes.Buffer) } -func (cb *closingBuffer) Close() error { - cb.Reset() - cb.pool.Put(cb) - return nil +func (p *Pool) Put(b *bytes.Buffer) { + b.Reset() + p.p.Put(b) } diff --git a/http_post.go b/http_post.go index 92883e225..5ad0d2c9b 100644 --- a/http_post.go +++ b/http_post.go @@ -9,8 +9,8 @@ import ( "sync" "time" + "bytes" "context" - "github.com/influxdata/kapacitor/bufpool" "github.com/influxdata/kapacitor/edge" "github.com/influxdata/kapacitor/keyvalue" "github.com/influxdata/kapacitor/models" @@ -24,7 +24,6 @@ type HTTPPostNode struct { c *pipeline.HTTPPostNode endpoint *httppost.Endpoint mu sync.RWMutex - bp *bufpool.Pool timeout time.Duration hc *http.Client } @@ -35,7 +34,6 @@ func newHTTPPostNode(et *ExecutingTask, n *pipeline.HTTPPostNode, d NodeDiagnost hn := &HTTPPostNode{ node: node{Node: n, et: et, diag: d}, c: n, - bp: bufpool.New(), timeout: n.Timeout, } @@ -164,7 +162,7 @@ func (n *HTTPPostNode) doPost(row *models.Row) int { } func (n *HTTPPostNode) postRow(row *models.Row) (*http.Response, error) { - body := n.bp.Get() + body := new(bytes.Buffer) var contentType string if n.endpoint.RowTemplate() != nil { diff --git a/services/alert/handlers.go b/services/alert/handlers.go index fe57c5871..dc5ed92a7 100644 --- a/services/alert/handlers.go +++ b/services/alert/handlers.go @@ -110,7 +110,7 @@ func NewExecHandler(c ExecHandlerConfig, d HandlerDiagnostic) alert.Handler { func (h *execHandler) Handle(event alert.Event) { buf := h.bp.Get() - defer buf.Close() + defer h.bp.Put(buf) ad := event.AlertData() err := json.NewEncoder(buf).Encode(ad) @@ -156,7 +156,7 @@ func NewTCPHandler(c TCPHandlerConfig, d HandlerDiagnostic) alert.Handler { func (h *tcpHandler) Handle(event alert.Event) { buf := h.bp.Get() - defer buf.Close() + defer h.bp.Put(buf) ad := event.AlertData() err := json.NewEncoder(buf).Encode(ad) diff --git a/services/httppost/service.go b/services/httppost/service.go index db80f6403..87edf3184 100644 --- a/services/httppost/service.go +++ b/services/httppost/service.go @@ -15,7 +15,6 @@ import ( "context" "github.com/influxdata/kapacitor/alert" - "github.com/influxdata/kapacitor/bufpool" "github.com/influxdata/kapacitor/keyvalue" "github.com/pkg/errors" ) @@ -242,8 +241,7 @@ type HandlerConfig struct { } type handler struct { - s *Service - bp *bufpool.Pool + s *Service endpoint *Endpoint headers map[string]string @@ -264,7 +262,6 @@ func (s *Service) Handler(c HandlerConfig, ctx ...keyvalue.T) alert.Handler { } return &handler{ s: s, - bp: bufpool.New(), endpoint: e, diag: s.diag.WithContext(ctx...), headers: c.Headers, @@ -290,7 +287,7 @@ func (h *handler) Handle(event alert.Event) { var err error // Construct the body of the HTTP request - body := h.bp.Get() + body := new(bytes.Buffer) ad := event.AlertData() var contentType string