Skip to content

Commit

Permalink
Add CLI Option for gRPC Max Receive Message Size
Browse files Browse the repository at this point in the history
Added a new CLI option for the gRPC Collector to override the default
max receive message size which is normally 4MB since some users of
Jaeger may want to record Spans which exceed the default max size.

Signed-off-by: Justin Stauffer <justin.stauffer@gmail.com>
  • Loading branch information
js8080 committed Aug 16, 2021
1 parent 8cd84e7 commit ee45f21
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 22 deletions.
23 changes: 14 additions & 9 deletions cmd/collector/app/builder_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,16 @@ import (
)

const (
collectorDynQueueSizeMemory = "collector.queue-size-memory"
collectorGRPCHostPort = "collector.grpc-server.host-port"
collectorHTTPHostPort = "collector.http-server.host-port"
collectorNumWorkers = "collector.num-workers"
collectorQueueSize = "collector.queue-size"
collectorTags = "collector.tags"
collectorZipkinAllowedHeaders = "collector.zipkin.allowed-headers"
collectorZipkinAllowedOrigins = "collector.zipkin.allowed-origins"
collectorZipkinHTTPHostPort = "collector.zipkin.host-port"
collectorDynQueueSizeMemory = "collector.queue-size-memory"
collectorGRPCHostPort = "collector.grpc-server.host-port"
collectorHTTPHostPort = "collector.http-server.host-port"
collectorNumWorkers = "collector.num-workers"
collectorQueueSize = "collector.queue-size"
collectorTags = "collector.tags"
collectorZipkinAllowedHeaders = "collector.zipkin.allowed-headers"
collectorZipkinAllowedOrigins = "collector.zipkin.allowed-origins"
collectorZipkinHTTPHostPort = "collector.zipkin.host-port"
collectorGRPCMaxReceiveMessageLength = "collector.grpc-max-receive-message-length"
)

