Skip to content

Commit

Permalink
Merge #27441 #27449
Browse files Browse the repository at this point in the history
27441: storage: introduce a min idle time in the replica scanner r=spencerkimball a=spencerkimball

This prevents busy looping in the scanner in the event there are more
replicas on a store than can be processed in the scanner target
interval (default = `10s`). This change also adjusts the default max
idle time to `1s`; the previous value of `100ms` was unnecessarily
short.

Release note (performance improvement): on stores with 100s of
thousands of replicas, this limits the scanner from running incessantly.

27449: storage: skip TestStoreRangeMergeWithData again r=bdarnell,a-robinson a=benesch

The chaos added in 2ed1515 to force lease renewals while a merge is in
progress is too disruptive and makes the test occasionally time out. See
issue #27442 for an analysis. Skip the test for now.

Release note: None

Co-authored-by: Spencer Kimball <spencer.kimball@gmail.com>
Co-authored-by: Nikhil Benesch <nikhil.benesch@gmail.com>
  • Loading branch information
3 people committed Jul 12, 2018
3 parents 664ac6c + 192fc5b + d96c0a9 commit 7eb61b5
Show file tree
Hide file tree
Showing 12 changed files with 58 additions and 32 deletions.
1 change: 1 addition & 0 deletions pkg/base/test_server_args.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type TestServerArgs struct {
RetryOptions retry.Options
SocketFile string
ScanInterval time.Duration
ScanMinIdleTime time.Duration
ScanMaxIdleTime time.Duration
SSLCertsDir string
TimeSeriesQueryWorkerMax int
Expand Down
1 change: 1 addition & 0 deletions pkg/server/admin_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func TestAdminAPITableStats(t *testing.T) {
ReplicationMode: base.ReplicationAuto,
ServerArgs: base.TestServerArgs{
ScanInterval: time.Millisecond,
ScanMinIdleTime: time.Millisecond,
ScanMaxIdleTime: time.Millisecond,
},
})
Expand Down
13 changes: 9 additions & 4 deletions pkg/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ const (
DefaultCacheSize = 128 << 20 // 128 MB
defaultSQLMemoryPoolSize = 128 << 20 // 128 MB
defaultScanInterval = 10 * time.Minute
defaultScanMaxIdleTime = 200 * time.Millisecond
defaultScanMinIdleTime = 10 * time.Millisecond
defaultScanMaxIdleTime = 1 * time.Second
// NB: this can't easily become a variable as the UI hard-codes it to 10s.
// See https://github.com/cockroachdb/cockroach/issues/20310.
DefaultMetricsSampleInterval = 10 * time.Second
Expand Down Expand Up @@ -211,10 +212,14 @@ type Config struct {
// Environment Variable: COCKROACH_SCAN_INTERVAL
ScanInterval time.Duration

// ScanMinIdleTime is the minimum time the scanner will be idle between ranges.
// If enabled (> 0), the scanner may complete in more than ScanInterval for large
// stores.
ScanMinIdleTime time.Duration

// ScanMaxIdleTime is the maximum time the scanner will be idle between ranges.
// If enabled (> 0), the scanner may complete in less than ScanInterval for small
// stores.
// Environment Variable: COCKROACH_SCAN_MAX_IDLE_TIME
ScanMaxIdleTime time.Duration

// TestingKnobs is used for internal test controls only.
Expand Down Expand Up @@ -381,6 +386,7 @@ func MakeConfig(ctx context.Context, st *cluster.Settings) Config {
SQLMemoryPoolSize: defaultSQLMemoryPoolSize,
SQLTableStatCacheSize: defaultSQLTableStatCacheSize,
ScanInterval: defaultScanInterval,
ScanMinIdleTime: defaultScanMinIdleTime,
ScanMaxIdleTime: defaultScanMaxIdleTime,
EventLogEnabled: defaultEventLogEnabled,
EnableWebSessionAuthentication: requireWebLogin,
Expand Down Expand Up @@ -412,6 +418,7 @@ func (cfg *Config) String() string {
fmt.Fprintln(w, "cache size\t", humanizeutil.IBytes(cfg.CacheSize))
fmt.Fprintln(w, "SQL memory pool size\t", humanizeutil.IBytes(cfg.SQLMemoryPoolSize))
fmt.Fprintln(w, "scan interval\t", cfg.ScanInterval)
fmt.Fprintln(w, "scan min idle time\t", cfg.ScanMinIdleTime)
fmt.Fprintln(w, "scan max idle time\t", cfg.ScanMaxIdleTime)
fmt.Fprintln(w, "event log enabled\t", cfg.EventLogEnabled)
if cfg.Linearizable {
Expand Down Expand Up @@ -610,8 +617,6 @@ func (cfg *Config) RequireWebSession() bool {
// when NewContext is called.
func (cfg *Config) readEnvironmentVariables() {
cfg.Linearizable = envutil.EnvOrDefaultBool("COCKROACH_EXPERIMENTAL_LINEARIZABLE", cfg.Linearizable)
cfg.ScanInterval = envutil.EnvOrDefaultDuration("COCKROACH_SCAN_INTERVAL", cfg.ScanInterval)
cfg.ScanMaxIdleTime = envutil.EnvOrDefaultDuration("COCKROACH_SCAN_MAX_IDLE_TIME", cfg.ScanMaxIdleTime)
}

// parseGossipBootstrapResolvers parses list of gossip bootstrap resolvers.
Expand Down
17 changes: 0 additions & 17 deletions pkg/server/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"os"
"reflect"
"testing"
"time"

"github.com/kr/pretty"

Expand Down Expand Up @@ -93,12 +92,6 @@ func TestReadEnvironmentVariables(t *testing.T) {
if err := os.Unsetenv("COCKROACH_EXPERIMENTAL_LINEARIZABLE"); err != nil {
t.Fatal(err)
}
if err := os.Unsetenv("COCKROACH_SCAN_INTERVAL"); err != nil {
t.Fatal(err)
}
if err := os.Unsetenv("COCKROACH_SCAN_MAX_IDLE_TIME"); err != nil {
t.Fatal(err)
}
if err := os.Unsetenv("COCKROACH_CONSISTENCY_CHECK_INTERVAL"); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -126,14 +119,6 @@ func TestReadEnvironmentVariables(t *testing.T) {
t.Fatal(err)
}
cfgExpected.Linearizable = true
if err := os.Setenv("COCKROACH_SCAN_INTERVAL", "48h"); err != nil {
t.Fatal(err)
}
cfgExpected.ScanInterval = time.Hour * 48
if err := os.Setenv("COCKROACH_SCAN_MAX_IDLE_TIME", "100ns"); err != nil {
t.Fatal(err)
}
cfgExpected.ScanMaxIdleTime = time.Nanosecond * 100

envutil.ClearEnvCache()
cfg.readEnvironmentVariables()
Expand All @@ -143,8 +128,6 @@ func TestReadEnvironmentVariables(t *testing.T) {

for _, envVar := range []string{
"COCKROACH_EXPERIMENTAL_LINEARIZABLE",
"COCKROACH_SCAN_INTERVAL",
"COCKROACH_SCAN_MAX_IDLE_TIME",
} {
t.Run("invalid", func(t *testing.T) {
if err := os.Setenv(envVar, "abcd"); err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
NodeDialer: s.nodeDialer,
RPCContext: s.rpcContext,
ScanInterval: s.cfg.ScanInterval,
ScanMinIdleTime: s.cfg.ScanMinIdleTime,
ScanMaxIdleTime: s.cfg.ScanMaxIdleTime,
TimestampCachePageSize: s.cfg.TimestampCachePageSize,
HistogramWindowInterval: s.cfg.HistogramWindowInterval(),
Expand Down
3 changes: 3 additions & 0 deletions pkg/server/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ func makeTestConfigFromParams(params base.TestServerArgs) Config {
if params.ScanInterval != 0 {
cfg.ScanInterval = params.ScanInterval
}
if params.ScanMinIdleTime != 0 {
cfg.ScanMinIdleTime = params.ScanMinIdleTime
}
if params.ScanMaxIdleTime != 0 {
cfg.ScanMaxIdleTime = params.ScanMaxIdleTime
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/tests/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func TestSplitOnTableBoundaries(t *testing.T) {
params, _ := tests.CreateTestServerParams()
// We want fast scan.
params.ScanInterval = time.Millisecond
params.ScanMinIdleTime = time.Millisecond
params.ScanMaxIdleTime = time.Millisecond
s, sqlDB, kvDB := serverutils.StartServer(t, params)
defer s.Stopper().Stop(context.TODO())
Expand Down
2 changes: 2 additions & 0 deletions pkg/storage/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ func TestStoreRangeMergeMetadataCleanup(t *testing.T) {
func TestStoreRangeMergeWithData(t *testing.T) {
defer leaktest.AfterTest(t)()

t.Skip("#27442")

for _, colocate := range []bool{false, true} {
for _, retries := range []int64{0, 3} {
t.Run(fmt.Sprintf("colocate=%v/retries=%d", colocate, retries), func(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/consistency_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ func TestConsistencyQueueRecomputeStats(t *testing.T) {
// Set scanner timings that minimize waiting in this test.
tsArgs := base.TestServerArgs{
ScanInterval: time.Second,
ScanMinIdleTime: 0,
ScanMaxIdleTime: 100 * time.Millisecond,
}
nodeZeroArgs := tsArgs
Expand Down
9 changes: 7 additions & 2 deletions pkg/storage/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ type replicaScanner struct {
clock *hlc.Clock

targetInterval time.Duration // Target duration interval for scan loop
minIdleTime time.Duration // Min idle time for scan loop
maxIdleTime time.Duration // Max idle time for scan loop
waitTimer timeutil.Timer // Shared timer to avoid allocations.
waitTimer timeutil.Timer // Shared timer to avoid allocations
replicas replicaSet // Replicas to be scanned
queues []replicaQueue // Replica queues managed by this scanner
removed chan *Replica // Replicas to remove from queues
Expand All @@ -90,7 +91,7 @@ type replicaScanner struct {
func newReplicaScanner(
ambient log.AmbientContext,
clock *hlc.Clock,
targetInterval, maxIdleTime time.Duration,
targetInterval, minIdleTime, maxIdleTime time.Duration,
replicas replicaSet,
) *replicaScanner {
if targetInterval < 0 {
Expand All @@ -100,6 +101,7 @@ func newReplicaScanner(
AmbientContext: ambient,
clock: clock,
targetInterval: targetInterval,
minIdleTime: minIdleTime,
maxIdleTime: maxIdleTime,
replicas: replicas,
removed: make(chan *Replica, 10),
Expand Down Expand Up @@ -190,6 +192,9 @@ func (rs *replicaScanner) paceInterval(start, now time.Time) time.Duration {
count = 1
}
interval := time.Duration(remainingNanos / int64(count))
if rs.minIdleTime > 0 && interval < rs.minIdleTime {
interval = rs.minIdleTime
}
if rs.maxIdleTime > 0 && interval > rs.maxIdleTime {
interval = rs.maxIdleTime
}
Expand Down
34 changes: 26 additions & 8 deletions pkg/storage/scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func TestScannerAddToQueues(t *testing.T) {
q2.setDisabled(true)
mc := hlc.NewManualClock(123)
clock := hlc.NewClock(mc.UnixNano, time.Nanosecond)
s := newReplicaScanner(makeAmbCtx(), clock, 1*time.Millisecond, 0, ranges)
s := newReplicaScanner(makeAmbCtx(), clock, 1*time.Millisecond, 0, 0, ranges)
s.AddQueues(q1, q2)
stopper := stop.NewStopper()

Expand Down Expand Up @@ -249,7 +249,7 @@ func TestScannerTiming(t *testing.T) {
q := &testQueue{}
mc := hlc.NewManualClock(123)
clock := hlc.NewClock(mc.UnixNano, time.Nanosecond)
s := newReplicaScanner(makeAmbCtx(), clock, duration, 0, ranges)
s := newReplicaScanner(makeAmbCtx(), clock, duration, 0, 0, ranges)
s.AddQueues(q)
stopper := stop.NewStopper()
s.Start(stopper)
Expand Down Expand Up @@ -287,22 +287,40 @@ func TestScannerPaceInterval(t *testing.T) {
for _, duration := range durations {
startTime := timeutil.Now()
ranges := newTestRangeSet(count, t)
s := newReplicaScanner(makeAmbCtx(), nil, duration, 0, ranges)
s := newReplicaScanner(makeAmbCtx(), nil, duration, 0, 0, ranges)
interval := s.paceInterval(startTime, startTime)
logErrorWhenNotCloseTo(duration/count, interval)
// The range set is empty
ranges = newTestRangeSet(0, t)
s = newReplicaScanner(makeAmbCtx(), nil, duration, 0, ranges)
s = newReplicaScanner(makeAmbCtx(), nil, duration, 0, 0, ranges)
interval = s.paceInterval(startTime, startTime)
logErrorWhenNotCloseTo(duration, interval)
ranges = newTestRangeSet(count, t)
s = newReplicaScanner(makeAmbCtx(), nil, duration, 0, ranges)
s = newReplicaScanner(makeAmbCtx(), nil, duration, 0, 0, ranges)
// Move the present to duration time into the future
interval = s.paceInterval(startTime, startTime.Add(duration))
logErrorWhenNotCloseTo(0, interval)
}
}

// TestScannerMinMaxIdleTime verifies that the pace interval will not
// be less than the specified min idle time or greater than the
// specified max idle time.
func TestScannerMinMaxIdleTime(t *testing.T) {
defer leaktest.AfterTest(t)()
const targetInterval = 100 * time.Millisecond
const minIdleTime = 10 * time.Millisecond
const maxIdleTime = 15 * time.Millisecond
for count := range []int{1, 10, 20, 100} {
startTime := timeutil.Now()
ranges := newTestRangeSet(count, t)
s := newReplicaScanner(makeAmbCtx(), nil, targetInterval, minIdleTime, maxIdleTime, ranges)
if interval := s.paceInterval(startTime, startTime); interval < minIdleTime || interval > maxIdleTime {
t.Errorf("expected interval %s <= %s <= %s", minIdleTime, interval, maxIdleTime)
}
}
}

// TestScannerDisabled verifies that disabling a scanner prevents
// replicas from being added to queues.
func TestScannerDisabled(t *testing.T) {
Expand All @@ -312,7 +330,7 @@ func TestScannerDisabled(t *testing.T) {
q := &testQueue{}
mc := hlc.NewManualClock(123)
clock := hlc.NewClock(mc.UnixNano, time.Nanosecond)
s := newReplicaScanner(makeAmbCtx(), clock, 1*time.Millisecond, 0, ranges)
s := newReplicaScanner(makeAmbCtx(), clock, 1*time.Millisecond, 0, 0, ranges)
s.AddQueues(q)
stopper := stop.NewStopper()
defer stopper.Stop(context.TODO())
Expand Down Expand Up @@ -362,7 +380,7 @@ func TestScannerDisabled(t *testing.T) {
func TestScannerDisabledWithZeroInterval(t *testing.T) {
defer leaktest.AfterTest(t)()
ranges := newTestRangeSet(1, t)
s := newReplicaScanner(makeAmbCtx(), nil, 0*time.Millisecond, 0, ranges)
s := newReplicaScanner(makeAmbCtx(), nil, 0*time.Millisecond, 0, 0, ranges)
if !s.GetDisabled() {
t.Errorf("expected scanner to be disabled")
}
Expand All @@ -375,7 +393,7 @@ func TestScannerEmptyRangeSet(t *testing.T) {
q := &testQueue{}
mc := hlc.NewManualClock(123)
clock := hlc.NewClock(mc.UnixNano, time.Nanosecond)
s := newReplicaScanner(makeAmbCtx(), clock, time.Hour, 0, ranges)
s := newReplicaScanner(makeAmbCtx(), clock, time.Hour, 0, 0, ranges)
s.AddQueues(q)
stopper := stop.NewStopper()
defer stopper.Stop(context.TODO())
Expand Down
7 changes: 6 additions & 1 deletion pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,11 @@ type StoreConfig struct {
// ScanInterval is the default value for the scan interval
ScanInterval time.Duration

// ScanMinIdleTime is the minimum time the scanner will be idle between ranges.
// If enabled (> 0), the scanner may complete in more than ScanInterval for
// stores with many ranges.
ScanMinIdleTime time.Duration

// ScanMaxIdleTime is the maximum time the scanner will be idle between ranges.
// If enabled (> 0), the scanner may complete in less than ScanInterval for small
// stores.
Expand Down Expand Up @@ -937,7 +942,7 @@ func NewStore(cfg StoreConfig, eng engine.Engine, nodeDesc *roachpb.NodeDescript
// Add range scanner and configure with queues.
s.scanner = newReplicaScanner(
s.cfg.AmbientCtx, s.cfg.Clock, cfg.ScanInterval,
cfg.ScanMaxIdleTime, newStoreReplicaVisitor(s),
cfg.ScanMinIdleTime, cfg.ScanMaxIdleTime, newStoreReplicaVisitor(s),
)
s.gcQueue = newGCQueue(s, s.cfg.Gossip)
s.splitQueue = newSplitQueue(s, s.db, s.cfg.Gossip)
Expand Down

0 comments on commit 7eb61b5

Please sign in to comment.