Skip to content

Commit

Permalink
Use cmux, use stream in GetTraceResponse, regenerate *pb.go and swagg…
Browse files Browse the repository at this point in the history
…er doc

Signed-off-by: Annanay <annanay.a@media.net>
  • Loading branch information
Annanay committed Feb 26, 2019
1 parent d19f54b commit 2d276fa
Show file tree
Hide file tree
Showing 7 changed files with 603 additions and 1,233 deletions.
48 changes: 16 additions & 32 deletions cmd/query/app/grpc_handler.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2019 Jaegertracing Authors.
// Copyright (c) 2019 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -24,58 +24,42 @@ import (
"go.uber.org/zap"
"google.golang.org/grpc"

"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

// GRPCHandler implements the GRPC endpoint of the query service.
type GRPCHandler struct {
spanReader spanstore.Reader
archiveSpanReader spanstore.Reader
dependencyReader dependencystore.Reader
logger *zap.Logger
queryService querysvc.QueryService
logger *zap.Logger
tracer opentracing.Tracer
}

// NewGRPCHandler returns a GRPCHandler
func NewGRPCHandler(spanReader spanstore.Reader, dependencyReader dependencystore.Reader, options ...HandlerOption) *GRPCHandler {
func NewGRPCHandler(queryService querysvc.QueryService, logger *zap.Logger, tracer opentracing.Tracer) *GRPCHandler {
gH := &GRPCHandler{
spanReader: spanReader,
dependencyReader: dependencyReader,
}

if gH.logger == nil {
gH.logger = zap.NewNop()
queryService: queryService,
logger: logger,
tracer: tracer,
}

return gH
}

// NewCombinedHandler returns a handler where GRPC and HTTP are multiplexed.
func NewCombinedHandler(grpcServer *grpc.Server, recoveryHandler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") {
grpcServer.ServeHTTP(w, r)
} else {
recoveryHandler.ServeHTTP(w, r)
}
})
}

// GetTrace is the GRPC handler to fetch traces.
func (g *GRPCHandler) GetTrace(ctx context.Context, r *api_v2.GetTraceRequest) (*api_v2.GetTraceResponse, error) {
ID := r.GetId()

trace, err := g.spanReader.GetTrace(ctx, ID)
trace, err := g.queryService.GetTrace(ctx, ID)
if err == spanstore.ErrTraceNotFound {
if g.archiveSpanReader == nil {
return &api_v2.GetTraceResponse{}, err
}

trace, err = g.archiveSpanReader.GetTrace(ctx, traceID)
if err != nil {
g.logger.Error("Could not fetch spans from backend", zap.Error(err))
}
g.logger.Error("trace not found", zap.Error(err))
return nil, err
}
if err != nil {
g.logger.Error("Could not fetch spans from backend", zap.Error(err))
return nil, err
}

return &api_v2.GetTraceResponse{Trace: trace}, nil
Expand Down
53 changes: 32 additions & 21 deletions cmd/query/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/gorilla/handlers"
"github.com/opentracing/opentracing-go"
"github.com/soheilhy/cmux"
"github.com/spf13/cobra"
"github.com/spf13/viper"
jaegerClientConfig "github.com/uber/jaeger-client-go/config"
Expand Down Expand Up @@ -144,43 +145,53 @@ func main() {
r.Handle(mBldr.HTTPRoute, h)
}

portStr := ":" + strconv.Itoa(queryOpts.Port)
compressHandler := handlers.CompressHandler(r)
recoveryHandler := recoveryhandler.NewRecoveryHandler(logger, true)

// Create HTTP Server
httpServer := &http.Server{
Handler: recoveryHandler(compressHandler),
}

// Create GRPC Server.
grpcServer := grpc.NewServer()

// Add logging.
grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, os.Stderr, os.Stderr))

// Register GRPC Handler.
spanReaderGRPC, err := storageFactory.CreateSpanReader()
if err != nil {
logger.Fatal("Failed to create span reader", zap.Error(err))
}
spanReaderGRPC = storageMetrics.NewReadMetricsDecorator(spanReaderGRPC, baseFactory.Namespace(metrics.NSOptions{Name: "query-grpc", Tags: nil}))
dependencyReaderGRPC, err := storageFactory.CreateDependencyReader()
grpcHandler := app.NewGRPCHandler(queryService, logger, tracer)
api_v2.RegisterQueryServiceHandler(grpcServer, grpcHandler)

// Prepare cmux conn.
conn, err := net.Listen("tcp", fmt.Sprintf(":%d", queryOpts.Port))
if err != nil {
logger.Fatal("Failed to create dependency reader", zap.Error(err))
logger.Fatal("Could not start listener", zap.Error(err))
}

grpcHandler := NewGRPCHandler(spanReaderGRPC, dependencyReaderGRPC, apiHandlerOptions)
api_v2.RegisterQueryServiceHandler(grpcServer, grpcHandler)
// Create cmux server.
// cmux will reverse-proxy between HTTP and GRPC backends.
s := cmux.New(conn)

// Add GRPC and HTTP listeners.
grpcL := s.Match(
cmux.HTTP2HeaderField("content-type", "application/grpc"),
cmux.HTTP2HeaderField("content-type", "application/grpc+proto"))
httpL := s.Match(cmux.Any())

go func() {
logger.Info("Starting HTTP+GRPC server", zap.Int("port", queryOpts.Port))
conn, err := net.Listen("tcp", fmt.Sprintf(":%d", queryOpts.Port))
if err != nil {
logger.Fatal("Could not launch service", zap.Error(err))
}

// Multiplexed server.
// Use CombinedHandler here, which will "reverse-proxy" between HTTP and GRPC backends.
srv.Serve(conn, NewCombinedHandler(grpcServer, recoveryHandler(compressHandler)))
logger.Info("Starting HTTP server", zap.Int("port", queryOpts.Port))
httpServer.Serve(httpL)
hc.Set(healthcheck.Unavailable)
}()

go func() {
logger.Info("Starting GRPC server", zap.Int("port", queryOpts.Port))
grpcServer.Serve(grpcL)
hc.Set(healthcheck.Unavailable)
}()

// Start cmux server.
s.Serve()

hc.Ready()
<-serverChannel
logger.Info("Shutdown complete")
Expand Down
Loading

0 comments on commit 2d276fa

Please sign in to comment.