Skip to content

Commit

Permalink
Set kafka message size and compression as configurable (#173)
Browse files Browse the repository at this point in the history
* add max message byte and compression to router cfg init

* add kafka commit for api server and cluster builder

* making kafka default available from router_default in helms value

* fix spacing

* update test case

* update test case and adding default when router create api is called

* update test name

Co-authored-by: leonlnj <ningjie.lee@gojek.com>
  • Loading branch information
leonlnj and leonlnj authored Mar 7, 2022
1 parent 538dce2 commit a3d788d
Show file tree
Hide file tree
Showing 12 changed files with 239 additions and 2 deletions.
2 changes: 2 additions & 0 deletions api/turing/api/request/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ func (r CreateOrUpdateRouterRequest) BuildRouterVersion(
Brokers: r.Config.LogConfig.KafkaConfig.Brokers,
Topic: r.Config.LogConfig.KafkaConfig.Topic,
SerializationFormat: r.Config.LogConfig.KafkaConfig.SerializationFormat,
MaxMessageBytes: defaults.KafkaConfig.MaxMessageBytes,
CompressionType: defaults.KafkaConfig.CompressionType,
}
}
if rv.ExperimentEngine.Type != models.ExperimentEngineTypeNop {
Expand Down
94 changes: 94 additions & 0 deletions api/turing/api/request/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,100 @@ func TestRequestBuildRouter(t *testing.T) {
assert.Equal(t, *expected, *got)
}

func TestRequestBuildRouterVersionLoggerConfiguration(t *testing.T) {

baseRequest := CreateOrUpdateRouterRequest{
Environment: "env",
Name: "router",
Config: &RouterConfig{
ExperimentEngine: &ExperimentEngineConfig{
Type: "nop",
},
},
}

projectID := models.ID(1)
routerDefault := config.RouterDefaults{
KafkaConfig: &config.KafkaConfig{
MaxMessageBytes: 1110,
CompressionType: "gzip",
},
}

tests := []struct {
testName string
logConfig *LogConfig
expectedLogConfig *models.LogConfig
}{
{
testName: "Test Kafka Logger",
logConfig: &LogConfig{
ResultLoggerType: "kafka",
KafkaConfig: &KafkaConfig{
Brokers: "10:11",
Topic: "2222",
SerializationFormat: "json",
},
},
expectedLogConfig: &models.LogConfig{
LogLevel: "",
CustomMetricsEnabled: false,
FiberDebugLogEnabled: false,
JaegerEnabled: false,
ResultLoggerType: "kafka",
KafkaConfig: &models.KafkaConfig{
Brokers: "10:11",
Topic: "2222",
SerializationFormat: "json",
MaxMessageBytes: 1110,
CompressionType: "gzip",
},
BigQueryConfig: nil,
},
},
{
testName: "Test BQ Logger",
logConfig: &LogConfig{
ResultLoggerType: "bigquery",
BigQueryConfig: &BigQueryConfig{
Table: "project.dataset.table",
ServiceAccountSecret: "service_account",
},
},
expectedLogConfig: &models.LogConfig{
LogLevel: "",
CustomMetricsEnabled: false,
FiberDebugLogEnabled: false,
JaegerEnabled: false,
ResultLoggerType: "bigquery",
KafkaConfig: nil,
BigQueryConfig: &models.BigQueryConfig{
Table: "project.dataset.table",
ServiceAccountSecret: "service_account",
BatchLoad: true,
},
},
},
}
for _, tt := range tests {
t.Run(tt.testName, func(t *testing.T) {
baseRequest.Config.LogConfig = tt.logConfig
router := baseRequest.BuildRouter(projectID)
// Set up mock Crypto service
cryptoSvc := &mocks.CryptoService{}
cryptoSvc.On("Encrypt", "dummy_passkey").Return("enc_passkey", nil)

// Set up mock Experiment service
expSvc := &mocks.ExperimentsService{}
expSvc.On("IsStandardExperimentManager", mock.Anything).Return(true)

got, err := baseRequest.BuildRouterVersion(router, &routerDefault, cryptoSvc, expSvc)
assert.NoError(t, err)
assert.Equal(t, got.LogConfig, tt.expectedLogConfig)
})
}
}

func TestRequestBuildRouterVersionWithDefaults(t *testing.T) {
defaults := config.RouterDefaults{
Image: "routerimage",
Expand Down
4 changes: 4 additions & 0 deletions api/turing/cluster/servicebuilder/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ const (
envKafkaBrokers = "APP_KAFKA_BROKERS"
envKafkaTopic = "APP_KAFKA_TOPIC"
envKafkaSerializationFormat = "APP_KAFKA_SERIALIZATION_FORMAT"
envKafkaMaxMessageBytes = "APP_KAFKA_MAX_MESSAGE_BYTES"
envKafkaCompressionType = "APP_KAFKA_COMPRESSION_TYPE"
envRouterConfigFile = "ROUTER_CONFIG_FILE"
envGoogleApplicationCredentials = "GOOGLE_APPLICATION_CREDENTIALS"
)
Expand Down Expand Up @@ -252,6 +254,8 @@ func (sb *clusterSvcBuilder) buildRouterEnvs(
{Name: envKafkaBrokers, Value: logConfig.KafkaConfig.Brokers},
{Name: envKafkaTopic, Value: logConfig.KafkaConfig.Topic},
{Name: envKafkaSerializationFormat, Value: string(logConfig.KafkaConfig.SerializationFormat)},
{Name: envKafkaMaxMessageBytes, Value: strconv.Itoa(logConfig.KafkaConfig.MaxMessageBytes)},
{Name: envKafkaCompressionType, Value: logConfig.KafkaConfig.CompressionType},
}...)
}

Expand Down
88 changes: 88 additions & 0 deletions api/turing/cluster/servicebuilder/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,3 +553,91 @@ func TestNewRouterEndpoint(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, expected, got)
}

