diff --git a/config/config.go b/config/config.go index 7d2e6a38bb028..23c6768b29f6f 100644 --- a/config/config.go +++ b/config/config.go @@ -276,9 +276,6 @@ type TiKVClient struct { // CommitTimeout is the max time which command 'commit' will wait. CommitTimeout string `toml:"commit-timeout" json:"commit-timeout"` - // MaxTxnTimeUse is the max time a Txn may use (in seconds) from its startTS to commitTS. - MaxTxnTimeUse uint `toml:"max-txn-time-use" json:"max-txn-time-use"` - // MaxBatchSize is the max batch size when calling batch commands API. MaxBatchSize uint `toml:"max-batch-size" json:"max-batch-size"` // If TiKV load is greater than this, TiDB will wait for a while to avoid little batch. @@ -407,8 +404,6 @@ var defaultConf = Config{ GrpcKeepAliveTimeout: 3, CommitTimeout: "41s", - MaxTxnTimeUse: 590, - MaxBatchSize: 128, OverloadThreshold: 200, MaxBatchWaitTime: 0, @@ -595,9 +590,6 @@ func (c *Config) Valid() error { if c.TiKVClient.GrpcConnectionCount == 0 { return fmt.Errorf("grpc-connection-count should be greater than 0") } - if c.TiKVClient.MaxTxnTimeUse == 0 { - return fmt.Errorf("max-txn-time-use should be greater than 0") - } if c.PessimisticTxn.TTL != "" { dur, err := time.ParseDuration(c.PessimisticTxn.TTL) if err != nil { diff --git a/config/config.toml.example b/config/config.toml.example index a5fd1c3a8c0f4..bc19320112710 100644 --- a/config/config.toml.example +++ b/config/config.toml.example @@ -271,11 +271,6 @@ grpc-keepalive-timeout = 3 # Max time for commit command, must be twice bigger than raft election timeout. commit-timeout = "41s" -# The max time a Txn may use (in seconds) from its startTS to commitTS. -# We use it to guarantee GC worker will not influence any active txn. Please make sure that this -# value is less than gc_life_time - 10s. -max-txn-time-use = 590 - # Max batch size in gRPC. max-batch-size = 128 # Overload threshold of TiKV. diff --git a/domain/domain.go b/domain/domain.go index e5124ba0d7e30..07a03f7ecaffd 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -425,7 +425,7 @@ func (do *Domain) infoSyncerKeeper() { for { select { case <-ticker.C: - do.info.ReportMinStartTS() + do.info.ReportMinStartTS(do.Store()) case <-do.info.Done(): logutil.BgLogger().Info("server info syncer need to restart") if err := do.info.Restart(context.Background()); err != nil { diff --git a/domain/domain_test.go b/domain/domain_test.go index a82bcb41e655b..17e0001e7d4be 100644 --- a/domain/domain_test.go +++ b/domain/domain_test.go @@ -16,6 +16,7 @@ package domain import ( "context" "crypto/tls" + "math" "testing" "time" @@ -34,6 +35,7 @@ import ( "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/store/tikv/oracle" + "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/mock" "github.com/pingcap/tidb/util/testleak" dto "github.com/prometheus/client_model/go" @@ -184,6 +186,29 @@ func TestInfo(t *testing.T) { } } +type mockSessionManager struct { + PS []*util.ProcessInfo +} + +func (msm *mockSessionManager) ShowProcessList() map[uint64]*util.ProcessInfo { + ret := make(map[uint64]*util.ProcessInfo) + for _, item := range msm.PS { + ret[item.ID] = item + } + return ret +} + +func (msm *mockSessionManager) GetProcessInfo(id uint64) (*util.ProcessInfo, bool) { + for _, item := range msm.PS { + if item.ID == id { + return item, true + } + } + return &util.ProcessInfo{}, false +} + +func (msm *mockSessionManager) Kill(cid uint64, query bool) {} + func (*testSuite) TestT(c *C) { defer testleak.AfterTest(c)() store, err := mockstore.NewMockTikvStore() @@ -338,6 +363,28 @@ func (*testSuite) TestT(c *C) { c.Assert(err.Error(), Equals, ErrInfoSchemaExpired.Error()) dom.SchemaValidator.Reset() + // Test for reporting min start timestamp. + infoSyncer := dom.InfoSyncer() + sm := &mockSessionManager{ + PS: make([]*util.ProcessInfo, 0), + } + infoSyncer.SetSessionManager(sm) + beforeTS := variable.GoTimeToTS(time.Now()) + infoSyncer.ReportMinStartTS(dom.Store()) + afterTS := variable.GoTimeToTS(time.Now()) + c.Assert(infoSyncer.minStartTS > beforeTS && infoSyncer.minStartTS < afterTS, IsFalse) + lowerLimit := time.Now().Add(-time.Duration(kv.MaxTxnTimeUse) * time.Millisecond) + validTS := variable.GoTimeToTS(lowerLimit.Add(time.Minute)) + sm.PS = []*util.ProcessInfo{ + {CurTxnStartTS: 0}, + {CurTxnStartTS: math.MaxUint64}, + {CurTxnStartTS: variable.GoTimeToTS(lowerLimit)}, + {CurTxnStartTS: validTS}, + } + infoSyncer.SetSessionManager(sm) + infoSyncer.ReportMinStartTS(dom.Store()) + c.Assert(infoSyncer.minStartTS == validTS, IsTrue) + err = store.Close() c.Assert(err, IsNil) isClose := dom.isClose() diff --git a/domain/info.go b/domain/info.go index bf0e91acf7e88..32d632d870565 100644 --- a/domain/info.go +++ b/domain/info.go @@ -17,7 +17,6 @@ import ( "context" "encoding/json" "fmt" - "math" "strconv" "time" @@ -27,7 +26,10 @@ import ( "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl/util" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/owner" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/store/tikv/oracle" util2 "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/hack" "github.com/pingcap/tidb/util/logutil" @@ -180,27 +182,38 @@ func (is *InfoSyncer) RemoveMinStartTS() { } // ReportMinStartTS reports self server min start timestamp to ETCD. -func (is *InfoSyncer) ReportMinStartTS() { +func (is *InfoSyncer) ReportMinStartTS(store kv.Storage) { if is.manager == nil { // Server may not start in time. return } pl := is.manager.ShowProcessList() - var minStartTS uint64 = math.MaxUint64 + + // Calculate the lower limit of the start timestamp to avoid extremely old transaction delaying GC. + currentVer, err := store.CurrentVersion() + if err != nil { + logutil.BgLogger().Error("update minStartTS failed", zap.Error(err)) + return + } + now := time.Unix(0, oracle.ExtractPhysical(currentVer.Ver)*1e6) + startTSLowerLimit := variable.GoTimeToTS(now.Add(-time.Duration(kv.MaxTxnTimeUse) * time.Millisecond)) + + minStartTS := variable.GoTimeToTS(now) for _, info := range pl { - if info.CurTxnStartTS < minStartTS { + if info.CurTxnStartTS > startTSLowerLimit && info.CurTxnStartTS < minStartTS { minStartTS = info.CurTxnStartTS } } + is.minStartTS = minStartTS - err := is.storeMinStartTS(context.Background()) + err = is.storeMinStartTS(context.Background()) if err != nil { logutil.BgLogger().Error("update minStartTS failed", zap.Error(err)) } } // Done returns a channel that closes when the info syncer is no longer being refreshed. -func (is InfoSyncer) Done() <-chan struct{} { +func (is *InfoSyncer) Done() <-chan struct{} { if is.etcdCli == nil { return make(chan struct{}, 1) } diff --git a/kv/kv.go b/kv/kv.go index 3ec7254038067..8413d86f684f9 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -71,6 +71,10 @@ const ( // Append UnCommitIndexKVFlag to the value indicate the index key/value is no need to commit. const UnCommitIndexKVFlag byte = '1' +// MaxTxnTimeUse is the max time a Txn may use (in ms) from its begin to commit. +// We use it to abort the transaction to guarantee GC worker will not influence it. +const MaxTxnTimeUse = 24 * 60 * 60 * 1000 + // IsoLevel is the transaction's isolation level. type IsoLevel int diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 2f8684d3ca22f..a64d835c155b2 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -24,10 +24,8 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" - "github.com/pingcap/failpoint" pb "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/parser/terror" - "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/sessionctx/binloginfo" @@ -95,11 +93,7 @@ type twoPhaseCommitter struct { priority pb.CommandPri connID uint64 // connID is used for log. cleanWg sync.WaitGroup - // maxTxnTimeUse represents max time a Txn may use (in ms) from its startTS to commitTS. - // We use it to guarantee GC worker will not influence any active txn. The value - // should be less than GC life time. - maxTxnTimeUse uint64 - detail unsafe.Pointer + detail unsafe.Pointer primaryKey []byte forUpdateTS uint64 @@ -314,9 +308,6 @@ func (c *twoPhaseCommitter) initKeysAndMutations() error { zap.Uint64("txnStartTS", txn.startTS)) } - // Convert from sec to ms - maxTxnTimeUse := uint64(config.GetGlobalConfig().TiKVClient.MaxTxnTimeUse) * 1000 - // Sanity check for startTS. if txn.StartTS() == math.MaxUint64 { err = errors.Errorf("try to commit with invalid txnStartTS: %d", txn.StartTS()) @@ -329,7 +320,6 @@ func (c *twoPhaseCommitter) initKeysAndMutations() error { commitDetail := &execdetails.CommitDetails{WriteSize: size, WriteKeys: len(keys)} metrics.TiKVTxnWriteKVCountHistogram.Observe(float64(commitDetail.WriteKeys)) metrics.TiKVTxnWriteSizeHistogram.Observe(float64(commitDetail.WriteSize)) - c.maxTxnTimeUse = maxTxnTimeUse c.keys = keys c.mutations = mutations c.lockTTL = txnLockTTL(txn.startTime, size) @@ -1034,13 +1024,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error { return errors.Trace(err) } - failpoint.Inject("tmpMaxTxnTime", func(val failpoint.Value) { - if tmpMaxTxnTime := uint64(val.(int)); tmpMaxTxnTime > 0 { - c.maxTxnTimeUse = tmpMaxTxnTime - } - }) - - if c.store.oracle.IsExpired(c.startTS, c.maxTxnTimeUse) { + if c.store.oracle.IsExpired(c.startTS, kv.MaxTxnTimeUse) { err = errors.Errorf("conn %d txn takes too much time, txnStartTS: %d, comm: %d", c.connID, c.startTS, c.commitTS) return err diff --git a/store/tikv/gcworker/gc_worker.go b/store/tikv/gcworker/gc_worker.go index cdbfa57c35a43..1b42357941106 100644 --- a/store/tikv/gcworker/gc_worker.go +++ b/store/tikv/gcworker/gc_worker.go @@ -29,12 +29,13 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/parser/terror" pd "github.com/pingcap/pd/client" - "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl/util" + "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/privilege" "github.com/pingcap/tidb/session" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/store/tikv/tikvrpc" @@ -320,6 +321,30 @@ func (w *GCWorker) checkPrepare(ctx context.Context) (bool, uint64, error) { return true, oracle.ComposeTS(oracle.GetPhysical(*newSafePoint), 0), nil } +// calculateNewSafePoint uses the current global transaction min start timestamp to calculate the new safe point. +func (w *GCWorker) calSafePointByMinStartTS(safePoint time.Time) time.Time { + kvs, err := w.store.GetSafePointKV().GetWithPrefix(domain.ServerMinStartTSPath) + if err != nil { + logutil.BgLogger().Warn("get all minStartTS failed", zap.Error(err)) + return safePoint + } + + safePointTS := variable.GoTimeToTS(safePoint) + for _, v := range kvs { + minStartTS, err := strconv.ParseUint(string(v.Value), 10, 64) + if err != nil { + logutil.BgLogger().Warn("parse minStartTS failed", zap.Error(err)) + continue + } + if minStartTS < safePointTS { + safePointTS = minStartTS + } + } + safePoint = time.Unix(0, oracle.ExtractPhysical(safePointTS)*1e6) + logutil.BgLogger().Debug("calSafePointByMinStartTS", zap.Time("safePoint", safePoint)) + return safePoint +} + func (w *GCWorker) getOracleTime() (time.Time, error) { currentVer, err := w.store.CurrentVersion() if err != nil { @@ -419,23 +444,16 @@ func (w *GCWorker) checkGCInterval(now time.Time) (bool, error) { // validateGCLiftTime checks whether life time is small than min gc life time. func (w *GCWorker) validateGCLiftTime(lifeTime time.Duration) (time.Duration, error) { - minLifeTime := gcMinLifeTime - // max-txn-time-use value is less than gc_life_time - 10s. - maxTxnTime := time.Duration(config.GetGlobalConfig().TiKVClient.MaxTxnTimeUse+10) * time.Second - if minLifeTime < maxTxnTime { - minLifeTime = maxTxnTime - } - - if lifeTime >= minLifeTime { + if lifeTime >= gcMinLifeTime { return lifeTime, nil } logutil.BgLogger().Info("[gc worker] invalid gc life time", zap.Duration("get gc life time", lifeTime), - zap.Duration("min gc life time", minLifeTime)) + zap.Duration("min gc life time", gcMinLifeTime)) - err := w.saveDuration(gcLifeTimeKey, minLifeTime) - return minLifeTime, err + err := w.saveDuration(gcLifeTimeKey, gcMinLifeTime) + return gcMinLifeTime, err } func (w *GCWorker) calculateNewSafePoint(now time.Time) (*time.Time, error) { @@ -452,7 +470,7 @@ func (w *GCWorker) calculateNewSafePoint(now time.Time) (*time.Time, error) { if err != nil { return nil, errors.Trace(err) } - safePoint := now.Add(-*lifeTime) + safePoint := w.calSafePointByMinStartTS(now.Add(-*lifeTime)) // We should never decrease safePoint. if lastSafePoint != nil && safePoint.Before(*lastSafePoint) { logutil.BgLogger().Info("[gc worker] last safe point is later than current one."+ diff --git a/store/tikv/gcworker/gc_worker_test.go b/store/tikv/gcworker/gc_worker_test.go index ba7e2534faca9..68b9b41215c54 100644 --- a/store/tikv/gcworker/gc_worker_test.go +++ b/store/tikv/gcworker/gc_worker_test.go @@ -16,6 +16,7 @@ package gcworker import ( "bytes" "context" + "fmt" "math" "sort" "strconv" @@ -29,11 +30,11 @@ import ( "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" pd "github.com/pingcap/pd/client" - "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl/util" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/session" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/mockoracle" "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/store/mockstore/mocktikv" @@ -188,6 +189,36 @@ func (s *testGCWorkerSuite) TestGetOracleTime(c *C) { s.timeEqual(c, t2, t1.Add(time.Second*10), time.Millisecond*10) } +func (s *testGCWorkerSuite) TestMinStartTS(c *C) { + spkv := s.store.GetSafePointKV() + err := spkv.Put(fmt.Sprintf("%s/%s", domain.ServerMinStartTSPath, "a"), strconv.FormatUint(math.MaxUint64, 10)) + c.Assert(err, IsNil) + now := time.Now() + sp := s.gcWorker.calSafePointByMinStartTS(now) + c.Assert(sp.Second(), Equals, now.Second()) + err = spkv.Put(fmt.Sprintf("%s/%s", domain.ServerMinStartTSPath, "a"), "0") + c.Assert(err, IsNil) + sp = s.gcWorker.calSafePointByMinStartTS(now) + zeroTime := time.Unix(0, oracle.ExtractPhysical(0)*1e6) + c.Assert(sp, Equals, zeroTime) + + err = spkv.Put(fmt.Sprintf("%s/%s", domain.ServerMinStartTSPath, "a"), "0") + c.Assert(err, IsNil) + err = spkv.Put(fmt.Sprintf("%s/%s", domain.ServerMinStartTSPath, "b"), "1") + c.Assert(err, IsNil) + sp = s.gcWorker.calSafePointByMinStartTS(now) + c.Assert(sp, Equals, zeroTime) + + err = spkv.Put(fmt.Sprintf("%s/%s", domain.ServerMinStartTSPath, "a"), + strconv.FormatUint(variable.GoTimeToTS(now), 10)) + c.Assert(err, IsNil) + err = spkv.Put(fmt.Sprintf("%s/%s", domain.ServerMinStartTSPath, "b"), + strconv.FormatUint(variable.GoTimeToTS(now.Add(-20*time.Second)), 10)) + c.Assert(err, IsNil) + sp = s.gcWorker.calSafePointByMinStartTS(now.Add(-10 * time.Second)) + c.Assert(sp.Second(), Equals, now.Add(-20*time.Second).Second()) +} + func (s *testGCWorkerSuite) TestPrepareGC(c *C) { now, err := s.gcWorker.getOracleTime() c.Assert(err, IsNil) @@ -278,19 +309,6 @@ func (s *testGCWorkerSuite) TestPrepareGC(c *C) { c.Assert(err, IsNil) c.Assert(*lifeTime, Equals, gcMinLifeTime) - // Check gc life time small than config.max-txn-use-time - s.oracle.AddOffset(time.Minute * 40) - config.GetGlobalConfig().TiKVClient.MaxTxnTimeUse = 20*60 - 10 // 20min - 10s - err = s.gcWorker.saveDuration(gcLifeTimeKey, time.Minute) - c.Assert(err, IsNil) - ok, _, err = s.gcWorker.prepare() - c.Assert(err, IsNil) - c.Assert(ok, IsTrue) - lifeTime, err = s.gcWorker.loadDuration(gcLifeTimeKey) - c.Assert(err, IsNil) - c.Assert(*lifeTime, Equals, 20*time.Minute) - - // check the tikv_gc_life_time more than config.max-txn-use-time situation. s.oracle.AddOffset(time.Minute * 40) err = s.gcWorker.saveDuration(gcLifeTimeKey, time.Minute*30) c.Assert(err, IsNil) diff --git a/store/tikv/safepoint.go b/store/tikv/safepoint.go index 006113c74a0ec..3aac6c132637c 100644 --- a/store/tikv/safepoint.go +++ b/store/tikv/safepoint.go @@ -17,10 +17,12 @@ import ( "context" "crypto/tls" "strconv" + "strings" "sync" "time" "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/mvcc/mvccpb" "github.com/pingcap/errors" "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" @@ -43,6 +45,7 @@ const ( type SafePointKV interface { Put(k string, v string) error Get(k string) (string, error) + GetWithPrefix(k string) ([]*mvccpb.KeyValue, error) } // MockSafePointKV implements SafePointKV at mock test @@ -74,6 +77,19 @@ func (w *MockSafePointKV) Get(k string) (string, error) { return elem, nil } +// GetWithPrefix implements the Get method for SafePointKV +func (w *MockSafePointKV) GetWithPrefix(prefix string) ([]*mvccpb.KeyValue, error) { + w.mockLock.RLock() + defer w.mockLock.RUnlock() + kvs := make([]*mvccpb.KeyValue, 0, len(w.store)) + for k, v := range w.store { + if strings.HasPrefix(k, prefix) { + kvs = append(kvs, &mvccpb.KeyValue{Key: []byte(k), Value: []byte(v)}) + } + } + return kvs, nil +} + // EtcdSafePointKV implements SafePointKV at runtime type EtcdSafePointKV struct { cli *clientv3.Client @@ -113,6 +129,17 @@ func (w *EtcdSafePointKV) Get(k string) (string, error) { return "", nil } +// GetWithPrefix implements the GetWithPrefix for SafePointKV +func (w *EtcdSafePointKV) GetWithPrefix(k string) ([]*mvccpb.KeyValue, error) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) + resp, err := w.cli.Get(ctx, k, clientv3.WithPrefix()) + cancel() + if err != nil { + return nil, errors.Trace(err) + } + return resp.Kvs, nil +} + func saveSafePoint(kv SafePointKV, t uint64) error { s := strconv.FormatUint(t, 10) err := kv.Put(GcSavedSafePoint, s)