Skip to content

Commit

Permalink
modifying bufpool to provide closingBuffer
Browse files Browse the repository at this point in the history
bufpool provided buffers result in race condition when used with a
buffer user that expects to take ownership of the buffer (like http
client appears to).  Returning an extended version of Buffer that allows
it to be closed, where the closing the buffer performs a Reset() on the
buffer and returns it to the pool it came from.
  • Loading branch information
sputnik13 committed Oct 9, 2017
1 parent 1b35d9a commit ae75528
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 17 deletions.
33 changes: 22 additions & 11 deletions bufpool/bufpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,39 @@ package bufpool

import (
"bytes"
"io"
"sync"
)

type Pool struct {
p sync.Pool
p *sync.Pool
}

func New() *Pool {
syncPool := sync.Pool{}
syncPool.New = func() interface{} {
return &closingBuffer{
pool: &syncPool,
}
}

return &Pool{
p: sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
},
p: &syncPool,
}
}

func (p *Pool) Get() *bytes.Buffer {
return p.p.Get().(*bytes.Buffer)
func (p *Pool) Get() *closingBuffer {
return p.p.Get().(*closingBuffer)
}

type closingBuffer struct {
bytes.Buffer
io.Closer
pool *sync.Pool
}

func (p *Pool) Put(b *bytes.Buffer) {
b.Reset()
p.p.Put(b)
func (cb *closingBuffer) Close() error {
cb.Reset()
cb.pool.Put(cb)
return nil
}
4 changes: 2 additions & 2 deletions http_post.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package kapacitor

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
Expand All @@ -10,6 +9,7 @@ import (
"sync"
"time"

"context"
"github.com/influxdata/kapacitor/bufpool"
"github.com/influxdata/kapacitor/edge"
"github.com/influxdata/kapacitor/keyvalue"
Expand All @@ -26,6 +26,7 @@ type HTTPPostNode struct {
mu sync.RWMutex
bp *bufpool.Pool
timeout time.Duration
hc *http.Client
}

// Create a new HTTPPostNode which submits received items via POST to an HTTP endpoint
Expand Down Expand Up @@ -164,7 +165,6 @@ func (n *HTTPPostNode) doPost(row *models.Row) int {

func (n *HTTPPostNode) postRow(row *models.Row) (*http.Response, error) {
body := n.bp.Get()
defer n.bp.Put(body)

var contentType string
if n.endpoint.RowTemplate() != nil {
Expand Down
4 changes: 2 additions & 2 deletions services/alert/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func NewExecHandler(c ExecHandlerConfig, d HandlerDiagnostic) alert.Handler {

func (h *execHandler) Handle(event alert.Event) {
buf := h.bp.Get()
defer h.bp.Put(buf)
defer buf.Close()
ad := event.AlertData()

err := json.NewEncoder(buf).Encode(ad)
Expand Down Expand Up @@ -156,7 +156,7 @@ func NewTCPHandler(c TCPHandlerConfig, d HandlerDiagnostic) alert.Handler {

func (h *tcpHandler) Handle(event alert.Event) {
buf := h.bp.Get()
defer h.bp.Put(buf)
defer buf.Close()
ad := event.AlertData()

err := json.NewEncoder(buf).Encode(ad)
Expand Down
5 changes: 3 additions & 2 deletions services/httppost/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package httppost

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
Expand All @@ -14,6 +13,7 @@ import (

"time"

"context"
"github.com/influxdata/kapacitor/alert"
"github.com/influxdata/kapacitor/bufpool"
"github.com/influxdata/kapacitor/keyvalue"
Expand Down Expand Up @@ -253,6 +253,8 @@ type handler struct {
diag Diagnostic

timeout time.Duration

hc *http.Client
}

func (s *Service) Handler(c HandlerConfig, ctx ...keyvalue.T) alert.Handler {
Expand Down Expand Up @@ -289,7 +291,6 @@ func (h *handler) Handle(event alert.Event) {

// Construct the body of the HTTP request
body := h.bp.Get()
defer h.bp.Put(body)
ad := event.AlertData()

var contentType string
Expand Down

0 comments on commit ae75528

Please sign in to comment.