Skip to content

Commit

Permalink
Merge branch 'master' into issue-31569
Browse files Browse the repository at this point in the history
  • Loading branch information
zyguan authored Aug 4, 2022
2 parents b6c73e9 + fff7483 commit 3b6487e
Show file tree
Hide file tree
Showing 102 changed files with 1,303 additions and 782 deletions.
5 changes: 3 additions & 2 deletions .bazelrc
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ run --color=yes
build:release --workspace_status_command=./build/print-workspace-status.sh --stamp
build:release --config=ci
build --incompatible_strict_action_env --incompatible_enable_cc_toolchain_resolution
build:ci --remote_cache=http://172.16.4.21:8080/tidb --remote_timeout="10s" --experimental_remote_cache_compression
build:ci --experimental_remote_cache_compression
test:ci --verbose_failures
test:ci --test_env=GO_TEST_WRAP_TESTV=1 --test_verbose_timeout_warnings
test:ci --remote_cache=http://172.16.4.21:8080/tidb --remote_timeout="10s"
test:ci --test_env=TZ=Asia/Shanghai --test_output=errors --experimental_ui_max_stdouterr_bytes=104857600

try-import /data/bazel
23 changes: 23 additions & 0 deletions .cilinter.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
run:
timeout: 10m
linters:
disable-all: true
enable:
- typecheck
- varcheck
- unused
- structcheck
- deadcode
- bodyclose
- rowserrcheck
- prealloc

issues:
exclude-rules:
- path: _test\.go
linters:
- errcheck
- gosec
- rowserrcheck
- makezero

1 change: 1 addition & 0 deletions .github/licenserc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ header:
- 'br/'
- '.gitignore'
- '.gitattributes'
- '.cilinter.yaml'
- '.golangci.yml'
- '.golangci_br.yml'
- 'LICENSES/'
Expand Down
27 changes: 12 additions & 15 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,14 @@ dev: checklist check explaintest gogenerate br_unit_test test_part_parser_dev ut
@>&2 echo "Great, all tests passed."

# Install the check tools.
check-setup:tools/bin/revive tools/bin/goword
check-setup:tools/bin/revive

check: check-parallel lint tidy testSuite check-static errdoc
check: check-parallel lint tidy testSuite errdoc bazel_golangcilinter bazel_all_build

fmt:
@echo "gofmt (simplify)"
@gofmt -s -l -w $(FILES) 2>&1 | $(FAIL_ON_STDOUT)

goword:tools/bin/goword
tools/bin/goword $(FILES) 2>&1 | $(FAIL_ON_STDOUT)

check-static: tools/bin/golangci-lint
GO111MODULE=on CGO_ENABLED=0 tools/bin/golangci-lint run -v $$($(PACKAGE_DIRECTORIES)) --config .golangci.yml

Expand Down Expand Up @@ -332,15 +329,6 @@ ifeq ("$(GOOS)", "freebsd")
GOBUILD = CGO_ENABLED=0 GO111MODULE=on go build -trimpath -ldflags '$(LDFLAGS)'
endif

br_coverage:
tools/bin/gocovmerge "$(TEST_DIR)"/cov.* | grep -vE ".*.pb.go|.*__failpoint_binding__.go" > "$(TEST_DIR)/all_cov.out"
ifeq ("$(JenkinsCI)", "1")
tools/bin/goveralls -coverprofile=$(TEST_DIR)/all_cov.out -service=jenkins-ci -repotoken $(COVERALLS_TOKEN)
else
go tool cover -html "$(TEST_DIR)/all_cov.out" -o "$(TEST_DIR)/all_cov.html"
grep -F '<option' "$(TEST_DIR)/all_cov.html"
endif

# TODO: adjust bins when br integraion tests reformat.
br_bins:
@which bin/tidb-server
Expand Down Expand Up @@ -413,9 +401,13 @@ bazel_coverage_test: failpoint-enable bazel_ci_prepare
-- //... -//cmd/... -//tests/graceshutdown/... \
-//tests/globalkilltest/... -//tests/readonlytest/... -//br/pkg/task:task_test

bazel_build: bazel_ci_prepare
bazel_all_build: bazel_ci_prepare
mkdir -p bin
bazel --output_user_root=/home/jenkins/.tidb/tmp build --config=ci //... --//build:with_nogo_flag=true

