Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into fix-socket-auth
Browse files Browse the repository at this point in the history
  • Loading branch information
morgo committed Nov 29, 2021
2 parents 454a6f5 + 443f15e commit 909e64e
Show file tree
Hide file tree
Showing 64 changed files with 1,747 additions and 368 deletions.
10 changes: 8 additions & 2 deletions br/pkg/lightning/backend/tidb/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,10 @@ func EncodeRowForRecord(encTable table.Table, sqlMode mysql.SQLMode, row []types
}
resRow, err := enc.Encode(log.L(), row, 0, columnPermutation, "", 0)
if err != nil {
return fmt.Sprintf("/* ERROR: %s */", err)
// if encode can't succeed, fallback to record the raw input strings
// ignore the error since it can only happen if the datum type is unknown, this can't happen here.
datumStr, _ := types.DatumsToString(row, true)
return datumStr
}
return resRow.(tidbRow).insertStmt
}
Expand Down Expand Up @@ -435,14 +438,17 @@ rowLoop:
continue rowLoop
case utils.IsRetryableError(err):
// retry next loop
default:
case be.errorMgr.TypeErrorsRemain() > 0:
// WriteBatchRowsToDB failed in the batch mode and can not be retried,
// we need to redo the writing row-by-row to find where the error locates (and skip it correctly in future).
if err = be.WriteRowsToDB(ctx, tableName, columnNames, r); err != nil {
// If the error is not nil, it means we reach the max error count in the non-batch mode.
// For now, we will treat like maxErrorCount is always 0. So we will just return if any error occurs.
return errors.Annotatef(err, "[%s] write rows reach max error count %d", tableName, 0)
}
continue rowLoop
default:
return err
}
}
return errors.Annotatef(err, "[%s] batch write rows reach max retry %d and still failed", tableName, writeRowsMaxRetryTimes)
Expand Down
144 changes: 131 additions & 13 deletions br/pkg/lightning/backend/tidb/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/errormanager"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/lightning/verification"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/parser/charset"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
Expand Down Expand Up @@ -408,7 +409,33 @@ func TestFetchRemoteTableModels_4_x_auto_random(t *testing.T) {
}, tableInfos)
}

func TestWriteRowsErrorDowngrading(t *testing.T) {
func TestWriteRowsErrorNoRetry(t *testing.T) {
t.Parallel()
nonRetryableError := sql.ErrNoRows
s := createMysqlSuite(t)
defer s.TearDownTest(t)

// batch insert, fail and rollback.
s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(1),(2),(3),(4),(5)\\E").
WillReturnError(nonRetryableError)

// disable error record, should not expect retry statements one by one.
ignoreBackend := tidb.NewTiDBBackend(s.dbHandle, config.ErrorOnDup,
errormanager.New(s.dbHandle, &config.Config{}),
)
dataRows := encodeRowsTiDB(t, ignoreBackend, s.tbl)
ctx := context.Background()
engine, err := ignoreBackend.OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1)
require.NoError(t, err)
writer, err := engine.LocalWriter(ctx, nil)
require.NoError(t, err)
err = writer.WriteRows(ctx, []string{"a"}, dataRows)
require.Error(t, err)
require.False(t, utils.IsRetryableError(err), "err: %v", err)
}

func TestWriteRowsErrorDowngradingAll(t *testing.T) {
t.Parallel()
nonRetryableError := sql.ErrNoRows
s := createMysqlSuite(t)
Expand Down Expand Up @@ -439,13 +466,77 @@ func TestWriteRowsErrorDowngrading(t *testing.T) {
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*").
WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "9.csv", int64(0), nonRetryableError.Error(), "(3)").
WillReturnResult(driver.ResultNoRows)
// the forth row will exceed the error threshold, won't record this error
s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(4)\\E").
WillReturnError(nonRetryableError)
s.mockDB.
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*").
WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "10.csv", int64(0), nonRetryableError.Error(), "(4)").
WillReturnResult(driver.ResultNoRows)
s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(5)\\E").
WillReturnError(nonRetryableError)
s.mockDB.
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*").
WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "11.csv", int64(0), nonRetryableError.Error(), "(5)").
WillReturnResult(driver.ResultNoRows)

