Skip to content

Commit

Permalink
import into: precheck and register to pd (#44313)
Browse files Browse the repository at this point in the history
ref #42930
  • Loading branch information
D3Hunter authored Jun 7, 2023
1 parent 6bab55c commit 20a442f
Show file tree
Hide file tree
Showing 15 changed files with 521 additions and 114 deletions.
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -360,10 +360,12 @@ br_compatibility_test:
mock_s3iface:
@mockgen -package mock github.com/aws/aws-sdk-go/service/s3/s3iface S3API > br/pkg/mock/s3iface.go

# mock interface for lightning and IMPORT INTO
mock_lightning:
@mockgen -package mock github.com/pingcap/tidb/br/pkg/lightning/backend Backend,EngineWriter,TargetInfoGetter,ChunkFlushStatus > br/pkg/mock/backend.go
@mockgen -package mock github.com/pingcap/tidb/br/pkg/lightning/backend/encode Encoder,EncodingBuilder,Rows,Row > br/pkg/mock/encode.go
@mockgen -package mocklocal github.com/pingcap/tidb/br/pkg/lightning/backend/local DiskUsage,TiKVModeSwitcher > br/pkg/mock/mocklocal/local.go
@mockgen -package mock github.com/pingcap/tidb/br/pkg/utils TaskRegister > br/pkg/mock/task_register.go

# There is no FreeBSD environment for GitHub actions. So cross-compile on Linux
# but that doesn't work with CGO_ENABLED=1, so disable cgo. The reason to have
Expand Down
1 change: 1 addition & 0 deletions br/pkg/mock/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ go_library(
"importer.go",
"mock_cluster.go",
"s3iface.go",
"task_register.go",
],
importpath = "github.com/pingcap/tidb/br/pkg/mock",
visibility = ["//visibility:public"],
Expand Down
77 changes: 77 additions & 0 deletions br/pkg/mock/task_register.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion br/pkg/utils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ go_test(
],
embed = [":utils"],
flaky = True,
shard_count = 29,
shard_count = 30,
deps = [
"//br/pkg/errors",
"//br/pkg/metautil",
Expand Down
77 changes: 64 additions & 13 deletions br/pkg/utils/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type RegisterTaskType int
const (
RegisterRestore RegisterTaskType = iota
RegisterLightning
RegisterImportInto
)

func (tp RegisterTaskType) String() string {
Expand All @@ -42,20 +43,40 @@ func (tp RegisterTaskType) String() string {
return "restore"
case RegisterLightning:
return "lightning"
case RegisterImportInto:
return "import-into"
}
return "default"
}

// The key format should be {RegisterImportTaskPrefix}/{RegisterTaskType}/{taskName}
const (
// RegisterImportTaskPrefix is the prefix of the key for task register
// todo: remove "/import" suffix, it's confusing to have a key like "/tidb/brie/import/restore/restore-xxx"
RegisterImportTaskPrefix = "/tidb/brie/import"

RegisterRetryInternal = 10 * time.Second
defaultTaskRegisterTTL = 3 * time.Minute // 3 minutes
)

// TaskRegister can register the task to PD with a lease, and keepalive it in the background
type TaskRegister struct {
// TaskRegister can register the task to PD with a lease.
type TaskRegister interface {
// Close closes the background task if using RegisterTask
// and revoke the lease.
// NOTE: we don't close the etcd client here, call should do it.
Close(ctx context.Context) (err error)
// RegisterTask firstly put its key to PD with a lease,
// and start to keepalive the lease in the background.
// DO NOT mix calls to RegisterTask and RegisterTaskOnce.
RegisterTask(c context.Context) error
// RegisterTaskOnce put its key to PD with a lease if the key does not exist,
// else we refresh the lease.
// you have to call this method periodically to keep the lease alive.
// DO NOT mix calls to RegisterTask and RegisterTaskOnce.
RegisterTaskOnce(ctx context.Context) error
}

type taskRegister struct {
client *clientv3.Client
ttl time.Duration
secondTTL int64
Expand All @@ -68,8 +89,8 @@ type TaskRegister struct {
}

// NewTaskRegisterWithTTL build a TaskRegister with key format {RegisterTaskPrefix}/{RegisterTaskType}/{taskName}
func NewTaskRegisterWithTTL(client *clientv3.Client, ttl time.Duration, tp RegisterTaskType, taskName string) *TaskRegister {
return &TaskRegister{
func NewTaskRegisterWithTTL(client *clientv3.Client, ttl time.Duration, tp RegisterTaskType, taskName string) TaskRegister {
return &taskRegister{
client: client,
ttl: ttl,
secondTTL: int64(ttl / time.Second),
Expand All @@ -80,13 +101,16 @@ func NewTaskRegisterWithTTL(client *clientv3.Client, ttl time.Duration, tp Regis
}

// NewTaskRegister build a TaskRegister with key format {RegisterTaskPrefix}/{RegisterTaskType}/{taskName}
func NewTaskRegister(client *clientv3.Client, tp RegisterTaskType, taskName string) *TaskRegister {
func NewTaskRegister(client *clientv3.Client, tp RegisterTaskType, taskName string) TaskRegister {
return NewTaskRegisterWithTTL(client, defaultTaskRegisterTTL, tp, taskName)
}

// Close closes the background task of taskRegister
func (tr *TaskRegister) Close(ctx context.Context) (err error) {
tr.cancel()
// Close implements the TaskRegister interface
func (tr *taskRegister) Close(ctx context.Context) (err error) {
// not needed if using RegisterTaskOnce
if tr.cancel != nil {
tr.cancel()
}
tr.wg.Wait()
if tr.curLeaseID != clientv3.NoLease {
_, err = tr.client.Lease.Revoke(ctx, tr.curLeaseID)
Expand All @@ -97,7 +121,7 @@ func (tr *TaskRegister) Close(ctx context.Context) (err error) {
return err
}

func (tr *TaskRegister) grant(ctx context.Context) (*clientv3.LeaseGrantResponse, error) {
func (tr *taskRegister) grant(ctx context.Context) (*clientv3.LeaseGrantResponse, error) {
lease, err := tr.client.Lease.Grant(ctx, tr.secondTTL)
if err != nil {
return nil, err
Expand All @@ -108,9 +132,36 @@ func (tr *TaskRegister) grant(ctx context.Context) (*clientv3.LeaseGrantResponse
return lease, nil
}

// RegisterTask firstly put its key to PD with a lease,
// and start to keepalive the lease in the background.
func (tr *TaskRegister) RegisterTask(c context.Context) error {
// RegisterTaskOnce implements the TaskRegister interface
func (tr *taskRegister) RegisterTaskOnce(ctx context.Context) error {
resp, err := tr.client.Get(ctx, tr.key)
if err != nil {
return errors.Trace(err)
}
if len(resp.Kvs) == 0 {
lease, err2 := tr.grant(ctx)
if err2 != nil {
return errors.Annotatef(err2, "failed grant a lease")
}
tr.curLeaseID = lease.ID
_, err2 = tr.client.KV.Put(ctx, tr.key, "", clientv3.WithLease(lease.ID))
if err2 != nil {
return errors.Trace(err2)
}
} else {
// if the task is run distributively, like IMPORT INTO, we should refresh the lease ID,
// in case the owner changed during the registration, and the new owner create the key.
tr.curLeaseID = clientv3.LeaseID(resp.Kvs[0].Lease)
_, err2 := tr.client.Lease.KeepAliveOnce(ctx, tr.curLeaseID)
if err2 != nil {
return errors.Trace(err2)
}
}
return nil
}

// RegisterTask implements the TaskRegister interface
func (tr *taskRegister) RegisterTask(c context.Context) error {
cctx, cancel := context.WithCancel(c)
tr.cancel = cancel
lease, err := tr.grant(cctx)
Expand All @@ -133,7 +184,7 @@ func (tr *TaskRegister) RegisterTask(c context.Context) error {
return nil
}

func (tr *TaskRegister) keepaliveLoop(ctx context.Context, ch <-chan *clientv3.LeaseKeepAliveResponse) {
func (tr *taskRegister) keepaliveLoop(ctx context.Context, ch <-chan *clientv3.LeaseKeepAliveResponse) {
defer tr.wg.Done()
const minTimeLeftThreshold time.Duration = 20 * time.Second
var (
Expand Down
35 changes: 35 additions & 0 deletions br/pkg/utils/register_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,41 @@ func TestTaskRegister(t *testing.T) {
require.NoError(t, register.Close(ctx))
}

func TestTaskRegisterOnce(t *testing.T) {
integration.BeforeTestExternal(t)
testEtcdCluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer testEtcdCluster.Terminate(t)

// should not close the client manually, the test will fail, since Terminate will close it too.
client := testEtcdCluster.RandClient()

ctx := context.Background()
register := NewTaskRegisterWithTTL(client, 10*time.Second, RegisterImportInto, "test")
defer register.Close(ctx)
err := register.RegisterTaskOnce(ctx)
require.NoError(t, err)

// sleep 3 seconds to make sure the lease TTL is smaller.
time.Sleep(3 * time.Second)
list, err := GetImportTasksFrom(ctx, client)
require.NoError(t, err)
require.Len(t, list.Tasks, 1)
currTask := list.Tasks[0]
t.Log(currTask.MessageToUser())
require.Equal(t, "/tidb/brie/import/import-into/test", currTask.Key)

// then register again, this time will only refresh the lease, and left TTL will be larger.
err = register.RegisterTaskOnce(ctx)
require.NoError(t, err)
list, err = GetImportTasksFrom(ctx, client)
require.NoError(t, err)
require.Len(t, list.Tasks, 1)
thisTask := list.Tasks[0]
require.Equal(t, currTask.Key, thisTask.Key)
require.Equal(t, currTask.LeaseID, thisTask.LeaseID)
require.Greater(t, thisTask.TTL, currTask.TTL)
}

func TestTaskRegisterFailedGrant(t *testing.T) {
integration.BeforeTestExternal(t)
testEtcdCluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, GRPCKeepAliveInterval: time.Second, GRPCKeepAliveTimeout: 10 * time.Second})
Expand Down
2 changes: 2 additions & 0 deletions disttask/loaddata/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"//br/pkg/lightning/config",
"//br/pkg/lightning/mydump",
"//br/pkg/lightning/verification",
"//br/pkg/utils",
"//disttask/framework/dispatcher",
"//disttask/framework/handle",
"//disttask/framework/proto",
Expand All @@ -32,6 +33,7 @@ go_library(
"//parser/mysql",
"//sessionctx",
"//table/tables",
"//util/etcd",
"//util/logutil",
"//util/sqlexec",
"@com_github_go_sql_driver_mysql//:mysql",
Expand Down
Loading

0 comments on commit 20a442f

Please sign in to comment.