bazel_build: bazel_ci_prepare
mkdir -p bin
bazel --output_user_root=/home/jenkins/.tidb/tmp build --config=ci //cmd/importer:importer //tidb-server:tidb-server //tidb-server:tidb-server-check --//build:with_nogo_flag=true
cp bazel-out/k8-fastbuild/bin/tidb-server/tidb-server_/tidb-server ./bin
cp bazel-out/k8-fastbuild/bin/cmd/importer/importer_/importer ./bin
cp bazel-out/k8-fastbuild/bin/tidb-server/tidb-server-check_/tidb-server-check ./bin
Expand All @@ -430,3 +422,8 @@ bazel_junit:
bazel_collect
@mkdir -p $(TEST_COVERAGE_DIR)
mv ./junit.xml `$(TEST_COVERAGE_DIR)/junit.xml`

bazel_golangcilinter:
bazel --output_user_root=/home/jenkins/.tidb/tmp run --config=ci --run_under="cd $(CURDIR) && " \
@com_github_golangci_golangci_lint//cmd/golangci-lint:golangci-lint \
-- run $$($(PACKAGE_DIRECTORIES)) --config ./.cilinter.yaml
15 changes: 15 additions & 0 deletions br/pkg/lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,9 @@ type AbstractBackend interface {
// ResolveDuplicateRows resolves duplicated rows by deleting/inserting data
// according to the required algorithm.
ResolveDuplicateRows(ctx context.Context, tbl table.Table, tableName string, algorithm config.DuplicateResolutionAlgorithm) error

// TotalMemoryConsume counts total memory usage. This is only used for local backend
TotalMemoryConsume() int64
}

// Backend is the delivery target for Lightning
Expand Down Expand Up @@ -286,6 +289,10 @@ func (be Backend) FlushAll(ctx context.Context) error {
return be.abstract.FlushAllEngines(ctx)
}

func (be Backend) TotalMemoryConsume() int64 {
return be.abstract.TotalMemoryConsume()
}

// CheckDiskQuota verifies if the total engine file size is below the given
// quota. If the quota is exceeded, this method returns an array of engines,
// which after importing can decrease the total size below quota.
Expand Down Expand Up @@ -411,6 +418,10 @@ func (engine *OpenedEngine) LocalWriter(ctx context.Context, cfg *LocalWriterCon
return &LocalEngineWriter{writer: w, tableName: engine.tableName}, nil
}

func (engine *OpenedEngine) TotalMemoryConsume() int64 {
return engine.engine.backend.TotalMemoryConsume()
}

