Skip to content

Commit

Permalink
*: support mysql compatible auto_increment, the client side changes (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao authored Nov 4, 2022
1 parent f9c7bbc commit 12d9644
Show file tree
Hide file tree
Showing 9 changed files with 339 additions and 11 deletions.
25 changes: 24 additions & 1 deletion ddl/schematracker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package schematracker
import (
"bytes"
"context"
"crypto/tls"
"fmt"
"strings"
"time"
Expand Down Expand Up @@ -553,15 +554,37 @@ func (d Checker) MoveJobFromTable2Queue() error {
// StorageDDLInjector wraps kv.Storage to inject checker to domain's DDL in bootstrap time.
type StorageDDLInjector struct {
kv.Storage
kv.EtcdBackend
Injector func(ddl.DDL) *Checker
}

var _ kv.EtcdBackend = StorageDDLInjector{}

// EtcdAddrs implements the kv.EtcdBackend interface.
func (s StorageDDLInjector) EtcdAddrs() ([]string, error) {
return s.EtcdBackend.EtcdAddrs()
}

// TLSConfig implements the kv.EtcdBackend interface.
func (s StorageDDLInjector) TLSConfig() *tls.Config {
return s.EtcdBackend.TLSConfig()
}

// StartGCWorker implements the kv.EtcdBackend interface.
func (s StorageDDLInjector) StartGCWorker() error {
return s.EtcdBackend.StartGCWorker()
}

// NewStorageDDLInjector creates a new StorageDDLInjector to inject Checker.
func NewStorageDDLInjector(s kv.Storage) kv.Storage {
return StorageDDLInjector{
ret := StorageDDLInjector{
Storage: s,
Injector: NewChecker,
}
if ebd, ok := s.(kv.EtcdBackend); ok {
ret.EtcdBackend = ebd
}
return ret
}

// UnwrapStorage unwraps StorageDDLInjector for one level.
Expand Down
3 changes: 2 additions & 1 deletion infoschema/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,8 @@ func (b *Builder) applyCreateTable(m *meta.Meta, dbInfo *model.DBInfo, tableID i
tblVer := autoid.AllocOptionTableInfoVersion(tblInfo.Version)
switch tp {
case model.ActionRebaseAutoID, model.ActionModifyTableAutoIdCache:
newAlloc := autoid.NewAllocator(b.store, dbInfo.ID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(), autoid.RowIDAllocType, tblVer)
idCacheOpt := autoid.CustomAutoIncCacheOption(tblInfo.AutoIdCache)
newAlloc := autoid.NewAllocator(b.store, dbInfo.ID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(), autoid.RowIDAllocType, tblVer, idCacheOpt)
allocs = append(allocs, newAlloc)
case model.ActionRebaseAutoRandomBase:
newAlloc := autoid.NewAllocator(b.store, dbInfo.ID, tblInfo.ID, tblInfo.IsAutoRandomBitColUnsigned(), autoid.AutoRandomType, tblVer)
Expand Down
6 changes: 6 additions & 0 deletions meta/autoid/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ go_library(
name = "autoid",
srcs = [
"autoid.go",
"autoid_service.go",
"errors.go",
"memid.go",
],
importpath = "github.com/pingcap/tidb/meta/autoid",
visibility = ["//visibility:public"],
deps = [
"//autoid_service",
"//errno",
"//kv",
"//meta",
Expand All @@ -24,8 +26,12 @@ go_library(
"@com_github_opentracing_opentracing_go//:opentracing-go",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/autoid",
"@com_github_tikv_client_go_v2//txnkv/txnsnapshot",
"@com_github_tikv_client_go_v2//util",
"@io_etcd_go_etcd_client_v3//:client",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//credentials/insecure",
"@org_uber_go_zap//:zap",
],
)
Expand Down
52 changes: 52 additions & 0 deletions meta/autoid/autoid.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
autoid "github.com/pingcap/tidb/autoid_service"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/metrics"
Expand All @@ -38,6 +39,7 @@ import (
"github.com/pingcap/tidb/util/mathutil"
"github.com/tikv/client-go/v2/txnkv/txnsnapshot"
tikvutil "github.com/tikv/client-go/v2/util"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -533,6 +535,47 @@ func NextStep(curStep int64, consumeDur time.Duration) int64 {
return res
}

func newSinglePointAlloc(store kv.Storage, dbID, tblID int64, isUnsigned bool) *singlePointAlloc {
ebd, ok := store.(kv.EtcdBackend)
if !ok {
// newSinglePointAlloc fail because not etcd background
// This could happen in the server package unit test
return nil
}

addrs, err := ebd.EtcdAddrs()
if err != nil {
panic(err)
}
spa := &singlePointAlloc{
dbID: dbID,
tblID: tblID,
isUnsigned: isUnsigned,
}
if len(addrs) > 0 {
etcdCli, err := clientv3.New(clientv3.Config{
Endpoints: addrs,
TLS: ebd.TLSConfig(),
})
if err != nil {
logutil.BgLogger().Error("[autoid client] fail to connect etcd, fallback to default", zap.Error(err))
return nil
}
spa.clientDiscover = clientDiscover{etcdCli: etcdCli}
} else {
spa.clientDiscover = clientDiscover{}
spa.mu.AutoIDAllocClient = autoid.MockForTest(store)
}

// mockAutoIDChange failpoint is not implemented in this allocator, so fallback to use the default one.
failpoint.Inject("mockAutoIDChange", func(val failpoint.Value) {
if val.(bool) {
spa = nil
}
})
return spa
}

// NewAllocator returns a new auto increment id generator on the store.
func NewAllocator(store kv.Storage, dbID, tbID int64, isUnsigned bool,
allocType AllocatorType, opts ...AllocOption) Allocator {
Expand All @@ -548,6 +591,15 @@ func NewAllocator(store kv.Storage, dbID, tbID int64, isUnsigned bool,
for _, fn := range opts {
fn.ApplyOn(alloc)
}

// Use the MySQL compatible AUTO_INCREMENT mode.
if allocType == RowIDAllocType && alloc.customStep && alloc.step == 1 {
alloc1 := newSinglePointAlloc(store, dbID, tbID, isUnsigned)
if alloc1 != nil {
return alloc1
}
}

return alloc
}

Expand Down
Loading

0 comments on commit 12d9644

Please sign in to comment.