From 842b0e346e36819ea02341d23aafed7538193071 Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Fri, 2 Apr 2021 16:59:38 -0400 Subject: [PATCH] changefeedccl: Make it possible to configure kafka behavior. Make it possible to configure kafka behavior, in particular Flushing, via `kafka_sink_config` CHANGEFEED option. This option can be used to configure underlying kafka library (sarama) to balance latency vs throughput configuraitons. The default is to optimize for latency. Clients can choose to configure for throughput instead. For example, the following will batch up to 1000 messages, or up to 1 second worth of messages. ``` CREATE CHANGEFEED ... WITH kafka_sink_config='{"Flush": {"MaxMessages": 1000, "Frequency": "1s"}}`' ``` Release Notes: Make kafka library used in changefeeds configurable via `kafka_sink_config` option to enable latency vs throughput configurations. --- .../changefeedccl/changefeedbase/options.go | 3 + pkg/ccl/changefeedccl/sink.go | 136 +++++++++++++----- pkg/ccl/changefeedccl/sink_test.go | 20 +++ 3 files changed, 122 insertions(+), 37 deletions(-) diff --git a/pkg/ccl/changefeedccl/changefeedbase/options.go b/pkg/ccl/changefeedccl/changefeedbase/options.go index bb1600eb0cc1..60763e06f149 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/options.go +++ b/pkg/ccl/changefeedccl/changefeedbase/options.go @@ -84,6 +84,9 @@ const ( OptFormatAvro FormatType = `experimental_avro` OptFormatNative FormatType = `native` + // OptKafkaSinkConfig is a JSON configuration for kafka sink (kafkaSinkConfig). + OptKafkaSinkConfig = `kafka_sink_config` + SinkParamCACert = `ca_cert` SinkParamClientCert = `client_cert` SinkParamClientKey = `client_key` diff --git a/pkg/ccl/changefeedccl/sink.go b/pkg/ccl/changefeedccl/sink.go index c2d52be3bc13..c4d0bedf529f 100644 --- a/pkg/ccl/changefeedccl/sink.go +++ b/pkg/ccl/changefeedccl/sink.go @@ -14,6 +14,7 @@ import ( "crypto/x509" gosql "database/sql" "encoding/base64" + "encoding/json" "fmt" "hash" "hash/fnv" @@ -206,7 +207,7 @@ func getSink( } makeSink = func() (Sink, error) { - return makeKafkaSink(cfg, u.Host, targets) + return makeKafkaSink(cfg, u.Host, targets, opts) } case isCloudStorageSink(u): fileSizeParam := q.Get(changefeedbase.SinkParamFileSize) @@ -369,8 +370,99 @@ func (s *kafkaSink) setTargets(targets jobspb.ChangefeedTargets) { } } +type jsonDuration time.Duration + +func (j *jsonDuration) UnmarshalJSON(b []byte) error { + var s string + if err := json.Unmarshal(b, &s); err != nil { + return err + } + dur, err := time.ParseDuration(s) + if err != nil { + return err + } + *j = jsonDuration(dur) + return nil +} + +type saramaConfig struct { + // These settings mirror ones in sarama config. + // We just tag them w/ JSON annotations. + // Flush describes settings specific to producer flushing. + // See sarama.Config.Producer.Flush + Flush struct { + Bytes int `json:",omitempty"` + Messages int `json:",omitempty"` + Frequency jsonDuration `json:",omitempty"` + MaxMessages int `json:",omitempty"` + } +} + +// Configure configures provided kafka configuration struct based +// on this config. +func (c *saramaConfig) Apply(kafka *sarama.Config) { + kafka.Producer.Flush.Bytes = c.Flush.Bytes + kafka.Producer.Flush.Messages = c.Flush.Messages + kafka.Producer.Flush.Frequency = time.Duration(c.Flush.Frequency) + kafka.Producer.Flush.MaxMessages = c.Flush.MaxMessages +} + +var defaultSaramaConfig = func() *saramaConfig { + config := &saramaConfig{} + + // When we emit messages to sarama, they're placed in a queue (as does any + // reasonable kafka producer client). When our sink's Flush is called, we + // have to wait for all buffered and inflight requests to be sent and then + // acknowledged. Quite unfortunately, we have no way to hint to the producer + // that it should immediately send out whatever is buffered. This + // configuration can have a dramatic impact on how quickly this happens + // naturally (and some configurations will block forever!). + // + // We can configure the producer to send out its batches based on number of + // messages and/or total buffered message size and/or time. If none of them + // are set, it uses some defaults, but if any of the three are set, it does + // no defaulting. Which means that if `Flush.Messages` is set to 10 and + // nothing else is set, then 9/10 times `Flush` will block forever. We can + // work around this by also setting `Flush.Frequency` but a cleaner way is + // to set `Flush.Messages` to 1. In the steady state, this sends a request + // with some messages, buffers any messages that come in while it is in + // flight, then sends those out. + config.Flush.Messages = 1 + + // This works around what seems to be a bug in sarama where it isn't + // computing the right value to compare against `Producer.MaxMessageBytes` + // and the server sends it back with a "Message was too large, server + // rejected it to avoid allocation" error. The other flush tunings are + // hints, but this one is a hard limit, so it's useful here as a workaround. + // + // This workaround should probably be something like setting + // `Producer.MaxMessageBytes` to 90% of it's value for some headroom, but + // this workaround is the one that's been running in roachtests and I'd want + // to test this one more before changing it. + config.Flush.MaxMessages = 1000 + + // config.Producer.Flush.Messages is set to 1 so we don't need this, but + // sarama prints scary things to the logs if we don't. + config.Flush.Frequency = jsonDuration(time.Hour) + + return config +}() + +func getSaramaConfig(opts map[string]string) (config *saramaConfig, err error) { + if configStr, haveOverride := opts[changefeedbase.OptKafkaSinkConfig]; haveOverride { + config = &saramaConfig{} + err = json.Unmarshal([]byte(configStr), config) + } else { + config = defaultSaramaConfig + } + return +} + func makeKafkaSink( - cfg kafkaSinkConfig, bootstrapServers string, targets jobspb.ChangefeedTargets, + cfg kafkaSinkConfig, + bootstrapServers string, + targets jobspb.ChangefeedTargets, + opts map[string]string, ) (Sink, error) { sink := &kafkaSink{cfg: cfg} sink.setTargets(targets) @@ -431,42 +523,12 @@ func makeKafkaSink( } } - // When we emit messages to sarama, they're placed in a queue (as does any - // reasonable kafka producer client). When our sink's Flush is called, we - // have to wait for all buffered and inflight requests to be sent and then - // acknowledged. Quite unfortunately, we have no way to hint to the producer - // that it should immediately send out whatever is buffered. This - // configuration can have a dramatic impact on how quickly this happens - // naturally (and some configurations will block forever!). - // - // We can configure the producer to send out its batches based on number of - // messages and/or total buffered message size and/or time. If none of them - // are set, it uses some defaults, but if any of the three are set, it does - // no defaulting. Which means that if `Flush.Messages` is set to 10 and - // nothing else is set, then 9/10 times `Flush` will block forever. We can - // work around this by also setting `Flush.Frequency` but a cleaner way is - // to set `Flush.Messages` to 1. In the steady state, this sends a request - // with some messages, buffers any messages that come in while it is in - // flight, then sends those out. - config.Producer.Flush.Messages = 1 - - // This works around what seems to be a bug in sarama where it isn't - // computing the right value to compare against `Producer.MaxMessageBytes` - // and the server sends it back with a "Message was too large, server - // rejected it to avoid allocation" error. The other flush tunings are - // hints, but this one is a hard limit, so it's useful here as a workaround. - // - // This workaround should probably be something like setting - // `Producer.MaxMessageBytes` to 90% of it's value for some headroom, but - // this workaround is the one that's been running in roachtests and I'd want - // to test this one more before changing it. - config.Producer.Flush.MaxMessages = 1000 - - // config.Producer.Flush.Messages is set to 1 so we don't need this, but - // sarama prints scary things to the logs if we don't. - config.Producer.Flush.Frequency = time.Hour + saramaCfg, err := getSaramaConfig(opts) + if err != nil { + return nil, errors.Wrapf(err, "failed to parse sarama config; check changefeed.experimental_kafka_config setting") + } + saramaCfg.Apply(config) - var err error sink.client, err = sarama.NewClient(strings.Split(bootstrapServers, `,`), config) if err != nil { err = pgerror.Wrapf(err, pgcode.CannotConnectNow, diff --git a/pkg/ccl/changefeedccl/sink_test.go b/pkg/ccl/changefeedccl/sink_test.go index a6180dcbec40..c1993d9eec13 100644 --- a/pkg/ccl/changefeedccl/sink_test.go +++ b/pkg/ccl/changefeedccl/sink_test.go @@ -17,6 +17,7 @@ import ( "github.com/Shopify/sarama" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" @@ -299,3 +300,22 @@ func TestSQLSink(t *testing.T) { }, ) } + +func TestSaramaConfigOptionParsing(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + opts := make(map[string]string) + cfg, err := getSaramaConfig(opts) + require.NoError(t, err) + require.Equal(t, defaultSaramaConfig, cfg) + + expected := &saramaConfig{} + expected.Flush.MaxMessages = 1000 + expected.Flush.Frequency = jsonDuration(time.Second) + + opts[changefeedbase.OptKafkaSinkConfig] = `{"Flush": {"MaxMessages": 1000, "Frequency": "1s"}}` + cfg, err = getSaramaConfig(opts) + require.NoError(t, err) + require.Equal(t, expected, cfg) +}