// WriteRows writes a collection of encoded rows into the engine.
func (w *LocalEngineWriter) WriteRows(ctx context.Context, columnNames []string, rows kv.Rows) error {
return w.writer.AppendRows(ctx, w.tableName, columnNames, rows)
Expand Down Expand Up @@ -501,3 +512,7 @@ type EngineWriter interface {
IsSynced() bool
Close(ctx context.Context) (ChunkFlushStatus, error)
}

func (engine *OpenedEngine) GetEngineUuid() uuid.UUID {
return engine.uuid
}
14 changes: 14 additions & 0 deletions br/pkg/lightning/backend/local/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,20 @@ func (e *Engine) unlock() {
e.mutex.Unlock()
}

func (e *Engine) TotalMemorySize() int64 {
var memSize int64 = 0
e.localWriters.Range(func(k, v interface{}) bool {
w := k.(*Writer)
if w.kvBuffer != nil {
w.Lock()
memSize += w.kvBuffer.TotalSize()
w.Unlock()
}
return true
})
return memSize
}

type rangeOffsets struct {
Size uint64
Keys uint64
Expand Down
30 changes: 23 additions & 7 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,18 @@ func NewLocalBackend(
return backend.MakeBackend(local), nil
}

func (local *local) TotalMemoryConsume() int64 {
var memConsume int64 = 0
local.engines.Range(func(k, v interface{}) bool {
e := v.(*Engine)
if e != nil {
memConsume += e.TotalMemorySize()
}
return true
})
return memConsume + local.bufferPool.TotalSize()
}

func (local *local) checkMultiIngestSupport(ctx context.Context) error {
stores, err := local.pdCtl.GetPDClient().GetAllStores(ctx, pd.WithExcludeTombstone())
if err != nil {
Expand Down Expand Up @@ -1278,7 +1290,7 @@ WriteAndIngest:
err = local.writeAndIngestPairs(ctx, engine, region, pairStart, end, regionSplitSize, regionSplitKeys)
local.ingestConcurrency.Recycle(w)
if err != nil {
if !common.IsRetryableError(err) {
if !local.isRetryableImportTiKVError(err) {
return err
}
_, regionStart, _ := codec.DecodeBytes(region.Region.StartKey, []byte{})
Expand Down Expand Up @@ -1308,7 +1320,7 @@ const (
retryIngest
)

func (local *local) isRetryableTiKVWriteError(err error) bool {
func (local *local) isRetryableImportTiKVError(err error) bool {
err = errors.Cause(err)
// io.EOF is not retryable in normal case
// but on TiKV restart, if we're writing to TiKV(through GRPC)
Expand Down Expand Up @@ -1338,7 +1350,7 @@ loopWrite:
var rangeStats rangeStats
metas, finishedRange, rangeStats, err = local.WriteToTiKV(ctx, engine, region, start, end, regionSplitSize, regionSplitKeys)
if err != nil {
if !local.isRetryableTiKVWriteError(err) {
if !local.isRetryableImportTiKVError(err) {
return err
}

Expand Down Expand Up @@ -1481,7 +1493,7 @@ func (local *local) writeAndIngestByRanges(ctx context.Context, engine *Engine,
if err == nil || common.IsContextCanceledError(err) {
return
}
if !common.IsRetryableError(err) {
if !local.isRetryableImportTiKVError(err) {
break
}
log.FromContext(ctx).Warn("write and ingest by range failed",
Expand Down Expand Up @@ -1930,12 +1942,11 @@ func (local *local) isIngestRetryable(
}
return retryTy, newRegion, common.ErrKVEpochNotMatch.GenWithStack(errPb.GetMessage())
case strings.Contains(errPb.Message, "raft: proposal dropped"):
// TODO: we should change 'Raft raft: proposal dropped' to a error type like 'NotLeader'
newRegion, err = getRegion()
if err != nil {
return retryNone, nil, errors.Trace(err)
}
return retryWrite, newRegion, errors.New(errPb.GetMessage())
return retryWrite, newRegion, common.ErrKVRaftProposalDropped.GenWithStack(errPb.GetMessage())
case errPb.ServerIsBusy != nil:
return retryNone, nil, common.ErrKVServerIsBusy.GenWithStack(errPb.GetMessage())
case errPb.RegionNotFound != nil:
Expand All @@ -1952,8 +1963,13 @@ func (local *local) isIngestRetryable(
return retryNone, nil, errors.Trace(err)
}
return retryWrite, newRegion, common.ErrKVReadIndexNotReady.GenWithStack(errPb.GetMessage())
case errPb.DiskFull != nil:
return retryNone, nil, errors.Errorf("non-retryable error: %s", resp.GetError().GetMessage())
}
return retryNone, nil, errors.Errorf("non-retryable error: %s", resp.GetError().GetMessage())
// all others ingest error, such as stale command, etc. we'll retry it again from writeAndIngestByRange
// here we use a single named-error ErrKVIngestFailed to represent them all
// we can separate them later if it's needed
return retryNone, nil, common.ErrKVIngestFailed.GenWithStack(resp.GetError().GetMessage())
}

// return the smallest []byte that is bigger than current bytes.
Expand Down
31 changes: 24 additions & 7 deletions br/pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/pingcap/kvproto/pkg/errorpb"
sst "github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/kvproto/pkg/metapb"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/common"
Expand Down Expand Up @@ -509,11 +510,6 @@ func TestIsIngestRetryable(t *testing.T) {
require.Equal(t, retryWrite, retryType)
require.Error(t, err)

resp.Error = &errorpb.Error{Message: "unknown error"}
retryType, _, err = local.isIngestRetryable(ctx, resp, region, metas)
require.Equal(t, retryNone, retryType)
require.EqualError(t, err, "non-retryable error: unknown error")

resp.Error = &errorpb.Error{
ReadIndexNotReady: &errorpb.ReadIndexNotReady{
Reason: "test",
Expand All @@ -522,6 +518,27 @@ func TestIsIngestRetryable(t *testing.T) {
retryType, _, err = local.isIngestRetryable(ctx, resp, region, metas)
require.Equal(t, retryWrite, retryType)
require.Error(t, err)

resp.Error = &errorpb.Error{
Message: "raft: proposal dropped",
}
retryType, _, err = local.isIngestRetryable(ctx, resp, region, metas)
require.Equal(t, retryWrite, retryType)
require.True(t, berrors.Is(err, common.ErrKVRaftProposalDropped))

resp.Error = &errorpb.Error{
DiskFull: &errorpb.DiskFull{},
}
retryType, _, err = local.isIngestRetryable(ctx, resp, region, metas)
require.Equal(t, retryNone, retryType)
require.Contains(t, err.Error(), "non-retryable error")

resp.Error = &errorpb.Error{
StaleCommand: &errorpb.StaleCommand{},
}
retryType, _, err = local.isIngestRetryable(ctx, resp, region, metas)
require.Equal(t, retryNone, retryType)
require.True(t, berrors.Is(err, common.ErrKVIngestFailed))
}

type testIngester struct{}
Expand Down Expand Up @@ -1253,6 +1270,6 @@ func TestGetRegionSplitSizeKeys(t *testing.T) {

func TestLocalIsRetryableTiKVWriteError(t *testing.T) {
l := local{}
require.True(t, l.isRetryableTiKVWriteError(io.EOF))
require.True(t, l.isRetryableTiKVWriteError(errors.Trace(io.EOF)))
require.True(t, l.isRetryableImportTiKVError(io.EOF))
require.True(t, l.isRetryableImportTiKVError(errors.Trace(io.EOF)))
}
4 changes: 4 additions & 0 deletions br/pkg/lightning/backend/noop/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ func (b noopBackend) ResolveDuplicateRows(ctx context.Context, tbl table.Table,
return nil
}

func (b noopBackend) TotalMemoryConsume() int64 {
return 0
}

type noopEncoder struct{}

// Close the encoder.
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/lightning/backend/tidb/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,10 @@ rowLoop:
return nil
}

func (be *tidbBackend) TotalMemoryConsume() int64 {
return 0
}

type stmtTask struct {
rows tidbRows
stmt string
Expand Down
18 changes: 10 additions & 8 deletions br/pkg/lightning/common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,16 @@ var (
ErrCreatePDClient = errors.Normalize("create pd client error", errors.RFCCodeText("Lightning:PD:ErrCreatePDClient"))
ErrPauseGC = errors.Normalize("pause gc error", errors.RFCCodeText("Lightning:PD:ErrPauseGC"))

ErrCheckKVVersion = errors.Normalize("check tikv version error", errors.RFCCodeText("Lightning:KV:ErrCheckKVVersion"))
ErrCreateKVClient = errors.Normalize("create kv client error", errors.RFCCodeText("Lightning:KV:ErrCreateKVClient"))
ErrCheckMultiIngest = errors.Normalize("check multi-ingest support error", errors.RFCCodeText("Lightning:KV:ErrCheckMultiIngest"))
ErrKVEpochNotMatch = errors.Normalize("epoch not match", errors.RFCCodeText("Lightning:KV:EpochNotMatch"))
ErrKVNotLeader = errors.Normalize("not leader", errors.RFCCodeText("Lightning:KV:NotLeader"))
ErrKVServerIsBusy = errors.Normalize("server is busy", errors.RFCCodeText("Lightning:KV:ServerIsBusy"))
ErrKVRegionNotFound = errors.Normalize("region not found", errors.RFCCodeText("Lightning:KV:RegionNotFound"))
ErrKVReadIndexNotReady = errors.Normalize("read index not ready", errors.RFCCodeText("Lightning:KV:ReadIndexNotReady"))
ErrCheckKVVersion = errors.Normalize("check tikv version error", errors.RFCCodeText("Lightning:KV:ErrCheckKVVersion"))
ErrCreateKVClient = errors.Normalize("create kv client error", errors.RFCCodeText("Lightning:KV:ErrCreateKVClient"))
ErrCheckMultiIngest = errors.Normalize("check multi-ingest support error", errors.RFCCodeText("Lightning:KV:ErrCheckMultiIngest"))
ErrKVEpochNotMatch = errors.Normalize("epoch not match", errors.RFCCodeText("Lightning:KV:EpochNotMatch"))
ErrKVNotLeader = errors.Normalize("not leader", errors.RFCCodeText("Lightning:KV:NotLeader"))
ErrKVServerIsBusy = errors.Normalize("server is busy", errors.RFCCodeText("Lightning:KV:ServerIsBusy"))
ErrKVRegionNotFound = errors.Normalize("region not found", errors.RFCCodeText("Lightning:KV:RegionNotFound"))
ErrKVReadIndexNotReady = errors.Normalize("read index not ready", errors.RFCCodeText("Lightning:KV:ReadIndexNotReady"))
ErrKVIngestFailed = errors.Normalize("ingest tikv failed", errors.RFCCodeText("Lightning:KV:ErrKVIngestFailed"))
ErrKVRaftProposalDropped = errors.Normalize("raft proposal dropped", errors.RFCCodeText("Lightning:KV:ErrKVRaftProposalDropped"))

ErrUnknownBackend = errors.Normalize("unknown backend %s", errors.RFCCodeText("Lightning:Restore:ErrUnknownBackend"))
ErrCheckLocalFile = errors.Normalize("cannot find local file for table: %s engineDir: %s", errors.RFCCodeText("Lightning:Restore:ErrCheckLocalFile"))
Expand Down
4 changes: 3 additions & 1 deletion br/pkg/lightning/common/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ func isSingleRetryableError(err error) bool {
case *errors.Error:
switch {
case berrors.Is(nerr, ErrKVEpochNotMatch), berrors.Is(nerr, ErrKVNotLeader),
berrors.Is(nerr, ErrKVRegionNotFound), berrors.Is(nerr, ErrKVServerIsBusy):
berrors.Is(nerr, ErrKVRegionNotFound), berrors.Is(nerr, ErrKVServerIsBusy),
berrors.Is(nerr, ErrKVReadIndexNotReady), berrors.Is(nerr, ErrKVIngestFailed),
berrors.Is(nerr, ErrKVRaftProposalDropped):
// common.ErrKVServerIsBusy is a little duplication with tmysql.ErrTiKVServerBusy
// it's because the response of sst.ingest gives us a sst.IngestResponse which doesn't contain error code,
// so we have to transform it into a defined code
Expand Down
6 changes: 6 additions & 0 deletions br/pkg/lightning/common/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,16 @@ func TestIsRetryableError(t *testing.T) {
require.True(t, IsRetryableError(ErrKVEpochNotMatch))
require.True(t, IsRetryableError(ErrKVServerIsBusy))
require.True(t, IsRetryableError(ErrKVRegionNotFound))
require.True(t, IsRetryableError(ErrKVReadIndexNotReady))
require.True(t, IsRetryableError(ErrKVIngestFailed))
require.True(t, IsRetryableError(ErrKVRaftProposalDropped))
require.True(t, IsRetryableError(ErrKVNotLeader.GenWithStack("test")))
require.True(t, IsRetryableError(ErrKVEpochNotMatch.GenWithStack("test")))
require.True(t, IsRetryableError(ErrKVServerIsBusy.GenWithStack("test")))
require.True(t, IsRetryableError(ErrKVRegionNotFound.GenWithStack("test")))
require.True(t, IsRetryableError(ErrKVReadIndexNotReady.GenWithStack("test")))
require.True(t, IsRetryableError(ErrKVIngestFailed.GenWithStack("test")))
require.True(t, IsRetryableError(ErrKVRaftProposalDropped.GenWithStack("test")))

// net: connection refused
_, err := net.Dial("tcp", "localhost:65533")
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/membuf/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ func (p *Pool) Destroy() {
}
}

// TotalSize is the total memory size of this Pool.
func (p *Pool) TotalSize() int64 {
return int64(len(p.blockCache) * p.blockSize)
}

// Buffer represents the reuse buffer.
type Buffer struct {
pool *Pool
Expand Down
Loading

0 comments on commit 3b6487e

Please sign in to comment.