Skip to content

Commit

Permalink
Merge pull request cockroachdb#63361 from miretskiy/backport21.1-63039
Browse files Browse the repository at this point in the history
release-21.1: changefeedccl: Make it possible to configure kafka behavior.
  • Loading branch information
miretskiy authored Apr 9, 2021
2 parents 14e902c + 842b0e3 commit 69122e0
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 37 deletions.
3 changes: 3 additions & 0 deletions pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ const (
OptFormatAvro FormatType = `experimental_avro`
OptFormatNative FormatType = `native`

// OptKafkaSinkConfig is a JSON configuration for kafka sink (kafkaSinkConfig).
OptKafkaSinkConfig = `kafka_sink_config`

SinkParamCACert = `ca_cert`
SinkParamClientCert = `client_cert`
SinkParamClientKey = `client_key`
Expand Down
136 changes: 99 additions & 37 deletions pkg/ccl/changefeedccl/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"crypto/x509"
gosql "database/sql"
"encoding/base64"
"encoding/json"
"fmt"
"hash"
"hash/fnv"
Expand Down Expand Up @@ -206,7 +207,7 @@ func getSink(
}

makeSink = func() (Sink, error) {
return makeKafkaSink(cfg, u.Host, targets)
return makeKafkaSink(cfg, u.Host, targets, opts)
}
case isCloudStorageSink(u):
fileSizeParam := q.Get(changefeedbase.SinkParamFileSize)
Expand Down Expand Up @@ -369,8 +370,99 @@ func (s *kafkaSink) setTargets(targets jobspb.ChangefeedTargets) {
}
}

type jsonDuration time.Duration

func (j *jsonDuration) UnmarshalJSON(b []byte) error {
var s string
if err := json.Unmarshal(b, &s); err != nil {
return err
}
dur, err := time.ParseDuration(s)
if err != nil {
return err
}
*j = jsonDuration(dur)
return nil
}

type saramaConfig struct {
// These settings mirror ones in sarama config.
// We just tag them w/ JSON annotations.
// Flush describes settings specific to producer flushing.
// See sarama.Config.Producer.Flush
Flush struct {
Bytes int `json:",omitempty"`
Messages int `json:",omitempty"`
Frequency jsonDuration `json:",omitempty"`
MaxMessages int `json:",omitempty"`
}
}

// Configure configures provided kafka configuration struct based
// on this config.
func (c *saramaConfig) Apply(kafka *sarama.Config) {
kafka.Producer.Flush.Bytes = c.Flush.Bytes
kafka.Producer.Flush.Messages = c.Flush.Messages
kafka.Producer.Flush.Frequency = time.Duration(c.Flush.Frequency)
kafka.Producer.Flush.MaxMessages = c.Flush.MaxMessages
}

var defaultSaramaConfig = func() *saramaConfig {
config := &saramaConfig{}

// When we emit messages to sarama, they're placed in a queue (as does any
// reasonable kafka producer client). When our sink's Flush is called, we
// have to wait for all buffered and inflight requests to be sent and then
// acknowledged. Quite unfortunately, we have no way to hint to the producer
// that it should immediately send out whatever is buffered. This
// configuration can have a dramatic impact on how quickly this happens
// naturally (and some configurations will block forever!).
//
// We can configure the producer to send out its batches based on number of
// messages and/or total buffered message size and/or time. If none of them
// are set, it uses some defaults, but if any of the three are set, it does
// no defaulting. Which means that if `Flush.Messages` is set to 10 and
// nothing else is set, then 9/10 times `Flush` will block forever. We can
// work around this by also setting `Flush.Frequency` but a cleaner way is
// to set `Flush.Messages` to 1. In the steady state, this sends a request
// with some messages, buffers any messages that come in while it is in
// flight, then sends those out.
config.Flush.Messages = 1

// This works around what seems to be a bug in sarama where it isn't
// computing the right value to compare against `Producer.MaxMessageBytes`
// and the server sends it back with a "Message was too large, server
// rejected it to avoid allocation" error. The other flush tunings are
// hints, but this one is a hard limit, so it's useful here as a workaround.
//
// This workaround should probably be something like setting
// `Producer.MaxMessageBytes` to 90% of it's value for some headroom, but
// this workaround is the one that's been running in roachtests and I'd want
// to test this one more before changing it.
config.Flush.MaxMessages = 1000

// config.Producer.Flush.Messages is set to 1 so we don't need this, but
// sarama prints scary things to the logs if we don't.
config.Flush.Frequency = jsonDuration(time.Hour)

return config
}()