// disable error record, should not expect retry statements one by one.
ignoreBackend := tidb.NewTiDBBackend(s.dbHandle, config.ErrorOnDup,
errormanager.New(s.dbHandle, &config.Config{
App: config.Lightning{
TaskInfoSchemaName: "tidb_lightning_errors",
MaxError: config.MaxError{
Type: *atomic.NewInt64(10),
},
},
}),
)
dataRows := encodeRowsTiDB(t, ignoreBackend, s.tbl)
ctx := context.Background()
logger := log.L()
engine, err := ignoreBackend.OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1)
require.NoError(t, err)
writer, err := engine.LocalWriter(ctx, nil)
require.NoError(t, err)
err = writer.WriteRows(ctx, []string{"a"}, dataRows)
require.NoError(t, err)
}

func TestWriteRowsErrorDowngradingExceedThreshold(t *testing.T) {
t.Parallel()
nonRetryableError := sql.ErrNoRows
s := createMysqlSuite(t)
defer s.TearDownTest(t)
// First, batch insert, fail and rollback.
s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(1),(2),(3),(4),(5)\\E").
WillReturnError(nonRetryableError)
// Then, insert row-by-row due to the non-retryable error.
s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(1)\\E").
WillReturnError(nonRetryableError)
s.mockDB.
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*").
WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "7.csv", int64(0), nonRetryableError.Error(), "(1)").
WillReturnResult(driver.ResultNoRows)
s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(2)\\E").
WillReturnError(nonRetryableError)
s.mockDB.
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*").
WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "8.csv", int64(0), nonRetryableError.Error(), "(2)").
WillReturnResult(driver.ResultNoRows)
s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(3)\\E").
WillReturnError(nonRetryableError)
s.mockDB.
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*").
WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "9.csv", int64(0), nonRetryableError.Error(), "(3)").
WillReturnResult(driver.ResultNoRows)
// the forth row will exceed the error threshold, won't record this error
s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(4)\\E").
WillReturnError(nonRetryableError)

ignoreBackend := tidb.NewTiDBBackend(s.dbHandle, config.ErrorOnDup,
errormanager.New(s.dbHandle, &config.Config{
Expand All @@ -457,15 +548,27 @@ func TestWriteRowsErrorDowngrading(t *testing.T) {
},
}),
)
dataRows := encodeRowsTiDB(t, ignoreBackend, s.tbl)
ctx := context.Background()
engine, err := ignoreBackend.OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1)
require.NoError(t, err)
writer, err := engine.LocalWriter(ctx, nil)
require.NoError(t, err)
err = writer.WriteRows(ctx, []string{"a"}, dataRows)
require.Error(t, err)
st, err := writer.Close(ctx)
require.NoError(t, err)
require.Nil(t, st)
}

