Skip to content

Commit

Permalink
Add support for graceful shutdown of collector. (#2076)
Browse files Browse the repository at this point in the history
Signed-off-by: Juraci Paixão Kröhling <juraci@kroehling.de>
  • Loading branch information
jpkrohling authored Feb 26, 2020
1 parent d5036f4 commit 5fb7d72
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 37 deletions.
37 changes: 23 additions & 14 deletions cmd/collector/app/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"go.uber.org/zap"
"google.golang.org/grpc"

"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore"
"github.com/jaegertracing/jaeger/cmd/collector/app/server"
"github.com/jaegertracing/jaeger/pkg/healthcheck"
Expand All @@ -41,6 +42,8 @@ type Collector struct {
spanWriter spanstore.Writer
strategyStore strategystore.StrategyStore
hCheck *healthcheck.HealthCheck
spanProcessor processor.SpanProcessor
spanHandlers *SpanHandlers

// state, read only
hServer *http.Server
Expand Down Expand Up @@ -79,57 +82,59 @@ func (c *Collector) Start(builderOpts *CollectorOptions) error {
Logger: c.logger,
MetricsFactory: c.metricsFactory,
}
zipkinSpansHandler, jaegerBatchesHandler, grpcHandler := handlerBuilder.BuildHandlers()

c.spanProcessor = handlerBuilder.BuildSpanProcessor()
c.spanHandlers = handlerBuilder.BuildHandlers(c.spanProcessor)
recoveryHandler := recoveryhandler.NewRecoveryHandler(c.logger, true)

if tchServer, err := server.StartThriftServer(&server.ThriftServerParams{
ServiceName: c.serviceName,
Port: builderOpts.CollectorPort,
JaegerBatchesHandler: jaegerBatchesHandler,
ZipkinSpansHandler: zipkinSpansHandler,
JaegerBatchesHandler: c.spanHandlers.JaegerBatchesHandler,
ZipkinSpansHandler: c.spanHandlers.ZipkinSpansHandler,
StrategyStore: c.strategyStore,
Logger: c.logger,
}); err != nil {
c.logger.Fatal("Could not start Thrift collector", zap.Error(err))
c.logger.Fatal("could not start Thrift collector", zap.Error(err))
} else {
c.tchServer = tchServer
}

if grpcServer, err := server.StartGRPCServer(&server.GRPCServerParams{
Port: builderOpts.CollectorGRPCPort,
Handler: grpcHandler,
Handler: c.spanHandlers.GRPCHandler,
TLSConfig: builderOpts.TLS,
SamplingStore: c.strategyStore,
Logger: c.logger,
}); err != nil {
c.logger.Fatal("Could not start gRPC collector", zap.Error(err))
c.logger.Fatal("could not start gRPC collector", zap.Error(err))
} else {
c.grpcServer = grpcServer
}

if httpServer, err := server.StartHTTPServer(&server.HTTPServerParams{
Port: builderOpts.CollectorHTTPPort,
Handler: jaegerBatchesHandler,
Handler: c.spanHandlers.JaegerBatchesHandler,
RecoveryHandler: recoveryHandler,
HealthCheck: c.hCheck,
MetricsFactory: c.metricsFactory,
SamplingStore: c.strategyStore,
Logger: c.logger,
}); err != nil {
c.logger.Fatal("Could not start the HTTP server", zap.Error(err))
c.logger.Fatal("could not start the HTTP server", zap.Error(err))
} else {
c.hServer = httpServer
}

if zkServer, err := server.StartZipkinServer(&server.ZipkinServerParams{
Port: builderOpts.CollectorZipkinHTTPPort,
Handler: zipkinSpansHandler,
Handler: c.spanHandlers.ZipkinSpansHandler,
RecoveryHandler: recoveryHandler,
AllowedHeaders: builderOpts.CollectorZipkinAllowedHeaders,
AllowedOrigins: builderOpts.CollectorZipkinAllowedOrigins,
Logger: c.logger,
}); err != nil {
c.logger.Fatal("Could not start the Zipkin server", zap.Error(err))
c.logger.Fatal("could not start the Zipkin server", zap.Error(err))
} else {
c.zkServer = zkServer
}
Expand All @@ -154,7 +159,7 @@ func (c *Collector) Close() error {
timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
err := c.hServer.Shutdown(timeout)
if err != nil {
c.logger.Error("Failed to stop the main HTTP server", zap.Error(err))
c.logger.Error("failed to stop the main HTTP server", zap.Error(err))
}
defer cancel()
}
Expand All @@ -164,17 +169,21 @@ func (c *Collector) Close() error {
timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
err := c.zkServer.Shutdown(timeout)
if err != nil {
c.logger.Error("Failed to stop the Zipkin server", zap.Error(err))
c.logger.Error("failed to stop the Zipkin server", zap.Error(err))
}
defer cancel()
}

// by now, we shouldn't have any in-flight requests anymore
if err := c.spanProcessor.Close(); err != nil {
c.logger.Error("failed to close span processor.", zap.Error(err))
}

// the span processor is closed
if c.spanWriter != nil {
if closer, ok := c.spanWriter.(io.Closer); ok {
err := closer.Close() // SpanWriter
if err != nil {
c.logger.Error("Failed to close span writer", zap.Error(err))
c.logger.Error("failed to close span writer", zap.Error(err))
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions cmd/collector/app/handler/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ func (p *mockSpanProcessor) reset() {
p.spans = nil
}

func (p *mockSpanProcessor) Close() error {
return nil
}

func initializeGRPCTestServer(t *testing.T, beforeServe func(s *grpc.Server)) (*grpc.Server, net.Addr) {
server := grpc.NewServer()
beforeServe(server)
Expand Down
3 changes: 3 additions & 0 deletions cmd/collector/app/handler/thrift_span_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ func (s *shouldIErrorProcessor) ProcessSpans(mSpans []*model.Span, _ processor.S
}
return retMe, nil
}
func (s *shouldIErrorProcessor) Close() error {
return nil
}

func TestZipkinSpanHandler(t *testing.T) {
testChunks := []struct {
Expand Down
3 changes: 3 additions & 0 deletions cmd/collector/app/processor/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package processor

import (
"io"

"github.com/jaegertracing/jaeger/model"
)

Expand All @@ -28,6 +30,7 @@ type SpansOptions struct {
type SpanProcessor interface {
// ProcessSpans processes model spans and return with either a list of true/false success or an error
ProcessSpans(mSpans []*model.Span, options SpansOptions) ([]bool, error)
io.Closer
}

// InboundTransport identifies the transport used to receive spans.
Expand Down
4 changes: 4 additions & 0 deletions cmd/collector/app/server/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ func (s mockSamplingStore) GetSamplingStrategy(serviceName string) (*sampling.Sa
type mockSpanProcessor struct {
}

func (p *mockSpanProcessor) Close() error {
return nil
}

func (p *mockSpanProcessor) ProcessSpans(spans []*model.Span, _ processor.SpansOptions) ([]bool, error) {
return []bool{}, nil
}
28 changes: 19 additions & 9 deletions cmd/collector/app/span_handler_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/collector/app/handler"
"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
zs "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer/zipkin"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/storage/spanstore"
Expand All @@ -35,17 +36,20 @@ type SpanHandlerBuilder struct {
MetricsFactory metrics.Factory
}

// BuildHandlers builds span handlers (Zipkin, Jaeger)
func (b *SpanHandlerBuilder) BuildHandlers() (
handler.ZipkinSpansHandler,
handler.JaegerBatchesHandler,
*handler.GRPCHandler,
) {
// SpanHandlers holds instances to the span handlers built by the SpanHandlerBuilder
type SpanHandlers struct {
ZipkinSpansHandler handler.ZipkinSpansHandler
JaegerBatchesHandler handler.JaegerBatchesHandler
GRPCHandler *handler.GRPCHandler
}

// BuildSpanProcessor builds the span processor to be used with the handlers
func (b *SpanHandlerBuilder) BuildSpanProcessor() processor.SpanProcessor {
hostname, _ := os.Hostname()
svcMetrics := b.metricsFactory()
hostMetrics := svcMetrics.Namespace(metrics.NSOptions{Tags: map[string]string{"host": hostname}})

spanProcessor := NewSpanProcessor(
return NewSpanProcessor(
b.SpanWriter,
Options.ServiceMetrics(svcMetrics),
Options.HostMetrics(hostMetrics),
Expand All @@ -58,9 +62,15 @@ func (b *SpanHandlerBuilder) BuildHandlers() (
Options.DynQueueSizeMemory(b.CollectorOpts.DynQueueSizeMemory),
)

return handler.NewZipkinSpanHandler(b.Logger, spanProcessor, zs.NewChainedSanitizer(zs.StandardSanitizers...)),
}

// BuildHandlers builds span handlers (Zipkin, Jaeger)
func (b *SpanHandlerBuilder) BuildHandlers(spanProcessor processor.SpanProcessor) *SpanHandlers {
return &SpanHandlers{
handler.NewZipkinSpanHandler(b.Logger, spanProcessor, zs.NewChainedSanitizer(zs.StandardSanitizers...)),
handler.NewJaegerSpanHandler(b.Logger, spanProcessor),
handler.NewGRPCHandler(b.Logger, spanProcessor)
handler.NewGRPCHandler(b.Logger, spanProcessor),
}
}

func defaultSpanFilter(*model.Span) bool {
Expand Down
10 changes: 6 additions & 4 deletions cmd/collector/app/span_handler_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,12 @@ func TestNewSpanHandlerBuilder(t *testing.T) {
MetricsFactory: metrics.NullFactory,
}

zipkin, jaeger, grpc := builder.BuildHandlers()
assert.NotNil(t, zipkin)
assert.NotNil(t, jaeger)
assert.NotNil(t, grpc)
spanProcessor := builder.BuildSpanProcessor()
spanHandlers := builder.BuildHandlers(spanProcessor)
assert.NotNil(t, spanHandlers.ZipkinSpansHandler)
assert.NotNil(t, spanHandlers.JaegerBatchesHandler)
assert.NotNil(t, spanHandlers.GRPCHandler)
assert.NotNil(t, spanProcessor)
}

func TestDefaultSpanFilter(t *testing.T) {
Expand Down
5 changes: 3 additions & 2 deletions cmd/collector/app/span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,11 @@ func newSpanProcessor(spanWriter spanstore.Writer, opts ...Option) *spanProcesso
return &sp
}

// Stop halts the span processor and all its go-routines.
func (sp *spanProcessor) Stop() {
func (sp *spanProcessor) Close() error {
close(sp.stopCh)
sp.queue.Stop()

return nil
}

func (sp *spanProcessor) saveSpan(span *model.Span) {
Expand Down
20 changes: 12 additions & 8 deletions cmd/collector/app/span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,12 @@ import (
zc "github.com/jaegertracing/jaeger/thrift-gen/zipkincore"
)

var _ (io.Closer) = (*fakeSpanWriter)(nil)
var blackListedService = "zoidberg"
var (
_ io.Closer = (*fakeSpanWriter)(nil)
_ io.Closer = (*spanProcessor)(nil)

blackListedService = "zoidberg"
)

func TestBySvcMetrics(t *testing.T) {
allowedService := "bender"
Expand Down Expand Up @@ -215,7 +219,6 @@ func makeJaegerSpan(service string, rootSpan bool, debugEnabled bool) (*jaeger.S
func TestSpanProcessor(t *testing.T) {
w := &fakeSpanWriter{}
p := NewSpanProcessor(w, Options.QueueSize(1)).(*spanProcessor)
defer p.Stop()

res, err := p.ProcessSpans([]*model.Span{
{
Expand All @@ -226,6 +229,7 @@ func TestSpanProcessor(t *testing.T) {
}, processor.SpansOptions{SpanFormat: processor.JaegerSpanFormat})
assert.NoError(t, err)
assert.Equal(t, []bool{true}, res)
assert.NoError(t, p.Close())
}

func TestSpanProcessorErrors(t *testing.T) {
Expand All @@ -251,7 +255,7 @@ func TestSpanProcessorErrors(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, []bool{true}, res)

p.Stop()
assert.NoError(t, p.Close())

assert.Equal(t, map[string]string{
"level": "error",
Expand Down Expand Up @@ -282,7 +286,7 @@ func TestSpanProcessorBusy(t *testing.T) {
Options.QueueSize(1),
Options.ReportBusy(true),
).(*spanProcessor)
defer p.Stop()
defer assert.NoError(t, p.Close())

// block the writer so that the first span is read from the queue and blocks the processor,
// and eiher the second or the third span is rejected since the queue capacity is just 1.
Expand Down Expand Up @@ -317,7 +321,7 @@ func TestSpanProcessorWithNilProcess(t *testing.T) {

w := &fakeSpanWriter{}
p := NewSpanProcessor(w, Options.ServiceMetrics(serviceMetrics)).(*spanProcessor)
defer p.Stop()
defer assert.NoError(t, p.Close())

p.saveSpan(&model.Span{})

Expand All @@ -335,7 +339,7 @@ func TestSpanProcessorWithCollectorTags(t *testing.T) {

w := &fakeSpanWriter{}
p := NewSpanProcessor(w, Options.CollectorTags(testCollectorTags)).(*spanProcessor)
defer p.Stop()
defer assert.NoError(t, p.Close())

span := &model.Span{
Process: model.NewProcess("unit-test-service", []model.KeyValue{}),
Expand Down Expand Up @@ -363,7 +367,6 @@ func TestSpanProcessorCountSpan(t *testing.T) {
w := &fakeSpanWriter{}
p := NewSpanProcessor(w, Options.HostMetrics(m), Options.DynQueueSizeMemory(1000)).(*spanProcessor)
p.background(10*time.Millisecond, p.updateGauges)
defer p.Stop()

p.processSpan(&model.Span{})
assert.NotEqual(t, uint64(0), p.bytesProcessed)
Expand All @@ -378,6 +381,7 @@ func TestSpanProcessorCountSpan(t *testing.T) {
}

assert.Fail(t, "gauge hasn't been updated within a reasonable amount of time")
assert.NoError(t, p.Close())
}

func TestUpdateDynQueueSize(t *testing.T) {
Expand Down

0 comments on commit 5fb7d72

Please sign in to comment.