Skip to content

Commit

Permalink
ddl/ingest: add the management infra for lightning struct (#37521)
Browse files Browse the repository at this point in the history
ref #35983
  • Loading branch information
tangenta authored Sep 6, 2022
1 parent bf02d25 commit 878ac8e
Show file tree
Hide file tree
Showing 17 changed files with 1,165 additions and 44 deletions.
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,6 @@ type EngineWriter interface {
Close(ctx context.Context) (ChunkFlushStatus, error)
}

func (engine *OpenedEngine) GetEngineUuid() uuid.UUID {
func (engine *OpenedEngine) GetEngineUUID() uuid.UUID {
return engine.uuid
}
58 changes: 58 additions & 0 deletions ddl/ingest/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "ingest",
srcs = [
"backend.go",
"backend_mgr.go",
"config.go",
"disk_root.go",
"engine.go",
"engine_mgr.go",
"env.go",
"mem_root.go",
"message.go",
],
importpath = "github.com/pingcap/tidb/ddl/ingest",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/lightning/backend",
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/backend/local",
"//br/pkg/lightning/checkpoints",
"//br/pkg/lightning/common",
"//br/pkg/lightning/config",
"//br/pkg/lightning/errormanager",
"//br/pkg/lightning/glue",
"//br/pkg/lightning/log",
"//config",
"//kv",
"//parser",
"//parser/model",
"//parser/mysql",
"//sessionctx/variable",
"//table",
"//util/generic",
"//util/logutil",
"//util/mathutil",
"//util/size",
"@com_github_google_uuid//:uuid",
"@com_github_pingcap_errors//:errors",
"@com_github_pkg_errors//:errors",
"@org_uber_go_zap//:zap",
],
)

go_test(
name = "ingest_test",
srcs = [
"env_test.go",
"mem_root_test.go",
],
flaky = True,
deps = [
":ingest",
"//config",
"@com_github_stretchr_testify//require",
],
)
121 changes: 121 additions & 0 deletions ddl/ingest/backend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ingest

import (
"context"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/config"
tikv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)

// BackendContext store a backend info for add index reorg task.
type BackendContext struct {
jobID int64
backend *backend.Backend
ctx context.Context
cfg *config.Config
EngMgr engineManager
sysVars map[string]string
diskRoot DiskRoot
done bool
}

// FinishImport imports all the key-values in engine into the storage, collects the duplicate errors if any, and
// removes the engine from the backend context.
func (bc *BackendContext) FinishImport(indexID int64, unique bool, tbl table.Table) error {
ei, exist := bc.EngMgr.Load(indexID)
if !exist {
return errors.New(LitErrGetEngineFail)
}

err := ei.ImportAndClean()
if err != nil {
return err
}

// Check remote duplicate value for the index.
if unique {
hasDupe, err := bc.backend.CollectRemoteDuplicateRows(bc.ctx, tbl, tbl.Meta().Name.L, &kv.SessionOptions{
SQLMode: mysql.ModeStrictAllTables,
SysVars: bc.sysVars,
IndexID: ei.indexID,
})
if err != nil {
logutil.BgLogger().Error(LitInfoRemoteDupCheck, zap.Error(err),
zap.String("table", tbl.Meta().Name.O), zap.Int64("index ID", indexID))
return errors.New(LitInfoRemoteDupCheck)
} else if hasDupe {
logutil.BgLogger().Error(LitErrRemoteDupExistErr,
zap.String("table", tbl.Meta().Name.O), zap.Int64("index ID", indexID))
return tikv.ErrKeyExists
}
}
return nil
}

const importThreshold = 0.85