func TestBuildRouterEnvsResultLogger(t *testing.T) {
type args struct {
namespace string
environmentType string
fluentdTag string
jaegerCollectorEndpoint string
sentryEnabled bool
sentryDSN string
secretName string
ver *models.RouterVersion
}
namespace := "testnamespace"
tests := []struct {
name string
args args
want []corev1.EnvVar
}{
{
name: "KafkaLogger",
args: args{
namespace: "testnamespace",
environmentType: "dev",
fluentdTag: "",
jaegerCollectorEndpoint: "",
sentryEnabled: false,
sentryDSN: "",
secretName: "",
ver: &models.RouterVersion{
Router: &models.Router{Name: "test1"},
Version: 1,
Timeout: "10s",
LogConfig: &models.LogConfig{
LogLevel: "DEBUG",
CustomMetricsEnabled: false,
FiberDebugLogEnabled: false,
JaegerEnabled: false,
ResultLoggerType: "kafka",
KafkaConfig: &models.KafkaConfig{
Brokers: "1.1.1.1:1111",
Topic: "kafkatopic",
SerializationFormat: "protobuf",
MaxMessageBytes: 123,
CompressionType: "gzip",
},
},
},
},
want: []corev1.EnvVar{
{Name: "APP_NAME", Value: "test1-1.testnamespace"},
{Name: "APP_ENVIRONMENT", Value: "dev"},
{Name: "ROUTER_TIMEOUT", Value: "10s"},
{Name: "APP_JAEGER_COLLECTOR_ENDPOINT", Value: ""},
{Name: "ROUTER_CONFIG_FILE", Value: "/app/config/fiber.yml"},
{Name: "APP_SENTRY_ENABLED", Value: "false"},
{Name: "APP_SENTRY_DSN", Value: ""},
{Name: "APP_LOGLEVEL", Value: "DEBUG"},
{Name: "APP_CUSTOM_METRICS", Value: "false"},
{Name: "APP_JAEGER_ENABLED", Value: "false"},
{Name: "APP_RESULT_LOGGER", Value: "kafka"},
{Name: "APP_FIBER_DEBUG_LOG", Value: "false"},
{Name: "APP_KAFKA_BROKERS", Value: "1.1.1.1:1111"},
{Name: "APP_KAFKA_TOPIC", Value: "kafkatopic"},
{Name: "APP_KAFKA_SERIALIZATION_FORMAT", Value: "protobuf"},
{Name: "APP_KAFKA_MAX_MESSAGE_BYTES", Value: "123"},
{Name: "APP_KAFKA_COMPRESSION_TYPE", Value: "gzip"},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sb := &clusterSvcBuilder{
MaxCPU: resource.MustParse("2"),
MaxMemory: resource.MustParse("2Gi"),
}
got, _ := sb.buildRouterEnvs(
namespace,
tt.args.environmentType,
tt.args.fluentdTag,
tt.args.jaegerCollectorEndpoint,
tt.args.sentryEnabled,
tt.args.sentryDSN,
tt.args.secretName,
tt.args.ver)
assert.Equal(t, tt.want, got)
})
}
}
12 changes: 12 additions & 0 deletions api/turing/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,8 @@ type RouterDefaults struct {
// blue-exp-engine:
// Image: ghcr.io/myproject/blue-exp-engine-plugin:v0.0.1
ExperimentEnginePlugins map[string]*ExperimentEnginePluginConfig `validate:"dive"`
// Kafka Configuration. If result logging is using Kafka
KafkaConfig *KafkaConfig
}

