diff --git a/cmd/collector/app/builder_flags.go b/cmd/collector/app/builder_flags.go index 8599d864549..7693fdc64a9 100644 --- a/cmd/collector/app/builder_flags.go +++ b/cmd/collector/app/builder_flags.go @@ -26,15 +26,16 @@ import ( ) const ( - collectorDynQueueSizeMemory = "collector.queue-size-memory" - collectorGRPCHostPort = "collector.grpc-server.host-port" - collectorHTTPHostPort = "collector.http-server.host-port" - collectorNumWorkers = "collector.num-workers" - collectorQueueSize = "collector.queue-size" - collectorTags = "collector.tags" - collectorZipkinAllowedHeaders = "collector.zipkin.allowed-headers" - collectorZipkinAllowedOrigins = "collector.zipkin.allowed-origins" - collectorZipkinHTTPHostPort = "collector.zipkin.host-port" + collectorDynQueueSizeMemory = "collector.queue-size-memory" + collectorGRPCHostPort = "collector.grpc-server.host-port" + collectorHTTPHostPort = "collector.http-server.host-port" + collectorNumWorkers = "collector.num-workers" + collectorQueueSize = "collector.queue-size" + collectorTags = "collector.tags" + collectorZipkinAllowedHeaders = "collector.zipkin.allowed-headers" + collectorZipkinAllowedOrigins = "collector.zipkin.allowed-origins" + collectorZipkinHTTPHostPort = "collector.zipkin.host-port" + collectorGRPCMaxReceiveMessageLength = "collector.grpc-max-receive-message-length" ) var tlsGRPCFlagsConfig = tlscfg.ServerFlagsConfig{ @@ -69,12 +70,15 @@ type CollectorOptions struct { CollectorZipkinAllowedOrigins string // CollectorZipkinAllowedHeaders is a list of headers that the Zipkin collector service allowes the client to use with cross-domain requests CollectorZipkinAllowedHeaders string + // CollectorGRPCMaxReceiveMessageLength is the maximum message size receivable by the gRPC Collector. + CollectorGRPCMaxReceiveMessageLength int } // AddFlags adds flags for CollectorOptions func AddFlags(flags *flag.FlagSet) { flags.Int(collectorNumWorkers, DefaultNumWorkers, "The number of workers pulling items from the queue") flags.Int(collectorQueueSize, DefaultQueueSize, "The queue size of the collector") + flags.Int(collectorGRPCMaxReceiveMessageLength, DefaultGRPCMaxReceiveMessageLength, "The maximum receivable message size for the collector's GRPC server") flags.String(collectorGRPCHostPort, ports.PortToHostPort(ports.CollectorGRPC), "The host:port (e.g. 127.0.0.1:14250 or :14250) of the collector's GRPC server") flags.String(collectorHTTPHostPort, ports.PortToHostPort(ports.CollectorHTTP), "The host:port (e.g. 127.0.0.1:14268 or :14268) of the collector's HTTP server") flags.String(collectorTags, "", "One or more tags to be added to the Process tags of all spans passing through this collector. Ex: key1=value1,key2=${envVar:defaultValue}") @@ -100,6 +104,7 @@ func (cOpts *CollectorOptions) InitFromViper(v *viper.Viper) *CollectorOptions { cOpts.QueueSize = v.GetInt(collectorQueueSize) cOpts.TLSGRPC = tlsGRPCFlagsConfig.InitFromViper(v) cOpts.TLSHTTP = tlsHTTPFlagsConfig.InitFromViper(v) + cOpts.CollectorGRPCMaxReceiveMessageLength = v.GetInt(collectorGRPCMaxReceiveMessageLength) return cOpts } diff --git a/cmd/collector/app/builder_flags_test.go b/cmd/collector/app/builder_flags_test.go index 0ffa0e8ec1b..3ec7d9200d3 100644 --- a/cmd/collector/app/builder_flags_test.go +++ b/cmd/collector/app/builder_flags_test.go @@ -51,3 +51,14 @@ func TestCollectorOptionsWithFlags_CheckFullHostPort(t *testing.T) { assert.Equal(t, "127.0.0.1:1234", c.CollectorGRPCHostPort) assert.Equal(t, "0.0.0.0:3456", c.CollectorZipkinHTTPHostPort) } + +func TestCollectorOptionsWithFlags_CheckMaxReceiveMessageLength(t *testing.T) { + c := &CollectorOptions{} + v, command := config.Viperize(AddFlags) + command.ParseFlags([]string{ + "--collector.grpc-max-receive-message-length=8388608", + }) + c.InitFromViper(v) + + assert.Equal(t, 8388608, c.CollectorGRPCMaxReceiveMessageLength) +} diff --git a/cmd/collector/app/collector.go b/cmd/collector/app/collector.go index f46f86a2f5d..550542bd49c 100644 --- a/cmd/collector/app/collector.go +++ b/cmd/collector/app/collector.go @@ -87,11 +87,12 @@ func (c *Collector) Start(builderOpts *CollectorOptions) error { c.spanHandlers = handlerBuilder.BuildHandlers(c.spanProcessor) grpcServer, err := server.StartGRPCServer(&server.GRPCServerParams{ - HostPort: builderOpts.CollectorGRPCHostPort, - Handler: c.spanHandlers.GRPCHandler, - TLSConfig: builderOpts.TLSGRPC, - SamplingStore: c.strategyStore, - Logger: c.logger, + HostPort: builderOpts.CollectorGRPCHostPort, + Handler: c.spanHandlers.GRPCHandler, + TLSConfig: builderOpts.TLSGRPC, + SamplingStore: c.strategyStore, + Logger: c.logger, + MaxReceiveMessageLength: builderOpts.CollectorGRPCMaxReceiveMessageLength, }) if err != nil { return fmt.Errorf("could not start gRPC collector %w", err) diff --git a/cmd/collector/app/options.go b/cmd/collector/app/options.go index 53de30ef105..abc390869d3 100644 --- a/cmd/collector/app/options.go +++ b/cmd/collector/app/options.go @@ -29,6 +29,8 @@ const ( DefaultNumWorkers = 50 // DefaultQueueSize is the size of the processor's queue DefaultQueueSize = 2000 + // DefaultGRPCMaxReceiveMessageLength is the default max receivable message size for the gRPC Collector + DefaultGRPCMaxReceiveMessageLength = 4 * 1024 * 1024 ) type options struct { diff --git a/cmd/collector/app/server/grpc.go b/cmd/collector/app/server/grpc.go index 4e3849bd70f..8d6cdf69538 100644 --- a/cmd/collector/app/server/grpc.go +++ b/cmd/collector/app/server/grpc.go @@ -31,17 +31,21 @@ import ( // GRPCServerParams to construct a new Jaeger Collector gRPC Server type GRPCServerParams struct { - TLSConfig tlscfg.Options - HostPort string - Handler *handler.GRPCHandler - SamplingStore strategystore.StrategyStore - Logger *zap.Logger - OnError func(error) + TLSConfig tlscfg.Options + HostPort string + Handler *handler.GRPCHandler + SamplingStore strategystore.StrategyStore + Logger *zap.Logger + OnError func(error) + MaxReceiveMessageLength int } // StartGRPCServer based on the given parameters func StartGRPCServer(params *GRPCServerParams) (*grpc.Server, error) { var server *grpc.Server + var grpcOpts []grpc.ServerOption + + grpcOpts = append(grpcOpts, grpc.MaxRecvMsgSize(params.MaxReceiveMessageLength)) if params.TLSConfig.Enabled { // user requested a server with TLS, setup creds @@ -51,10 +55,12 @@ func StartGRPCServer(params *GRPCServerParams) (*grpc.Server, error) { } creds := credentials.NewTLS(tlsCfg) - server = grpc.NewServer(grpc.Creds(creds)) + grpcOpts = append(grpcOpts, grpc.Creds(creds)) + + server = grpc.NewServer(grpcOpts...) } else { // server without TLS - server = grpc.NewServer() + server = grpc.NewServer(grpcOpts...) } listener, err := net.Listen("tcp", params.HostPort)