// Flush checks the disk quota and imports the current key-values in engine to the storage.
func (bc *BackendContext) Flush(indexID int64) error {
ei, exist := bc.EngMgr.Load(indexID)
if !exist {
logutil.BgLogger().Error(LitErrGetEngineFail, zap.Int64("index ID", indexID))
return errors.New(LitErrGetEngineFail)
}

err := bc.diskRoot.UpdateUsageAndQuota()
if err != nil {
logutil.BgLogger().Error(LitErrUpdateDiskStats, zap.Int64("index ID", indexID))
return err
}

if bc.diskRoot.CurrentUsage() >= uint64(importThreshold*float64(bc.diskRoot.MaxQuota())) {
// TODO: it should be changed according checkpoint solution.
// Flush writer cached data into local disk for engine first.
err := ei.Flush()
if err != nil {
return err
}
logutil.BgLogger().Info(LitInfoUnsafeImport, zap.Int64("index ID", indexID),
zap.Uint64("current disk usage", bc.diskRoot.CurrentUsage()),
zap.Uint64("max disk quota", bc.diskRoot.MaxQuota()))
err = bc.backend.UnsafeImportAndReset(bc.ctx, ei.uuid, int64(config.SplitRegionSize)*int64(config.MaxSplitRegionSizeRatio), int64(config.SplitRegionKeys))
if err != nil {
logutil.BgLogger().Error(LitErrIngestDataErr, zap.Int64("index ID", indexID),
zap.Error(err), zap.Uint64("current disk usage", bc.diskRoot.CurrentUsage()),
zap.Uint64("max disk quota", bc.diskRoot.MaxQuota()))
return err
}
}
return nil
}

// Done returns true if the lightning backfill is done.
func (bc *BackendContext) Done() bool {
return bc.done
}

// SetDone sets the done flag.
func (bc *BackendContext) SetDone() {
bc.done = true
}
193 changes: 193 additions & 0 deletions ddl/ingest/backend_mgr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ingest

