diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index da7f05ece65..3c433de4650 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -15,7 +15,6 @@ package cluster import ( - "bytes" "context" "fmt" "math" @@ -295,26 +294,26 @@ func (c *RaftCluster) runSyncConfig() { defer ticker.Stop() stores := c.GetStores() - syncConfig(c.storeConfigManager, stores) + syncConfig(c.ctx, c.storeConfigManager, stores) for { select { case <-c.ctx.Done(): log.Info("sync store config job is stopped") return case <-ticker.C: - if !syncConfig(c.storeConfigManager, stores) { + if !syncConfig(c.ctx, c.storeConfigManager, stores) { stores = c.GetStores() } } } } -func syncConfig(manager *config.StoreConfigManager, stores []*core.StoreInfo) bool { +func syncConfig(ctx context.Context, manager *config.StoreConfigManager, stores []*core.StoreInfo) bool { for index := 0; index < len(stores); index++ { select { - case <-c.ctx.Done(): + case <-ctx.Done(): log.Info("stop sync store config job due to server shutdown") - return + return false default: } // filter out the stores that are tiflash @@ -329,15 +328,11 @@ func syncConfig(manager *config.StoreConfigManager, stores []*core.StoreInfo) bo } // it will try next store if the current store is failed. address := netutil.ResolveLoopBackAddr(stores[index].GetStatusAddress(), stores[index].GetAddress()) -<<<<<<< HEAD - if err := manager.ObserveConfig(address); err != nil { -======= - switchRaftV2, err := c.observeStoreConfig(c.ctx, address) + err := manager.ObserveConfig(ctx, address) if err != nil { // delete the store if it is failed and retry next store. stores = append(stores[:index], stores[index+1:]...) index-- ->>>>>>> 38d087fec (config: sync store config in time (#6919)) storeSyncConfigEvent.WithLabelValues(address, "fail").Inc() log.Debug("sync store config failed, it will try next store", zap.Error(err)) continue @@ -346,78 +341,7 @@ func syncConfig(manager *config.StoreConfigManager, stores []*core.StoreInfo) bo // it will only try one store. return true } -<<<<<<< HEAD return false -======= - return false, false -} - -// observeStoreConfig is used to observe the store config changes and -// return whether if the new config changes the engine to raft-kv2. -func (c *RaftCluster) observeStoreConfig(ctx context.Context, address string) (bool, error) { - cfg, err := c.fetchStoreConfigFromTiKV(ctx, address) - if err != nil { - return false, err - } - oldCfg := c.opt.GetStoreConfig() - if cfg == nil || oldCfg.Equal(cfg) { - return false, nil - } - log.Info("sync the store config successful", - zap.String("store-address", address), - zap.String("store-config", cfg.String()), - zap.String("old-config", oldCfg.String())) - return c.updateStoreConfig(oldCfg, cfg) -} - -// updateStoreConfig updates the store config. This is extracted for testing. -func (c *RaftCluster) updateStoreConfig(oldCfg, cfg *config.StoreConfig) (bool, error) { - cfg.Adjust() - c.opt.SetStoreConfig(cfg) - return oldCfg.Storage.Engine != config.RaftstoreV2 && cfg.Storage.Engine == config.RaftstoreV2, nil -} - -// fetchStoreConfigFromTiKV tries to fetch the config from the TiKV store URL. -func (c *RaftCluster) fetchStoreConfigFromTiKV(ctx context.Context, statusAddress string) (*config.StoreConfig, error) { - cfg := &config.StoreConfig{} - failpoint.Inject("mockFetchStoreConfigFromTiKV", func(val failpoint.Value) { - if regionMaxSize, ok := val.(string); ok { - cfg.RegionMaxSize = regionMaxSize - cfg.Storage.Engine = config.RaftstoreV2 - } - failpoint.Return(cfg, nil) - }) - if c.httpClient == nil { - return nil, fmt.Errorf("failed to get store config due to nil client") - } - var url string - if netutil.IsEnableHTTPS(c.httpClient) { - url = fmt.Sprintf("%s://%s/config", "https", statusAddress) - } else { - url = fmt.Sprintf("%s://%s/config", "http", statusAddress) - } - ctx, cancel := context.WithTimeout(ctx, clientTimeout) - req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, bytes.NewBuffer(nil)) - if err != nil { - cancel() - return nil, fmt.Errorf("failed to create store config http request: %w", err) - } - resp, err := c.httpClient.Do(req) - if err != nil { - cancel() - return nil, err - } - defer resp.Body.Close() - body, err := io.ReadAll(resp.Body) - cancel() - if err != nil { - return nil, err - } - if err := json.Unmarshal(body, cfg); err != nil { - return nil, err - } - return cfg, nil ->>>>>>> 38d087fec (config: sync store config in time (#6919)) } // LoadClusterInfo loads cluster related info. diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index ae23e635685..be631a0f49a 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -16,15 +16,13 @@ package cluster import ( "context" + "encoding/json" "fmt" "math" "math/rand" -<<<<<<< HEAD - "strings" -======= "net/http" "net/http/httptest" ->>>>>>> 38d087fec (config: sync store config in time (#6919)) + "strings" "sync" "testing" "time" @@ -1178,6 +1176,40 @@ func (s *testClusterInfoSuite) TestOfflineAndMerge(c *C) { } } +func (s *testClusterInfoSuite) TestSyncConfigContext(c *C) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + _, opt, _ := newTestScheduleConfig() + tc := newTestCluster(ctx, opt) + tc.storeConfigManager = config.NewStoreConfigManager(http.DefaultClient) + tc.httpClient = &http.Client{} + + server := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { + time.Sleep(time.Second * 100) + cfg := &config.StoreConfig{} + b, err := json.Marshal(cfg) + if err != nil { + res.WriteHeader(http.StatusInternalServerError) + res.Write([]byte(fmt.Sprintf("failed setting up test server: %s", err))) + return + } + + res.WriteHeader(http.StatusOK) + res.Write(b) + })) + stores := newTestStores(1, "2.0.0") + for _, s := range stores { + tc.putStoreLocked(s) + } + // trip schema header + now := time.Now() + stores[0].GetMeta().StatusAddress = server.URL[7:] + synced := syncConfig(tc.ctx, tc.storeConfigManager, stores) + c.Assert(synced, IsFalse) + c.Assert(time.Since(now), Less, clientTimeout*2) +} + func (s *testClusterInfoSuite) TestSyncConfig(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) @@ -1205,55 +1237,12 @@ func (s *testClusterInfoSuite) TestSyncConfig(c *C) { for _, v := range testdata { tc.storeConfigManager = config.NewTestStoreConfigManager(v.whiteList) c.Assert(tc.GetStoreConfig().GetRegionMaxSize(), Equals, uint64(144)) - c.Assert(syncConfig(tc.storeConfigManager, tc.GetStores()), Equals, v.updated) + c.Assert(syncConfig(context.Background(), tc.storeConfigManager, tc.GetStores()), Equals, v.updated) c.Assert(tc.GetStoreConfig().GetRegionMaxSize(), Equals, v.maxRegionSize) } } -<<<<<<< HEAD func (s *testClusterInfoSuite) TestUpdateStorePendingPeerCount(c *C) { -======= -func TestSyncConfigContext(t *testing.T) { - re := require.New(t) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - _, opt, err := newTestScheduleConfig() - re.NoError(err) - tc := newTestCluster(ctx, opt) - tc.httpClient = &http.Client{} - - server := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { - time.Sleep(time.Second * 100) - cfg := &config.StoreConfig{} - b, err := json.Marshal(cfg) - if err != nil { - res.WriteHeader(http.StatusInternalServerError) - res.Write([]byte(fmt.Sprintf("failed setting up test server: %s", err))) - return - } - - res.WriteHeader(http.StatusOK) - res.Write(b) - })) - stores := newTestStores(1, "2.0.0") - for _, s := range stores { - re.NoError(tc.putStoreLocked(s)) - } - // trip schema header - now := time.Now() - stores[0].GetMeta().StatusAddress = server.URL[7:] - synced, _ := tc.syncStoreConfig(tc.GetStores()) - re.False(synced) - re.Less(time.Since(now), clientTimeout*2) -} - -func TestStoreConfigSync(t *testing.T) { - re := require.New(t) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - ->>>>>>> 38d087fec (config: sync store config in time (#6919)) _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) tc := newTestCluster(s.ctx, opt) diff --git a/server/config/store_config.go b/server/config/store_config.go index 27fc456dd08..81a13a7e734 100644 --- a/server/config/store_config.go +++ b/server/config/store_config.go @@ -15,12 +15,15 @@ package config import ( + "bytes" + "context" "encoding/json" "fmt" - "io/ioutil" + "io" "net/http" "reflect" "sync/atomic" + "time" "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" @@ -41,6 +44,8 @@ var ( defaultRegionMaxKey = uint64(1440000) // default region split key is 960000 defaultRegionSplitKey = uint64(960000) + + clientTimeout = 3 * time.Second ) // StoreConfig is the config of store like TiKV. @@ -182,8 +187,8 @@ func NewTestStoreConfigManager(whiteList []string) *StoreConfigManager { } // ObserveConfig is used to observe the config change. -func (m *StoreConfigManager) ObserveConfig(address string) error { - cfg, err := m.source.GetConfig(address) +func (m *StoreConfigManager) ObserveConfig(ctx context.Context, address string) error { + cfg, err := m.source.GetConfig(ctx, address) if err != nil { return err } @@ -206,7 +211,7 @@ func (m *StoreConfigManager) GetStoreConfig() *StoreConfig { // Source is used to get the store config. type Source interface { - GetConfig(statusAddress string) (*StoreConfig, error) + GetConfig(ctx context.Context, statusAddress string) (*StoreConfig, error) } // TiKVConfigSource is used to get the store config from TiKV. @@ -223,14 +228,22 @@ func newTiKVConfigSource(schema string, client *http.Client) *TiKVConfigSource { } // GetConfig returns the store config from TiKV. -func (s TiKVConfigSource) GetConfig(statusAddress string) (*StoreConfig, error) { +func (s TiKVConfigSource) GetConfig(ctx context.Context, statusAddress string) (*StoreConfig, error) { url := fmt.Sprintf("%s://%s/config", s.schema, statusAddress) - resp, err := s.client.Get(url) + ctx, cancel := context.WithTimeout(ctx, clientTimeout) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, bytes.NewBuffer(nil)) + if err != nil { + cancel() + return nil, fmt.Errorf("failed to create store config http request: %w", err) + } + resp, err := s.client.Do(req) if err != nil { + cancel() return nil, err } defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) + body, err := io.ReadAll(resp.Body) + cancel() if err != nil { return nil, err } @@ -253,7 +266,7 @@ func newFakeSource(whiteList []string) *FakeSource { } // GetConfig returns the config. -func (f *FakeSource) GetConfig(url string) (*StoreConfig, error) { +func (f *FakeSource) GetConfig(_ context.Context, url string) (*StoreConfig, error) { if !slice.Contains(f.whiteList, url) { return nil, fmt.Errorf("[url:%s] is not in white list", url) } diff --git a/server/config/store_config_test.go b/server/config/store_config_test.go index 478e1ebb3d7..bf8f3cc7644 100644 --- a/server/config/store_config_test.go +++ b/server/config/store_config_test.go @@ -15,6 +15,7 @@ package config import ( + "context" "crypto/tls" "encoding/json" "net/http" @@ -62,9 +63,9 @@ func (t *testTiKVConfigSuite) TestTiKVConfig(c *C) { func (t *testTiKVConfigSuite) TestUpdateConfig(c *C) { manager := NewTestStoreConfigManager([]string{"tidb.com"}) - manager.ObserveConfig("tikv.com") + manager.ObserveConfig(context.Background(), "tikv.com") c.Assert(manager.GetStoreConfig().GetRegionMaxSize(), Equals, uint64(144)) - manager.ObserveConfig("tidb.com") + manager.ObserveConfig(context.Background(), "tidb.com") c.Assert(manager.GetStoreConfig().GetRegionMaxSize(), Equals, uint64(10)) client := &http.Client{