From 1833ce27098b2238bc4b9a2c3d7f5368e159c4a6 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Mon, 4 Mar 2024 15:15:33 +0800 Subject: [PATCH] storage: decouple the region storage from LevelDB backend (#7859) ref tikv/pd#3453 In the previous implementation, LevelDB and `RegionStorage` were strongly coupled. This PR decouples region storage from it to further optimize code abstraction, so we can get a more reusable `LevelDBBackend` and a more specialized `RegionStorage` implementation. Signed-off-by: JmPotato --- pkg/storage/leveldb_backend.go | 102 ++++++--------- pkg/storage/leveldb_backend_test.go | 121 ++++++++++++++++++ pkg/storage/region_storage.go | 79 ++++++++++++ pkg/storage/region_storage_test.go | 95 ++++++++++++++ pkg/storage/storage.go | 22 ++-- pkg/storage/storage_test.go | 65 +++++++++- pkg/syncer/client_test.go | 4 +- pkg/syncer/server.go | 13 +- server/server.go | 9 +- .../region_syncer/region_syncer_test.go | 4 +- 10 files changed, 423 insertions(+), 91 deletions(-) create mode 100644 pkg/storage/leveldb_backend_test.go create mode 100644 pkg/storage/region_storage.go create mode 100644 pkg/storage/region_storage_test.go diff --git a/pkg/storage/leveldb_backend.go b/pkg/storage/leveldb_backend.go index d25044e9c20..8fb1db196c1 100644 --- a/pkg/storage/leveldb_backend.go +++ b/pkg/storage/leveldb_backend.go @@ -18,9 +18,7 @@ import ( "context" "time" - "github.com/gogo/protobuf/proto" "github.com/pingcap/failpoint" - "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" "github.com/syndtr/goleveldb/leveldb" "github.com/tikv/pd/pkg/encryption" @@ -32,25 +30,27 @@ import ( ) const ( - // DefaultFlushRegionRate is the ttl to sync the regions to region storage. - defaultFlushRegionRate = 3 * time.Second - // DefaultBatchSize is the batch size to save the regions to region storage. + // defaultFlushRate is the default interval to flush the data into the local storage. + defaultFlushRate = 3 * time.Second + // defaultBatchSize is the default batch size to save the data to the local storage. defaultBatchSize = 100 + // defaultDirtyFlushTick + defaultDirtyFlushTick = time.Second ) // levelDBBackend is a storage backend that stores data in LevelDB, -// which is mainly used by the PD region storage. +// which is mainly used to store the PD Region meta information. type levelDBBackend struct { *endpoint.StorageEndpoint - ekm *encryption.Manager - mu syncutil.RWMutex - batchRegions map[string]*metapb.Region - batchSize int - cacheSize int - flushRate time.Duration - flushTime time.Time - regionStorageCtx context.Context - regionStorageCancel context.CancelFunc + ekm *encryption.Manager + mu syncutil.RWMutex + batch map[string][]byte + batchSize int + cacheSize int + flushRate time.Duration + flushTime time.Time + ctx context.Context + cancel context.CancelFunc } // newLevelDBBackend is used to create a new LevelDB backend. @@ -63,23 +63,19 @@ func newLevelDBBackend( if err != nil { return nil, err } - regionStorageCtx, regionStorageCancel := context.WithCancel(ctx) lb := &levelDBBackend{ - StorageEndpoint: endpoint.NewStorageEndpoint(levelDB, ekm), - ekm: ekm, - batchSize: defaultBatchSize, - flushRate: defaultFlushRegionRate, - batchRegions: make(map[string]*metapb.Region, defaultBatchSize), - flushTime: time.Now().Add(defaultFlushRegionRate), - regionStorageCtx: regionStorageCtx, - regionStorageCancel: regionStorageCancel, + StorageEndpoint: endpoint.NewStorageEndpoint(levelDB, ekm), + ekm: ekm, + batchSize: defaultBatchSize, + flushRate: defaultFlushRate, + batch: make(map[string][]byte, defaultBatchSize), + flushTime: time.Now().Add(defaultFlushRate), } + lb.ctx, lb.cancel = context.WithCancel(ctx) go lb.backgroundFlush() return lb, nil } -var dirtyFlushTick = time.Second - func (lb *levelDBBackend) backgroundFlush() { defer logutil.LogPanic() @@ -87,14 +83,14 @@ func (lb *levelDBBackend) backgroundFlush() { isFlush bool err error ) - ticker := time.NewTicker(dirtyFlushTick) + ticker := time.NewTicker(defaultDirtyFlushTick) defer ticker.Stop() for { select { case <-ticker.C: lb.mu.RLock() isFlush = lb.flushTime.Before(time.Now()) - failpoint.Inject("regionStorageFastFlush", func() { + failpoint.Inject("levelDBStorageFastFlush", func() { isFlush = true }) lb.mu.RUnlock() @@ -102,42 +98,32 @@ func (lb *levelDBBackend) backgroundFlush() { continue } if err = lb.Flush(); err != nil { - log.Error("flush regions meet error", errs.ZapError(err)) + log.Error("flush data meet error", errs.ZapError(err)) } - case <-lb.regionStorageCtx.Done(): + case <-lb.ctx.Done(): return } } } -func (lb *levelDBBackend) SaveRegion(region *metapb.Region) error { - region, err := encryption.EncryptRegion(region, lb.ekm) - if err != nil { - return err - } +// SaveIntoBatch saves the key-value pair into the batch cache, and it will +// only be saved to the underlying storage when the `Flush` method is +// called or the cache is full. +func (lb *levelDBBackend) SaveIntoBatch(key string, value []byte) error { lb.mu.Lock() defer lb.mu.Unlock() if lb.cacheSize < lb.batchSize-1 { - lb.batchRegions[endpoint.RegionPath(region.GetId())] = region + lb.batch[key] = value lb.cacheSize++ lb.flushTime = time.Now().Add(lb.flushRate) return nil } - lb.batchRegions[endpoint.RegionPath(region.GetId())] = region - err = lb.flushLocked() - - if err != nil { - return err - } - return nil -} - -func (lb *levelDBBackend) DeleteRegion(region *metapb.Region) error { - return lb.Remove(endpoint.RegionPath(region.GetId())) + lb.batch[key] = value + return lb.flushLocked() } -// Flush saves the cache region to the underlying storage. +// Flush saves the batch cache to the underlying storage. func (lb *levelDBBackend) Flush() error { lb.mu.Lock() defer lb.mu.Unlock() @@ -145,38 +131,32 @@ func (lb *levelDBBackend) Flush() error { } func (lb *levelDBBackend) flushLocked() error { - if err := lb.saveRegions(lb.batchRegions); err != nil { + if err := lb.saveBatchLocked(); err != nil { return err } lb.cacheSize = 0 - lb.batchRegions = make(map[string]*metapb.Region, lb.batchSize) + lb.batch = make(map[string][]byte, lb.batchSize) return nil } -func (lb *levelDBBackend) saveRegions(regions map[string]*metapb.Region) error { +func (lb *levelDBBackend) saveBatchLocked() error { batch := new(leveldb.Batch) - - for key, r := range regions { - value, err := proto.Marshal(r) - if err != nil { - return errs.ErrProtoMarshal.Wrap(err).GenWithStackByCause() - } + for key, value := range lb.batch { batch.Put([]byte(key), value) } - if err := lb.Base.(*kv.LevelDBKV).Write(batch, nil); err != nil { return errs.ErrLevelDBWrite.Wrap(err).GenWithStackByCause() } return nil } -// Close closes the LevelDB kv. It will call Flush() once before closing. +// Close will gracefully close the LevelDB backend and flush the data to the underlying storage before closing. func (lb *levelDBBackend) Close() error { err := lb.Flush() if err != nil { - log.Error("meet error before close the region storage", errs.ZapError(err)) + log.Error("meet error before closing the leveldb storage", errs.ZapError(err)) } - lb.regionStorageCancel() + lb.cancel() err = lb.Base.(*kv.LevelDBKV).Close() if err != nil { return errs.ErrLevelDBClose.Wrap(err).GenWithStackByArgs() diff --git a/pkg/storage/leveldb_backend_test.go b/pkg/storage/leveldb_backend_test.go new file mode 100644 index 00000000000..f727dd69ee3 --- /dev/null +++ b/pkg/storage/leveldb_backend_test.go @@ -0,0 +1,121 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestLevelDBBackend(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + backend, err := newLevelDBBackend(ctx, t.TempDir(), nil) + re.NoError(err) + re.NotNil(backend) + key, value := "k1", "v1" + // Save without flush. + err = backend.SaveIntoBatch(key, []byte(value)) + re.NoError(err) + val, err := backend.Load(key) + re.NoError(err) + re.Empty(val) + // Flush and load. + err = backend.Flush() + re.NoError(err) + val, err = backend.Load(key) + re.NoError(err) + re.Equal(value, val) + // Delete and load. + err = backend.Remove(key) + re.NoError(err) + val, err = backend.Load(key) + re.NoError(err) + re.Empty(val) + // Save twice without flush. + err = backend.SaveIntoBatch(key, []byte(value)) + re.NoError(err) + val, err = backend.Load(key) + re.NoError(err) + re.Empty(val) + value = "v2" + err = backend.SaveIntoBatch(key, []byte(value)) + re.NoError(err) + val, err = backend.Load(key) + re.NoError(err) + re.Empty(val) + // Delete before flush. + err = backend.Remove(key) + re.NoError(err) + val, err = backend.Load(key) + re.NoError(err) + re.Empty(val) + // Flush and load. + err = backend.Flush() + re.NoError(err) + val, err = backend.Load(key) + re.NoError(err) + re.Equal(value, val) + // Delete and load. + err = backend.Remove(key) + re.NoError(err) + val, err = backend.Load(key) + re.NoError(err) + re.Empty(val) + // Test the background flush. + backend.flushRate = defaultDirtyFlushTick + err = backend.SaveIntoBatch(key, []byte(value)) + re.NoError(err) + val, err = backend.Load(key) + re.NoError(err) + re.Empty(val) + time.Sleep(defaultDirtyFlushTick * 2) + val, err = backend.Load(key) + re.NoError(err) + re.Equal(value, val) + err = backend.Remove(key) + re.NoError(err) + val, err = backend.Load(key) + re.NoError(err) + re.Empty(val) + backend.flushRate = defaultFlushRate + // Test the flush when the cache is full. + backend.flushRate = time.Minute + for i := 0; i < backend.batchSize; i++ { + key, value = fmt.Sprintf("k%d", i), fmt.Sprintf("v%d", i) + err = backend.SaveIntoBatch(key, []byte(value)) + re.NoError(err) + if i < backend.batchSize-1 { + // The cache is not full yet. + val, err = backend.Load(key) + re.NoError(err) + re.Empty(val) + } else { + // The cache is full, and the flush is triggered. + val, err = backend.Load(key) + re.NoError(err) + re.Equal(value, val) + } + } + backend.flushRate = defaultFlushRate + // Close the backend. + err = backend.Close() + re.NoError(err) +} diff --git a/pkg/storage/region_storage.go b/pkg/storage/region_storage.go new file mode 100644 index 00000000000..11bc6a7cc21 --- /dev/null +++ b/pkg/storage/region_storage.go @@ -0,0 +1,79 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + + "github.com/gogo/protobuf/proto" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/encryption" + "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/storage/kv" +) + +// RegionStorage is a storage for the PD region meta information based on LevelDB, +// which will override the default implementation of the `endpoint.RegionStorage`. +type RegionStorage struct { + kv.Base + backend *levelDBBackend +} + +var _ endpoint.RegionStorage = (*RegionStorage)(nil) + +func newRegionStorage(backend *levelDBBackend) *RegionStorage { + return &RegionStorage{Base: backend.Base, backend: backend} +} + +// LoadRegion implements the `endpoint.RegionStorage` interface. +func (s *RegionStorage) LoadRegion(regionID uint64, region *metapb.Region) (bool, error) { + return s.backend.LoadRegion(regionID, region) +} + +// LoadRegions implements the `endpoint.RegionStorage` interface. +func (s *RegionStorage) LoadRegions(ctx context.Context, f func(region *core.RegionInfo) []*core.RegionInfo) error { + return s.backend.LoadRegions(ctx, f) +} + +// SaveRegion implements the `endpoint.RegionStorage` interface. +// Instead of saving the region directly, it will encrypt the region and then save it in batch. +func (s *RegionStorage) SaveRegion(region *metapb.Region) error { + encryptedRegion, err := encryption.EncryptRegion(region, s.backend.ekm) + if err != nil { + return err + } + value, err := proto.Marshal(encryptedRegion) + if err != nil { + return errs.ErrProtoMarshal.Wrap(err).GenWithStackByCause() + } + return s.backend.SaveIntoBatch(endpoint.RegionPath(region.GetId()), value) +} + +// DeleteRegion implements the `endpoint.RegionStorage` interface. +func (s *RegionStorage) DeleteRegion(region *metapb.Region) error { + return s.backend.Remove((endpoint.RegionPath(region.GetId()))) +} + +// Flush implements the `endpoint.RegionStorage` interface. +func (s *RegionStorage) Flush() error { + return s.backend.Flush() +} + +// Close implements the `endpoint.RegionStorage` interface. +func (s *RegionStorage) Close() error { + return s.backend.Close() +} diff --git a/pkg/storage/region_storage_test.go b/pkg/storage/region_storage_test.go new file mode 100644 index 00000000000..f6670f8c82e --- /dev/null +++ b/pkg/storage/region_storage_test.go @@ -0,0 +1,95 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "testing" + + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/stretchr/testify/require" + "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/storage/endpoint" +) + +func TestRegionStorage(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + var ( + regionStorage endpoint.RegionStorage + err error + ) + regionStorage, err = NewRegionStorageWithLevelDBBackend(ctx, t.TempDir(), nil) + re.NoError(err) + re.NotNil(regionStorage) + // Load regions from the storage. + regions := make([]*core.RegionInfo, 0) + appendRegionFunc := func(region *core.RegionInfo) []*core.RegionInfo { + regions = append(regions, region) + return nil + } + err = regionStorage.LoadRegions(ctx, appendRegionFunc) + re.NoError(err) + re.Empty(regions) + // Save regions to the storage. + region1 := newTestRegionMeta(1) + err = regionStorage.SaveRegion(region1) + re.NoError(err) + region2 := newTestRegionMeta(2) + err = regionStorage.SaveRegion(region2) + re.NoError(err) + regions = make([]*core.RegionInfo, 0) + err = regionStorage.LoadRegions(ctx, appendRegionFunc) + re.NoError(err) + re.Empty(regions) + // Flush and load. + err = regionStorage.Flush() + re.NoError(err) + regions = make([]*core.RegionInfo, 0) + err = regionStorage.LoadRegions(ctx, appendRegionFunc) + re.NoError(err) + re.Len(regions, 2) + re.Equal(region1, regions[0].GetMeta()) + re.Equal(region2, regions[1].GetMeta()) + newRegion := &metapb.Region{} + ok, err := regionStorage.LoadRegion(3, newRegion) + re.NoError(err) + re.False(ok) + ok, err = regionStorage.LoadRegion(1, newRegion) + re.NoError(err) + re.True(ok) + re.Equal(region1, newRegion) + ok, err = regionStorage.LoadRegion(2, newRegion) + re.NoError(err) + re.True(ok) + re.Equal(region2, newRegion) + // Delete and load. + err = regionStorage.DeleteRegion(region1) + re.NoError(err) + regions = make([]*core.RegionInfo, 0) + err = regionStorage.LoadRegions(ctx, appendRegionFunc) + re.NoError(err) + re.Len(regions, 1) + re.Equal(region2, regions[0].GetMeta()) + ok, err = regionStorage.LoadRegion(2, newRegion) + re.NoError(err) + re.True(ok) + re.Equal(region2, newRegion) + re.Equal(regions[0].GetMeta(), newRegion) + // Close the storage. + err = regionStorage.Close() + re.NoError(err) +} diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index aba01dfa806..5e006133d22 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -57,13 +57,18 @@ func NewStorageWithEtcdBackend(client *clientv3.Client, rootPath string) Storage return newEtcdBackend(client, rootPath) } -// NewStorageWithLevelDBBackend creates a new storage with LevelDB backend. -func NewStorageWithLevelDBBackend( +// NewRegionStorageWithLevelDBBackend will create a specialized storage to +// store region meta information based on a LevelDB backend. +func NewRegionStorageWithLevelDBBackend( ctx context.Context, filePath string, ekm *encryption.Manager, -) (Storage, error) { - return newLevelDBBackend(ctx, filePath, ekm) +) (*RegionStorage, error) { + levelDBBackend, err := newLevelDBBackend(ctx, filePath, ekm) + if err != nil { + return nil, err + } + return newRegionStorage(levelDBBackend), nil } // TODO: support other KV storage backends like BadgerDB in the future. @@ -88,15 +93,14 @@ func NewCoreStorage(defaultStorage Storage, regionStorage endpoint.RegionStorage } } -// TryGetLocalRegionStorage gets the local region storage. Returns nil if not present. -func TryGetLocalRegionStorage(s Storage) endpoint.RegionStorage { +// RetrieveRegionStorage retrieve the region storage from the given storage. +// If it's a `coreStorage`, it will return the regionStorage inside, otherwise it will return the original storage. +func RetrieveRegionStorage(s Storage) endpoint.RegionStorage { switch ps := s.(type) { case *coreStorage: return ps.regionStorage - case *levelDBBackend, *memoryStorage: - return ps default: - return nil + return ps } } diff --git a/pkg/storage/storage_test.go b/pkg/storage/storage_test.go index dbb5a03b264..4525ec6091c 100644 --- a/pkg/storage/storage_test.go +++ b/pkg/storage/storage_test.go @@ -209,6 +209,57 @@ func TestLoadMinServiceGCSafePoint(t *testing.T) { re.Equal(uint64(2), ssp.SafePoint) } +func TestTryGetLocalRegionStorage(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + // Memory backend integrated into core storage. + defaultStorage := NewStorageWithMemoryBackend() + var regionStorage endpoint.RegionStorage = NewStorageWithMemoryBackend() + coreStorage := NewCoreStorage(defaultStorage, regionStorage) + storage := RetrieveRegionStorage(coreStorage) + re.NotNil(storage) + re.Equal(regionStorage, storage) + // RegionStorage with LevelDB backend integrated into core storage. + defaultStorage = NewStorageWithMemoryBackend() + regionStorage, err := NewRegionStorageWithLevelDBBackend(ctx, t.TempDir(), nil) + re.NoError(err) + coreStorage = NewCoreStorage(defaultStorage, regionStorage) + storage = RetrieveRegionStorage(coreStorage) + re.NotNil(storage) + re.Equal(regionStorage, storage) + // Raw LevelDB backend integrated into core storage. + defaultStorage = NewStorageWithMemoryBackend() + regionStorage, err = newLevelDBBackend(ctx, t.TempDir(), nil) + re.NoError(err) + coreStorage = NewCoreStorage(defaultStorage, regionStorage) + storage = RetrieveRegionStorage(coreStorage) + re.NotNil(storage) + re.Equal(regionStorage, storage) + defaultStorage = NewStorageWithMemoryBackend() + regionStorage, err = newLevelDBBackend(ctx, t.TempDir(), nil) + re.NoError(err) + coreStorage = NewCoreStorage(defaultStorage, regionStorage) + storage = RetrieveRegionStorage(coreStorage) + re.NotNil(storage) + re.Equal(regionStorage, storage) + // Without core storage. + defaultStorage = NewStorageWithMemoryBackend() + storage = RetrieveRegionStorage(defaultStorage) + re.NotNil(storage) + re.Equal(defaultStorage, storage) + defaultStorage, err = newLevelDBBackend(ctx, t.TempDir(), nil) + re.NoError(err) + storage = RetrieveRegionStorage(defaultStorage) + re.NotNil(storage) + re.Equal(defaultStorage, storage) + defaultStorage, err = newLevelDBBackend(ctx, t.TempDir(), nil) + re.NoError(err) + storage = RetrieveRegionStorage(defaultStorage) + re.NotNil(storage) + re.Equal(defaultStorage, storage) +} + func TestLoadRegions(t *testing.T) { re := require.New(t) storage := NewStorageWithMemoryBackend() @@ -367,7 +418,7 @@ func randomMerge(regions []*metapb.Region, n int, ratio int) { } } -func saveRegions(lb *levelDBBackend, n int, ratio int) error { +func saveRegions(storage endpoint.RegionStorage, n int, ratio int) error { keys := generateKeys(n) regions := make([]*metapb.Region, 0, n) for i := uint64(0); i < uint64(n); i++ { @@ -398,36 +449,36 @@ func saveRegions(lb *levelDBBackend, n int, ratio int) error { } for _, region := range regions { - err := lb.SaveRegion(region) + err := storage.SaveRegion(region) if err != nil { return err } } - return lb.Flush() + return storage.Flush() } func benchmarkLoadRegions(b *testing.B, n int, ratio int) { re := require.New(b) ctx := context.Background() dir := b.TempDir() - lb, err := newLevelDBBackend(ctx, dir, nil) + regionStorage, err := NewRegionStorageWithLevelDBBackend(ctx, dir, nil) if err != nil { b.Fatal(err) } cluster := core.NewBasicCluster() - err = saveRegions(lb, n, ratio) + err = saveRegions(regionStorage, n, ratio) if err != nil { b.Fatal(err) } defer func() { - err = lb.Close() + err = regionStorage.Close() if err != nil { b.Fatal(err) } }() b.ResetTimer() - err = lb.LoadRegions(context.Background(), cluster.CheckAndPutRegion) + err = regionStorage.LoadRegions(ctx, cluster.CheckAndPutRegion) re.NoError(err) } diff --git a/pkg/syncer/client_test.go b/pkg/syncer/client_test.go index ba389b5de6d..6770fae44ac 100644 --- a/pkg/syncer/client_test.go +++ b/pkg/syncer/client_test.go @@ -34,7 +34,7 @@ import ( func TestLoadRegion(t *testing.T) { re := require.New(t) tempDir := t.TempDir() - rs, err := storage.NewStorageWithLevelDBBackend(context.Background(), tempDir, nil) + rs, err := storage.NewRegionStorageWithLevelDBBackend(context.Background(), tempDir, nil) re.NoError(err) server := &mockServer{ @@ -62,7 +62,7 @@ func TestLoadRegion(t *testing.T) { func TestErrorCode(t *testing.T) { re := require.New(t) tempDir := t.TempDir() - rs, err := storage.NewStorageWithLevelDBBackend(context.Background(), tempDir, nil) + rs, err := storage.NewRegionStorageWithLevelDBBackend(context.Background(), tempDir, nil) re.NoError(err) server := &mockServer{ ctx: context.Background(), diff --git a/pkg/syncer/server.go b/pkg/syncer/server.go index 4fb38614de0..ccc32b13303 100644 --- a/pkg/syncer/server.go +++ b/pkg/syncer/server.go @@ -88,19 +88,16 @@ type RegionSyncer struct { streamingRunning atomic.Bool } -// NewRegionSyncer returns a region syncer. -// The final consistency is ensured by the heartbeat. -// Strong consistency is not guaranteed. -// Usually open the region syncer in huge cluster and the server -// no longer etcd but go-leveldb. +// NewRegionSyncer returns a region syncer that ensures final consistency through the heartbeat, +// but it does not guarantee strong consistency. Using the same storage backend of the region storage. func NewRegionSyncer(s Server) *RegionSyncer { - localRegionStorage := storage.TryGetLocalRegionStorage(s.GetStorage()) - if localRegionStorage == nil { + regionStorage := storage.RetrieveRegionStorage(s.GetStorage()) + if regionStorage == nil { return nil } syncer := &RegionSyncer{ server: s, - history: newHistoryBuffer(defaultHistoryBufferSize, localRegionStorage.(kv.Base)), + history: newHistoryBuffer(defaultHistoryBufferSize, regionStorage.(kv.Base)), limit: ratelimit.NewRateLimiter(defaultBucketRate, defaultBucketCapacity), tlsConfig: s.GetTLSConfig(), } diff --git a/server/server.go b/server/server.go index ab69c2a3ad7..4c1632f634a 100644 --- a/server/server.go +++ b/server/server.go @@ -449,11 +449,16 @@ func (s *Server) startServer(ctx context.Context) error { Label: idAllocLabel, Member: s.member.MemberValue(), }) - regionStorage, err := storage.NewStorageWithLevelDBBackend(ctx, filepath.Join(s.cfg.DataDir, "region-meta"), s.encryptionKeyManager) + // Initialize an etcd storage as the default storage. + defaultStorage := storage.NewStorageWithEtcdBackend(s.client, s.rootPath) + // Initialize a specialized LevelDB storage to store the region-related meta info independently. + regionStorage, err := storage.NewRegionStorageWithLevelDBBackend( + ctx, + filepath.Join(s.cfg.DataDir, "region-meta"), + s.encryptionKeyManager) if err != nil { return err } - defaultStorage := storage.NewStorageWithEtcdBackend(s.client, s.rootPath) s.storage = storage.NewCoreStorage(defaultStorage, regionStorage) s.tsoDispatcher = tsoutil.NewTSODispatcher(tsoProxyHandleDuration, tsoProxyBatchSize) s.tsoProtoFactory = &tsoutil.TSOProtoFactory{} diff --git a/tests/server/region_syncer/region_syncer_test.go b/tests/server/region_syncer/region_syncer_test.go index cb93c988e11..1470173e0ed 100644 --- a/tests/server/region_syncer/region_syncer_test.go +++ b/tests/server/region_syncer/region_syncer_test.go @@ -36,7 +36,7 @@ func TestMain(m *testing.M) { func TestRegionSyncer(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) - re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/storage/regionStorageFastFlush", `return(true)`)) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/storage/levelDBStorageFastFlush", `return(true)`)) re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/syncer/noFastExitSync", `return(true)`)) re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/syncer/disableClientStreaming", `return(true)`)) @@ -156,7 +156,7 @@ func TestRegionSyncer(t *testing.T) { re.Equal(region.GetBuckets(), r.GetBuckets()) } re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/syncer/noFastExitSync")) - re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/storage/regionStorageFastFlush")) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/storage/levelDBStorageFastFlush")) } func TestFullSyncWithAddMember(t *testing.T) {