Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

don't block batch endpoint on most queues #146

Merged
merged 3 commits into from
Sep 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,9 @@ func newStartedApp(
metricsr := &metrics.MockMetrics{}
metricsr.Start()

collector := &collect.InMemCollector{}
collector := &collect.InMemCollector{
BlockOnAddSpan: true,
}

peerList, err := peers.GetPeers()
assert.NoError(t, err)
Expand Down
1 change: 0 additions & 1 deletion cmd/samproxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ func main() {
PendingWorkCapacity: uint(c.GetPeerBufferSize()),
UserAgentAddition: userAgentAddition,
Transport: peerTransport,
BlockOnSend: true,
// gzip compression is expensive, and peers are most likely close to each other
// so we can turn off gzip when forwarding to peers
DisableGzipCompression: true,
Expand Down
34 changes: 26 additions & 8 deletions collect/collect.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package collect

import (
"errors"
"fmt"
"os"
"runtime"
Expand All @@ -18,12 +19,14 @@ import (
"github.com/honeycombio/samproxy/types"
)

var ErrWouldBlock = errors.New("not adding span, channel buffer is full")

type Collector interface {
// AddSpan adds a span to be collected, buffered, and merged in to a trace.
// Once the trace is "complete", it'll be passed off to the sampler then
// scheduled for transmission.
AddSpan(*types.Span)
AddSpanFromPeer(*types.Span)
AddSpan(*types.Span) error
AddSpanFromPeer(*types.Span) error
}

func GetCollectorImplementation(c config.Config) Collector {
Expand Down Expand Up @@ -51,6 +54,9 @@ type InMemCollector struct {
Metrics metrics.Metrics `inject:""`
SamplerFactory *sample.SamplerFactory `inject:""`

// For test use only
BlockOnAddSpan bool

// mutex must be held whenever non-channel internal fields are accessed.
// This exists to avoid data races in tests and startup/shutdown.
mutex sync.RWMutex
Expand Down Expand Up @@ -211,15 +217,27 @@ func (i *InMemCollector) checkAlloc() {
}

// AddSpan accepts the incoming span to a queue and returns immediately
func (i *InMemCollector) AddSpan(sp *types.Span) {
// TODO protect against sending on a closed channel during shutdown
i.incoming <- sp
func (i *InMemCollector) AddSpan(sp *types.Span) error {
return i.add(sp, i.incoming)
}

// AddSpan accepts the incoming span to a queue and returns immediately
func (i *InMemCollector) AddSpanFromPeer(sp *types.Span) {
// TODO protect against sending on a closed channel during shutdown
i.fromPeer <- sp
func (i *InMemCollector) AddSpanFromPeer(sp *types.Span) error {
return i.add(sp, i.fromPeer)
}

func (i *InMemCollector) add(sp *types.Span, ch chan<- *types.Span) error {
if i.BlockOnAddSpan {
ch <- sp
return nil
}

select {
case ch <- sp:
return nil
default:
return ErrWouldBlock
}
}

// collect handles both accepting spans that have been handed to it and sending
Expand Down
1 change: 1 addition & 0 deletions collect/collect_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func BenchmarkCollect(b *testing.B) {
Config: conf,
Logger: log,
},
BlockOnAddSpan: true,
cache: cache.NewInMemCache(3, metric, log),
incoming: make(chan *types.Span, 500),
fromPeer: make(chan *types.Span, 500),
Expand Down
49 changes: 49 additions & 0 deletions collect/collect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,5 +514,54 @@ func TestMaxAlloc(t *testing.T) {
}

transmission.Mux.Unlock()
}

func TestAddSpanNoBlock(t *testing.T) {
transmission := &transmit.MockTransmission{}
transmission.Start()
conf := &config.MockConfig{
GetSendDelayVal: 0,
GetTraceTimeoutVal: 10 * time.Minute,
GetSamplerTypeVal: &config.DeterministicSamplerConfig{},
SendTickerVal: 2 * time.Millisecond,
}
coll := &InMemCollector{
Config: conf,
Logger: &logger.NullLogger{},
Transmission: transmission,
Metrics: &metrics.NullMetrics{},
SamplerFactory: &sample.SamplerFactory{
Config: conf,
Logger: &logger.NullLogger{},
},
}
c := cache.NewInMemCache(10, &metrics.NullMetrics{}, &logger.NullLogger{})
coll.cache = c
stc, err := lru.New(15)
assert.NoError(t, err, "lru cache should start")
coll.sentTraceCache = stc

coll.incoming = make(chan *types.Span, 3)
coll.fromPeer = make(chan *types.Span, 3)
coll.datasetSamplers = make(map[string]sample.Sampler)

// Don't start collect(), so the queues are never drained
span := &types.Span{
TraceID: "1",
Event: types.Event{
Dataset: "aoeu",
},
}

for i := 0; i < 3; i++ {
err := coll.AddSpan(span)
assert.NoError(t, err)
err = coll.AddSpanFromPeer(span)
assert.NoError(t, err)
}

err = coll.AddSpan(span)
assert.Error(t, err)
err = coll.AddSpanFromPeer(span)
assert.Error(t, err)
}
78 changes: 40 additions & 38 deletions route/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func (r *Router) LnS(incomingOrPeer string) {
r.Metrics.Register(r.incomingOrPeer+"_router_batch", "counter")
r.Metrics.Register(r.incomingOrPeer+"_router_span", "counter")
r.Metrics.Register(r.incomingOrPeer+"_router_peer", "counter")
r.Metrics.Register(r.incomingOrPeer+"_router_dropped", "counter")

muxxer := mux.NewRouter()

Expand Down Expand Up @@ -410,69 +411,70 @@ func (r *Router) batch(w http.ResponseWriter, req *http.Request) {
r.UpstreamTransmission.EnqueueEvent(ev)
continue
}
// ok, we're a span. Figure out if we should handle locally or pass on
// to a peer. We won't pass anything that came from a peer; if there's
// confusion about who owns what, this avoids loops.
ev, err := r.batchedEventToEvent(req, bev)
if err != nil {
batchedResponses = append(
batchedResponses,
&BatchResponse{
Status: http.StatusBadRequest,
Error: fmt.Sprintf("failed to process event: %s", err.Error()),
},
)
continue
}

debugLog := debugLog.WithString("api_host", ev.APIHost).
WithString("dataset", ev.Dataset).
WithString("trace_id", traceID)

// ok, we're a span. Figure out if we should handle locally or pass on to a peer
targetShard := r.Sharder.WhichShard(traceID)
if r.incomingOrPeer == "incoming" && !targetShard.Equals(r.Sharder.MyShard()) {
r.Metrics.IncrementCounter("incoming_router_peer")
ev, err := r.batchedEventToEvent(req, bev)
if err != nil {
batchedResponses = append(
batchedResponses,
&BatchResponse{
Status: http.StatusBadRequest,
Error: fmt.Sprintf("failed to process event: %s", err.Error()),
},
)
continue
}
debugLog.WithFields(map[string]interface{}{
"api_host": ev.APIHost,
"dataset": ev.Dataset,
"trace_id": traceID,
"peer": targetShard.GetAddress(),
}).Logf("Sending span from batch to my peer")
r.Metrics.IncrementCounter(r.incomingOrPeer + "_router_peer")
debugLog.WithString("peer", targetShard.GetAddress()).
Logf("Sending span from batch to my peer")
batchedResponses = append(
batchedResponses,
&BatchResponse{Status: http.StatusAccepted},
)
ev.APIHost = targetShard.GetAddress()

// Unfortunately this doesn't tell us if the event was actually
// enqueued; we need to watch the response channel to find out, at
// which point it's too late to tell the client.
r.PeerTransmission.EnqueueEvent(ev)
continue
}
// we're supposed to handle it
ev, err := r.batchedEventToEvent(req, bev)
span := &types.Span{
Event: *ev,
TraceID: traceID,
}
if r.incomingOrPeer == "incoming" {
err = r.Collector.AddSpan(span)
} else {
err = r.Collector.AddSpanFromPeer(span)
}

if err != nil {
r.Metrics.IncrementCounter(r.incomingOrPeer + "_router_dropped")
debugLog.Logf("Dropping span from batch, channel full")
batchedResponses = append(
batchedResponses,
&BatchResponse{
Status: http.StatusBadRequest,
Error: fmt.Sprintf("failed to process event: %s", err.Error()),
Error: err.Error(),
Status: http.StatusTooManyRequests,
},
)
continue
}

span := &types.Span{
Event: *ev,
TraceID: traceID,
}
r.Metrics.IncrementCounter(r.incomingOrPeer + "_router_span")
debugLog.WithFields(map[string]interface{}{
"api_host": ev.APIHost,
"dataset": ev.Dataset,
"trace_id": span.TraceID,
}).Logf("Accepting span from batch for collection into a trace")
debugLog.Logf("Accepting span from batch for collection into a trace")
batchedResponses = append(
batchedResponses,
&BatchResponse{Status: http.StatusAccepted},
)
if r.incomingOrPeer == "incoming" {
r.Collector.AddSpan(span)
} else {
r.Collector.AddSpanFromPeer(span)
}
}
response, err := json.Marshal(batchedResponses)
if err != nil {
Expand Down
30 changes: 15 additions & 15 deletions transmit/transmit.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,13 @@ func (d *DefaultTransmission) reloadTransmissionBuilder() {
}

func (d *DefaultTransmission) EnqueueEvent(ev *types.Event) {
d.Logger.Debug().WithFields(map[string]interface{}{
"request_id": ev.Context.Value(types.RequestIDContextKey{}),
"api_host": ev.APIHost,
"dataset": ev.Dataset,
"type": ev.Type,
"target": ev.Target,
}).Logf("transmit sending event")
d.Logger.Debug().
WithField("request_id", ev.Context.Value(types.RequestIDContextKey{})).
WithString("api_host", ev.APIHost).
WithString("dataset", ev.Dataset).
WithString("type", ev.Type.String()).
WithString("target", ev.Target.String()).
Logf("transmit sending event")
libhEv := d.builder.NewEvent()
libhEv.APIHost = ev.APIHost
libhEv.WriteKey = ev.APIKey
Expand All @@ -108,14 +108,14 @@ func (d *DefaultTransmission) EnqueueEvent(ev *types.Event) {
err := libhEv.SendPresampled()
if err != nil {
d.Metrics.IncrementCounter(d.Name + counterEnqueueErrors)
d.Logger.Error().WithFields(map[string]interface{}{
"error": err.Error(),
"request_id": ev.Context.Value(types.RequestIDContextKey{}),
"dataset": ev.Dataset,
"api_host": ev.APIHost,
"type": ev.Type.String(),
"target": ev.Target.String(),
}).Logf("failed to enqueue event")
d.Logger.Error().
WithString("error", err.Error()).
WithField("request_id", ev.Context.Value(types.RequestIDContextKey{})).
WithString("dataset", ev.Dataset).
WithString("api_host", ev.APIHost).
WithString("type", ev.Type.String()).
WithString("target", ev.Target.String()).
Logf("failed to enqueue event")
}
}

Expand Down