Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ changefeed.aggregator.flush_jitter float 0.1 jitter aggregator flushes as a frac
changefeed.backfill.concurrent_scan_requests integer 0 number of concurrent scan requests per node issued during a backfill application
changefeed.backfill.scan_request_size integer 524288 the maximum number of bytes returned by each scan request application
changefeed.batch_reduction_retry.enabled (alias: changefeed.batch_reduction_retry_enabled) boolean false if true, kafka changefeeds upon erroring on an oversized batch will attempt to resend the messages with progressively lower batch sizes application
changefeed.default_range_distribution_strategy enumeration default configures how work is distributed among nodes for a given changefeed. for the most balanced distribution, use `balanced_simple`. changing this setting will not override locality restrictions [default = 0, balanced_simple = 1] application
changefeed.event_consumer_worker_queue_size integer 16 if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of events which a worker can buffer application
changefeed.event_consumer_workers integer 0 the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled application
changefeed.fast_gzip.enabled boolean true use fast gzip implementation application
Expand Down
1 change: 0 additions & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
<tr><td><div id="setting-changefeed-backfill-concurrent-scan-requests" class="anchored"><code>changefeed.backfill.concurrent_scan_requests</code></div></td><td>integer</td><td><code>0</code></td><td>number of concurrent scan requests per node issued during a backfill</td><td>Basic/Standard/Advanced/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-backfill-scan-request-size" class="anchored"><code>changefeed.backfill.scan_request_size</code></div></td><td>integer</td><td><code>524288</code></td><td>the maximum number of bytes returned by each scan request</td><td>Basic/Standard/Advanced/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-batch-reduction-retry-enabled" class="anchored"><code>changefeed.batch_reduction_retry.enabled<br />(alias: changefeed.batch_reduction_retry_enabled)</code></div></td><td>boolean</td><td><code>false</code></td><td>if true, kafka changefeeds upon erroring on an oversized batch will attempt to resend the messages with progressively lower batch sizes</td><td>Basic/Standard/Advanced/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-default-range-distribution-strategy" class="anchored"><code>changefeed.default_range_distribution_strategy</code></div></td><td>enumeration</td><td><code>default</code></td><td>configures how work is distributed among nodes for a given changefeed. for the most balanced distribution, use `balanced_simple`. changing this setting will not override locality restrictions [default = 0, balanced_simple = 1]</td><td>Basic/Standard/Advanced/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-event-consumer-worker-queue-size" class="anchored"><code>changefeed.event_consumer_worker_queue_size</code></div></td><td>integer</td><td><code>16</code></td><td>if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of events which a worker can buffer</td><td>Basic/Standard/Advanced/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-event-consumer-workers" class="anchored"><code>changefeed.event_consumer_workers</code></div></td><td>integer</td><td><code>0</code></td><td>the number of workers to use when processing events: &lt;0 disables, 0 assigns a reasonable default, &gt;0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled</td><td>Basic/Standard/Advanced/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-fast-gzip-enabled" class="anchored"><code>changefeed.fast_gzip.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>use fast gzip implementation</td><td>Basic/Standard/Advanced/Self-Hosted</td></tr>
Expand Down
45 changes: 8 additions & 37 deletions pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metamorphic"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -345,37 +344,6 @@ func startDistChangefeed(
// The bin packing choice gives preference to leaseholder replicas if possible.
var replicaOracleChoice = replicaoracle.BinPackingChoice

type rangeDistributionType int

const (
// defaultDistribution employs no load balancing on the changefeed
// side. We defer to distsql to select nodes and distribute work.
defaultDistribution rangeDistributionType = 0
// balancedSimpleDistribution defers to distsql for selecting the
// set of nodes to distribute work to. However, changefeeds will try to
// distribute work evenly across this set of nodes.
balancedSimpleDistribution rangeDistributionType = 1
// TODO(jayant): add balancedFullDistribution which takes
// full control of node selection and distribution.
)

// RangeDistributionStrategy is used to determine how the changefeed balances
// ranges between nodes.
// TODO: deprecate this setting in favor of a changefeed option.
var RangeDistributionStrategy = settings.RegisterEnumSetting(
settings.ApplicationLevel,
"changefeed.default_range_distribution_strategy",
"configures how work is distributed among nodes for a given changefeed. "+
"for the most balanced distribution, use `balanced_simple`. changing this setting "+
"will not override locality restrictions",
metamorphic.ConstantWithTestChoice("default_range_distribution_strategy",
"default", "balanced_simple"),
map[rangeDistributionType]string{
defaultDistribution: "default",
balancedSimpleDistribution: "balanced_simple",
},
settings.WithPublic)

