Skip to content

Commit

Permalink
Merge branch 'master' into fix-skipDDL-attribute
Browse files Browse the repository at this point in the history
  • Loading branch information
joccau authored Nov 4, 2021
2 parents 7efad08 + 86caab9 commit 91cbade
Show file tree
Hide file tree
Showing 35 changed files with 427 additions and 159 deletions.
1 change: 1 addition & 0 deletions br/pkg/glue/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Glue interface {
// Session is an abstraction of the session.Session interface.
type Session interface {
Execute(ctx context.Context, sql string) error
ExecuteInternal(ctx context.Context, sql string, args ...interface{}) error
CreateDatabase(ctx context.Context, schema *model.DBInfo) error
CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo) error
Close()
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ func (gs *tidbSession) Execute(ctx context.Context, sql string) error {
return errors.Trace(err)
}

func (gs *tidbSession) ExecuteInternal(ctx context.Context, sql string, args ...interface{}) error {
_, err := gs.se.ExecuteInternal(ctx, sql, args...)
return errors.Trace(err)
}

// CreateDatabase implements glue.Session.
func (gs *tidbSession) CreateDatabase(ctx context.Context, schema *model.DBInfo) error {
d := domain.GetDomain(gs.se).DDL()
Expand Down
53 changes: 36 additions & 17 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,7 @@ func (rc *Client) GoValidateChecksum(
// run the stat loader
go func() {
defer wg.Done()
rc.statLoader(ctx, loadStatCh)
rc.updateMetaAndLoadStats(ctx, loadStatCh)
}()
workers := utils.NewWorkerPool(defaultChecksumConcurrency, "RestoreChecksum")
go func() {
Expand Down Expand Up @@ -842,7 +842,13 @@ func (rc *Client) GoValidateChecksum(
return outCh
}

func (rc *Client) execChecksum(ctx context.Context, tbl CreatedTable, kvClient kv.Client, concurrency uint, loadStatCh chan<- *CreatedTable) error {
func (rc *Client) execChecksum(
ctx context.Context,
tbl CreatedTable,
kvClient kv.Client,
concurrency uint,
loadStatCh chan<- *CreatedTable,
) error {
logger := log.With(
zap.String("db", tbl.OldTable.DB.Name.O),
zap.String("table", tbl.OldTable.Info.Name.O),
Expand Down Expand Up @@ -891,13 +897,12 @@ func (rc *Client) execChecksum(ctx context.Context, tbl CreatedTable, kvClient k
)
return errors.Annotate(berrors.ErrRestoreChecksumMismatch, "failed to validate checksum")
}
if table.Stats != nil {
loadStatCh <- &tbl
}

loadStatCh <- &tbl
return nil
}

func (rc *Client) statLoader(ctx context.Context, input <-chan *CreatedTable) {
func (rc *Client) updateMetaAndLoadStats(ctx context.Context, input <-chan *CreatedTable) {
for {
select {
case <-ctx.Done():
Expand All @@ -906,19 +911,33 @@ func (rc *Client) statLoader(ctx context.Context, input <-chan *CreatedTable) {
if !ok {
return
}

// Not need to return err when failed because of update analysis-meta
restoreTS, err := rc.GetTS(ctx)
if err != nil {
log.Error("getTS failed", zap.Error(err))
} else {
err = rc.db.UpdateStatsMeta(ctx, tbl.Table.ID, restoreTS, tbl.OldTable.TotalKvs)
if err != nil {
log.Error("update stats meta failed", zap.Any("table", tbl.Table), zap.Error(err))
}
}

table := tbl.OldTable
log.Info("start loads analyze after validate checksum",
zap.Int64("old id", tbl.OldTable.Info.ID),
zap.Int64("new id", tbl.Table.ID),
)
start := time.Now()
if err := rc.statsHandler.LoadStatsFromJSON(rc.dom.InfoSchema(), table.Stats); err != nil {
log.Error("analyze table failed", zap.Any("table", table.Stats), zap.Error(err))
if table.Stats != nil {
log.Info("start loads analyze after validate checksum",
zap.Int64("old id", tbl.OldTable.Info.ID),
zap.Int64("new id", tbl.Table.ID),
)
start := time.Now()
if err := rc.statsHandler.LoadStatsFromJSON(rc.dom.InfoSchema(), table.Stats); err != nil {
log.Error("analyze table failed", zap.Any("table", table.Stats), zap.Error(err))
}
log.Info("restore stat done",
zap.String("table", table.Info.Name.L),
zap.String("db", table.DB.Name.L),
zap.Duration("cost", time.Since(start)))
}
log.Info("restore stat done",
zap.String("table", table.Info.Name.L),
zap.String("db", table.DB.Name.L),
zap.Duration("cost", time.Since(start)))
}
}
}
Expand Down
23 changes: 23 additions & 0 deletions br/pkg/restore/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -92,6 +93,28 @@ func (db *DB) ExecDDL(ctx context.Context, ddlJob *model.Job) error {
return errors.Trace(err)
}

// UpdateStatsMeta update count and snapshot ts in mysql.stats_meta
func (db *DB) UpdateStatsMeta(ctx context.Context, tableID int64, restoreTS uint64, count uint64) error {
sysDB := mysql.SystemDB
statsMetaTbl := "stats_meta"

// set restoreTS to snapshot and version which is used to update stats_meta
err := db.se.ExecuteInternal(
ctx,
"update %n.%n set snapshot = %?, version = %?, count = %? where table_id = %?",
sysDB,
statsMetaTbl,
restoreTS,
restoreTS,
count,
tableID,
)
if err != nil {
log.Error("execute update sql failed", zap.Error(err))
}
return nil
}

// CreateDatabase executes a CREATE DATABASE SQL.
func (db *DB) CreateDatabase(ctx context.Context, schema *model.DBInfo) error {
err := db.se.CreateDatabase(ctx, schema)
Expand Down
3 changes: 2 additions & 1 deletion dumpling/export/block_allow_list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ import (

"github.com/pingcap/tidb-tools/pkg/filter"
tf "github.com/pingcap/tidb-tools/pkg/table-filter"
tcontext "github.com/pingcap/tidb/dumpling/context"
"github.com/stretchr/testify/require"

tcontext "github.com/pingcap/tidb/dumpling/context"
)

func TestFilterTables(t *testing.T) {
Expand Down
24 changes: 6 additions & 18 deletions dumpling/export/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"regexp"
"strconv"
"strings"
Expand All @@ -21,11 +20,12 @@ import (
"github.com/pingcap/errors"
filter "github.com/pingcap/tidb-tools/pkg/table-filter"
"github.com/pingcap/tidb-tools/pkg/utils"
"github.com/pingcap/tidb/br/pkg/storage"
tcontext "github.com/pingcap/tidb/dumpling/context"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/pflag"
"go.uber.org/zap"

"github.com/pingcap/tidb/br/pkg/storage"
tcontext "github.com/pingcap/tidb/dumpling/context"
)

const (
Expand Down Expand Up @@ -531,21 +531,9 @@ func (conf *Config) createExternalStorage(ctx context.Context) (storage.External
if err != nil {
return nil, errors.Trace(err)
}
httpClient := http.DefaultClient
httpClient.Timeout = 30 * time.Second
maxIdleConnsPerHost := http.DefaultMaxIdleConnsPerHost
if conf.Threads > maxIdleConnsPerHost {
maxIdleConnsPerHost = conf.Threads
}
transport := http.DefaultTransport.(*http.Transport).Clone()
transport.MaxIdleConnsPerHost = maxIdleConnsPerHost
httpClient.Transport = transport

return storage.New(ctx, b, &storage.ExternalStorageOptions{
HTTPClient: httpClient,
SkipCheckPath: true,
SendCredentials: false,
})

// TODO: support setting httpClient with certification later
return storage.New(ctx, b, &storage.ExternalStorageOptions{})
}

const (
Expand Down
3 changes: 2 additions & 1 deletion dumpling/export/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ package export
import (
"testing"

tcontext "github.com/pingcap/tidb/dumpling/context"
"github.com/stretchr/testify/require"

tcontext "github.com/pingcap/tidb/dumpling/context"
)

func TestCreateExternalStorage(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions dumpling/export/consistency.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"database/sql"

"github.com/pingcap/errors"

"github.com/pingcap/tidb/br/pkg/utils"
tcontext "github.com/pingcap/tidb/dumpling/context"
)
Expand Down
6 changes: 3 additions & 3 deletions dumpling/export/consistency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import (
"errors"
"testing"

tcontext "github.com/pingcap/tidb/dumpling/context"
"github.com/stretchr/testify/require"

"github.com/DATA-DOG/go-sqlmock"
"github.com/go-sql-driver/mysql"
"github.com/stretchr/testify/require"

tcontext "github.com/pingcap/tidb/dumpling/context"
)

func TestConsistencyController(t *testing.T) {
Expand Down
Loading

0 comments on commit 91cbade

Please sign in to comment.