Skip to content

Commit

Permalink
reverting bufpool
Browse files Browse the repository at this point in the history
  • Loading branch information
sputnik13 committed Oct 26, 2017
1 parent bf4cc98 commit 598fde6
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 33 deletions.
33 changes: 11 additions & 22 deletions bufpool/bufpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,28 @@ package bufpool

import (
"bytes"
"io"
"sync"
)

type Pool struct {
p *sync.Pool
p sync.Pool
}

func New() *Pool {
syncPool := sync.Pool{}
syncPool.New = func() interface{} {
return &closingBuffer{
pool: &syncPool,
}
}

return &Pool{
p: &syncPool,
p: sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
},
}
}

func (p *Pool) Get() *closingBuffer {
return p.p.Get().(*closingBuffer)
}

type closingBuffer struct {
bytes.Buffer
io.Closer
pool *sync.Pool
func (p *Pool) Get() *bytes.Buffer {
return p.p.Get().(*bytes.Buffer)
}

func (cb *closingBuffer) Close() error {
cb.Reset()
cb.pool.Put(cb)
return nil
func (p *Pool) Put(b *bytes.Buffer) {
b.Reset()
p.p.Put(b)
}
6 changes: 2 additions & 4 deletions http_post.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"sync"
"time"

"bytes"
"context"
"github.com/influxdata/kapacitor/bufpool"
"github.com/influxdata/kapacitor/edge"
"github.com/influxdata/kapacitor/keyvalue"
"github.com/influxdata/kapacitor/models"
Expand All @@ -24,7 +24,6 @@ type HTTPPostNode struct {
c *pipeline.HTTPPostNode
endpoint *httppost.Endpoint
mu sync.RWMutex
bp *bufpool.Pool
timeout time.Duration
hc *http.Client
}
Expand All @@ -35,7 +34,6 @@ func newHTTPPostNode(et *ExecutingTask, n *pipeline.HTTPPostNode, d NodeDiagnost
hn := &HTTPPostNode{
node: node{Node: n, et: et, diag: d},
c: n,
bp: bufpool.New(),
timeout: n.Timeout,
}

Expand Down Expand Up @@ -164,7 +162,7 @@ func (n *HTTPPostNode) doPost(row *models.Row) int {
}

func (n *HTTPPostNode) postRow(row *models.Row) (*http.Response, error) {
body := n.bp.Get()
body := new(bytes.Buffer)

var contentType string
if n.endpoint.RowTemplate() != nil {
Expand Down
4 changes: 2 additions & 2 deletions services/alert/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func NewExecHandler(c ExecHandlerConfig, d HandlerDiagnostic) alert.Handler {

func (h *execHandler) Handle(event alert.Event) {
buf := h.bp.Get()
defer buf.Close()
defer h.bp.Put(buf)
ad := event.AlertData()

err := json.NewEncoder(buf).Encode(ad)
Expand Down Expand Up @@ -156,7 +156,7 @@ func NewTCPHandler(c TCPHandlerConfig, d HandlerDiagnostic) alert.Handler {

func (h *tcpHandler) Handle(event alert.Event) {
buf := h.bp.Get()
defer buf.Close()
defer h.bp.Put(buf)
ad := event.AlertData()

err := json.NewEncoder(buf).Encode(ad)
Expand Down
7 changes: 2 additions & 5 deletions services/httppost/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

"context"
"github.com/influxdata/kapacitor/alert"
"github.com/influxdata/kapacitor/bufpool"
"github.com/influxdata/kapacitor/keyvalue"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -242,8 +241,7 @@ type HandlerConfig struct {
}

type handler struct {
s *Service
bp *bufpool.Pool
s *Service

endpoint *Endpoint
headers map[string]string
Expand All @@ -264,7 +262,6 @@ func (s *Service) Handler(c HandlerConfig, ctx ...keyvalue.T) alert.Handler {
}
return &handler{
s: s,
bp: bufpool.New(),
endpoint: e,
diag: s.diag.WithContext(ctx...),
headers: c.Headers,
Expand All @@ -290,7 +287,7 @@ func (h *handler) Handle(event alert.Event) {
var err error

// Construct the body of the HTTP request
body := h.bp.Get()
body := new(bytes.Buffer)
ad := event.AlertData()

var contentType string
Expand Down

0 comments on commit 598fde6

Please sign in to comment.