// FluentdConfig captures the defaults used by the Turing Router when Fluentd is enabled
Expand All @@ -287,6 +289,14 @@ type FluentdConfig struct {
FlushIntervalSeconds int
}

// KafkaConfig captures the defaults used by Turing Router when result logger is set to kafka
type KafkaConfig struct {
// Producer Config - Max message byte to send to broker
MaxMessageBytes int
// Producer Config - Compression Type of message
CompressionType string
}

// AuthorizationConfig captures the config for auth using mlp authz
type AuthorizationConfig struct {
Enabled bool
Expand Down Expand Up @@ -469,6 +479,8 @@ func setDefaultValues(v *viper.Viper) {
v.SetDefault("RouterDefaults::FluentdConfig::Tag", "turing-result.log")
v.SetDefault("RouterDefaults::FluentdConfig::FlushIntervalSeconds", "90")
v.SetDefault("RouterDefaults::Experiment", map[string]interface{}{})
v.SetDefault("RouterDefaults::KafkaConfig::MaxMessageBytes", "1048588")
v.SetDefault("RouterDefaults::KafkaConfig::CompressionType", "none")

v.SetDefault("Sentry::Enabled", "false")
v.SetDefault("Sentry::DSN", "")
Expand Down
16 changes: 16 additions & 0 deletions api/turing/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ func TestLoad(t *testing.T) {
Tag: "turing-result.log",
FlushIntervalSeconds: 90,
},
KafkaConfig: &config.KafkaConfig{
MaxMessageBytes: 1048588,
CompressionType: "none",
},
},
Sentry: sentry.Config{},
ClusterConfig: config.ClusterConfig{
Expand Down Expand Up @@ -208,6 +212,10 @@ func TestLoad(t *testing.T) {
Tag: "turing-result.log",
FlushIntervalSeconds: 60,
},
KafkaConfig: &config.KafkaConfig{
MaxMessageBytes: 1048588,
CompressionType: "none",
},
},
Sentry: sentry.Config{
Enabled: true,
Expand Down Expand Up @@ -294,6 +302,10 @@ func TestLoad(t *testing.T) {
"red": {Image: "ghcr.io/myproject/red-exp-engine-plugin:v0.0.1"},
"blue": {Image: "ghcr.io/myproject/blue-exp-engine-plugin:latest"},
},
KafkaConfig: &config.KafkaConfig{
MaxMessageBytes: 1234567,
CompressionType: "snappy",
},
},
Sentry: sentry.Config{
Enabled: true,
Expand Down Expand Up @@ -400,6 +412,10 @@ func TestLoad(t *testing.T) {
"red": {Image: "ghcr.io/myproject/red-exp-engine-plugin:v0.0.1"},
"blue": {Image: "ghcr.io/myproject/blue-exp-engine-plugin:latest"},
},
KafkaConfig: &config.KafkaConfig{
MaxMessageBytes: 1234567,
CompressionType: "snappy",
},
},
Sentry: sentry.Config{
Enabled: true,
Expand Down
3 changes: 3 additions & 0 deletions api/turing/config/testdata/config-2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ RouterDefaults:
Image: ghcr.io/myproject/red-exp-engine-plugin:v0.0.1
blue:
Image: ghcr.io/myproject/blue-exp-engine-plugin:latest
KafkaConfig:
MaxMessageBytes: 1234567
CompressionType: snappy
Experiment:
qux:
quxkey1: quxval1-override
Expand Down
4 changes: 4 additions & 0 deletions api/turing/models/log_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ type KafkaConfig struct {
Topic string `json:"topic"`
// Serialization Format used for the messages
SerializationFormat SerializationFormat `json:"serialization_format"`
// Producer Config - Max message byte to send to broker
MaxMessageBytes int `json:"max_message_bytes"`
// Producer Config - Compression Type of message
CompressionType string `json:"compression_type"`
}

// LogConfig contains all log configuration necessary for a deployment
Expand Down
6 changes: 5 additions & 1 deletion api/turing/models/log_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ func TestLogConfigValue(t *testing.T) {
Brokers: "test-brokers",
Topic: "test-topic",
SerializationFormat: "test-serialization",
MaxMessageBytes: 10000,
CompressionType: "none",
},
},
expected: string(`{
Expand All @@ -70,7 +72,9 @@ func TestLogConfigValue(t *testing.T) {
"kafka_config": {
"brokers": "test-brokers",
"topic": "test-topic",
"serialization_format": "test-serialization"
"serialization_format": "test-serialization",
"max_message_bytes" : 10000,
"compression_type" : "none"
}
}`),
},
Expand Down
2 changes: 2 additions & 0 deletions engines/router/missionctl/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ type KafkaConfig struct {
Brokers string
Topic string
SerializationFormat SerializationFormat `split_words:"true"`
MaxMessageBytes int `split_words:"true" default:"1048588"`
CompressionType string `split_words:"true" default:"none"`
}

