From 192fc5be5c9fc2d36f34959ff96461e4bf1955a7 Mon Sep 17 00:00:00 2001 From: Spencer Kimball Date: Thu, 12 Jul 2018 14:37:48 -0400 Subject: [PATCH 1/2] storage: introduce a min idle time in the replica scanner This prevents busy looping in the scanner in the event there are more replicas on a store than can be processed in the scanner target interval (default = `10s`). This change also adjusts the default max idle time to `1s`; the previous value of `100ms` was unnecessarily short. Release note (performance improvement): on stores with 100s of thousands of replicas, this limits the scanner from running incessantly. --- pkg/base/test_server_args.go | 1 + pkg/server/admin_cluster_test.go | 1 + pkg/server/config.go | 13 ++++++---- pkg/server/config_test.go | 17 -------------- pkg/server/server.go | 1 + pkg/server/testserver.go | 3 +++ pkg/sql/tests/split_test.go | 1 + pkg/storage/consistency_queue_test.go | 1 + pkg/storage/scanner.go | 9 +++++-- pkg/storage/scanner_test.go | 34 ++++++++++++++++++++------- pkg/storage/store.go | 7 +++++- 11 files changed, 56 insertions(+), 32 deletions(-) diff --git a/pkg/base/test_server_args.go b/pkg/base/test_server_args.go index 2aec46dd2d08..462745cde8b8 100644 --- a/pkg/base/test_server_args.go +++ b/pkg/base/test_server_args.go @@ -81,6 +81,7 @@ type TestServerArgs struct { RetryOptions retry.Options SocketFile string ScanInterval time.Duration + ScanMinIdleTime time.Duration ScanMaxIdleTime time.Duration SSLCertsDir string TimeSeriesQueryWorkerMax int diff --git a/pkg/server/admin_cluster_test.go b/pkg/server/admin_cluster_test.go index 10d6bde64c9a..ec8adc39427d 100644 --- a/pkg/server/admin_cluster_test.go +++ b/pkg/server/admin_cluster_test.go @@ -40,6 +40,7 @@ func TestAdminAPITableStats(t *testing.T) { ReplicationMode: base.ReplicationAuto, ServerArgs: base.TestServerArgs{ ScanInterval: time.Millisecond, + ScanMinIdleTime: time.Millisecond, ScanMaxIdleTime: time.Millisecond, }, }) diff --git a/pkg/server/config.go b/pkg/server/config.go index 6e65a234a773..b89565c9d538 100644 --- a/pkg/server/config.go +++ b/pkg/server/config.go @@ -58,7 +58,8 @@ const ( DefaultCacheSize = 128 << 20 // 128 MB defaultSQLMemoryPoolSize = 128 << 20 // 128 MB defaultScanInterval = 10 * time.Minute - defaultScanMaxIdleTime = 200 * time.Millisecond + defaultScanMinIdleTime = 10 * time.Millisecond + defaultScanMaxIdleTime = 1 * time.Second // NB: this can't easily become a variable as the UI hard-codes it to 10s. // See https://github.com/cockroachdb/cockroach/issues/20310. DefaultMetricsSampleInterval = 10 * time.Second @@ -211,10 +212,14 @@ type Config struct { // Environment Variable: COCKROACH_SCAN_INTERVAL ScanInterval time.Duration + // ScanMinIdleTime is the minimum time the scanner will be idle between ranges. + // If enabled (> 0), the scanner may complete in more than ScanInterval for large + // stores. + ScanMinIdleTime time.Duration + // ScanMaxIdleTime is the maximum time the scanner will be idle between ranges. // If enabled (> 0), the scanner may complete in less than ScanInterval for small // stores. - // Environment Variable: COCKROACH_SCAN_MAX_IDLE_TIME ScanMaxIdleTime time.Duration // TestingKnobs is used for internal test controls only. @@ -381,6 +386,7 @@ func MakeConfig(ctx context.Context, st *cluster.Settings) Config { SQLMemoryPoolSize: defaultSQLMemoryPoolSize, SQLTableStatCacheSize: defaultSQLTableStatCacheSize, ScanInterval: defaultScanInterval, + ScanMinIdleTime: defaultScanMinIdleTime, ScanMaxIdleTime: defaultScanMaxIdleTime, EventLogEnabled: defaultEventLogEnabled, EnableWebSessionAuthentication: requireWebLogin, @@ -412,6 +418,7 @@ func (cfg *Config) String() string { fmt.Fprintln(w, "cache size\t", humanizeutil.IBytes(cfg.CacheSize)) fmt.Fprintln(w, "SQL memory pool size\t", humanizeutil.IBytes(cfg.SQLMemoryPoolSize)) fmt.Fprintln(w, "scan interval\t", cfg.ScanInterval) + fmt.Fprintln(w, "scan min idle time\t", cfg.ScanMinIdleTime) fmt.Fprintln(w, "scan max idle time\t", cfg.ScanMaxIdleTime) fmt.Fprintln(w, "event log enabled\t", cfg.EventLogEnabled) if cfg.Linearizable { @@ -610,8 +617,6 @@ func (cfg *Config) RequireWebSession() bool { // when NewContext is called. func (cfg *Config) readEnvironmentVariables() { cfg.Linearizable = envutil.EnvOrDefaultBool("COCKROACH_EXPERIMENTAL_LINEARIZABLE", cfg.Linearizable) - cfg.ScanInterval = envutil.EnvOrDefaultDuration("COCKROACH_SCAN_INTERVAL", cfg.ScanInterval) - cfg.ScanMaxIdleTime = envutil.EnvOrDefaultDuration("COCKROACH_SCAN_MAX_IDLE_TIME", cfg.ScanMaxIdleTime) } // parseGossipBootstrapResolvers parses list of gossip bootstrap resolvers. diff --git a/pkg/server/config_test.go b/pkg/server/config_test.go index 80e38b827617..a881399c1e7b 100644 --- a/pkg/server/config_test.go +++ b/pkg/server/config_test.go @@ -19,7 +19,6 @@ import ( "os" "reflect" "testing" - "time" "github.com/kr/pretty" @@ -93,12 +92,6 @@ func TestReadEnvironmentVariables(t *testing.T) { if err := os.Unsetenv("COCKROACH_EXPERIMENTAL_LINEARIZABLE"); err != nil { t.Fatal(err) } - if err := os.Unsetenv("COCKROACH_SCAN_INTERVAL"); err != nil { - t.Fatal(err) - } - if err := os.Unsetenv("COCKROACH_SCAN_MAX_IDLE_TIME"); err != nil { - t.Fatal(err) - } if err := os.Unsetenv("COCKROACH_CONSISTENCY_CHECK_INTERVAL"); err != nil { t.Fatal(err) } @@ -126,14 +119,6 @@ func TestReadEnvironmentVariables(t *testing.T) { t.Fatal(err) } cfgExpected.Linearizable = true - if err := os.Setenv("COCKROACH_SCAN_INTERVAL", "48h"); err != nil { - t.Fatal(err) - } - cfgExpected.ScanInterval = time.Hour * 48 - if err := os.Setenv("COCKROACH_SCAN_MAX_IDLE_TIME", "100ns"); err != nil { - t.Fatal(err) - } - cfgExpected.ScanMaxIdleTime = time.Nanosecond * 100 envutil.ClearEnvCache() cfg.readEnvironmentVariables() @@ -143,8 +128,6 @@ func TestReadEnvironmentVariables(t *testing.T) { for _, envVar := range []string{ "COCKROACH_EXPERIMENTAL_LINEARIZABLE", - "COCKROACH_SCAN_INTERVAL", - "COCKROACH_SCAN_MAX_IDLE_TIME", } { t.Run("invalid", func(t *testing.T) { if err := os.Setenv(envVar, "abcd"); err != nil { diff --git a/pkg/server/server.go b/pkg/server/server.go index 5d8cf55b05b3..c478ecc01f60 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -413,6 +413,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { NodeDialer: s.nodeDialer, RPCContext: s.rpcContext, ScanInterval: s.cfg.ScanInterval, + ScanMinIdleTime: s.cfg.ScanMinIdleTime, ScanMaxIdleTime: s.cfg.ScanMaxIdleTime, TimestampCachePageSize: s.cfg.TimestampCachePageSize, HistogramWindowInterval: s.cfg.HistogramWindowInterval(), diff --git a/pkg/server/testserver.go b/pkg/server/testserver.go index 1239538f546b..30f8afa003a2 100644 --- a/pkg/server/testserver.go +++ b/pkg/server/testserver.go @@ -128,6 +128,9 @@ func makeTestConfigFromParams(params base.TestServerArgs) Config { if params.ScanInterval != 0 { cfg.ScanInterval = params.ScanInterval } + if params.ScanMinIdleTime != 0 { + cfg.ScanMinIdleTime = params.ScanMinIdleTime + } if params.ScanMaxIdleTime != 0 { cfg.ScanMaxIdleTime = params.ScanMaxIdleTime } diff --git a/pkg/sql/tests/split_test.go b/pkg/sql/tests/split_test.go index c6378a333156..74c25d1170fd 100644 --- a/pkg/sql/tests/split_test.go +++ b/pkg/sql/tests/split_test.go @@ -73,6 +73,7 @@ func TestSplitOnTableBoundaries(t *testing.T) { params, _ := tests.CreateTestServerParams() // We want fast scan. params.ScanInterval = time.Millisecond + params.ScanMinIdleTime = time.Millisecond params.ScanMaxIdleTime = time.Millisecond s, sqlDB, kvDB := serverutils.StartServer(t, params) defer s.Stopper().Stop(context.TODO()) diff --git a/pkg/storage/consistency_queue_test.go b/pkg/storage/consistency_queue_test.go index 18b86cf9559b..80efaefde9f8 100644 --- a/pkg/storage/consistency_queue_test.go +++ b/pkg/storage/consistency_queue_test.go @@ -210,6 +210,7 @@ func TestConsistencyQueueRecomputeStats(t *testing.T) { // Set scanner timings that minimize waiting in this test. tsArgs := base.TestServerArgs{ ScanInterval: time.Second, + ScanMinIdleTime: 0, ScanMaxIdleTime: 100 * time.Millisecond, } nodeZeroArgs := tsArgs diff --git a/pkg/storage/scanner.go b/pkg/storage/scanner.go index 9e5f81a53d72..db7d5696c561 100644 --- a/pkg/storage/scanner.go +++ b/pkg/storage/scanner.go @@ -65,8 +65,9 @@ type replicaScanner struct { clock *hlc.Clock targetInterval time.Duration // Target duration interval for scan loop + minIdleTime time.Duration // Min idle time for scan loop maxIdleTime time.Duration // Max idle time for scan loop - waitTimer timeutil.Timer // Shared timer to avoid allocations. + waitTimer timeutil.Timer // Shared timer to avoid allocations replicas replicaSet // Replicas to be scanned queues []replicaQueue // Replica queues managed by this scanner removed chan *Replica // Replicas to remove from queues @@ -90,7 +91,7 @@ type replicaScanner struct { func newReplicaScanner( ambient log.AmbientContext, clock *hlc.Clock, - targetInterval, maxIdleTime time.Duration, + targetInterval, minIdleTime, maxIdleTime time.Duration, replicas replicaSet, ) *replicaScanner { if targetInterval < 0 { @@ -100,6 +101,7 @@ func newReplicaScanner( AmbientContext: ambient, clock: clock, targetInterval: targetInterval, + minIdleTime: minIdleTime, maxIdleTime: maxIdleTime, replicas: replicas, removed: make(chan *Replica, 10), @@ -190,6 +192,9 @@ func (rs *replicaScanner) paceInterval(start, now time.Time) time.Duration { count = 1 } interval := time.Duration(remainingNanos / int64(count)) + if rs.minIdleTime > 0 && interval < rs.minIdleTime { + interval = rs.minIdleTime + } if rs.maxIdleTime > 0 && interval > rs.maxIdleTime { interval = rs.maxIdleTime } diff --git a/pkg/storage/scanner_test.go b/pkg/storage/scanner_test.go index b9e383212313..9b7e13105997 100644 --- a/pkg/storage/scanner_test.go +++ b/pkg/storage/scanner_test.go @@ -197,7 +197,7 @@ func TestScannerAddToQueues(t *testing.T) { q2.setDisabled(true) mc := hlc.NewManualClock(123) clock := hlc.NewClock(mc.UnixNano, time.Nanosecond) - s := newReplicaScanner(makeAmbCtx(), clock, 1*time.Millisecond, 0, ranges) + s := newReplicaScanner(makeAmbCtx(), clock, 1*time.Millisecond, 0, 0, ranges) s.AddQueues(q1, q2) stopper := stop.NewStopper() @@ -249,7 +249,7 @@ func TestScannerTiming(t *testing.T) { q := &testQueue{} mc := hlc.NewManualClock(123) clock := hlc.NewClock(mc.UnixNano, time.Nanosecond) - s := newReplicaScanner(makeAmbCtx(), clock, duration, 0, ranges) + s := newReplicaScanner(makeAmbCtx(), clock, duration, 0, 0, ranges) s.AddQueues(q) stopper := stop.NewStopper() s.Start(stopper) @@ -287,22 +287,40 @@ func TestScannerPaceInterval(t *testing.T) { for _, duration := range durations { startTime := timeutil.Now() ranges := newTestRangeSet(count, t) - s := newReplicaScanner(makeAmbCtx(), nil, duration, 0, ranges) + s := newReplicaScanner(makeAmbCtx(), nil, duration, 0, 0, ranges) interval := s.paceInterval(startTime, startTime) logErrorWhenNotCloseTo(duration/count, interval) // The range set is empty ranges = newTestRangeSet(0, t) - s = newReplicaScanner(makeAmbCtx(), nil, duration, 0, ranges) + s = newReplicaScanner(makeAmbCtx(), nil, duration, 0, 0, ranges) interval = s.paceInterval(startTime, startTime) logErrorWhenNotCloseTo(duration, interval) ranges = newTestRangeSet(count, t) - s = newReplicaScanner(makeAmbCtx(), nil, duration, 0, ranges) + s = newReplicaScanner(makeAmbCtx(), nil, duration, 0, 0, ranges) // Move the present to duration time into the future interval = s.paceInterval(startTime, startTime.Add(duration)) logErrorWhenNotCloseTo(0, interval) } } +// TestScannerMinMaxIdleTime verifies that the pace interval will not +// be less than the specified min idle time or greater than the +// specified max idle time. +func TestScannerMinMaxIdleTime(t *testing.T) { + defer leaktest.AfterTest(t)() + const targetInterval = 100 * time.Millisecond + const minIdleTime = 10 * time.Millisecond + const maxIdleTime = 15 * time.Millisecond + for count := range []int{1, 10, 20, 100} { + startTime := timeutil.Now() + ranges := newTestRangeSet(count, t) + s := newReplicaScanner(makeAmbCtx(), nil, targetInterval, minIdleTime, maxIdleTime, ranges) + if interval := s.paceInterval(startTime, startTime); interval < minIdleTime || interval > maxIdleTime { + t.Errorf("expected interval %s <= %s <= %s", minIdleTime, interval, maxIdleTime) + } + } +} + // TestScannerDisabled verifies that disabling a scanner prevents // replicas from being added to queues. func TestScannerDisabled(t *testing.T) { @@ -312,7 +330,7 @@ func TestScannerDisabled(t *testing.T) { q := &testQueue{} mc := hlc.NewManualClock(123) clock := hlc.NewClock(mc.UnixNano, time.Nanosecond) - s := newReplicaScanner(makeAmbCtx(), clock, 1*time.Millisecond, 0, ranges) + s := newReplicaScanner(makeAmbCtx(), clock, 1*time.Millisecond, 0, 0, ranges) s.AddQueues(q) stopper := stop.NewStopper() defer stopper.Stop(context.TODO()) @@ -362,7 +380,7 @@ func TestScannerDisabled(t *testing.T) { func TestScannerDisabledWithZeroInterval(t *testing.T) { defer leaktest.AfterTest(t)() ranges := newTestRangeSet(1, t) - s := newReplicaScanner(makeAmbCtx(), nil, 0*time.Millisecond, 0, ranges) + s := newReplicaScanner(makeAmbCtx(), nil, 0*time.Millisecond, 0, 0, ranges) if !s.GetDisabled() { t.Errorf("expected scanner to be disabled") } @@ -375,7 +393,7 @@ func TestScannerEmptyRangeSet(t *testing.T) { q := &testQueue{} mc := hlc.NewManualClock(123) clock := hlc.NewClock(mc.UnixNano, time.Nanosecond) - s := newReplicaScanner(makeAmbCtx(), clock, time.Hour, 0, ranges) + s := newReplicaScanner(makeAmbCtx(), clock, time.Hour, 0, 0, ranges) s.AddQueues(q) stopper := stop.NewStopper() defer stopper.Stop(context.TODO()) diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 0a10842b9268..3bb2ca270607 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -598,6 +598,11 @@ type StoreConfig struct { // ScanInterval is the default value for the scan interval ScanInterval time.Duration + // ScanMinIdleTime is the minimum time the scanner will be idle between ranges. + // If enabled (> 0), the scanner may complete in more than ScanInterval for + // stores with many ranges. + ScanMinIdleTime time.Duration + // ScanMaxIdleTime is the maximum time the scanner will be idle between ranges. // If enabled (> 0), the scanner may complete in less than ScanInterval for small // stores. @@ -925,7 +930,7 @@ func NewStore(cfg StoreConfig, eng engine.Engine, nodeDesc *roachpb.NodeDescript // Add range scanner and configure with queues. s.scanner = newReplicaScanner( s.cfg.AmbientCtx, s.cfg.Clock, cfg.ScanInterval, - cfg.ScanMaxIdleTime, newStoreReplicaVisitor(s), + cfg.ScanMinIdleTime, cfg.ScanMaxIdleTime, newStoreReplicaVisitor(s), ) s.gcQueue = newGCQueue(s, s.cfg.Gossip) s.splitQueue = newSplitQueue(s, s.db, s.cfg.Gossip) From d96c0a96ec665e8c7adb80e94b10957f6764a1bf Mon Sep 17 00:00:00 2001 From: Nikhil Benesch Date: Thu, 12 Jul 2018 16:53:34 -0400 Subject: [PATCH 2/2] storage: skip TestStoreRangeMergeWithData again The chaos added in 2ed1515 to force lease renewals while a merge is in progress is too disruptive and makes the test occasionally time out. See issue #27442 for an analysis. Skip the test for now. Release note: None --- pkg/storage/client_merge_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/storage/client_merge_test.go b/pkg/storage/client_merge_test.go index a4a4473b4adb..dd070da5930a 100644 --- a/pkg/storage/client_merge_test.go +++ b/pkg/storage/client_merge_test.go @@ -186,6 +186,8 @@ func TestStoreRangeMergeMetadataCleanup(t *testing.T) { func TestStoreRangeMergeWithData(t *testing.T) { defer leaktest.AfterTest(t)() + t.Skip("#27442") + for _, colocate := range []bool{false, true} { for _, retries := range []int64{0, 3} { t.Run(fmt.Sprintf("colocate=%v/retries=%d", colocate, retries), func(t *testing.T) {