import (
"context"
"database/sql"
"fmt"
"math"

"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/local"
"github.com/pingcap/tidb/br/pkg/lightning/checkpoints"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/errormanager"
"github.com/pingcap/tidb/br/pkg/lightning/glue"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/util/generic"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)

type backendCtxManager struct {
generic.SyncMap[int64, *BackendContext]
memRoot MemRoot
diskRoot DiskRoot
}

func (m *backendCtxManager) init(memRoot MemRoot, diskRoot DiskRoot) {
m.SyncMap = generic.NewSyncMap[int64, *BackendContext](10)
m.memRoot = memRoot
m.diskRoot = diskRoot
}

// Register creates a new backend and registers it to the backend context.
func (m *backendCtxManager) Register(ctx context.Context, unique bool, jobID int64, _ mysql.SQLMode) (*BackendContext, error) {
bc, exist := m.Load(jobID)
if !exist {
m.memRoot.RefreshConsumption()
ok := m.memRoot.CheckConsume(StructSizeBackendCtx)
if !ok {
return nil, genBackendAllocMemFailedErr(m.memRoot, jobID)
}
cfg, err := generateLightningConfig(m.memRoot, jobID, unique)
if err != nil {
logutil.BgLogger().Warn(LitWarnConfigError, zap.Int64("job ID", jobID), zap.Error(err))
return nil, err
}
bd, err := createLocalBackend(ctx, cfg, glueLit{})
if err != nil {
logutil.BgLogger().Error(LitErrCreateBackendFail, zap.Int64("job ID", jobID), zap.Error(err))
return nil, err
}

bcCtx := newBackendContext(ctx, jobID, &bd, cfg, defaultImportantVariables, m.memRoot, m.diskRoot)
m.Store(jobID, bcCtx)

m.memRoot.Consume(StructSizeBackendCtx)
logutil.BgLogger().Info(LitInfoCreateBackend, zap.Int64("job ID", jobID),
zap.Int64("current memory usage", m.memRoot.CurrentUsage()),
zap.Int64("max memory quota", m.memRoot.MaxMemoryQuota()),
zap.Bool("is unique index", unique))
return bcCtx, nil
}
return bc, nil
}

func createLocalBackend(ctx context.Context, cfg *config.Config, glue glue.Glue) (backend.Backend, error) {
tls, err := cfg.ToTLS()
if err != nil {
logutil.BgLogger().Error(LitErrCreateBackendFail, zap.Error(err))
return backend.Backend{}, err
}

errorMgr := errormanager.New(nil, cfg, log.Logger{Logger: logutil.BgLogger()})
return local.NewLocalBackend(ctx, tls, cfg, glue, int(LitRLimit), errorMgr)
}

func newBackendContext(ctx context.Context, jobID int64, be *backend.Backend,
cfg *config.Config, vars map[string]string, memRoot MemRoot, diskRoot DiskRoot) *BackendContext {
bc := &BackendContext{
jobID: jobID,
backend: be,
ctx: ctx,
cfg: cfg,
sysVars: vars,
diskRoot: diskRoot,
}
bc.EngMgr.init(memRoot, diskRoot)
return bc
}

// Unregister removes a backend context from the backend context manager.
func (m *backendCtxManager) Unregister(jobID int64) {
bc, exist := m.Load(jobID)
if !exist {
return
}
bc.EngMgr.UnregisterAll(jobID)
bc.backend.Close()
m.memRoot.Release(StructSizeBackendCtx)
m.Delete(jobID)
m.memRoot.ReleaseWithTag(encodeBackendTag(jobID))
logutil.BgLogger().Info(LitInfoCloseBackend, zap.Int64("job ID", jobID),
zap.Int64("current memory usage", m.memRoot.CurrentUsage()),
zap.Int64("max memory quota", m.memRoot.MaxMemoryQuota()))
}

// TotalDiskUsage returns the total disk usage of all backends.
func (m *backendCtxManager) TotalDiskUsage() uint64 {
var totalDiskUsed uint64
for _, key := range m.Keys() {
bc, exists := m.Load(key)
if exists {
_, _, bcDiskUsed, _ := bc.backend.CheckDiskQuota(math.MaxInt64)
totalDiskUsed += uint64(bcDiskUsed)
}
}
return totalDiskUsed
}

// UpdateMemoryUsage collects the memory usages from all the backend and updates it to the memRoot.
func (m *backendCtxManager) UpdateMemoryUsage() {
for _, key := range m.Keys() {
bc, exists := m.Load(key)
if exists {
curSize := bc.backend.TotalMemoryConsume()
m.memRoot.ReleaseWithTag(encodeBackendTag(bc.jobID))
m.memRoot.ConsumeWithTag(encodeBackendTag(bc.jobID), curSize)
}
}
}

// glueLit is used as a placeholder for the local backend initialization.
type glueLit struct{}

// OwnsSQLExecutor Implement interface OwnsSQLExecutor.
func (glueLit) OwnsSQLExecutor() bool {
return false
}

// GetSQLExecutor Implement interface GetSQLExecutor.
func (glueLit) GetSQLExecutor() glue.SQLExecutor {
return nil
}

// GetDB Implement interface GetDB.
func (glueLit) GetDB() (*sql.DB, error) {
return nil, nil
}

// GetParser Implement interface GetParser.
func (glueLit) GetParser() *parser.Parser {
return nil
}

// GetTables Implement interface GetTables.
func (glueLit) GetTables(context.Context, string) ([]*model.TableInfo, error) {
return nil, nil
}

// GetSession Implement interface GetSession.
func (glueLit) GetSession(context.Context) (checkpoints.Session, error) {
return nil, nil
}

// OpenCheckpointsDB Implement interface OpenCheckpointsDB.
func (glueLit) OpenCheckpointsDB(context.Context, *config.Config) (checkpoints.DB, error) {
return nil, nil
}

// Record is used to report some information (key, value) to host TiDB, including progress, stage currently.
func (glueLit) Record(string, uint64) {
}

func encodeBackendTag(jobID int64) string {
return fmt.Sprintf("%d", jobID)
}
Loading

0 comments on commit 878ac8e

Please sign in to comment.