From 5e8af60fa1d14ae471d7e3a5ef889d9045f555cf Mon Sep 17 00:00:00 2001 From: rjs211 Date: Thu, 13 Aug 2020 10:00:20 +0530 Subject: [PATCH 1/8] Adding methods to get port from address Signed-off-by: rjs211 --- ports/ports.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/ports/ports.go b/ports/ports.go index 79048b35a83..fb93ecdebda 100644 --- a/ports/ports.go +++ b/ports/ports.go @@ -52,6 +52,14 @@ func PortToHostPort(port int) string { return ":" + strconv.Itoa(port) } +// HostPortToPort converts the host:port address string intoto port int +func HostPortToPort(hostPort string) (int, error) { + s := strings.Split(hostPort, ":") + + return strconv.Atoi(s[len(s)-1]) + +} + // GetAddressFromCLIOptions gets listening address based on port (deprecated flags) or host:port (new flags) func GetAddressFromCLIOptions(port int, hostPort string) string { if port != 0 { From 424279077f16cf8ab5db017d5c6654fb330b5c38 Mon Sep 17 00:00:00 2001 From: rjs211 Date: Thu, 13 Aug 2020 10:06:06 +0530 Subject: [PATCH 2/8] separating ports Signed-off-by: rjs211 --- cmd/query/app/flags.go | 37 ++++++++++-- cmd/query/app/flags_test.go | 24 ++++++++ cmd/query/app/server.go | 107 ++++++++++++++++++++++++++++------- cmd/query/app/server_test.go | 53 +++++++++++++++-- 4 files changed, 193 insertions(+), 28 deletions(-) diff --git a/cmd/query/app/flags.go b/cmd/query/app/flags.go index 80f4e2ea152..f51b7ea329d 100644 --- a/cmd/query/app/flags.go +++ b/cmd/query/app/flags.go @@ -40,6 +40,8 @@ const ( queryHostPort = "query.host-port" queryPort = "query.port" queryPortWarning = "(deprecated, will be removed after 2020-08-31 or in release v1.20.0, whichever is later)" + queryHTTPHostPort = "query.http-server.host-port" + queryGRPCHostPort = "query.grpc-server.host-port" queryBasePath = "query.base-path" queryStaticFiles = "query.static-files" queryUIConfig = "query.ui-config" @@ -56,8 +58,12 @@ var tlsFlagsConfig = tlscfg.ServerFlagsConfig{ // QueryOptions holds configuration for query service type QueryOptions struct { - // HostPort is the host:port address that the query service listens o n + // HostPort is the host:port address that the query service listens on HostPort string + // HTTPHostPort is the host:port address that the query service listens in on for http requests + HTTPHostPort string + // GRPCHostPort is the host:port address that the query service listens in on for gRPC requests + GRPCHostPort string // BasePath is the prefix for all UI and API HTTP routes BasePath string // StaticAssets is the path for the static assets for the UI (https://github.com/uber/jaeger-ui) @@ -66,7 +72,7 @@ type QueryOptions struct { UIConfig string // BearerTokenPropagation activate/deactivate bearer token propagation to storage BearerTokenPropagation bool - // TLS configures secure transport + // TLS configures secure transport` TLS tlscfg.Options // AdditionalHeaders AdditionalHeaders http.Header @@ -77,7 +83,9 @@ type QueryOptions struct { // AddFlags adds flags for QueryOptions func AddFlags(flagSet *flag.FlagSet) { flagSet.Var(&config.StringSlice{}, queryAdditionalHeaders, `Additional HTTP response headers. Can be specified multiple times. Format: "Key: Value"`) - flagSet.String(queryHostPort, ports.PortToHostPort(ports.QueryHTTP), "The host:port (e.g. 127.0.0.1:5555 or :5555) of the query's HTTP server") + flagSet.String(queryHostPort, ports.PortToHostPort(ports.QueryHTTP), "The host:port (e.g. 127.0.0.1:5555 or :5555) of the query's HTTP server") //old default: ports.PortToHostPort(ports.QueryHTTP) + flagSet.String(queryHTTPHostPort, ports.PortToHostPort(ports.QueryHTTP), "The host:port (e.g. 127.0.0.1:5555 or :5555) of the query's HTTP server") // TODO: change default value + flagSet.String(queryGRPCHostPort, ports.PortToHostPort(ports.QueryHTTP), "The host:port (e.g. 127.0.0.1:5555 or :5555) of the query's HTTP server") // TODO : change after defining in "ports" module // TODO: change default value flagSet.Int(queryPort, 0, queryPortWarning+" see --"+queryHostPort) flagSet.String(queryBasePath, "/", "The base path for all HTTP routes, e.g. /jaeger; useful when running behind a reverse proxy") flagSet.String(queryStaticFiles, "", "The directory path override for the static assets for the UI") @@ -86,14 +94,33 @@ func AddFlags(flagSet *flag.FlagSet) { flagSet.Duration(queryMaxClockSkewAdjust, time.Second, "The maximum delta by which span timestamps may be adjusted in the UI due to clock skew; set to 0s to disable clock skew adjustments") } +// InitPortsConfigFromViper initializes the port numbers and TLS configuration of ports +func (qOpts *QueryOptions) InitPortsConfigFromViper(v *viper.Viper, logger *zap.Logger) *QueryOptions { + qOpts.HTTPHostPort = v.GetString(queryHTTPHostPort) + qOpts.GRPCHostPort = v.GetString(queryGRPCHostPort) + qOpts.HostPort = ports.GetAddressFromCLIOptions(v.GetInt(queryPort), v.GetString(queryHostPort)) + + qOpts.TLS = tlsFlagsConfig.InitFromViper(v) + + // query.host-port is not defined and atleast one of query.grpc-server.host-port or query.http-server.host-port is defined + // user intends to use the separate flags. + if !(v.IsSet(queryHostPort) || v.IsSet(queryPort)) && (v.IsSet(queryHTTPHostPort) || v.IsSet(queryGRPCHostPort)) { + return qOpts + } + logger.Warn(fmt.Sprintf("Use of %s and %s is deprecated. use %s and %s instead", queryPort, queryHostPort, queryHTTPHostPort, queryGRPCHostPort)) + qOpts.HTTPHostPort = qOpts.HostPort + qOpts.GRPCHostPort = qOpts.HostPort + return qOpts + +} + // InitFromViper initializes QueryOptions with properties from viper func (qOpts *QueryOptions) InitFromViper(v *viper.Viper, logger *zap.Logger) *QueryOptions { - qOpts.HostPort = ports.GetAddressFromCLIOptions(v.GetInt(queryPort), v.GetString(queryHostPort)) + qOpts = qOpts.InitPortsConfigFromViper(v, logger) qOpts.BasePath = v.GetString(queryBasePath) qOpts.StaticAssets = v.GetString(queryStaticFiles) qOpts.UIConfig = v.GetString(queryUIConfig) qOpts.BearerTokenPropagation = v.GetBool(queryTokenPropagation) - qOpts.TLS = tlsFlagsConfig.InitFromViper(v) qOpts.MaxClockSkewAdjust = v.GetDuration(queryMaxClockSkewAdjust) stringSlice := v.GetStringSlice(queryAdditionalHeaders) diff --git a/cmd/query/app/flags_test.go b/cmd/query/app/flags_test.go index 22170ff8b24..8b44038270d 100644 --- a/cmd/query/app/flags_test.go +++ b/cmd/query/app/flags_test.go @@ -60,6 +60,30 @@ func TestQueryBuilderFlags(t *testing.T) { assert.Equal(t, 10*time.Second, qOpts.MaxClockSkewAdjust) } +func TestQueryBuilderSeparateFlags(t *testing.T) { + v, command := config.Viperize(AddFlags) + command.ParseFlags([]string{ + "--query.static-files=/dev/null", + "--query.ui-config=some.json", + "--query.base-path=/jaeger", + "--query.http-server.host-port=127.0.0.1:8080", + "--query.additional-headers=access-control-allow-origin:blerg", + "--query.additional-headers=whatever:thing", + "--query.max-clock-skew-adjustment=10s", + }) + qOpts := new(QueryOptions).InitFromViper(v, zap.NewNop()) + assert.Equal(t, "/dev/null", qOpts.StaticAssets) + assert.Equal(t, "some.json", qOpts.UIConfig) + assert.Equal(t, "/jaeger", qOpts.BasePath) + assert.Equal(t, "127.0.0.1:8080", qOpts.HTTPHostPort) + + assert.Equal(t, http.Header{ + "Access-Control-Allow-Origin": []string{"blerg"}, + "Whatever": []string{"thing"}, + }, qOpts.AdditionalHeaders) + assert.Equal(t, 10*time.Second, qOpts.MaxClockSkewAdjust) +} + func TestQueryBuilderBadHeadersFlags(t *testing.T) { v, command := config.Viperize(AddFlags) command.ParseFlags([]string{ diff --git a/cmd/query/app/server.go b/cmd/query/app/server.go index 3e4b6756462..df2a5fcf2f7 100644 --- a/cmd/query/app/server.go +++ b/cmd/query/app/server.go @@ -30,6 +30,7 @@ import ( "github.com/jaegertracing/jaeger/pkg/healthcheck" "github.com/jaegertracing/jaeger/pkg/netutils" "github.com/jaegertracing/jaeger/pkg/recoveryhandler" + "github.com/jaegertracing/jaeger/ports" "github.com/jaegertracing/jaeger/proto-gen/api_v2" ) @@ -42,13 +43,27 @@ type Server struct { tracer opentracing.Tracer // TODO make part of flags.Service conn net.Listener + grpcConn net.Listener + httpConn net.Listener grpcServer *grpc.Server httpServer *http.Server + separatePorts bool unavailableChannel chan healthcheck.Status } // NewServer creates and initializes Server func NewServer(logger *zap.Logger, querySvc *querysvc.QueryService, options *QueryOptions, tracer opentracing.Tracer) (*Server, error) { + + httpPort, err := ports.HostPortToPort(options.HTTPHostPort) + if err != nil { + return nil, err + } + grpcPort, err := ports.HostPortToPort(options.GRPCHostPort) + if err != nil { + return nil, err + } + separatePorts := (grpcPort != httpPort) + grpcServer, err := createGRPCServer(querySvc, options, logger, tracer) if err != nil { return nil, err @@ -61,6 +76,7 @@ func NewServer(logger *zap.Logger, querySvc *querysvc.QueryService, options *Que tracer: tracer, grpcServer: grpcServer, httpServer: createHTTPServer(querySvc, options, tracer, logger), + separatePorts: separatePorts, unavailableChannel: make(chan healthcheck.Status), }, nil } @@ -117,11 +133,27 @@ func createHTTPServer(querySvc *querysvc.QueryService, queryOpts *QueryOptions, } } -// Start http, GRPC and cmux servers concurrently -func (s *Server) Start() error { +// initListener initialises listeners of the server +func (s *Server) initListener() (cmux.CMux, error) { + if s.separatePorts { // use separate ports and listeners each for gRPC and HTTP requests + var err error + s.grpcConn, err = net.Listen("tcp", s.queryOptions.GRPCHostPort) + if err != nil { + return nil, err + } + + s.httpConn, err = net.Listen("tcp", s.queryOptions.HTTPHostPort) + if err != nil { + return nil, err + } + s.logger.Info("Query server started") + return nil, nil + } + + // old behavior using cmux conn, err := net.Listen("tcp", s.queryOptions.HostPort) if err != nil { - return err + return nil, err } s.conn = conn @@ -138,16 +170,46 @@ func (s *Server) Start() error { // cmux server acts as a reverse-proxy between HTTP and GRPC backends. cmuxServer := cmux.New(s.conn) - grpcListener := cmuxServer.MatchWithWriters( + s.grpcConn = cmuxServer.MatchWithWriters( cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"), cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc+proto"), ) - httpListener := cmuxServer.Match(cmux.Any()) + s.httpConn = cmuxServer.Match(cmux.Any()) + s.queryOptions.HTTPHostPort = s.queryOptions.HostPort + s.queryOptions.GRPCHostPort = s.queryOptions.HostPort + + return cmuxServer, nil + +} + +// Start http, GRPC and cmux servers concurrently +func (s *Server) Start() error { + cmuxServer, err := s.initListener() + if err != nil { + return err + } + + var tcpPort int + if !s.separatePorts { + if port, err := netutils.GetPort(s.conn.Addr()); err == nil { + tcpPort = port + } + + } + var httpPort int + if port, err := netutils.GetPort(s.httpConn.Addr()); err == nil { + httpPort = port + } + + var grpcPort int + if port, err := netutils.GetPort(s.grpcConn.Addr()); err == nil { + grpcPort = port + } go func() { - s.logger.Info("Starting HTTP server", zap.Int("port", tcpPort), zap.String("addr", s.queryOptions.HostPort)) + s.logger.Info("Starting HTTP server", zap.Int("port", httpPort), zap.String("addr", s.queryOptions.HTTPHostPort)) - switch err := s.httpServer.Serve(httpListener); err { + switch err := s.httpServer.Serve(s.httpConn); err { case nil, http.ErrServerClosed, cmux.ErrListenerClosed: // normal exit, nothing to do default: @@ -158,25 +220,27 @@ func (s *Server) Start() error { // Start GRPC server concurrently go func() { - s.logger.Info("Starting GRPC server", zap.Int("port", tcpPort), zap.String("addr", s.queryOptions.HostPort)) + s.logger.Info("Starting GRPC server", zap.Int("port", grpcPort), zap.String("addr", s.queryOptions.GRPCHostPort)) - if err := s.grpcServer.Serve(grpcListener); err != nil { + if err := s.grpcServer.Serve(s.grpcConn); err != nil { s.logger.Error("Could not start GRPC server", zap.Error(err)) } s.unavailableChannel <- healthcheck.Unavailable }() // Start cmux server concurrently. - go func() { - s.logger.Info("Starting CMUX server", zap.Int("port", tcpPort), zap.String("addr", s.queryOptions.HostPort)) + if !s.separatePorts { + go func() { + s.logger.Info("Starting CMUX server", zap.Int("port", tcpPort), zap.String("addr", s.queryOptions.HostPort)) - err := cmuxServer.Serve() - // TODO: Remove string comparison when https://github.com/soheilhy/cmux/pull/69 is merged - if err != nil && !strings.Contains(err.Error(), "use of closed network connection") { - s.logger.Error("Could not start multiplexed server", zap.Error(err)) - } - s.unavailableChannel <- healthcheck.Unavailable - }() + err := cmuxServer.Serve() + // TODO: Remove string comparison when https://github.com/soheilhy/cmux/pull/69 is merged + if err != nil && !strings.Contains(err.Error(), "use of closed network connection") { + s.logger.Error("Could not start multiplexed server", zap.Error(err)) + } + s.unavailableChannel <- healthcheck.Unavailable + }() + } return nil } @@ -185,5 +249,10 @@ func (s *Server) Start() error { func (s *Server) Close() { s.grpcServer.Stop() s.httpServer.Close() - s.conn.Close() + if s.separatePorts { + s.httpConn.Close() + s.grpcConn.Close() + } else { + s.conn.Close() + } } diff --git a/cmd/query/app/server_test.go b/cmd/query/app/server_test.go index 33f8a36b802..05d2b8da6b3 100644 --- a/cmd/query/app/server_test.go +++ b/cmd/query/app/server_test.go @@ -16,6 +16,7 @@ package app import ( "context" + "fmt" "testing" "time" @@ -61,7 +62,6 @@ func TestServer(t *testing.T) { flagsSvc := flags.NewService(ports.QueryAdminHTTP) flagsSvc.Logger = zap.NewNop() hostPort := ports.GetAddressFromCLIOptions(ports.QueryHTTP, "") - spanReader := &spanstoremocks.Reader{} dependencyReader := &depsmocks.Reader{} expectedServices := []string{"test"} @@ -70,7 +70,7 @@ func TestServer(t *testing.T) { querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{}) server, err := NewServer(flagsSvc.Logger, querySvc, - &QueryOptions{HostPort: hostPort, BearerTokenPropagation: true}, + &QueryOptions{HostPort: hostPort, GRPCHostPort: hostPort, HTTPHostPort: hostPort, BearerTokenPropagation: true}, opentracing.NoopTracer{}) assert.Nil(t, err) assert.NoError(t, server.Start()) @@ -100,6 +100,49 @@ func TestServer(t *testing.T) { assert.Equal(t, healthcheck.Unavailable, flagsSvc.HC().Get()) } +func TestServer2(t *testing.T) { + flagsSvc := flags.NewService(ports.QueryAdminHTTP) + flagsSvc.Logger = zap.NewNop() + // hostPort := ports.GetAddressFromCLIOptions(ports.QueryHTTP, "") + + spanReader := &spanstoremocks.Reader{} + dependencyReader := &depsmocks.Reader{} + expectedServices := []string{"test"} + spanReader.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(expectedServices, nil) + + querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{}) + + server, err := NewServer(flagsSvc.Logger, querySvc, + &QueryOptions{HTTPHostPort: "127.0.0.1:8080", GRPCHostPort: "127.0.0.1:8081", BearerTokenPropagation: true}, + opentracing.NoopTracer{}) + assert.Nil(t, err) + assert.NoError(t, server.Start()) + go func() { + for s := range server.HealthCheckStatus() { + flagsSvc.SetHealthCheckStatus(s) + } + }() + + client := newGRPCClient(t, "127.0.0.1:8081") + defer client.conn.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + res, err := client.GetServices(ctx, &api_v2.GetServicesRequest{}) + assert.NoError(t, err) + assert.Equal(t, expectedServices, res.Services) + + server.Close() + for i := 0; i < 10; i++ { + if flagsSvc.HC().Get() == healthcheck.Unavailable { + break + } + time.Sleep(1 * time.Millisecond) + } + assert.Equal(t, healthcheck.Unavailable, flagsSvc.HC().Get()) +} + func TestServerGracefulExit(t *testing.T) { flagsSvc := flags.NewService(ports.QueryAdminHTTP) @@ -107,10 +150,11 @@ func TestServerGracefulExit(t *testing.T) { assert.Equal(t, 0, logs.Len(), "Expected initial ObservedLogs to have zero length.") flagsSvc.Logger = zap.New(zapCore) + hostPort := ports.PortToHostPort(ports.QueryAdminHTTP) querySvc := &querysvc.QueryService{} tracer := opentracing.NoopTracer{} - server, err := NewServer(flagsSvc.Logger, querySvc, &QueryOptions{HostPort: ports.PortToHostPort(ports.QueryAdminHTTP)}, tracer) + server, err := NewServer(flagsSvc.Logger, querySvc, &QueryOptions{HostPort: hostPort, GRPCHostPort: hostPort, HTTPHostPort: hostPort}, tracer) assert.Nil(t, err) assert.NoError(t, server.Start()) go func() { @@ -137,7 +181,7 @@ func TestServerHandlesPortZero(t *testing.T) { querySvc := &querysvc.QueryService{} tracer := opentracing.NoopTracer{} - server, err := NewServer(flagsSvc.Logger, querySvc, &QueryOptions{HostPort: ":0"}, tracer) + server, err := NewServer(flagsSvc.Logger, querySvc, &QueryOptions{HostPort: ":0", GRPCHostPort: ":0", HTTPHostPort: ":0"}, tracer) assert.Nil(t, err) assert.NoError(t, server.Start()) server.Close() @@ -147,5 +191,6 @@ func TestServerHandlesPortZero(t *testing.T) { onlyEntry := message.All()[0] port := onlyEntry.ContextMap()["port"] + fmt.Println(port) assert.Greater(t, port, int64(0)) } From 33bfabfe05cc1d9483c664fb84826eef24988f85 Mon Sep 17 00:00:00 2001 From: rjs211 Date: Thu, 13 Aug 2020 23:21:13 +0530 Subject: [PATCH 3/8] Adding default GRPC port number Signed-off-by: rjs211 --- cmd/query/app/flags.go | 6 +++--- cmd/query/app/flags_test.go | 29 ++++++++++++++++++++++++++++- ports/ports.go | 2 ++ 3 files changed, 33 insertions(+), 4 deletions(-) diff --git a/cmd/query/app/flags.go b/cmd/query/app/flags.go index f51b7ea329d..9e452a16b77 100644 --- a/cmd/query/app/flags.go +++ b/cmd/query/app/flags.go @@ -83,9 +83,9 @@ type QueryOptions struct { // AddFlags adds flags for QueryOptions func AddFlags(flagSet *flag.FlagSet) { flagSet.Var(&config.StringSlice{}, queryAdditionalHeaders, `Additional HTTP response headers. Can be specified multiple times. Format: "Key: Value"`) - flagSet.String(queryHostPort, ports.PortToHostPort(ports.QueryHTTP), "The host:port (e.g. 127.0.0.1:5555 or :5555) of the query's HTTP server") //old default: ports.PortToHostPort(ports.QueryHTTP) - flagSet.String(queryHTTPHostPort, ports.PortToHostPort(ports.QueryHTTP), "The host:port (e.g. 127.0.0.1:5555 or :5555) of the query's HTTP server") // TODO: change default value - flagSet.String(queryGRPCHostPort, ports.PortToHostPort(ports.QueryHTTP), "The host:port (e.g. 127.0.0.1:5555 or :5555) of the query's HTTP server") // TODO : change after defining in "ports" module // TODO: change default value + flagSet.String(queryHostPort, ports.PortToHostPort(ports.QueryHTTP), "The host:port (e.g. 127.0.0.1:5555 or :5555) of the query's HTTP server") + flagSet.String(queryHTTPHostPort, ports.PortToHostPort(ports.QueryHTTP), "The host:port (e.g. 127.0.0.1:5555 or :5555) of the query's HTTP server") + flagSet.String(queryGRPCHostPort, ports.PortToHostPort(ports.QueryGRPC), "The host:port (e.g. 127.0.0.1:5555 or :5555) of the query's HTTP server") flagSet.Int(queryPort, 0, queryPortWarning+" see --"+queryHostPort) flagSet.String(queryBasePath, "/", "The base path for all HTTP routes, e.g. /jaeger; useful when running behind a reverse proxy") flagSet.String(queryStaticFiles, "", "The directory path override for the static assets for the UI") diff --git a/cmd/query/app/flags_test.go b/cmd/query/app/flags_test.go index 8b44038270d..716fa02e325 100644 --- a/cmd/query/app/flags_test.go +++ b/cmd/query/app/flags_test.go @@ -24,6 +24,7 @@ import ( "go.uber.org/zap" "github.com/jaegertracing/jaeger/pkg/config" + "github.com/jaegertracing/jaeger/ports" "github.com/jaegertracing/jaeger/storage/mocks" spanstore_mocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks" ) @@ -60,7 +61,7 @@ func TestQueryBuilderFlags(t *testing.T) { assert.Equal(t, 10*time.Second, qOpts.MaxClockSkewAdjust) } -func TestQueryBuilderSeparateFlags(t *testing.T) { +func TestQueryBuilderFlagsSeparatePorts(t *testing.T) { v, command := config.Viperize(AddFlags) command.ParseFlags([]string{ "--query.static-files=/dev/null", @@ -76,6 +77,32 @@ func TestQueryBuilderSeparateFlags(t *testing.T) { assert.Equal(t, "some.json", qOpts.UIConfig) assert.Equal(t, "/jaeger", qOpts.BasePath) assert.Equal(t, "127.0.0.1:8080", qOpts.HTTPHostPort) + assert.Equal(t, ports.PortToHostPort(ports.QueryGRPC), qOpts.GRPCHostPort) + + assert.Equal(t, http.Header{ + "Access-Control-Allow-Origin": []string{"blerg"}, + "Whatever": []string{"thing"}, + }, qOpts.AdditionalHeaders) + assert.Equal(t, 10*time.Second, qOpts.MaxClockSkewAdjust) +} + +func TestQueryBuilderFlagsSeparateNoPorts(t *testing.T) { + v, command := config.Viperize(AddFlags) + command.ParseFlags([]string{ + "--query.static-files=/dev/null", + "--query.ui-config=some.json", + "--query.base-path=/jaeger", + "--query.additional-headers=access-control-allow-origin:blerg", + "--query.additional-headers=whatever:thing", + "--query.max-clock-skew-adjustment=10s", + }) + qOpts := new(QueryOptions).InitFromViper(v, zap.NewNop()) + assert.Equal(t, "/dev/null", qOpts.StaticAssets) + assert.Equal(t, "some.json", qOpts.UIConfig) + assert.Equal(t, "/jaeger", qOpts.BasePath) + assert.Equal(t, ports.PortToHostPort(ports.QueryHTTP), qOpts.HTTPHostPort) + assert.Equal(t, ports.PortToHostPort(ports.QueryHTTP), qOpts.GRPCHostPort) + assert.Equal(t, ports.PortToHostPort(ports.QueryHTTP), qOpts.HostPort) assert.Equal(t, http.Header{ "Access-Control-Allow-Origin": []string{"blerg"}, diff --git a/ports/ports.go b/ports/ports.go index fb93ecdebda..c8e1ae7310d 100644 --- a/ports/ports.go +++ b/ports/ports.go @@ -38,6 +38,8 @@ const ( // CollectorAdminHTTP is the default admin HTTP port (health check, metrics, etc.) CollectorAdminHTTP = 14269 + // QueryGRPC is the default port of GRPC requests for Query trace retrieval + QueryGRPC = 16685 // QueryHTTP is the default port for UI and Query API (e.g. /api/* endpoints) QueryHTTP = 16686 // QueryAdminHTTP is the default admin HTTP port (health check, metrics, etc.) From 50c362d91dd85d2b06af705354e62d47b8936912 Mon Sep 17 00:00:00 2001 From: rjs211 Date: Fri, 28 Aug 2020 09:33:51 +0530 Subject: [PATCH 4/8] remove redundant lines and code cleanup Signed-off-by: rjs211 --- cmd/query/app/flags.go | 7 ++++--- cmd/query/app/flags_test.go | 34 ++-------------------------------- cmd/query/app/server_test.go | 5 +---- 3 files changed, 7 insertions(+), 39 deletions(-) diff --git a/cmd/query/app/flags.go b/cmd/query/app/flags.go index 9e452a16b77..2a39fcd7d2b 100644 --- a/cmd/query/app/flags.go +++ b/cmd/query/app/flags.go @@ -40,6 +40,7 @@ const ( queryHostPort = "query.host-port" queryPort = "query.port" queryPortWarning = "(deprecated, will be removed after 2020-08-31 or in release v1.20.0, whichever is later)" + queryHOSTPortWarning = "(deprecated, will be removed after 2020-08-31 or in release v1.20.0, whichever is later)" queryHTTPHostPort = "query.http-server.host-port" queryGRPCHostPort = "query.grpc-server.host-port" queryBasePath = "query.base-path" @@ -72,7 +73,7 @@ type QueryOptions struct { UIConfig string // BearerTokenPropagation activate/deactivate bearer token propagation to storage BearerTokenPropagation bool - // TLS configures secure transport` + // TLS configures secure transport TLS tlscfg.Options // AdditionalHeaders AdditionalHeaders http.Header @@ -83,9 +84,9 @@ type QueryOptions struct { // AddFlags adds flags for QueryOptions func AddFlags(flagSet *flag.FlagSet) { flagSet.Var(&config.StringSlice{}, queryAdditionalHeaders, `Additional HTTP response headers. Can be specified multiple times. Format: "Key: Value"`) - flagSet.String(queryHostPort, ports.PortToHostPort(ports.QueryHTTP), "The host:port (e.g. 127.0.0.1:5555 or :5555) of the query's HTTP server") + flagSet.String(queryHostPort, ports.PortToHostPort(ports.QueryHTTP), queryHOSTPortWarning+" see --"+queryHTTPHostPort+" and --"+queryGRPCHostPort) flagSet.String(queryHTTPHostPort, ports.PortToHostPort(ports.QueryHTTP), "The host:port (e.g. 127.0.0.1:5555 or :5555) of the query's HTTP server") - flagSet.String(queryGRPCHostPort, ports.PortToHostPort(ports.QueryGRPC), "The host:port (e.g. 127.0.0.1:5555 or :5555) of the query's HTTP server") + flagSet.String(queryGRPCHostPort, ports.PortToHostPort(ports.QueryGRPC), "The host:port (e.g. 127.0.0.1:5555 or :5555) of the query's gRPC server") flagSet.Int(queryPort, 0, queryPortWarning+" see --"+queryHostPort) flagSet.String(queryBasePath, "/", "The base path for all HTTP routes, e.g. /jaeger; useful when running behind a reverse proxy") flagSet.String(queryStaticFiles, "", "The directory path override for the static assets for the UI") diff --git a/cmd/query/app/flags_test.go b/cmd/query/app/flags_test.go index 716fa02e325..c79fc2c66a8 100644 --- a/cmd/query/app/flags_test.go +++ b/cmd/query/app/flags_test.go @@ -64,51 +64,21 @@ func TestQueryBuilderFlags(t *testing.T) { func TestQueryBuilderFlagsSeparatePorts(t *testing.T) { v, command := config.Viperize(AddFlags) command.ParseFlags([]string{ - "--query.static-files=/dev/null", - "--query.ui-config=some.json", - "--query.base-path=/jaeger", "--query.http-server.host-port=127.0.0.1:8080", - "--query.additional-headers=access-control-allow-origin:blerg", - "--query.additional-headers=whatever:thing", - "--query.max-clock-skew-adjustment=10s", }) qOpts := new(QueryOptions).InitFromViper(v, zap.NewNop()) - assert.Equal(t, "/dev/null", qOpts.StaticAssets) - assert.Equal(t, "some.json", qOpts.UIConfig) - assert.Equal(t, "/jaeger", qOpts.BasePath) assert.Equal(t, "127.0.0.1:8080", qOpts.HTTPHostPort) assert.Equal(t, ports.PortToHostPort(ports.QueryGRPC), qOpts.GRPCHostPort) - - assert.Equal(t, http.Header{ - "Access-Control-Allow-Origin": []string{"blerg"}, - "Whatever": []string{"thing"}, - }, qOpts.AdditionalHeaders) - assert.Equal(t, 10*time.Second, qOpts.MaxClockSkewAdjust) } func TestQueryBuilderFlagsSeparateNoPorts(t *testing.T) { v, command := config.Viperize(AddFlags) - command.ParseFlags([]string{ - "--query.static-files=/dev/null", - "--query.ui-config=some.json", - "--query.base-path=/jaeger", - "--query.additional-headers=access-control-allow-origin:blerg", - "--query.additional-headers=whatever:thing", - "--query.max-clock-skew-adjustment=10s", - }) + command.ParseFlags([]string{}) qOpts := new(QueryOptions).InitFromViper(v, zap.NewNop()) - assert.Equal(t, "/dev/null", qOpts.StaticAssets) - assert.Equal(t, "some.json", qOpts.UIConfig) - assert.Equal(t, "/jaeger", qOpts.BasePath) + assert.Equal(t, ports.PortToHostPort(ports.QueryHTTP), qOpts.HTTPHostPort) assert.Equal(t, ports.PortToHostPort(ports.QueryHTTP), qOpts.GRPCHostPort) assert.Equal(t, ports.PortToHostPort(ports.QueryHTTP), qOpts.HostPort) - - assert.Equal(t, http.Header{ - "Access-Control-Allow-Origin": []string{"blerg"}, - "Whatever": []string{"thing"}, - }, qOpts.AdditionalHeaders) - assert.Equal(t, 10*time.Second, qOpts.MaxClockSkewAdjust) } func TestQueryBuilderBadHeadersFlags(t *testing.T) { diff --git a/cmd/query/app/server_test.go b/cmd/query/app/server_test.go index 05d2b8da6b3..ad22be6767d 100644 --- a/cmd/query/app/server_test.go +++ b/cmd/query/app/server_test.go @@ -16,7 +16,6 @@ package app import ( "context" - "fmt" "testing" "time" @@ -100,10 +99,9 @@ func TestServer(t *testing.T) { assert.Equal(t, healthcheck.Unavailable, flagsSvc.HC().Get()) } -func TestServer2(t *testing.T) { +func TestServerWithDedicatedPorts(t *testing.T) { flagsSvc := flags.NewService(ports.QueryAdminHTTP) flagsSvc.Logger = zap.NewNop() - // hostPort := ports.GetAddressFromCLIOptions(ports.QueryHTTP, "") spanReader := &spanstoremocks.Reader{} dependencyReader := &depsmocks.Reader{} @@ -191,6 +189,5 @@ func TestServerHandlesPortZero(t *testing.T) { onlyEntry := message.All()[0] port := onlyEntry.ContextMap()["port"] - fmt.Println(port) assert.Greater(t, port, int64(0)) } From 68878bd2ad1dfe367799057056dd5c30dbd4b105 Mon Sep 17 00:00:00 2001 From: rjs211 Date: Tue, 8 Sep 2020 21:31:15 +0530 Subject: [PATCH 5/8] Removing possibly flaky test Signed-off-by: rjs211 --- cmd/query/app/server_test.go | 44 ++++++++++++++++++++++++------------ 1 file changed, 29 insertions(+), 15 deletions(-) diff --git a/cmd/query/app/server_test.go b/cmd/query/app/server_test.go index ad22be6767d..9d130953079 100644 --- a/cmd/query/app/server_test.go +++ b/cmd/query/app/server_test.go @@ -16,6 +16,7 @@ package app import ( "context" + "sync" "testing" "time" @@ -73,10 +74,23 @@ func TestServer(t *testing.T) { opentracing.NoopTracer{}) assert.Nil(t, err) assert.NoError(t, server.Start()) + + var wg sync.WaitGroup + wg.Add(1) + once := sync.Once{} + go func() { for s := range server.HealthCheckStatus() { - flagsSvc.SetHealthCheckStatus(s) + flagsSvc.HC().Set(s) + if s == healthcheck.Unavailable { + once.Do(func() { + wg.Done() + }) + } + } + wg.Done() + }() client := newGRPCClient(t, hostPort) @@ -90,12 +104,7 @@ func TestServer(t *testing.T) { assert.Equal(t, expectedServices, res.Services) server.Close() - for i := 0; i < 10; i++ { - if flagsSvc.HC().Get() == healthcheck.Unavailable { - break - } - time.Sleep(1 * time.Millisecond) - } + wg.Wait() assert.Equal(t, healthcheck.Unavailable, flagsSvc.HC().Get()) } @@ -115,9 +124,19 @@ func TestServerWithDedicatedPorts(t *testing.T) { opentracing.NoopTracer{}) assert.Nil(t, err) assert.NoError(t, server.Start()) + + var wg sync.WaitGroup + wg.Add(1) + once := sync.Once{} + go func() { for s := range server.HealthCheckStatus() { - flagsSvc.SetHealthCheckStatus(s) + flagsSvc.HC().Set(s) + if s == healthcheck.Unavailable { + once.Do(func() { + wg.Done() + }) + } } }() @@ -132,12 +151,7 @@ func TestServerWithDedicatedPorts(t *testing.T) { assert.Equal(t, expectedServices, res.Services) server.Close() - for i := 0; i < 10; i++ { - if flagsSvc.HC().Get() == healthcheck.Unavailable { - break - } - time.Sleep(1 * time.Millisecond) - } + wg.Wait() assert.Equal(t, healthcheck.Unavailable, flagsSvc.HC().Get()) } @@ -157,7 +171,7 @@ func TestServerGracefulExit(t *testing.T) { assert.NoError(t, server.Start()) go func() { for s := range server.HealthCheckStatus() { - flagsSvc.SetHealthCheckStatus(s) + flagsSvc.HC().Set(s) } }() From 1236e5d0a54e7cfbd9adde54e769aa000ab31b4f Mon Sep 17 00:00:00 2001 From: rjs211 Date: Wed, 9 Sep 2020 15:35:52 +0530 Subject: [PATCH 6/8] Adding minor changes for review Signed-off-by: rjs211 1. Added test cases for `HostPortToPort` method 2. Modified Depricated warning to reasonable date --- cmd/query/app/flags.go | 2 +- ports/ports_test.go | 12 ++++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/cmd/query/app/flags.go b/cmd/query/app/flags.go index 2a39fcd7d2b..82dc1219aaa 100644 --- a/cmd/query/app/flags.go +++ b/cmd/query/app/flags.go @@ -40,7 +40,7 @@ const ( queryHostPort = "query.host-port" queryPort = "query.port" queryPortWarning = "(deprecated, will be removed after 2020-08-31 or in release v1.20.0, whichever is later)" - queryHOSTPortWarning = "(deprecated, will be removed after 2020-08-31 or in release v1.20.0, whichever is later)" + queryHOSTPortWarning = "(deprecated, will be removed after 2020-11-30 or in release v1.20.0, whichever is later)" queryHTTPHostPort = "query.http-server.host-port" queryGRPCHostPort = "query.grpc-server.host-port" queryBasePath = "query.base-path" diff --git a/ports/ports_test.go b/ports/ports_test.go index 51385afec03..c284acf0760 100644 --- a/ports/ports_test.go +++ b/ports/ports_test.go @@ -39,3 +39,15 @@ func TestGetAddressFromCLIOptionOnlyPort(t *testing.T) { func TestGetAddressFromCLIOptionOnlyPortWithoutColon(t *testing.T) { assert.Equal(t, ":123", GetAddressFromCLIOptions(0, "123")) } + +func TestHostPortToPortOnlyPort(t *testing.T) { + port, err := HostPortToPort(":42") + assert.Nil(t, err) + assert.Equal(t, 42, port) +} + +func TestHostPortToPortFullHostPort(t *testing.T) { + port, err := HostPortToPort("127.0.0.1:123") + assert.Nil(t, err) + assert.Equal(t, 123, port) +} From 8bd7008a52740dc832515c1dbda6225adcbb538f Mon Sep 17 00:00:00 2001 From: rjs211 Date: Thu, 10 Sep 2020 16:17:00 +0530 Subject: [PATCH 7/8] Using available methods for host-port split, correcting nit-bits Signed-off-by: rjs211 --- cmd/query/app/flags.go | 8 ++++---- cmd/query/app/server.go | 10 ++++------ ports/ports.go | 8 -------- ports/ports_test.go | 12 ------------ 4 files changed, 8 insertions(+), 30 deletions(-) diff --git a/cmd/query/app/flags.go b/cmd/query/app/flags.go index 82dc1219aaa..2ced798262f 100644 --- a/cmd/query/app/flags.go +++ b/cmd/query/app/flags.go @@ -40,7 +40,7 @@ const ( queryHostPort = "query.host-port" queryPort = "query.port" queryPortWarning = "(deprecated, will be removed after 2020-08-31 or in release v1.20.0, whichever is later)" - queryHOSTPortWarning = "(deprecated, will be removed after 2020-11-30 or in release v1.20.0, whichever is later)" + queryHOSTPortWarning = "(deprecated, will be removed after 2020-12-31 or in release v1.21.0, whichever is later)" queryHTTPHostPort = "query.http-server.host-port" queryGRPCHostPort = "query.grpc-server.host-port" queryBasePath = "query.base-path" @@ -103,12 +103,12 @@ func (qOpts *QueryOptions) InitPortsConfigFromViper(v *viper.Viper, logger *zap. qOpts.TLS = tlsFlagsConfig.InitFromViper(v) - // query.host-port is not defined and atleast one of query.grpc-server.host-port or query.http-server.host-port is defined - // user intends to use the separate flags. + // query.host-port is not defined and at least one of query.grpc-server.host-port or query.http-server.host-port is defined. + // User intends to use separate GRPC and HTTP host:port flags if !(v.IsSet(queryHostPort) || v.IsSet(queryPort)) && (v.IsSet(queryHTTPHostPort) || v.IsSet(queryGRPCHostPort)) { return qOpts } - logger.Warn(fmt.Sprintf("Use of %s and %s is deprecated. use %s and %s instead", queryPort, queryHostPort, queryHTTPHostPort, queryGRPCHostPort)) + logger.Warn(fmt.Sprintf("Use of %s and %s is deprecated. Use %s and %s instead", queryPort, queryHostPort, queryHTTPHostPort, queryGRPCHostPort)) qOpts.HTTPHostPort = qOpts.HostPort qOpts.GRPCHostPort = qOpts.HostPort return qOpts diff --git a/cmd/query/app/server.go b/cmd/query/app/server.go index c007a5a78dc..1fa96f3c2c5 100644 --- a/cmd/query/app/server.go +++ b/cmd/query/app/server.go @@ -30,7 +30,6 @@ import ( "github.com/jaegertracing/jaeger/pkg/healthcheck" "github.com/jaegertracing/jaeger/pkg/netutils" "github.com/jaegertracing/jaeger/pkg/recoveryhandler" - "github.com/jaegertracing/jaeger/ports" "github.com/jaegertracing/jaeger/proto-gen/api_v2" ) @@ -54,15 +53,14 @@ type Server struct { // NewServer creates and initializes Server func NewServer(logger *zap.Logger, querySvc *querysvc.QueryService, options *QueryOptions, tracer opentracing.Tracer) (*Server, error) { - httpPort, err := ports.HostPortToPort(options.HTTPHostPort) + _, httpPort, err := net.SplitHostPort(options.HTTPHostPort) if err != nil { return nil, err } - grpcPort, err := ports.HostPortToPort(options.GRPCHostPort) + _, grpcPort, err := net.SplitHostPort(options.GRPCHostPort) if err != nil { return nil, err } - separatePorts := (grpcPort != httpPort) grpcServer, err := createGRPCServer(querySvc, options, logger, tracer) if err != nil { @@ -76,7 +74,7 @@ func NewServer(logger *zap.Logger, querySvc *querysvc.QueryService, options *Que tracer: tracer, grpcServer: grpcServer, httpServer: createHTTPServer(querySvc, options, tracer, logger), - separatePorts: separatePorts, + separatePorts: (grpcPort != httpPort), unavailableChannel: make(chan healthcheck.Status), }, nil } @@ -194,8 +192,8 @@ func (s *Server) Start() error { if port, err := netutils.GetPort(s.conn.Addr()); err == nil { tcpPort = port } - } + var httpPort int if port, err := netutils.GetPort(s.httpConn.Addr()); err == nil { httpPort = port diff --git a/ports/ports.go b/ports/ports.go index c8e1ae7310d..95cbee755c9 100644 --- a/ports/ports.go +++ b/ports/ports.go @@ -54,14 +54,6 @@ func PortToHostPort(port int) string { return ":" + strconv.Itoa(port) } -// HostPortToPort converts the host:port address string intoto port int -func HostPortToPort(hostPort string) (int, error) { - s := strings.Split(hostPort, ":") - - return strconv.Atoi(s[len(s)-1]) - -} - // GetAddressFromCLIOptions gets listening address based on port (deprecated flags) or host:port (new flags) func GetAddressFromCLIOptions(port int, hostPort string) string { if port != 0 { diff --git a/ports/ports_test.go b/ports/ports_test.go index c284acf0760..51385afec03 100644 --- a/ports/ports_test.go +++ b/ports/ports_test.go @@ -39,15 +39,3 @@ func TestGetAddressFromCLIOptionOnlyPort(t *testing.T) { func TestGetAddressFromCLIOptionOnlyPortWithoutColon(t *testing.T) { assert.Equal(t, ":123", GetAddressFromCLIOptions(0, "123")) } - -func TestHostPortToPortOnlyPort(t *testing.T) { - port, err := HostPortToPort(":42") - assert.Nil(t, err) - assert.Equal(t, 42, port) -} - -func TestHostPortToPortFullHostPort(t *testing.T) { - port, err := HostPortToPort("127.0.0.1:123") - assert.Nil(t, err) - assert.Equal(t, 123, port) -} From dfe0c0089e0c881603cad38376f6a6c3acd40b17 Mon Sep 17 00:00:00 2001 From: rjs211 Date: Thu, 10 Sep 2020 21:39:37 +0530 Subject: [PATCH 8/8] added tests to increase coverage Signed-off-by: rjs211 --- cmd/query/app/server_test.go | 39 ++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/cmd/query/app/server_test.go b/cmd/query/app/server_test.go index 9d130953079..ca2473acdc0 100644 --- a/cmd/query/app/server_test.go +++ b/cmd/query/app/server_test.go @@ -16,6 +16,7 @@ package app import ( "context" + "net" "sync" "testing" "time" @@ -58,6 +59,44 @@ func TestCreateTLSServerError(t *testing.T) { assert.NotNil(t, err) } +func TestServerBadHostPort(t *testing.T) { + _, err := NewServer(zap.NewNop(), &querysvc.QueryService{}, + &QueryOptions{HTTPHostPort: "8080", GRPCHostPort: "127.0.0.1:8081", BearerTokenPropagation: true}, + opentracing.NoopTracer{}) + + assert.NotNil(t, err) + _, err = NewServer(zap.NewNop(), &querysvc.QueryService{}, + &QueryOptions{HTTPHostPort: "127.0.0.1:8081", GRPCHostPort: "9123", BearerTokenPropagation: true}, + opentracing.NoopTracer{}) + + assert.NotNil(t, err) +} + +func TestServerInUseHostPort(t *testing.T) { + + for _, hostPort := range [2]string{":8080", ":8081"} { + conn, err := net.Listen("tcp", hostPort) + assert.NoError(t, err) + + server, err := NewServer(zap.NewNop(), &querysvc.QueryService{}, + &QueryOptions{HTTPHostPort: "127.0.0.1:8080", GRPCHostPort: "127.0.0.1:8081", BearerTokenPropagation: true}, + opentracing.NoopTracer{}) + assert.NoError(t, err) + + err = server.Start() + assert.NotNil(t, err) + conn.Close() + if server.grpcConn != nil { + server.grpcConn.Close() + } + if server.httpConn != nil { + server.httpConn.Close() + } + + } + +} + func TestServer(t *testing.T) { flagsSvc := flags.NewService(ports.QueryAdminHTTP) flagsSvc.Logger = zap.NewNop()