Skip to content

Commit

Permalink
storage: decouple the region storage from LevelDB backend (#7859)
Browse files Browse the repository at this point in the history
ref #3453

In the previous implementation, LevelDB and `RegionStorage` were strongly coupled.
This PR decouples region storage from it to further optimize code abstraction, so we can
get a more reusable `LevelDBBackend` and a more specialized `RegionStorage` implementation.

Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato authored Mar 4, 2024
1 parent 5f5fdb2 commit 1833ce2
Show file tree
Hide file tree
Showing 10 changed files with 423 additions and 91 deletions.
102 changes: 41 additions & 61 deletions pkg/storage/leveldb_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ import (
"context"
"time"

"github.com/gogo/protobuf/proto"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/syndtr/goleveldb/leveldb"
"github.com/tikv/pd/pkg/encryption"
Expand All @@ -32,25 +30,27 @@ import (
)

const (
// DefaultFlushRegionRate is the ttl to sync the regions to region storage.
defaultFlushRegionRate = 3 * time.Second
// DefaultBatchSize is the batch size to save the regions to region storage.
// defaultFlushRate is the default interval to flush the data into the local storage.
defaultFlushRate = 3 * time.Second
// defaultBatchSize is the default batch size to save the data to the local storage.
defaultBatchSize = 100
// defaultDirtyFlushTick
defaultDirtyFlushTick = time.Second
)

// levelDBBackend is a storage backend that stores data in LevelDB,
// which is mainly used by the PD region storage.
// which is mainly used to store the PD Region meta information.
type levelDBBackend struct {
*endpoint.StorageEndpoint
ekm *encryption.Manager
mu syncutil.RWMutex
batchRegions map[string]*metapb.Region
batchSize int
cacheSize int
flushRate time.Duration
flushTime time.Time
regionStorageCtx context.Context
regionStorageCancel context.CancelFunc
ekm *encryption.Manager
mu syncutil.RWMutex
batch map[string][]byte
batchSize int
cacheSize int
flushRate time.Duration
flushTime time.Time
ctx context.Context
cancel context.CancelFunc
}

// newLevelDBBackend is used to create a new LevelDB backend.
Expand All @@ -63,120 +63,100 @@ func newLevelDBBackend(
if err != nil {
return nil, err
}
regionStorageCtx, regionStorageCancel := context.WithCancel(ctx)
lb := &levelDBBackend{
StorageEndpoint: endpoint.NewStorageEndpoint(levelDB, ekm),
ekm: ekm,
batchSize: defaultBatchSize,
flushRate: defaultFlushRegionRate,
batchRegions: make(map[string]*metapb.Region, defaultBatchSize),
flushTime: time.Now().Add(defaultFlushRegionRate),
regionStorageCtx: regionStorageCtx,
regionStorageCancel: regionStorageCancel,
StorageEndpoint: endpoint.NewStorageEndpoint(levelDB, ekm),
ekm: ekm,
batchSize: defaultBatchSize,
flushRate: defaultFlushRate,
batch: make(map[string][]byte, defaultBatchSize),
flushTime: time.Now().Add(defaultFlushRate),
}
lb.ctx, lb.cancel = context.WithCancel(ctx)
go lb.backgroundFlush()
return lb, nil
}

var dirtyFlushTick = time.Second

func (lb *levelDBBackend) backgroundFlush() {
defer logutil.LogPanic()

var (
isFlush bool
err error
)
ticker := time.NewTicker(dirtyFlushTick)
ticker := time.NewTicker(defaultDirtyFlushTick)
defer ticker.Stop()
for {
select {
case <-ticker.C:
lb.mu.RLock()
isFlush = lb.flushTime.Before(time.Now())
failpoint.Inject("regionStorageFastFlush", func() {
failpoint.Inject("levelDBStorageFastFlush", func() {
isFlush = true
})
lb.mu.RUnlock()
if !isFlush {
continue
}
if err = lb.Flush(); err != nil {
log.Error("flush regions meet error", errs.ZapError(err))
log.Error("flush data meet error", errs.ZapError(err))
}
case <-lb.regionStorageCtx.Done():
case <-lb.ctx.Done():
return
}
}
}

func (lb *levelDBBackend) SaveRegion(region *metapb.Region) error {
region, err := encryption.EncryptRegion(region, lb.ekm)
if err != nil {
return err
}
// SaveIntoBatch saves the key-value pair into the batch cache, and it will
// only be saved to the underlying storage when the `Flush` method is
// called or the cache is full.
func (lb *levelDBBackend) SaveIntoBatch(key string, value []byte) error {
lb.mu.Lock()
defer lb.mu.Unlock()
if lb.cacheSize < lb.batchSize-1 {
lb.batchRegions[endpoint.RegionPath(region.GetId())] = region
lb.batch[key] = value
lb.cacheSize++

lb.flushTime = time.Now().Add(lb.flushRate)
return nil
}
lb.batchRegions[endpoint.RegionPath(region.GetId())] = region
err = lb.flushLocked()

if err != nil {
return err
}
return nil
}

func (lb *levelDBBackend) DeleteRegion(region *metapb.Region) error {
return lb.Remove(endpoint.RegionPath(region.GetId()))
lb.batch[key] = value
return lb.flushLocked()
}

// Flush saves the cache region to the underlying storage.
// Flush saves the batch cache to the underlying storage.
func (lb *levelDBBackend) Flush() error {
lb.mu.Lock()
defer lb.mu.Unlock()
return lb.flushLocked()
}

func (lb *levelDBBackend) flushLocked() error {
if err := lb.saveRegions(lb.batchRegions); err != nil {
if err := lb.saveBatchLocked(); err != nil {
return err
}
lb.cacheSize = 0
lb.batchRegions = make(map[string]*metapb.Region, lb.batchSize)
lb.batch = make(map[string][]byte, lb.batchSize)
return nil
}

func (lb *levelDBBackend) saveRegions(regions map[string]*metapb.Region) error {
func (lb *levelDBBackend) saveBatchLocked() error {
batch := new(leveldb.Batch)

for key, r := range regions {
value, err := proto.Marshal(r)
if err != nil {
return errs.ErrProtoMarshal.Wrap(err).GenWithStackByCause()
}
for key, value := range lb.batch {
batch.Put([]byte(key), value)
}

if err := lb.Base.(*kv.LevelDBKV).Write(batch, nil); err != nil {
return errs.ErrLevelDBWrite.Wrap(err).GenWithStackByCause()
}
return nil
}

// Close closes the LevelDB kv. It will call Flush() once before closing.
// Close will gracefully close the LevelDB backend and flush the data to the underlying storage before closing.
func (lb *levelDBBackend) Close() error {
err := lb.Flush()
if err != nil {
log.Error("meet error before close the region storage", errs.ZapError(err))
log.Error("meet error before closing the leveldb storage", errs.ZapError(err))
}
lb.regionStorageCancel()
lb.cancel()
err = lb.Base.(*kv.LevelDBKV).Close()
if err != nil {
return errs.ErrLevelDBClose.Wrap(err).GenWithStackByArgs()
Expand Down
121 changes: 121 additions & 0 deletions pkg/storage/leveldb_backend_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright 2024 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package storage

import (
"context"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestLevelDBBackend(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
backend, err := newLevelDBBackend(ctx, t.TempDir(), nil)
re.NoError(err)
re.NotNil(backend)
key, value := "k1", "v1"
// Save without flush.
err = backend.SaveIntoBatch(key, []byte(value))
re.NoError(err)
val, err := backend.Load(key)
re.NoError(err)
re.Empty(val)
// Flush and load.
err = backend.Flush()
re.NoError(err)
val, err = backend.Load(key)
re.NoError(err)
re.Equal(value, val)
// Delete and load.
err = backend.Remove(key)
re.NoError(err)
val, err = backend.Load(key)
re.NoError(err)
re.Empty(val)
// Save twice without flush.
err = backend.SaveIntoBatch(key, []byte(value))
re.NoError(err)
val, err = backend.Load(key)
re.NoError(err)
re.Empty(val)
value = "v2"
err = backend.SaveIntoBatch(key, []byte(value))
re.NoError(err)
val, err = backend.Load(key)
re.NoError(err)
re.Empty(val)
// Delete before flush.
err = backend.Remove(key)
re.NoError(err)
val, err = backend.Load(key)
re.NoError(err)
re.Empty(val)
// Flush and load.
err = backend.Flush()
re.NoError(err)
val, err = backend.Load(key)
re.NoError(err)
re.Equal(value, val)
// Delete and load.
err = backend.Remove(key)
re.NoError(err)
val, err = backend.Load(key)
re.NoError(err)
re.Empty(val)
// Test the background flush.
backend.flushRate = defaultDirtyFlushTick
err = backend.SaveIntoBatch(key, []byte(value))
re.NoError(err)
val, err = backend.Load(key)
re.NoError(err)
re.Empty(val)
time.Sleep(defaultDirtyFlushTick * 2)
val, err = backend.Load(key)
re.NoError(err)
re.Equal(value, val)
err = backend.Remove(key)
re.NoError(err)
val, err = backend.Load(key)
re.NoError(err)
re.Empty(val)
backend.flushRate = defaultFlushRate
// Test the flush when the cache is full.
backend.flushRate = time.Minute
for i := 0; i < backend.batchSize; i++ {
key, value = fmt.Sprintf("k%d", i), fmt.Sprintf("v%d", i)
err = backend.SaveIntoBatch(key, []byte(value))
re.NoError(err)
if i < backend.batchSize-1 {
// The cache is not full yet.
val, err = backend.Load(key)
re.NoError(err)
re.Empty(val)
} else {
// The cache is full, and the flush is triggered.
val, err = backend.Load(key)
re.NoError(err)
re.Equal(value, val)
}
}
backend.flushRate = defaultFlushRate
// Close the backend.
err = backend.Close()
re.NoError(err)
}
79 changes: 79 additions & 0 deletions pkg/storage/region_storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2024 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package storage

import (
"context"

"github.com/gogo/protobuf/proto"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/encryption"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
)

// RegionStorage is a storage for the PD region meta information based on LevelDB,
// which will override the default implementation of the `endpoint.RegionStorage`.
type RegionStorage struct {
kv.Base
backend *levelDBBackend
}

var _ endpoint.RegionStorage = (*RegionStorage)(nil)

func newRegionStorage(backend *levelDBBackend) *RegionStorage {
return &RegionStorage{Base: backend.Base, backend: backend}
}

// LoadRegion implements the `endpoint.RegionStorage` interface.
func (s *RegionStorage) LoadRegion(regionID uint64, region *metapb.Region) (bool, error) {
return s.backend.LoadRegion(regionID, region)
}

// LoadRegions implements the `endpoint.RegionStorage` interface.
func (s *RegionStorage) LoadRegions(ctx context.Context, f func(region *core.RegionInfo) []*core.RegionInfo) error {
return s.backend.LoadRegions(ctx, f)
}

// SaveRegion implements the `endpoint.RegionStorage` interface.
// Instead of saving the region directly, it will encrypt the region and then save it in batch.
func (s *RegionStorage) SaveRegion(region *metapb.Region) error {
encryptedRegion, err := encryption.EncryptRegion(region, s.backend.ekm)
if err != nil {
return err
}
value, err := proto.Marshal(encryptedRegion)
if err != nil {
return errs.ErrProtoMarshal.Wrap(err).GenWithStackByCause()
}
return s.backend.SaveIntoBatch(endpoint.RegionPath(region.GetId()), value)
}

// DeleteRegion implements the `endpoint.RegionStorage` interface.
func (s *RegionStorage) DeleteRegion(region *metapb.Region) error {
return s.backend.Remove((endpoint.RegionPath(region.GetId())))
}

// Flush implements the `endpoint.RegionStorage` interface.
func (s *RegionStorage) Flush() error {
return s.backend.Flush()
}

// Close implements the `endpoint.RegionStorage` interface.
func (s *RegionStorage) Close() error {
return s.backend.Close()
}
Loading

0 comments on commit 1833ce2

Please sign in to comment.