dataRows := ignoreBackend.MakeEmptyRows()
func encodeRowsTiDB(t *testing.T, b backend.Backend, tbl table.Table) kv.Rows {
dataRows := b.MakeEmptyRows()
dataChecksum := verification.MakeKVChecksum(0, 0, 0)
indexRows := ignoreBackend.MakeEmptyRows()
indexRows := b.MakeEmptyRows()
indexChecksum := verification.MakeKVChecksum(0, 0, 0)
logger := log.L()

encoder, err := ignoreBackend.NewEncoder(s.tbl, &kv.SessionOptions{})
encoder, err := b.NewEncoder(tbl, &kv.SessionOptions{})
require.NoError(t, err)
row, err := encoder.Encode(logger, []types.Datum{
types.NewIntDatum(1),
Expand Down Expand Up @@ -501,12 +604,27 @@ func TestWriteRowsErrorDowngrading(t *testing.T) {
require.NoError(t, err)

row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum)
return dataRows
}

writer, err := engine.LocalWriter(ctx, nil)
require.NoError(t, err)
err = writer.WriteRows(ctx, []string{"a"}, dataRows)
require.Error(t, err)
st, err := writer.Close(ctx)
require.NoError(t, err)
require.Nil(t, st)
func TestEncodeRowForRecord(t *testing.T) {
t.Parallel()
s := createMysqlSuite(t)

// for a correct row, the will encode a correct result
row := tidb.EncodeRowForRecord(s.tbl, mysql.ModeStrictTransTables, []types.Datum{
types.NewIntDatum(5),
types.NewStringDatum("test test"),
types.NewBinaryLiteralDatum(types.NewBinaryLiteralFromUint(0xabcdef, 6)),
}, []int{0, -1, -1, -1, -1, -1, -1, -1, 1, 2, -1, -1, -1, -1})
require.Equal(t, row, "(5,'test test',x'000000abcdef')")

// the following row will result in column count mismatch error, there for encode
// result will fallback to a "," separated string list.
row = tidb.EncodeRowForRecord(s.tbl, mysql.ModeStrictTransTables, []types.Datum{
types.NewIntDatum(5),
types.NewStringDatum("test test"),
types.NewBinaryLiteralDatum(types.NewBinaryLiteralFromUint(0xabcdef, 6)),
}, []int{0, -1, -1, -1, -1, -1, -1, -1, 1, 2, 3, -1, -1, -1})
require.Equal(t, row, "(5, \"test test\", \x00\x00\x00\xab\xcd\xef)")
}
4 changes: 4 additions & 0 deletions br/pkg/lightning/errormanager/errormanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ type ErrorManager struct {
dupResolution config.DuplicateResolutionAlgorithm
}

func (em *ErrorManager) TypeErrorsRemain() int64 {
return em.remainingError.Type.Load()
}

// New creates a new error manager.
func New(db *sql.DB, cfg *config.Config) *ErrorManager {
em := &ErrorManager{
Expand Down
24 changes: 16 additions & 8 deletions br/pkg/lightning/restore/check_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
"github.com/pingcap/tidb/br/pkg/lightning/verification"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/version"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/table"
Expand Down Expand Up @@ -179,6 +178,15 @@ func (rc *Controller) ClusterIsAvailable(ctx context.Context) error {
return nil
}

func isTiFlash(store *api.MetaStore) bool {
for _, label := range store.Labels {
if label.Key == "engine" && label.Value == "tiflash" {
return true
}
}
return false
}

func (rc *Controller) checkEmptyRegion(ctx context.Context) error {
passed := true
message := "Cluster doesn't have too many empty regions"
Expand Down Expand Up @@ -206,7 +214,7 @@ func (rc *Controller) checkEmptyRegion(ctx context.Context) error {
}
}
for _, store := range storeInfo.Stores {
stores[store.Store.Id] = store
stores[store.Store.StoreID] = store
}
tableCount := 0
for _, db := range rc.dbMetas {
Expand All @@ -224,10 +232,10 @@ func (rc *Controller) checkEmptyRegion(ctx context.Context) error {
)
for storeID, regionCnt := range regions {
if store, ok := stores[storeID]; ok {
if store.Store.State != metapb.StoreState_Up {
if metapb.StoreState(metapb.StoreState_value[store.Store.StateName]) != metapb.StoreState_Up {
continue
}
if version.IsTiFlash(store.Store.Store) {
if isTiFlash(store.Store) {
continue
}
if regionCnt > errorThrehold {
Expand Down Expand Up @@ -269,10 +277,10 @@ func (rc *Controller) checkRegionDistribution(ctx context.Context) error {
}
stores := make([]*api.StoreInfo, 0, len(result.Stores))
for _, store := range result.Stores {
if store.Store.State != metapb.StoreState_Up {
if metapb.StoreState(metapb.StoreState_value[store.Store.StateName]) != metapb.StoreState_Up {
continue
}
if version.IsTiFlash(store.Store.Store) {
if isTiFlash(store.Store) {
continue
}
stores = append(stores, store)
Expand Down Expand Up @@ -302,11 +310,11 @@ func (rc *Controller) checkRegionDistribution(ctx context.Context) error {
passed = false
message = fmt.Sprintf("Region distribution is unbalanced, the ratio of the regions count of the store(%v) "+
"with least regions(%v) to the store(%v) with most regions(%v) is %v, but we expect it must not be less than %v",
minStore.Store.Id, minStore.Status.RegionCount, maxStore.Store.Id, maxStore.Status.RegionCount, ratio, errorRegionCntMinMaxRatio)
minStore.Store.StoreID, minStore.Status.RegionCount, maxStore.Store.StoreID, maxStore.Status.RegionCount, ratio, errorRegionCntMinMaxRatio)
} else if ratio < warnRegionCntMinMaxRatio {
message = fmt.Sprintf("Region distribution is unbalanced, the ratio of the regions count of the store(%v) "+
"with least regions(%v) to the store(%v) with most regions(%v) is %v, but we expect it should not be less than %v",
minStore.Store.Id, minStore.Status.RegionCount, maxStore.Store.Id, maxStore.Status.RegionCount, ratio, warnRegionCntMinMaxRatio)
minStore.Store.StoreID, minStore.Status.RegionCount, maxStore.Store.StoreID, maxStore.Status.RegionCount, ratio, warnRegionCntMinMaxRatio)
}
return nil
}
Expand Down
21 changes: 10 additions & 11 deletions br/pkg/lightning/restore/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1489,7 +1489,6 @@ func (s *chunkRestoreSuite) TestEncodeLoopColumnsMismatch(c *C) {
}

func (s *chunkRestoreSuite) TestEncodeLoopIgnoreColumnsCSV(c *C) {
log.InitLogger(&log.Config{}, "error")
cases := []struct {
s string
ignoreColumns []*config.IgnoreColumns
Expand Down Expand Up @@ -1979,7 +1978,7 @@ func (s *tableRestoreSuite) TestCheckClusterRegion(c *C) {
testCases := []testCase{
{
stores: api.StoresInfo{Stores: []*api.StoreInfo{
{Store: &api.MetaStore{Store: &metapb.Store{Id: 1}}, Status: &api.StoreStatus{RegionCount: 200}},
{Store: &api.MetaStore{StoreID: 1}, Status: &api.StoreStatus{RegionCount: 200}},
}},
emptyRegions: api.RegionsInfo{
Regions: append([]api.RegionInfo(nil), makeRegions(100, 1)...),
Expand All @@ -1990,9 +1989,9 @@ func (s *tableRestoreSuite) TestCheckClusterRegion(c *C) {
},
{
stores: api.StoresInfo{Stores: []*api.StoreInfo{
{Store: &api.MetaStore{Store: &metapb.Store{Id: 1}}, Status: &api.StoreStatus{RegionCount: 2000}},
{Store: &api.MetaStore{Store: &metapb.Store{Id: 2}}, Status: &api.StoreStatus{RegionCount: 3100}},
{Store: &api.MetaStore{Store: &metapb.Store{Id: 3}}, Status: &api.StoreStatus{RegionCount: 2500}},
{Store: &api.MetaStore{StoreID: 1}, Status: &api.StoreStatus{RegionCount: 2000}},
{Store: &api.MetaStore{StoreID: 2}, Status: &api.StoreStatus{RegionCount: 3100}},
{Store: &api.MetaStore{StoreID: 3}, Status: &api.StoreStatus{RegionCount: 2500}},
}},
emptyRegions: api.RegionsInfo{
Regions: append(append(append([]api.RegionInfo(nil),
Expand All @@ -2010,19 +2009,19 @@ func (s *tableRestoreSuite) TestCheckClusterRegion(c *C) {
},
{
stores: api.StoresInfo{Stores: []*api.StoreInfo{
{Store: &api.MetaStore{Store: &metapb.Store{Id: 1}}, Status: &api.StoreStatus{RegionCount: 1200}},
{Store: &api.MetaStore{Store: &metapb.Store{Id: 2}}, Status: &api.StoreStatus{RegionCount: 3000}},
{Store: &api.MetaStore{Store: &metapb.Store{Id: 3}}, Status: &api.StoreStatus{RegionCount: 2500}},
{Store: &api.MetaStore{StoreID: 1}, Status: &api.StoreStatus{RegionCount: 1200}},
{Store: &api.MetaStore{StoreID: 2}, Status: &api.StoreStatus{RegionCount: 3000}},
{Store: &api.MetaStore{StoreID: 3}, Status: &api.StoreStatus{RegionCount: 2500}},
}},
expectMsgs: []string{".*Region distribution is unbalanced.*but we expect it must not be less than 0.5.*"},
expectResult: false,
expectErrorCnt: 1,
},
{
stores: api.StoresInfo{Stores: []*api.StoreInfo{
{Store: &api.MetaStore{Store: &metapb.Store{Id: 1}}, Status: &api.StoreStatus{RegionCount: 0}},
{Store: &api.MetaStore{Store: &metapb.Store{Id: 2}}, Status: &api.StoreStatus{RegionCount: 2800}},
{Store: &api.MetaStore{Store: &metapb.Store{Id: 3}}, Status: &api.StoreStatus{RegionCount: 2500}},
{Store: &api.MetaStore{StoreID: 1}, Status: &api.StoreStatus{RegionCount: 0}},
{Store: &api.MetaStore{StoreID: 2}, Status: &api.StoreStatus{RegionCount: 2800}},
{Store: &api.MetaStore{StoreID: 3}, Status: &api.StoreStatus{RegionCount: 2500}},
}},
expectMsgs: []string{".*Region distribution is unbalanced.*but we expect it must not be less than 0.5.*"},
expectResult: false,
Expand Down
Loading

0 comments on commit 909e64e

Please sign in to comment.