From 12d96441449322614fef28fc36fc53fdcdbf9eeb Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Fri, 4 Nov 2022 09:36:00 +0800 Subject: [PATCH] *: support mysql compatible auto_increment, the client side changes (#38809) --- ddl/schematracker/checker.go | 25 +++- infoschema/builder.go | 3 +- meta/autoid/BUILD.bazel | 6 + meta/autoid/autoid.go | 52 ++++++++ meta/autoid/autoid_service.go | 236 ++++++++++++++++++++++++++++++++++ metrics/metrics.go | 1 + metrics/session.go | 9 ++ server/server_test.go | 16 +-- server/tidb_test.go | 2 +- 9 files changed, 339 insertions(+), 11 deletions(-) create mode 100644 meta/autoid/autoid_service.go diff --git a/ddl/schematracker/checker.go b/ddl/schematracker/checker.go index 3a09db2f19c36..b1533d0246fb1 100644 --- a/ddl/schematracker/checker.go +++ b/ddl/schematracker/checker.go @@ -17,6 +17,7 @@ package schematracker import ( "bytes" "context" + "crypto/tls" "fmt" "strings" "time" @@ -553,15 +554,37 @@ func (d Checker) MoveJobFromTable2Queue() error { // StorageDDLInjector wraps kv.Storage to inject checker to domain's DDL in bootstrap time. type StorageDDLInjector struct { kv.Storage + kv.EtcdBackend Injector func(ddl.DDL) *Checker } +var _ kv.EtcdBackend = StorageDDLInjector{} + +// EtcdAddrs implements the kv.EtcdBackend interface. +func (s StorageDDLInjector) EtcdAddrs() ([]string, error) { + return s.EtcdBackend.EtcdAddrs() +} + +// TLSConfig implements the kv.EtcdBackend interface. +func (s StorageDDLInjector) TLSConfig() *tls.Config { + return s.EtcdBackend.TLSConfig() +} + +// StartGCWorker implements the kv.EtcdBackend interface. +func (s StorageDDLInjector) StartGCWorker() error { + return s.EtcdBackend.StartGCWorker() +} + // NewStorageDDLInjector creates a new StorageDDLInjector to inject Checker. func NewStorageDDLInjector(s kv.Storage) kv.Storage { - return StorageDDLInjector{ + ret := StorageDDLInjector{ Storage: s, Injector: NewChecker, } + if ebd, ok := s.(kv.EtcdBackend); ok { + ret.EtcdBackend = ebd + } + return ret } // UnwrapStorage unwraps StorageDDLInjector for one level. diff --git a/infoschema/builder.go b/infoschema/builder.go index 8bc907516c0d3..8ff50a8b50435 100644 --- a/infoschema/builder.go +++ b/infoschema/builder.go @@ -699,7 +699,8 @@ func (b *Builder) applyCreateTable(m *meta.Meta, dbInfo *model.DBInfo, tableID i tblVer := autoid.AllocOptionTableInfoVersion(tblInfo.Version) switch tp { case model.ActionRebaseAutoID, model.ActionModifyTableAutoIdCache: - newAlloc := autoid.NewAllocator(b.store, dbInfo.ID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(), autoid.RowIDAllocType, tblVer) + idCacheOpt := autoid.CustomAutoIncCacheOption(tblInfo.AutoIdCache) + newAlloc := autoid.NewAllocator(b.store, dbInfo.ID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(), autoid.RowIDAllocType, tblVer, idCacheOpt) allocs = append(allocs, newAlloc) case model.ActionRebaseAutoRandomBase: newAlloc := autoid.NewAllocator(b.store, dbInfo.ID, tblInfo.ID, tblInfo.IsAutoRandomBitColUnsigned(), autoid.AutoRandomType, tblVer) diff --git a/meta/autoid/BUILD.bazel b/meta/autoid/BUILD.bazel index f5bb574907665..7490d65691e4c 100644 --- a/meta/autoid/BUILD.bazel +++ b/meta/autoid/BUILD.bazel @@ -4,12 +4,14 @@ go_library( name = "autoid", srcs = [ "autoid.go", + "autoid_service.go", "errors.go", "memid.go", ], importpath = "github.com/pingcap/tidb/meta/autoid", visibility = ["//visibility:public"], deps = [ + "//autoid_service", "//errno", "//kv", "//meta", @@ -24,8 +26,12 @@ go_library( "@com_github_opentracing_opentracing_go//:opentracing-go", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", + "@com_github_pingcap_kvproto//pkg/autoid", "@com_github_tikv_client_go_v2//txnkv/txnsnapshot", "@com_github_tikv_client_go_v2//util", + "@io_etcd_go_etcd_client_v3//:client", + "@org_golang_google_grpc//:grpc", + "@org_golang_google_grpc//credentials/insecure", "@org_uber_go_zap//:zap", ], ) diff --git a/meta/autoid/autoid.go b/meta/autoid/autoid.go index e20c0ed898f13..def3245bb2da3 100644 --- a/meta/autoid/autoid.go +++ b/meta/autoid/autoid.go @@ -26,6 +26,7 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/failpoint" + autoid "github.com/pingcap/tidb/autoid_service" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/metrics" @@ -38,6 +39,7 @@ import ( "github.com/pingcap/tidb/util/mathutil" "github.com/tikv/client-go/v2/txnkv/txnsnapshot" tikvutil "github.com/tikv/client-go/v2/util" + clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" ) @@ -533,6 +535,47 @@ func NextStep(curStep int64, consumeDur time.Duration) int64 { return res } +func newSinglePointAlloc(store kv.Storage, dbID, tblID int64, isUnsigned bool) *singlePointAlloc { + ebd, ok := store.(kv.EtcdBackend) + if !ok { + // newSinglePointAlloc fail because not etcd background + // This could happen in the server package unit test + return nil + } + + addrs, err := ebd.EtcdAddrs() + if err != nil { + panic(err) + } + spa := &singlePointAlloc{ + dbID: dbID, + tblID: tblID, + isUnsigned: isUnsigned, + } + if len(addrs) > 0 { + etcdCli, err := clientv3.New(clientv3.Config{ + Endpoints: addrs, + TLS: ebd.TLSConfig(), + }) + if err != nil { + logutil.BgLogger().Error("[autoid client] fail to connect etcd, fallback to default", zap.Error(err)) + return nil + } + spa.clientDiscover = clientDiscover{etcdCli: etcdCli} + } else { + spa.clientDiscover = clientDiscover{} + spa.mu.AutoIDAllocClient = autoid.MockForTest(store) + } + + // mockAutoIDChange failpoint is not implemented in this allocator, so fallback to use the default one. + failpoint.Inject("mockAutoIDChange", func(val failpoint.Value) { + if val.(bool) { + spa = nil + } + }) + return spa +} + // NewAllocator returns a new auto increment id generator on the store. func NewAllocator(store kv.Storage, dbID, tbID int64, isUnsigned bool, allocType AllocatorType, opts ...AllocOption) Allocator { @@ -548,6 +591,15 @@ func NewAllocator(store kv.Storage, dbID, tbID int64, isUnsigned bool, for _, fn := range opts { fn.ApplyOn(alloc) } + + // Use the MySQL compatible AUTO_INCREMENT mode. + if allocType == RowIDAllocType && alloc.customStep && alloc.step == 1 { + alloc1 := newSinglePointAlloc(store, dbID, tbID, isUnsigned) + if alloc1 != nil { + return alloc1 + } + } + return alloc } diff --git a/meta/autoid/autoid_service.go b/meta/autoid/autoid_service.go new file mode 100644 index 0000000000000..6133dfdfc3cb2 --- /dev/null +++ b/meta/autoid/autoid_service.go @@ -0,0 +1,236 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package autoid + +import ( + "context" + "strings" + "sync" + "time" + + "github.com/opentracing/opentracing-go" + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/autoid" + "github.com/pingcap/tidb/metrics" + "github.com/pingcap/tidb/util/logutil" + clientv3 "go.etcd.io/etcd/client/v3" + "go.uber.org/zap" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +var _ Allocator = &singlePointAlloc{} + +type singlePointAlloc struct { + dbID int64 + tblID int64 + lastAllocated int64 + isUnsigned bool + clientDiscover +} + +type clientDiscover struct { + // This the etcd client for service discover + etcdCli *clientv3.Client + // This is the real client for the AutoIDAlloc service + mu struct { + sync.RWMutex + autoid.AutoIDAllocClient + // Release the client conn to avoid resource leak! + // See https://github.com/grpc/grpc-go/issues/5321 + *grpc.ClientConn + } +} + +const ( + autoIDLeaderPath = "tidb/autoid/leader" +) + +func (d *clientDiscover) GetClient(ctx context.Context) (autoid.AutoIDAllocClient, error) { + d.mu.RLock() + cli := d.mu.AutoIDAllocClient + if cli != nil { + d.mu.RUnlock() + return cli, nil + } + d.mu.RUnlock() + + d.mu.Lock() + defer d.mu.Unlock() + if d.mu.AutoIDAllocClient != nil { + return d.mu.AutoIDAllocClient, nil + } + + resp, err := d.etcdCli.Get(ctx, autoIDLeaderPath, clientv3.WithFirstCreate()...) + if err != nil { + return nil, errors.Trace(err) + } + + if len(resp.Kvs) == 0 { + return nil, errors.New("autoid service leader not found") + } + + addr := string(resp.Kvs[0].Value) + grpcConn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return nil, errors.Trace(err) + } + cli = autoid.NewAutoIDAllocClient(grpcConn) + d.mu.AutoIDAllocClient = cli + d.mu.ClientConn = grpcConn + return cli, nil +} + +// Alloc allocs N consecutive autoID for table with tableID, returning (min, max] of the allocated autoID batch. +// The consecutive feature is used to insert multiple rows in a statement. +// increment & offset is used to validate the start position (the allocator's base is not always the last allocated id). +// The returned range is (min, max]: +// case increment=1 & offset=1: you can derive the ids like min+1, min+2... max. +// case increment=x & offset=y: you firstly need to seek to firstID by `SeekToFirstAutoIDXXX`, then derive the IDs like firstID, firstID + increment * 2... in the caller. +func (sp *singlePointAlloc) Alloc(ctx context.Context, n uint64, increment, offset int64) (min int64, max int64, _ error) { + if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("autoid.Alloc", opentracing.ChildOf(span.Context())) + defer span1.Finish() + ctx = opentracing.ContextWithSpan(ctx, span1) + } + + if !validIncrementAndOffset(increment, offset) { + return 0, 0, errInvalidIncrementAndOffset.GenWithStackByArgs(increment, offset) + } + +retry: + cli, err := sp.GetClient(ctx) + if err != nil { + return 0, 0, errors.Trace(err) + } + + start := time.Now() + resp, err := cli.AllocAutoID(ctx, &autoid.AutoIDRequest{ + DbID: sp.dbID, + TblID: sp.tblID, + N: n, + Increment: increment, + Offset: offset, + IsUnsigned: sp.isUnsigned, + }) + if err != nil { + if strings.Contains(err.Error(), "rpc error") { + time.Sleep(backoffDuration) + sp.resetConn() + goto retry + } + return 0, 0, errors.Trace(err) + } + + du := time.Since(start) + metrics.AutoIDReqDuration.Observe(du.Seconds()) + sp.lastAllocated = resp.Min + return resp.Min, resp.Max, err +} + +const backoffDuration = 200 * time.Millisecond + +func (sp *singlePointAlloc) resetConn() { + var grpcConn *grpc.ClientConn + sp.mu.Lock() + grpcConn = sp.mu.ClientConn + sp.mu.AutoIDAllocClient = nil + sp.mu.ClientConn = nil + sp.mu.Unlock() + // Close grpc.ClientConn to release resource. + if grpcConn != nil { + err := grpcConn.Close() + logutil.BgLogger().Info("[autoid client] AllocAutoID grpc error, reconnect", zap.Error(err)) + } +} + +// AllocSeqCache allocs sequence batch value cached in table level(rather than in alloc), the returned range covering +// the size of sequence cache with it's increment. The returned round indicates the sequence cycle times if it is with +// cycle option. +func (*singlePointAlloc) AllocSeqCache() (a int64, b int64, c int64, err error) { + return 0, 0, 0, errors.New("AllocSeqCache not implemented") +} + +// Rebase rebases the autoID base for table with tableID and the new base value. +// If allocIDs is true, it will allocate some IDs and save to the cache. +// If allocIDs is false, it will not allocate IDs. +func (sp *singlePointAlloc) Rebase(ctx context.Context, newBase int64, _ bool) error { + if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("autoid.Rebase", opentracing.ChildOf(span.Context())) + defer span1.Finish() + ctx = opentracing.ContextWithSpan(ctx, span1) + } + + return sp.rebase(ctx, newBase, false) +} + +func (sp *singlePointAlloc) rebase(ctx context.Context, newBase int64, force bool) error { +retry: + cli, err := sp.GetClient(ctx) + if err != nil { + return errors.Trace(err) + } + _, err = cli.Rebase(ctx, &autoid.RebaseRequest{ + DbID: sp.dbID, + TblID: sp.tblID, + Base: newBase, + Force: force, + IsUnsigned: sp.isUnsigned, + }) + if err != nil { + if strings.Contains(err.Error(), "rpc error") { + time.Sleep(backoffDuration) + sp.resetConn() + goto retry + } + return errors.Trace(err) + } + sp.lastAllocated = newBase + return err +} + +// ForceRebase set the next global auto ID to newBase. +func (sp *singlePointAlloc) ForceRebase(newBase int64) error { + if newBase == -1 { + return ErrAutoincReadFailed.GenWithStack("Cannot force rebase the next global ID to '0'") + } + return sp.rebase(context.Background(), newBase, true) +} + +// RebaseSeq rebases the sequence value in number axis with tableID and the new base value. +func (*singlePointAlloc) RebaseSeq(_ int64) (int64, bool, error) { + return 0, false, errors.New("RebaseSeq not implemented") +} + +// Base return the current base of Allocator. +func (sp *singlePointAlloc) Base() int64 { + return sp.lastAllocated +} + +// End is only used for test. +func (sp *singlePointAlloc) End() int64 { + return sp.lastAllocated +} + +// NextGlobalAutoID returns the next global autoID. +// Used by 'show create table', 'alter table auto_increment = xxx' +func (sp *singlePointAlloc) NextGlobalAutoID() (int64, error) { + _, max, err := sp.Alloc(context.Background(), 0, 1, 1) + return max + 1, err +} + +func (*singlePointAlloc) GetType() AllocatorType { + return RowIDAllocType +} diff --git a/metrics/metrics.go b/metrics/metrics.go index 21f7b7ab09db8..706da97f356b7 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -205,6 +205,7 @@ func RegisterMetrics() { prometheus.MustRegister(GetCheckpointBatchSize) prometheus.MustRegister(RegionCheckpointRequest) prometheus.MustRegister(RegionCheckpointFailure) + prometheus.MustRegister(AutoIDReqDuration) prometheus.MustRegister(RCCheckTSWriteConfilictCounter) tikvmetrics.InitMetrics(TiDB, TiKVClient) diff --git a/metrics/session.go b/metrics/session.go index 5f1079c9cc6ac..483ebe1826790 100644 --- a/metrics/session.go +++ b/metrics/session.go @@ -18,6 +18,15 @@ import "github.com/prometheus/client_golang/prometheus" // Session metrics. var ( + AutoIDReqDuration = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "tidb", + Subsystem: "meta", + Name: "autoid_duration_seconds", + Help: "Bucketed histogram of processing time (s) in parse SQL.", + Buckets: prometheus.ExponentialBuckets(0.00004, 2, 28), // 40us ~ 1.5h + }) + SessionExecuteParseDuration = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "tidb", diff --git a/server/server_test.go b/server/server_test.go index 8639f98076232..623a4d3313628 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -626,8 +626,8 @@ func (cli *testServerClient) runTestLoadDataForListPartition(t *testing.T) { dbt.MustExec(fmt.Sprintf("load data local infile %q into table t", path)) rows = dbt.MustQuery("show warnings") cli.checkRows(t, rows, - "Warning 1062 Duplicate entry '1' for key 'idx'", - "Warning 1062 Duplicate entry '2' for key 'idx'") + "Warning 1062 Duplicate entry '1' for key 't.idx'", + "Warning 1062 Duplicate entry '2' for key 't.idx'") require.NoError(t, rows.Close()) rows = dbt.MustQuery("select * from t order by id") cli.checkRows(t, rows, "1 a", "2 b", "3 c", "4 e", "7 a") @@ -680,8 +680,8 @@ func (cli *testServerClient) runTestLoadDataForListPartition2(t *testing.T) { dbt.MustExec(fmt.Sprintf("load data local infile %q into table t (id,name)", path)) rows = dbt.MustQuery("show warnings") cli.checkRows(t, rows, - "Warning 1062 Duplicate entry '1-2' for key 'idx'", - "Warning 1062 Duplicate entry '2-2' for key 'idx'") + "Warning 1062 Duplicate entry '1-2' for key 't.idx'", + "Warning 1062 Duplicate entry '2-2' for key 't.idx'") require.NoError(t, rows.Close()) rows = dbt.MustQuery("select id,name from t order by id") cli.checkRows(t, rows, "1 a", "2 b", "3 c", "4 e", "7 a") @@ -735,8 +735,8 @@ func (cli *testServerClient) runTestLoadDataForListColumnPartition(t *testing.T) dbt.MustExec(fmt.Sprintf("load data local infile %q into table t", path)) rows = dbt.MustQuery("show warnings") cli.checkRows(t, rows, - "Warning 1062 Duplicate entry '1' for key 'idx'", - "Warning 1062 Duplicate entry '2' for key 'idx'") + "Warning 1062 Duplicate entry '1' for key 't.idx'", + "Warning 1062 Duplicate entry '2' for key 't.idx'") require.NoError(t, rows.Close()) rows = dbt.MustQuery("select * from t order by id") cli.checkRows(t, rows, "1 a", "2 b", "3 c", "4 e", "7 a") @@ -789,7 +789,7 @@ func (cli *testServerClient) runTestLoadDataForListColumnPartition2(t *testing.T require.NoError(t, err) require.NoError(t, rows.Close()) rows = dbt.MustQuery("show warnings") - cli.checkRows(t, rows, "Warning 1062 Duplicate entry 'w-1' for key 'idx'") + cli.checkRows(t, rows, "Warning 1062 Duplicate entry 'w-1' for key 't.idx'") require.NoError(t, rows.Close()) rows = dbt.MustQuery("select * from t order by id") cli.checkRows(t, rows, "w 1 1", "w 2 2", "e 5 5", "n 9 9") @@ -807,7 +807,7 @@ func (cli *testServerClient) runTestLoadDataForListColumnPartition2(t *testing.T rows = dbt.MustQuery("show warnings") cli.checkRows(t, rows, "Warning 1526 Table has no partition for value from column_list", - "Warning 1062 Duplicate entry 'w-1' for key 'idx'") + "Warning 1062 Duplicate entry 'w-1' for key 't.idx'") require.NoError(t, rows.Close()) rows = dbt.MustQuery("select * from t order by id") cli.checkRows(t, rows, "w 1 1", "w 2 2", "w 3 3", "e 5 5", "e 8 8", "n 9 9") diff --git a/server/tidb_test.go b/server/tidb_test.go index 633dab3cef8af..fc176613768ba 100644 --- a/server/tidb_test.go +++ b/server/tidb_test.go @@ -1671,7 +1671,7 @@ func TestTopSQLCPUProfile(t *testing.T) { dbt.MustExec("alter table t drop index if exists idx_b") _, err := db.Exec(addIndexStr) require.NotNil(t, err) - require.Equal(t, "Error 1062: Duplicate entry '1' for key 'idx_b'", err.Error()) + require.Equal(t, "Error 1062: Duplicate entry '1' for key 't.idx_b'", err.Error()) } check = func() { checkFn(addIndexStr, "")