func getSaramaConfig(opts map[string]string) (config *saramaConfig, err error) {
if configStr, haveOverride := opts[changefeedbase.OptKafkaSinkConfig]; haveOverride {
config = &saramaConfig{}
err = json.Unmarshal([]byte(configStr), config)
} else {
config = defaultSaramaConfig
}
return
}

func makeKafkaSink(
cfg kafkaSinkConfig, bootstrapServers string, targets jobspb.ChangefeedTargets,
cfg kafkaSinkConfig,
bootstrapServers string,
targets jobspb.ChangefeedTargets,
opts map[string]string,
) (Sink, error) {
sink := &kafkaSink{cfg: cfg}
sink.setTargets(targets)
Expand Down Expand Up @@ -431,42 +523,12 @@ func makeKafkaSink(
}
}

// When we emit messages to sarama, they're placed in a queue (as does any
// reasonable kafka producer client). When our sink's Flush is called, we
// have to wait for all buffered and inflight requests to be sent and then
// acknowledged. Quite unfortunately, we have no way to hint to the producer
// that it should immediately send out whatever is buffered. This
// configuration can have a dramatic impact on how quickly this happens
// naturally (and some configurations will block forever!).
//
// We can configure the producer to send out its batches based on number of
// messages and/or total buffered message size and/or time. If none of them
// are set, it uses some defaults, but if any of the three are set, it does
// no defaulting. Which means that if `Flush.Messages` is set to 10 and
// nothing else is set, then 9/10 times `Flush` will block forever. We can
// work around this by also setting `Flush.Frequency` but a cleaner way is
// to set `Flush.Messages` to 1. In the steady state, this sends a request
// with some messages, buffers any messages that come in while it is in
// flight, then sends those out.
config.Producer.Flush.Messages = 1

// This works around what seems to be a bug in sarama where it isn't
// computing the right value to compare against `Producer.MaxMessageBytes`
// and the server sends it back with a "Message was too large, server
// rejected it to avoid allocation" error. The other flush tunings are
// hints, but this one is a hard limit, so it's useful here as a workaround.
//
// This workaround should probably be something like setting
// `Producer.MaxMessageBytes` to 90% of it's value for some headroom, but
// this workaround is the one that's been running in roachtests and I'd want
// to test this one more before changing it.
config.Producer.Flush.MaxMessages = 1000

// config.Producer.Flush.Messages is set to 1 so we don't need this, but
// sarama prints scary things to the logs if we don't.
config.Producer.Flush.Frequency = time.Hour
saramaCfg, err := getSaramaConfig(opts)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse sarama config; check changefeed.experimental_kafka_config setting")
}
saramaCfg.Apply(config)

var err error
sink.client, err = sarama.NewClient(strings.Split(bootstrapServers, `,`), config)
if err != nil {
err = pgerror.Wrapf(err, pgcode.CannotConnectNow,
Expand Down
20 changes: 20 additions & 0 deletions pkg/ccl/changefeedccl/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/Shopify/sarama"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
Expand Down Expand Up @@ -299,3 +300,22 @@ func TestSQLSink(t *testing.T) {
},
)
}

func TestSaramaConfigOptionParsing(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

opts := make(map[string]string)
cfg, err := getSaramaConfig(opts)
require.NoError(t, err)
require.Equal(t, defaultSaramaConfig, cfg)

expected := &saramaConfig{}
expected.Flush.MaxMessages = 1000
expected.Flush.Frequency = jsonDuration(time.Second)

opts[changefeedbase.OptKafkaSinkConfig] = `{"Flush": {"MaxMessages": 1000, "Frequency": "1s"}}`
cfg, err = getSaramaConfig(opts)
require.NoError(t, err)
require.Equal(t, expected, cfg)
}

0 comments on commit 69122e0

Please sign in to comment.