Skip to content

Commit

Permalink
*: compatible upgrade with previous versions (#46276)
Browse files Browse the repository at this point in the history
close #46275
  • Loading branch information
zimulala authored Aug 29, 2023
1 parent a06bcc6 commit d149379
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 48 deletions.
25 changes: 15 additions & 10 deletions ddl/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,43 +148,48 @@ func NewMockStateSyncer() syncer.StateSyncer {
return &MockStateSyncer{}
}

// clusterState mocks cluster state.
// We move it from MockStateSyncer to here. Because we want to make it unaffected by ddl close.
var clusterState *atomicutil.Pointer[syncer.StateInfo]

// MockStateSyncer is a mock state syncer, it is exported for testing.
type MockStateSyncer struct {
clusterState *atomicutil.Pointer[syncer.StateInfo]
globalVerCh chan clientv3.WatchResponse
mockSession chan struct{}
globalVerCh chan clientv3.WatchResponse
mockSession chan struct{}
}

// Init implements StateSyncer.Init interface.
func (s *MockStateSyncer) Init(context.Context) error {
s.globalVerCh = make(chan clientv3.WatchResponse, 1)
s.mockSession = make(chan struct{}, 1)
state := syncer.NewStateInfo(syncer.StateNormalRunning)
s.clusterState = atomicutil.NewPointer(state)
if clusterState == nil {
clusterState = atomicutil.NewPointer(state)
}
return nil
}

// UpdateGlobalState implements StateSyncer.UpdateGlobalState interface.
func (s *MockStateSyncer) UpdateGlobalState(_ context.Context, stateInfo *syncer.StateInfo) error {
failpoint.Inject("mockUpgradingState", func(val failpoint.Value) {
if val.(bool) {
s.clusterState.Store(stateInfo)
clusterState.Store(stateInfo)
failpoint.Return(nil)
}
})
s.globalVerCh <- clientv3.WatchResponse{}
s.clusterState.Store(stateInfo)
clusterState.Store(stateInfo)
return nil
}

// GetGlobalState implements StateSyncer.GetGlobalState interface.
func (s *MockStateSyncer) GetGlobalState(context.Context) (*syncer.StateInfo, error) {
return s.clusterState.Load(), nil
func (*MockStateSyncer) GetGlobalState(context.Context) (*syncer.StateInfo, error) {
return clusterState.Load(), nil
}

// IsUpgradingState implements StateSyncer.IsUpgradingState interface.
func (s *MockStateSyncer) IsUpgradingState() bool {
return s.clusterState.Load().State == syncer.StateUpgrading
func (*MockStateSyncer) IsUpgradingState() bool {
return clusterState.Load().State == syncer.StateUpgrading
}

// WatchChan implements StateSyncer.WatchChan interface.
Expand Down
10 changes: 6 additions & 4 deletions server/handler/upgrade_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ func (h ClusterUpgradeHandler) ServeHTTP(w http.ResponseWriter, req *http.Reques
op := params[Operation]
switch op {
case "start":
hasDone, err = h.startUpgrade()
hasDone, err = h.StartUpgrade()
case "finish":
hasDone, err = h.finishUpgrade()
hasDone, err = h.FinishUpgrade()
default:
WriteError(w, errors.Errorf("wrong operation:%s", op))
return
Expand All @@ -74,7 +74,8 @@ func (h ClusterUpgradeHandler) ServeHTTP(w http.ResponseWriter, req *http.Reques
zap.String("category", "upgrading"), zap.String("op", req.FormValue("op")), zap.Bool("hasDone", hasDone))
}

func (h ClusterUpgradeHandler) startUpgrade() (hasDone bool, err error) {
// StartUpgrade is used to start the upgrade.
func (h ClusterUpgradeHandler) StartUpgrade() (hasDone bool, err error) {
se, err := session.CreateSession(h.store)
if err != nil {
return false, err
Expand All @@ -93,7 +94,8 @@ func (h ClusterUpgradeHandler) startUpgrade() (hasDone bool, err error) {
return false, err
}

func (h ClusterUpgradeHandler) finishUpgrade() (hasDone bool, err error) {
// FinishUpgrade is used to finish the upgrade.
func (h ClusterUpgradeHandler) FinishUpgrade() (hasDone bool, err error) {
se, err := session.CreateSession(h.store)
if err != nil {
return false, err
Expand Down
50 changes: 42 additions & 8 deletions session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -1178,8 +1178,14 @@ func getTiDBVar(s Session, name string) (sVal string, isNull bool, e error) {
return row.GetString(0), false, nil
}

// SupportUpgradeStateVer is exported for testing.
var SupportUpgradeStateVer = version145
var (
// SupportUpgradeStateVer is exported for testing.
// The minimum version that can be upgraded by paused user DDL.
SupportUpgradeStateVer int64 = version145
// SupportUpgradeHTTPOpVer is exported for testing.
// The minimum version of the upgrade can be notified through the HTTP API.
SupportUpgradeHTTPOpVer int64 = version172
)

// upgrade function will do some upgrade works, when the system is bootstrapped by low version TiDB server
// For example, add new system variables into mysql.global_variables table.
Expand All @@ -1190,6 +1196,10 @@ func upgrade(s Session) {
// It is already bootstrapped/upgraded by a higher version TiDB server.
return
}
if ver >= SupportUpgradeStateVer {
checkOrSyncUpgrade(s, ver)
}

// Only upgrade from under version92 and this TiDB is not owner set.
// The owner in older tidb does not support concurrent DDL, we should add the internal DDL to job queue.
if ver < version92 {
Expand All @@ -1209,9 +1219,6 @@ func upgrade(s Session) {
logutil.BgLogger().Fatal("[upgrade] init metadata lock failed", zap.Error(err))
}

if ver >= int64(SupportUpgradeStateVer) {
terror.MustNil(SyncUpgradeState(s))
}
if isNull {
upgradeToVer99Before(s)
}
Expand All @@ -1224,9 +1231,6 @@ func upgrade(s Session) {
if isNull {
upgradeToVer99After(s)
}
if ver >= int64(SupportUpgradeStateVer) {
terror.MustNil(SyncNormalRunning(s))
}

variable.DDLForce2Queue.Store(false)
updateBootstrapVer(s)
Expand Down Expand Up @@ -1331,6 +1335,36 @@ func IsUpgradingClusterState(s Session) (bool, error) {
return stateInfo.State == syncer.StateUpgrading, nil
}

func checkOrSyncUpgrade(s Session, ver int64) {
if ver < SupportUpgradeHTTPOpVer {
terror.MustNil(SyncUpgradeState(s))
return
}

interval := 200 * time.Millisecond
retryTimes := int(time.Duration(internalSQLTimeout) * time.Second / interval)
for i := 0; i < retryTimes; i++ {
isUpgrading, err := IsUpgradingClusterState(s)
if err == nil {
if isUpgrading {
break
}
logutil.BgLogger().Fatal("global state isn't upgrading, please send a request to start the upgrade first",
zap.String("category", "upgrading"), zap.Error(err))
}

if i == retryTimes-1 {
logutil.BgLogger().Fatal("get global state failed", zap.String("category", "upgrading"), zap.Error(err))
}
if i%10 == 0 {
logutil.BgLogger().Warn("get global state failed", zap.String("category", "upgrading"), zap.Error(err))
}
time.Sleep(interval)
}
logutil.BgLogger().Info("global state is upgrading", zap.String("category", "upgrading"),
zap.Int64("old version", ver), zap.Int64("latest version", currentBootstrapVersion))
}

// checkOwnerVersion is used to wait the DDL owner to be elected in the cluster and check it is the same version as this TiDB.
func checkOwnerVersion(ctx context.Context, dom *domain.Domain) (bool, error) {
ticker := time.NewTicker(100 * time.Millisecond)
Expand Down
3 changes: 2 additions & 1 deletion session/bootstraptest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ go_test(
"main_test.go",
],
flaky = True,
shard_count = 10,
shard_count = 11,
deps = [
"//config",
"//ddl",
Expand All @@ -17,6 +17,7 @@ go_test(
"//meta",
"//parser/model",
"//parser/terror",
"//server/handler",
"//session", #keep
"//sessionctx",
"//testkit", #keep
Expand Down
76 changes: 76 additions & 0 deletions session/bootstraptest/bootstrap_upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/server/handler"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/testkit"
Expand Down Expand Up @@ -256,10 +257,13 @@ func TestUpgradeVersionMockLatest(t *testing.T) {
require.NoError(t, err)
require.Equal(t, session.CurrentBootstrapVersion-1, ver)
dom.Close()
startUpgrade(store, session.CurrentBootstrapVersion-1)
domLatestV, err := session.BootstrapSession(store)
require.NoError(t, err)
defer domLatestV.Close()

finishUpgrade(store)

seLatestV := session.CreateSessionAndSetID(t, store)
ver, err = session.GetBootstrapVersion(seLatestV)
require.NoError(t, err)
Expand Down Expand Up @@ -295,6 +299,54 @@ func TestUpgradeVersionMockLatest(t *testing.T) {
" PARTITION `p4` VALUES LESS THAN (7096))"))
}

// TestUpgradeVersionForUpgradeHTTPOp tests SupportUpgradeHTTPOpVer upgrade SupportUpgradeHTTPOpVer++.
func TestUpgradeVersionForUpgradeHTTPOp(t *testing.T) {
*session.WithMockUpgrade = true
session.MockUpgradeToVerLatestKind = session.MockSimpleUpgradeToVerLatest

store, dom := session.CreateStoreAndBootstrap(t)
defer func() { require.NoError(t, store.Close()) }()

seV := session.CreateSessionAndSetID(t, store)
txn, err := store.Begin()
require.NoError(t, err)
m := meta.NewMeta(txn)
err = m.FinishBootstrap(session.SupportUpgradeHTTPOpVer)
require.NoError(t, err)
err = txn.Commit(context.Background())
require.NoError(t, err)
session.MustExec(t, seV, fmt.Sprintf("update mysql.tidb set variable_value='%d' where variable_name='tidb_server_version'", session.SupportUpgradeHTTPOpVer))
session.UnsetStoreBootstrapped(store.UUID())
ver, err := session.GetBootstrapVersion(seV)
require.NoError(t, err)
require.Equal(t, session.SupportUpgradeHTTPOpVer, ver)
dom.Close()

// Start the upgrade test.
// Current cluster state is normal.
isUpgrading, err := session.IsUpgradingClusterState(seV)
require.NoError(t, err)
require.Equal(t, false, isUpgrading)
upgradeHandler := handler.NewClusterUpgradeHandler(store)
upgradeHandler.StartUpgrade()
domLatestV, err := session.BootstrapSession(store)
require.NoError(t, err)
defer domLatestV.Close()
seLatestV := session.CreateSessionAndSetID(t, store)
ver, err = session.GetBootstrapVersion(seLatestV)
require.NoError(t, err)
require.Equal(t, session.SupportUpgradeHTTPOpVer+1, ver)
// Current cluster state is upgrading.
isUpgrading, err = session.IsUpgradingClusterState(seLatestV)
require.NoError(t, err)
require.Equal(t, true, isUpgrading)
upgradeHandler.FinishUpgrade()
// Upgrading is finished and current cluster state is normal.
isUpgrading, err = session.IsUpgradingClusterState(seV)
require.NoError(t, err)
require.Equal(t, false, isUpgrading)
}

func TestUpgradeVersionForPausedJob(t *testing.T) {
store, dom := session.CreateStoreAndBootstrap(t)
defer func() { require.NoError(t, store.Close()) }()
Expand Down Expand Up @@ -334,6 +386,7 @@ func TestUpgradeVersionForPausedJob(t *testing.T) {
<-ch
dom.Close()
// Make sure upgrade is successful.
startUpgrade(store, session.CurrentBootstrapVersion-1)
domLatestV, err := session.BootstrapSession(store)
require.NoError(t, err)
defer domLatestV.Close()
Expand All @@ -342,6 +395,8 @@ func TestUpgradeVersionForPausedJob(t *testing.T) {
require.NoError(t, err)
require.Equal(t, session.CurrentBootstrapVersion, ver)

finishUpgrade(store)

// Resume the DDL job, then add index operation can be executed successfully.
session.MustExec(t, seLatestV, fmt.Sprintf("admin resume ddl jobs %d", jobID))
checkDDLJobExecSucc(t, seLatestV, jobID)
Expand Down Expand Up @@ -416,6 +471,7 @@ func TestUpgradeVersionForSystemPausedJob(t *testing.T) {
<-ch
dom.Close()
// Make sure upgrade is successful.
startUpgrade(store, session.CurrentBootstrapVersion-1)
domLatestV, err := session.BootstrapSession(store)
require.NoError(t, err)
defer domLatestV.Close()
Expand All @@ -424,6 +480,8 @@ func TestUpgradeVersionForSystemPausedJob(t *testing.T) {
require.NoError(t, err)
require.Equal(t, session.CurrentBootstrapVersion+1, ver)

finishUpgrade(store)

checkDDLJobExecSucc(t, seLatestV, jobID)
}

Expand Down Expand Up @@ -502,10 +560,12 @@ func TestUpgradeVersionForResumeJob(t *testing.T) {
<-ch
dom.Close()
// Make sure upgrade is successful.
startUpgrade(store, session.CurrentBootstrapVersion-1)
domLatestV, err := session.BootstrapSession(store)
require.NoError(t, err)
defer domLatestV.Close()
domLatestV.DDL().SetHook(hook)
finishUpgrade(store)
seLatestV := session.CreateSessionAndSetID(t, store)
// Add a new DDL (an "add index" job uses a different table than the previous DDL job) to the DDL table.
session.MustExec(t, seLatestV, "alter table test.upgrade_tbl1 add index idx2(a)")
Expand Down Expand Up @@ -554,6 +614,20 @@ func execute(ctx context.Context, s sessionctx.Context, query string) ([]chunk.R
return rows, nil
}

func startUpgrade(store kv.Storage, currVer int64) {
// It's used for compatible tests upgraded from previous versions of SupportUpgradeHTTPOpVer.
if currVer < session.SupportUpgradeHTTPOpVer {
return
}
upgradeHandler := handler.NewClusterUpgradeHandler(store)
upgradeHandler.StartUpgrade()
}

func finishUpgrade(store kv.Storage) {
upgradeHandler := handler.NewClusterUpgradeHandler(store)
upgradeHandler.FinishUpgrade()
}

// TestUpgradeWithPauseDDL adds a user and a system DB's DDL operations, before every test bootstrap(DDL operation). It tests:
//
// 1.Before and after each test bootstrap, the DDL of the user DB is paused, but the DDL of the system DB is not paused.
Expand Down Expand Up @@ -665,6 +739,7 @@ func TestUpgradeWithPauseDDL(t *testing.T) {
require.NoError(t, err)
require.Equal(t, session.CurrentBootstrapVersion-1, ver)
dom.Close()
startUpgrade(store, session.CurrentBootstrapVersion-1)
domLatestV, err := session.BootstrapSession(store)
require.NoError(t, err)
defer domLatestV.Close()
Expand All @@ -675,6 +750,7 @@ func TestUpgradeWithPauseDDL(t *testing.T) {
require.Equal(t, session.CurrentBootstrapVersion+1, ver)

wg.Wait()
finishUpgrade(store)

tk := testkit.NewTestKit(t, store)
var rows []chunk.Row
Expand Down
Loading

0 comments on commit d149379

Please sign in to comment.