var useBulkOracle = settings.RegisterBoolSetting(
settings.ApplicationLevel,
"changefeed.random_replica_selection.enabled",
Expand Down Expand Up @@ -410,7 +378,10 @@ func makePlan(
}
}

rangeDistribution := RangeDistributionStrategy.Get(sv)
rangeDistributionStrat, err := changefeedbase.MakeStatementOptions(details.Opts).GetRangeDistributionStrategy()
if err != nil {
return nil, nil, err
}
evalCtx := execCtx.ExtendedEvalContext()
oracle := replicaoracle.NewOracle(replicaOracleChoice, dsp.ReplicaOracleConfig(locFilter))
if useBulkOracle.Get(&evalCtx.Settings.SV) {
Expand All @@ -427,8 +398,8 @@ func makePlan(
log.Changefeed.Infof(ctx, "spans returned by DistSQL: %v", spanPartitions)
}
switch {
case distMode == sql.LocalDistribution || rangeDistribution == defaultDistribution:
case rangeDistribution == balancedSimpleDistribution:
case distMode == sql.LocalDistribution || rangeDistributionStrat == changefeedbase.RangeDistributionStrategyDefault:
case rangeDistributionStrat == changefeedbase.RangeDistributionStrategyBalancedSimple:
log.Changefeed.Infof(ctx, "rebalancing ranges using balanced simple distribution")
sender := execCtx.ExecCfg().DB.NonTransactionalSender()
distSender := sender.(*kv.CrossRangeTxnWrapperSender).Wrapped().(*kvcoord.DistSender)
Expand All @@ -442,8 +413,8 @@ func makePlan(
log.Changefeed.Infof(ctx, "spans after balanced simple distribution rebalancing: %v", spanPartitions)
}
default:
return nil, nil, errors.AssertionFailedf("unsupported dist strategy %d and dist mode %d",
rangeDistribution, distMode)
return nil, nil, errors.AssertionFailedf("unsupported dist strategy %s and dist mode %d",
rangeDistributionStrat, distMode)
}

if haveKnobs && maybeCfKnobs.FilterDrainingNodes != nil && len(drainingNodes) > 0 {
Expand Down
75 changes: 49 additions & 26 deletions pkg/ccl/changefeedccl/changefeed_dist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,6 @@ func TestChangefeedWithNoDistributionStrategy(t *testing.T) {
tester := newRangeDistributionTester(t, noLocality)
defer tester.cleanup()

serverutils.SetClusterSetting(t, tester.tc, "changefeed.default_range_distribution_strategy", "default")
serverutils.SetClusterSetting(t, tester.tc, "changefeed.random_replica_selection.enabled", false)
tester.sqlDB.Exec(t, "CREATE CHANGEFEED FOR x INTO 'null://' WITH initial_scan='no'")
partitions := tester.getPartitions()
Expand All @@ -527,10 +526,9 @@ func TestChangefeedWithSimpleDistributionStrategy(t *testing.T) {
// Check that we roughly assign (64 ranges / 6 nodes) ranges to each node.
tester := newRangeDistributionTester(t, noLocality)
defer tester.cleanup()
tester.sqlDB.Exec(t, "SET CLUSTER SETTING changefeed.default_range_distribution_strategy = 'balanced_simple'")
// We need to disable the bulk oracle in order to ensure the leaseholder is selected.
tester.sqlDB.Exec(t, "SET CLUSTER SETTING changefeed.random_replica_selection.enabled = false")
tester.sqlDB.Exec(t, "CREATE CHANGEFEED FOR x INTO 'null://' WITH initial_scan='no'")
tester.sqlDB.Exec(t, "CREATE CHANGEFEED FOR x INTO 'null://' WITH initial_scan='no', range_distribution_strategy='balanced_simple'")
partitions := tester.getPartitions()
counts := tester.countRangesPerNode(partitions)
upper := int(math.Ceil((1 + rebalanceThreshold.Get(&tester.lastNode.ClusterSettings().SV)) * 64 / 6))
Expand All @@ -548,30 +546,56 @@ func TestChangefeedWithNoDistributionStrategyAndConstrainedLocality(t *testing.T
skip.UnderShort(t)
skip.UnderDuress(t)

// The replica oracle selects the leaseholder replica for each range. Then, distsql assigns the replica
// to the same node which stores it. However, node of these nodes don't pass the filter. The replicas assigned
// to these nodes are distributed arbitrarily to any nodes which pass the filter.
tester := newRangeDistributionTester(t, func(i int) []roachpb.Tier {
if i%2 == 1 {
return []roachpb.Tier{{Key: "y", Value: "1"}}
t.Run("default specified", func(t *testing.T) {
// The replica oracle selects the leaseholder replica for each range. Then, distsql assigns the replica
// to the same node which stores it. However, node of these nodes don't pass the filter. The replicas assigned
// to these nodes are distributed arbitrarily to any nodes which pass the filter.
tester := newRangeDistributionTester(t, func(i int) []roachpb.Tier {
if i%2 == 1 {
return []roachpb.Tier{{Key: "y", Value: "1"}}
}
return []roachpb.Tier{}
})
defer tester.cleanup()
tester.sqlDB.Exec(t, "CREATE CHANGEFEED FOR x INTO 'null://' WITH initial_scan='no', execution_locality='y=1', range_distribution_strategy='default'")
partitions := tester.getPartitions()
counts := tester.countRangesPerNode(partitions)

totalRanges := 0
for i, count := range counts {
if i%2 == 1 {
totalRanges += count
} else {
require.Equal(t, count, 0)
}
}
return []roachpb.Tier{}
require.Equal(t, totalRanges, 64)
})
defer tester.cleanup()
tester.sqlDB.Exec(t, "SET CLUSTER SETTING changefeed.default_range_distribution_strategy = 'default'")
tester.sqlDB.Exec(t, "CREATE CHANGEFEED FOR x INTO 'null://' WITH initial_scan='no', execution_locality='y=1'")
partitions := tester.getPartitions()
counts := tester.countRangesPerNode(partitions)

totalRanges := 0
for i, count := range counts {
if i%2 == 1 {
totalRanges += count
} else {
require.Equal(t, count, 0)
t.Run("no distribution strategy specified", func(t *testing.T) {
// The replica oracle selects the leaseholder replica for each range. Then, distsql assigns the replica
// to the same node which stores it. However, node of these nodes don't pass the filter. The replicas assigned
// to these nodes are distributed arbitrarily to any nodes which pass the filter.
tester := newRangeDistributionTester(t, func(i int) []roachpb.Tier {
if i%2 == 1 {
return []roachpb.Tier{{Key: "y", Value: "1"}}
}
return []roachpb.Tier{}
})
defer tester.cleanup()
tester.sqlDB.Exec(t, "CREATE CHANGEFEED FOR x INTO 'null://' WITH initial_scan='no', execution_locality='y=1'")
partitions := tester.getPartitions()
counts := tester.countRangesPerNode(partitions)

totalRanges := 0
for i, count := range counts {
if i%2 == 1 {
totalRanges += count
} else {
require.Equal(t, count, 0)
}
}
}
require.Equal(t, totalRanges, 64)
require.Equal(t, totalRanges, 64)
})
}

func TestChangefeedWithSimpleDistributionStrategyAndConstrainedLocality(t *testing.T) {
Expand All @@ -593,8 +617,7 @@ func TestChangefeedWithSimpleDistributionStrategyAndConstrainedLocality(t *testi
return []roachpb.Tier{}
})
defer tester.cleanup()
tester.sqlDB.Exec(t, "SET CLUSTER SETTING changefeed.default_range_distribution_strategy = 'balanced_simple'")
tester.sqlDB.Exec(t, "CREATE CHANGEFEED FOR x INTO 'null://' WITH initial_scan='no', execution_locality='y=1'")
tester.sqlDB.Exec(t, "CREATE CHANGEFEED FOR x INTO 'null://' WITH initial_scan='no', execution_locality='y=1', range_distribution_strategy='balanced_simple'")
partitions := tester.getPartitions()
counts := tester.countRangesPerNode(partitions)

Expand Down
28 changes: 28 additions & 0 deletions pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,19 @@ const (
EnrichedPropertySchema EnrichedProperty = `schema`
)

// RangeDistributionStrategy configures how the changefeed balances ranges between nodes.
type RangeDistributionStrategy string

const (
// RangeDistributionStrategyDefault employs no load balancing on the changefeed
// side. We defer to distsql to select nodes and distribute work.
RangeDistributionStrategyDefault RangeDistributionStrategy = `default`
// RangeDistributionStrategyBalancedSimple defers to distsql for selecting the
// set of nodes to distribute work to. However, changefeeds will try to
// distribute work evenly across this set of nodes.
RangeDistributionStrategyBalancedSimple RangeDistributionStrategy = `balanced_simple`
)

// Constants for the initial scan types
const (
InitialScan InitialScanType = iota
Expand Down Expand Up @@ -162,6 +175,8 @@ const (

OptEnrichedProperties = `enriched_properties`

OptRangeDistributionStrategy = `range_distribution_strategy`

OptEnvelopeKeyOnly EnvelopeType = `key_only`
OptEnvelopeRow EnvelopeType = `row`
OptEnvelopeDeprecatedRow EnvelopeType = `deprecated_row`
Expand Down Expand Up @@ -412,6 +427,7 @@ var ChangefeedOptionExpectValues = map[string]OptionPermittedValues{
OptIgnoreDisableChangefeedReplication: flagOption,
OptEncodeJSONValueNullAsObject: flagOption,
OptEnrichedProperties: csv(string(EnrichedPropertySource), string(EnrichedPropertySchema)),
OptRangeDistributionStrategy: enum("default", "balanced_simple"),
OptHeadersJSONColumnName: stringOption,
OptExtraHeaders: jsonOption,
}
Expand All @@ -428,6 +444,7 @@ var CommonOptions = makeStringSet(OptCursor, OptEndTime, OptEnvelope,
OptMinCheckpointFrequency, OptMetricsScope, OptVirtualColumns, Topics, OptExpirePTSAfter,
OptExecutionLocality, OptLaggingRangesThreshold, OptLaggingRangesPollingInterval,
OptIgnoreDisableChangefeedReplication, OptEncodeJSONValueNullAsObject, OptEnrichedProperties,
OptRangeDistributionStrategy,
)

// SQLValidOptions is options exclusive to SQL sink
Expand Down Expand Up @@ -805,6 +822,17 @@ func (s StatementOptions) IsInitialScanSpecified() bool {
return true
}

func (s StatementOptions) GetRangeDistributionStrategy() (RangeDistributionStrategy, error) {
v, err := s.getEnumValue(OptRangeDistributionStrategy)
if err != nil {
return "", err
}
if v == `` {
return RangeDistributionStrategyDefault, nil
}
return RangeDistributionStrategy(v), nil
}

// ShouldUseFullStatementTimeName returns true if references to the table should be in db.schema.table
// format (e.g. in Kafka topics).
func (s StatementOptions) ShouldUseFullStatementTimeName() bool {
Expand Down
20 changes: 2 additions & 18 deletions pkg/cmd/roachtest/tests/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,9 +512,8 @@ func (f *enumFeatureFlag) enabled(r entropy, choose func(entropy) string) (strin
// cdcFeatureFlags describes various cdc feature flags.
// zero value cdcFeatureFlags uses metamorphic settings for features.
type cdcFeatureFlags struct {
RangeFeedScheduler featureFlag
SchemaLockTables featureFlag
DistributionStrategy enumFeatureFlag
RangeFeedScheduler featureFlag
SchemaLockTables featureFlag
}

func makeDefaultFeatureFlags() cdcFeatureFlags {
Expand Down Expand Up @@ -4445,11 +4444,6 @@ func (cfc *changefeedCreator) Args(args ...interface{}) *changefeedCreator {
return cfc
}

func chooseDistributionStrategy(r entropy) string {
vals := changefeedccl.RangeDistributionStrategy.GetAvailableValues()
return vals[r.Intn(len(vals))]
}

// applySettings aplies various settings to the cluster -- once per the
// lifetime of changefeedCreator
func (cfc *changefeedCreator) applySettings() error {
Expand All @@ -4471,16 +4465,6 @@ func (cfc *changefeedCreator) applySettings() error {
}
}

rangeDistribution, rangeDistributionEnabled := cfc.flags.DistributionStrategy.enabled(cfc.rng,
chooseDistributionStrategy)
if rangeDistributionEnabled == featureEnabled {
cfc.logger.Printf("Setting changefeed.default_range_distribution_strategy to %s", rangeDistribution)
if _, err := cfc.db.Exec(fmt.Sprintf(
"SET CLUSTER SETTING changefeed.default_range_distribution_strategy = '%s'", rangeDistribution)); err != nil {
return err
}
}

return nil
}

Expand Down
14 changes: 0 additions & 14 deletions pkg/cmd/roachtest/tests/mixed_version_cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,14 +408,6 @@ func (cmvt *cdcMixedVersionTester) createChangeFeed(
ff.RangeFeedScheduler.v = &featureUnset
}

distributionStrategySupported, err := cmvt.distributionStrategySupported(r, h)
if err != nil {
return err
}
if !distributionStrategySupported {
ff.DistributionStrategy.v = &featureUnset
}

jobID, err := newChangefeedCreator(db, systemDB, l, r, fmt.Sprintf("%s.%s", targetDB, targetTable),
cmvt.kafka.manager.sinkURL(ctx), ff).
With(options).
Expand Down Expand Up @@ -483,12 +475,6 @@ func (cmvt *cdcMixedVersionTester) rangefeedSchedulerSupported(
return h.ClusterVersionAtLeast(r, v232CV)
}

func (cmvt *cdcMixedVersionTester) distributionStrategySupported(
r *rand.Rand, h *mixedversion.Helper,
) (bool, error) {
return h.ClusterVersionAtLeast(r, v241CV)
}

// canMixedVersionUseDeletedClusterSetting returns whether a
// mixed-version cluster can use a deleted (system) cluster
// setting. If it returns true, it will also return the subset of
Expand Down
Loading