Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: calculate GC safe point based on global min start timestamp #12223

Merged
merged 20 commits into from
Sep 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,6 @@ type TiKVClient struct {
// CommitTimeout is the max time which command 'commit' will wait.
CommitTimeout string `toml:"commit-timeout" json:"commit-timeout"`

// MaxTxnTimeUse is the max time a Txn may use (in seconds) from its startTS to commitTS.
MaxTxnTimeUse uint `toml:"max-txn-time-use" json:"max-txn-time-use"`

// MaxBatchSize is the max batch size when calling batch commands API.
MaxBatchSize uint `toml:"max-batch-size" json:"max-batch-size"`
// If TiKV load is greater than this, TiDB will wait for a while to avoid little batch.
Expand Down Expand Up @@ -407,8 +404,6 @@ var defaultConf = Config{
GrpcKeepAliveTimeout: 3,
CommitTimeout: "41s",

MaxTxnTimeUse: 590,

MaxBatchSize: 128,
OverloadThreshold: 200,
MaxBatchWaitTime: 0,
Expand Down Expand Up @@ -595,9 +590,6 @@ func (c *Config) Valid() error {
if c.TiKVClient.GrpcConnectionCount == 0 {
return fmt.Errorf("grpc-connection-count should be greater than 0")
}
if c.TiKVClient.MaxTxnTimeUse == 0 {
return fmt.Errorf("max-txn-time-use should be greater than 0")
}
if c.PessimisticTxn.TTL != "" {
dur, err := time.ParseDuration(c.PessimisticTxn.TTL)
if err != nil {
Expand Down
5 changes: 0 additions & 5 deletions config/config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -271,11 +271,6 @@ grpc-keepalive-timeout = 3
# Max time for commit command, must be twice bigger than raft election timeout.
commit-timeout = "41s"

# The max time a Txn may use (in seconds) from its startTS to commitTS.
# We use it to guarantee GC worker will not influence any active txn. Please make sure that this
# value is less than gc_life_time - 10s.
max-txn-time-use = 590

# Max batch size in gRPC.
max-batch-size = 128
# Overload threshold of TiKV.
Expand Down
2 changes: 1 addition & 1 deletion domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ func (do *Domain) infoSyncerKeeper() {
for {
select {
case <-ticker.C:
do.info.ReportMinStartTS()
do.info.ReportMinStartTS(do.Store())
case <-do.info.Done():
logutil.BgLogger().Info("server info syncer need to restart")
if err := do.info.Restart(context.Background()); err != nil {
Expand Down
47 changes: 47 additions & 0 deletions domain/domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package domain
import (
"context"
"crypto/tls"
"math"
"testing"
"time"

Expand All @@ -34,6 +35,7 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/testleak"
dto "github.com/prometheus/client_model/go"
Expand Down Expand Up @@ -184,6 +186,29 @@ func TestInfo(t *testing.T) {
}
}

type mockSessionManager struct {
PS []*util.ProcessInfo
}

func (msm *mockSessionManager) ShowProcessList() map[uint64]*util.ProcessInfo {
ret := make(map[uint64]*util.ProcessInfo)
for _, item := range msm.PS {
ret[item.ID] = item
}
return ret
}

func (msm *mockSessionManager) GetProcessInfo(id uint64) (*util.ProcessInfo, bool) {
for _, item := range msm.PS {
if item.ID == id {
return item, true
}
}
return &util.ProcessInfo{}, false
}

func (msm *mockSessionManager) Kill(cid uint64, query bool) {}

func (*testSuite) TestT(c *C) {
defer testleak.AfterTest(c)()
store, err := mockstore.NewMockTikvStore()
Expand Down Expand Up @@ -338,6 +363,28 @@ func (*testSuite) TestT(c *C) {
c.Assert(err.Error(), Equals, ErrInfoSchemaExpired.Error())
dom.SchemaValidator.Reset()

// Test for reporting min start timestamp.
infoSyncer := dom.InfoSyncer()
sm := &mockSessionManager{
PS: make([]*util.ProcessInfo, 0),
}
infoSyncer.SetSessionManager(sm)
beforeTS := variable.GoTimeToTS(time.Now())
infoSyncer.ReportMinStartTS(dom.Store())
afterTS := variable.GoTimeToTS(time.Now())
c.Assert(infoSyncer.minStartTS > beforeTS && infoSyncer.minStartTS < afterTS, IsFalse)
lowerLimit := time.Now().Add(-time.Duration(kv.MaxTxnTimeUse) * time.Millisecond)
validTS := variable.GoTimeToTS(lowerLimit.Add(time.Minute))
sm.PS = []*util.ProcessInfo{
{CurTxnStartTS: 0},
{CurTxnStartTS: math.MaxUint64},
{CurTxnStartTS: variable.GoTimeToTS(lowerLimit)},
{CurTxnStartTS: validTS},
}
infoSyncer.SetSessionManager(sm)
infoSyncer.ReportMinStartTS(dom.Store())
c.Assert(infoSyncer.minStartTS == validTS, IsTrue)

err = store.Close()
c.Assert(err, IsNil)
isClose := dom.isClose()
Expand Down
25 changes: 19 additions & 6 deletions domain/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"context"
"encoding/json"
"fmt"
"math"
"strconv"
"time"

Expand All @@ -27,7 +26,10 @@ import (
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/owner"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/tikv/oracle"
util2 "github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/hack"
"github.com/pingcap/tidb/util/logutil"
Expand Down Expand Up @@ -180,27 +182,38 @@ func (is *InfoSyncer) RemoveMinStartTS() {
}

// ReportMinStartTS reports self server min start timestamp to ETCD.
func (is *InfoSyncer) ReportMinStartTS() {
func (is *InfoSyncer) ReportMinStartTS(store kv.Storage) {
if is.manager == nil {
// Server may not start in time.
return
}
pl := is.manager.ShowProcessList()
var minStartTS uint64 = math.MaxUint64

// Calculate the lower limit of the start timestamp to avoid extremely old transaction delaying GC.
currentVer, err := store.CurrentVersion()
if err != nil {
logutil.BgLogger().Error("update minStartTS failed", zap.Error(err))
return
}
now := time.Unix(0, oracle.ExtractPhysical(currentVer.Ver)*1e6)
startTSLowerLimit := variable.GoTimeToTS(now.Add(-time.Duration(kv.MaxTxnTimeUse) * time.Millisecond))

minStartTS := variable.GoTimeToTS(now)
for _, info := range pl {
if info.CurTxnStartTS < minStartTS {
if info.CurTxnStartTS > startTSLowerLimit && info.CurTxnStartTS < minStartTS {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any test that covers these changes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

noop... Any suggestions? Maybe we need some intagration test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry but I'm not familiar with that part of code. Just thinking it will be nice if this is covered by tests.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test case added, PTAL @MyonKeminta .

minStartTS = info.CurTxnStartTS
}
}

is.minStartTS = minStartTS
err := is.storeMinStartTS(context.Background())
err = is.storeMinStartTS(context.Background())
if err != nil {
logutil.BgLogger().Error("update minStartTS failed", zap.Error(err))
}
}

// Done returns a channel that closes when the info syncer is no longer being refreshed.
func (is InfoSyncer) Done() <-chan struct{} {
func (is *InfoSyncer) Done() <-chan struct{} {
if is.etcdCli == nil {
return make(chan struct{}, 1)
}
Expand Down
4 changes: 4 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ const (
// Append UnCommitIndexKVFlag to the value indicate the index key/value is no need to commit.
const UnCommitIndexKVFlag byte = '1'

// MaxTxnTimeUse is the max time a Txn may use (in ms) from its begin to commit.
// We use it to abort the transaction to guarantee GC worker will not influence it.
const MaxTxnTimeUse = 24 * 60 * 60 * 1000

// IsoLevel is the transaction's isolation level.
type IsoLevel int

Expand Down
20 changes: 2 additions & 18 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@ import (

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx/binloginfo"
Expand Down Expand Up @@ -95,11 +93,7 @@ type twoPhaseCommitter struct {
priority pb.CommandPri
connID uint64 // connID is used for log.
cleanWg sync.WaitGroup
// maxTxnTimeUse represents max time a Txn may use (in ms) from its startTS to commitTS.
// We use it to guarantee GC worker will not influence any active txn. The value
// should be less than GC life time.
maxTxnTimeUse uint64
detail unsafe.Pointer
detail unsafe.Pointer

primaryKey []byte
forUpdateTS uint64
Expand Down Expand Up @@ -314,9 +308,6 @@ func (c *twoPhaseCommitter) initKeysAndMutations() error {
zap.Uint64("txnStartTS", txn.startTS))
}

// Convert from sec to ms
maxTxnTimeUse := uint64(config.GetGlobalConfig().TiKVClient.MaxTxnTimeUse) * 1000

// Sanity check for startTS.
if txn.StartTS() == math.MaxUint64 {
err = errors.Errorf("try to commit with invalid txnStartTS: %d", txn.StartTS())
Expand All @@ -329,7 +320,6 @@ func (c *twoPhaseCommitter) initKeysAndMutations() error {
commitDetail := &execdetails.CommitDetails{WriteSize: size, WriteKeys: len(keys)}
metrics.TiKVTxnWriteKVCountHistogram.Observe(float64(commitDetail.WriteKeys))
metrics.TiKVTxnWriteSizeHistogram.Observe(float64(commitDetail.WriteSize))
c.maxTxnTimeUse = maxTxnTimeUse
c.keys = keys
c.mutations = mutations
c.lockTTL = txnLockTTL(txn.startTime, size)
Expand Down Expand Up @@ -1034,13 +1024,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error {
return errors.Trace(err)
}

failpoint.Inject("tmpMaxTxnTime", func(val failpoint.Value) {
if tmpMaxTxnTime := uint64(val.(int)); tmpMaxTxnTime > 0 {
c.maxTxnTimeUse = tmpMaxTxnTime
}
})

if c.store.oracle.IsExpired(c.startTS, c.maxTxnTimeUse) {
if c.store.oracle.IsExpired(c.startTS, kv.MaxTxnTimeUse) {
err = errors.Errorf("conn %d txn takes too much time, txnStartTS: %d, comm: %d",
c.connID, c.startTS, c.commitTS)
return err
Expand Down
44 changes: 31 additions & 13 deletions store/tikv/gcworker/gc_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/parser/terror"
pd "github.com/pingcap/pd/client"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
Expand Down Expand Up @@ -320,6 +321,30 @@ func (w *GCWorker) checkPrepare(ctx context.Context) (bool, uint64, error) {
return true, oracle.ComposeTS(oracle.GetPhysical(*newSafePoint), 0), nil
}

// calculateNewSafePoint uses the current global transaction min start timestamp to calculate the new safe point.
func (w *GCWorker) calSafePointByMinStartTS(safePoint time.Time) time.Time {
kvs, err := w.store.GetSafePointKV().GetWithPrefix(domain.ServerMinStartTSPath)
if err != nil {
logutil.BgLogger().Warn("get all minStartTS failed", zap.Error(err))
return safePoint
}

safePointTS := variable.GoTimeToTS(safePoint)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use oracle.ComposeTS? The same to other usages of variable.GoTimeToTS

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not need to GetPhysical().

for _, v := range kvs {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible the kvs is empty?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

minStartTS, err := strconv.ParseUint(string(v.Value), 10, 64)
if err != nil {
logutil.BgLogger().Warn("parse minStartTS failed", zap.Error(err))
continue
}
if minStartTS < safePointTS {
safePointTS = minStartTS
}
}
safePoint = time.Unix(0, oracle.ExtractPhysical(safePointTS)*1e6)
logutil.BgLogger().Debug("calSafePointByMinStartTS", zap.Time("safePoint", safePoint))
return safePoint
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's is possible that the diff > maxTxnTimeUse but there is another transaction startTS before the safepoint.
So we need to check maxTxnTimeUse during kvs iteration.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we needn't, since for these transactions which run longer than maxTxnTimeUse should be failed finally

}

func (w *GCWorker) getOracleTime() (time.Time, error) {
currentVer, err := w.store.CurrentVersion()
if err != nil {
Expand Down Expand Up @@ -419,23 +444,16 @@ func (w *GCWorker) checkGCInterval(now time.Time) (bool, error) {

// validateGCLiftTime checks whether life time is small than min gc life time.
func (w *GCWorker) validateGCLiftTime(lifeTime time.Duration) (time.Duration, error) {
minLifeTime := gcMinLifeTime
// max-txn-time-use value is less than gc_life_time - 10s.
maxTxnTime := time.Duration(config.GetGlobalConfig().TiKVClient.MaxTxnTimeUse+10) * time.Second
if minLifeTime < maxTxnTime {
minLifeTime = maxTxnTime
}

if lifeTime >= minLifeTime {
if lifeTime >= gcMinLifeTime {
return lifeTime, nil
}

logutil.BgLogger().Info("[gc worker] invalid gc life time",
zap.Duration("get gc life time", lifeTime),
zap.Duration("min gc life time", minLifeTime))
zap.Duration("min gc life time", gcMinLifeTime))

err := w.saveDuration(gcLifeTimeKey, minLifeTime)
return minLifeTime, err
err := w.saveDuration(gcLifeTimeKey, gcMinLifeTime)
return gcMinLifeTime, err
}

func (w *GCWorker) calculateNewSafePoint(now time.Time) (*time.Time, error) {
Expand All @@ -452,7 +470,7 @@ func (w *GCWorker) calculateNewSafePoint(now time.Time) (*time.Time, error) {
if err != nil {
return nil, errors.Trace(err)
}
safePoint := now.Add(-*lifeTime)
safePoint := w.calSafePointByMinStartTS(now.Add(-*lifeTime))
// We should never decrease safePoint.
if lastSafePoint != nil && safePoint.Before(*lastSafePoint) {
logutil.BgLogger().Info("[gc worker] last safe point is later than current one."+
Expand Down
Loading