// JaegerConfig captures the settings for tracing using Jaeger client
Expand Down
4 changes: 4 additions & 0 deletions engines/router/missionctl/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ func TestInitConfigDefaultEnvs(t *testing.T) {
Brokers: "",
Topic: "",
SerializationFormat: SerializationFormat(""),
MaxMessageBytes: 1048588,
CompressionType: "none",
},
CustomMetrics: false,
Jaeger: &JaegerConfig{
Expand Down Expand Up @@ -172,6 +174,8 @@ func TestInitConfigEnv(t *testing.T) {
Brokers: "localhost:9000",
Topic: "kafka_topic",
SerializationFormat: JSONSerializationFormat,
MaxMessageBytes: 1048588,
CompressionType: "none",
},
CustomMetrics: true,
Jaeger: &JaegerConfig{
Expand Down
6 changes: 5 additions & 1 deletion engines/router/missionctl/log/resultlog/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,11 @@ func newKafkaLogger(cfg *config.KafkaConfig) (*KafkaLogger, error) {
}

func newKafkaProducer(cfg *config.KafkaConfig) (kafkaProducer, error) {
producer, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": cfg.Brokers, "message.max.bytes": 10485760})
producer, err := kafka.NewProducer(
&kafka.ConfigMap{
"bootstrap.servers": cfg.Brokers,
"message.max.bytes": cfg.MaxMessageBytes,
"compression.type": cfg.CompressionType})
if err != nil {
return nil, errors.Wrapf(err, "Error initializing Kafka Producer")
}
Expand Down

0 comments on commit a3d788d

Please sign in to comment.