Skip to content

Commit

Permalink
dumpling: add recent speed to dump status (#38456)
Browse files Browse the repository at this point in the history
close #38455
  • Loading branch information
okJiang authored Oct 17, 2022
1 parent 287f5c9 commit aa49a4c
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 1 deletion.
3 changes: 3 additions & 0 deletions dumpling/export/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ type Dumper struct {
selectTiDBTableRegionFunc func(tctx *tcontext.Context, conn *BaseConn, meta TableMeta) (pkFields []string, pkVals [][]string, err error)
totalTables int64
charsetAndDefaultCollationMap map[string]string

speedRecorder *SpeedRecorder
}

// NewDumper returns a new Dumper
Expand All @@ -78,6 +80,7 @@ func NewDumper(ctx context.Context, conf *Config) (*Dumper, error) {
conf: conf,
cancelCtx: cancelFn,
selectTiDBTableRegionFunc: selectTiDBTableRegion,
speedRecorder: NewSpeedRecorder(),
}

var err error
Expand Down
46 changes: 46 additions & 0 deletions dumpling/export/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package export

import (
"fmt"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -48,6 +49,7 @@ type DumpStatus struct {
FinishedRows float64
EstimateTotalRows float64
TotalTables int64
CurrentSpeedBPS int64
}

// GetStatus returns the status of dumping by reading metrics.
Expand All @@ -58,6 +60,7 @@ func (d *Dumper) GetStatus() *DumpStatus {
ret.FinishedBytes = ReadGauge(d.metrics.finishedSizeGauge)
ret.FinishedRows = ReadGauge(d.metrics.finishedRowsGauge)
ret.EstimateTotalRows = ReadCounter(d.metrics.estimateTotalRowsCounter)
ret.CurrentSpeedBPS = d.speedRecorder.GetSpeed(int64(ret.FinishedBytes))
return ret
}

Expand All @@ -72,3 +75,46 @@ func calculateTableCount(m DatabaseTables) int {
}
return cnt
}

// SpeedRecorder record the finished bytes and calculate its speed.
type SpeedRecorder struct {
mu sync.Mutex
lastFinished int64
lastUpdateTime time.Time
speedBPS int64
}

// NewSpeedRecorder new a SpeedRecorder.
func NewSpeedRecorder() *SpeedRecorder {
return &SpeedRecorder{
lastUpdateTime: time.Now(),
}
}

// GetSpeed calculate status speed.
func (s *SpeedRecorder) GetSpeed(finished int64) int64 {
s.mu.Lock()
defer s.mu.Unlock()

if finished <= s.lastFinished {
// for finished bytes does not get forwarded, use old speed to avoid
// display zero. We may find better strategy in future.
return s.speedBPS
}

now := time.Now()
elapsed := int64(now.Sub(s.lastUpdateTime).Seconds())
if elapsed == 0 {
elapsed = 1
}
currentSpeed := (finished - s.lastFinished) / elapsed
if currentSpeed == 0 {
currentSpeed = 1
}

s.lastFinished = finished
s.lastUpdateTime = now
s.speedBPS = currentSpeed

return currentSpeed
}
22 changes: 21 additions & 1 deletion dumpling/export/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ package export

import (
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestGetParameters(t *testing.T) {
conf := defaultConfigForTest(t)
d := &Dumper{conf: conf}
d := &Dumper{conf: conf, speedRecorder: NewSpeedRecorder()}
d.metrics = newMetrics(conf.PromFactory, nil)

mid := d.GetStatus()
Expand All @@ -30,3 +31,22 @@ func TestGetParameters(t *testing.T) {
require.EqualValues(t, float64(30), mid.FinishedRows)
require.EqualValues(t, float64(40), mid.EstimateTotalRows)
}

func TestSpeedRecorder(t *testing.T) {
testCases := []struct {
spentTime int64
finished int64
expected int64
}{
{spentTime: 1, finished: 100, expected: 100},
{spentTime: 2, finished: 200, expected: 50},
// already finished, will return last speed
{spentTime: 3, finished: 200, expected: 50},
}
speedRecorder := NewSpeedRecorder()
for _, tc := range testCases {
time.Sleep(time.Duration(tc.spentTime) * time.Second)
recentSpeed := speedRecorder.GetSpeed(tc.finished)
require.Equal(t, tc.expected, recentSpeed)
}
}

0 comments on commit aa49a4c

Please sign in to comment.