var tlsGRPCFlagsConfig = tlscfg.ServerFlagsConfig{
Expand Down Expand Up @@ -69,12 +70,15 @@ type CollectorOptions struct {
CollectorZipkinAllowedOrigins string
// CollectorZipkinAllowedHeaders is a list of headers that the Zipkin collector service allowes the client to use with cross-domain requests
CollectorZipkinAllowedHeaders string
// CollectorGRPCMaxReceiveMessageLength is the maximum message size receivable by the gRPC Collector.
CollectorGRPCMaxReceiveMessageLength int
}

// AddFlags adds flags for CollectorOptions
func AddFlags(flags *flag.FlagSet) {
flags.Int(collectorNumWorkers, DefaultNumWorkers, "The number of workers pulling items from the queue")
flags.Int(collectorQueueSize, DefaultQueueSize, "The queue size of the collector")
flags.Int(collectorGRPCMaxReceiveMessageLength, DefaultGRPCMaxReceiveMessageLength, "The maximum receivable message size for the collector's GRPC server")
flags.String(collectorGRPCHostPort, ports.PortToHostPort(ports.CollectorGRPC), "The host:port (e.g. 127.0.0.1:14250 or :14250) of the collector's GRPC server")
flags.String(collectorHTTPHostPort, ports.PortToHostPort(ports.CollectorHTTP), "The host:port (e.g. 127.0.0.1:14268 or :14268) of the collector's HTTP server")
flags.String(collectorTags, "", "One or more tags to be added to the Process tags of all spans passing through this collector. Ex: key1=value1,key2=${envVar:defaultValue}")
Expand All @@ -100,6 +104,7 @@ func (cOpts *CollectorOptions) InitFromViper(v *viper.Viper) *CollectorOptions {
cOpts.QueueSize = v.GetInt(collectorQueueSize)
cOpts.TLSGRPC = tlsGRPCFlagsConfig.InitFromViper(v)
cOpts.TLSHTTP = tlsHTTPFlagsConfig.InitFromViper(v)
cOpts.CollectorGRPCMaxReceiveMessageLength = v.GetInt(collectorGRPCMaxReceiveMessageLength)

return cOpts
}
11 changes: 11 additions & 0 deletions cmd/collector/app/builder_flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,14 @@ func TestCollectorOptionsWithFlags_CheckFullHostPort(t *testing.T) {
assert.Equal(t, "127.0.0.1:1234", c.CollectorGRPCHostPort)
assert.Equal(t, "0.0.0.0:3456", c.CollectorZipkinHTTPHostPort)
}

func TestCollectorOptionsWithFlags_CheckMaxReceiveMessageLength(t *testing.T) {
c := &CollectorOptions{}
v, command := config.Viperize(AddFlags)
command.ParseFlags([]string{
"--collector.grpc-max-receive-message-length=8388608",
})
c.InitFromViper(v)

assert.Equal(t, 8388608, c.CollectorGRPCMaxReceiveMessageLength)
}
11 changes: 6 additions & 5 deletions cmd/collector/app/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,12 @@ func (c *Collector) Start(builderOpts *CollectorOptions) error {
c.spanHandlers = handlerBuilder.BuildHandlers(c.spanProcessor)

grpcServer, err := server.StartGRPCServer(&server.GRPCServerParams{
HostPort: builderOpts.CollectorGRPCHostPort,
Handler: c.spanHandlers.GRPCHandler,
TLSConfig: builderOpts.TLSGRPC,
SamplingStore: c.strategyStore,
Logger: c.logger,
HostPort: builderOpts.CollectorGRPCHostPort,
Handler: c.spanHandlers.GRPCHandler,
TLSConfig: builderOpts.TLSGRPC,
SamplingStore: c.strategyStore,
Logger: c.logger,
MaxReceiveMessageLength: builderOpts.CollectorGRPCMaxReceiveMessageLength,
})
if err != nil {
return fmt.Errorf("could not start gRPC collector %w", err)
Expand Down
2 changes: 2 additions & 0 deletions cmd/collector/app/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ const (
DefaultNumWorkers = 50
// DefaultQueueSize is the size of the processor's queue
DefaultQueueSize = 2000
// DefaultGRPCMaxReceiveMessageLength is the default max receivable message size for the gRPC Collector
DefaultGRPCMaxReceiveMessageLength = 4 * 1024 * 1024
)

type options struct {
Expand Down
22 changes: 14 additions & 8 deletions cmd/collector/app/server/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,21 @@ import (

// GRPCServerParams to construct a new Jaeger Collector gRPC Server
type GRPCServerParams struct {
TLSConfig tlscfg.Options
HostPort string
Handler *handler.GRPCHandler
SamplingStore strategystore.StrategyStore
Logger *zap.Logger
OnError func(error)
TLSConfig tlscfg.Options
HostPort string
Handler *handler.GRPCHandler
SamplingStore strategystore.StrategyStore
Logger *zap.Logger
OnError func(error)
MaxReceiveMessageLength int
}

// StartGRPCServer based on the given parameters
func StartGRPCServer(params *GRPCServerParams) (*grpc.Server, error) {
var server *grpc.Server
var grpcOpts []grpc.ServerOption

grpcOpts = append(grpcOpts, grpc.MaxRecvMsgSize(params.MaxReceiveMessageLength))

if params.TLSConfig.Enabled {
// user requested a server with TLS, setup creds
Expand All @@ -51,10 +55,12 @@ func StartGRPCServer(params *GRPCServerParams) (*grpc.Server, error) {
}

creds := credentials.NewTLS(tlsCfg)
server = grpc.NewServer(grpc.Creds(creds))
grpcOpts = append(grpcOpts, grpc.Creds(creds))

server = grpc.NewServer(grpcOpts...)
} else {
// server without TLS
server = grpc.NewServer()
server = grpc.NewServer(grpcOpts...)
}

listener, err := net.Listen("tcp", params.HostPort)
Expand Down

0 comments on commit ee45f21

Please sign in to comment.