Skip to content

Commit

Permalink
Merge pull request pingcap#9 from tangenta/new_writer-1
Browse files Browse the repository at this point in the history
support remote backend
  • Loading branch information
wjhuang2016 authored Jun 28, 2023
2 parents 3cdecf4 + f139aa8 commit 6280dba
Show file tree
Hide file tree
Showing 5 changed files with 263 additions and 15 deletions.
144 changes: 144 additions & 0 deletions br/pkg/lightning/backend/remote/remote.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package remote

import (
"context"
"fmt"
"path/filepath"
"strconv"
"sync"
"time"

"github.com/google/uuid"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/sharedisk"
"github.com/pingcap/tidb/br/pkg/storage"
)

const defaultRetryBackoffTime = 3 * time.Second

type Config struct {
Bucket string
Prefix string
AccessKey string
SecretAccessKey string
Host string
Port string
}

func NewRemoteBackend(ctx context.Context, cfg *Config) (backend.Backend, error) {
uri := fmt.Sprintf("s3://%s/%s?access-key=%s&secret-access-key=%s&endpoint=http://%s:%s&force-path-style=true",
cfg.Bucket, cfg.Prefix, cfg.AccessKey, cfg.SecretAccessKey, cfg.Host, cfg.Port)
bd, err := storage.ParseBackend(uri, nil)
if err != nil {
return nil, err
}
extStore, err := storage.New(ctx, bd, &storage.ExternalStorageOptions{})
return &remoteBackend{
externalStorage: extStore,
mu: struct {
sync.RWMutex
maxWriterID int
writersSeq map[int]int
}{},
}, nil
}

type remoteBackend struct {
externalStorage storage.ExternalStorage
mu struct {
sync.RWMutex
maxWriterID int
writersSeq map[int]int
}
}

func (r *remoteBackend) Close() {
}

func (r *remoteBackend) RetryImportDelay() time.Duration {
return defaultRetryBackoffTime
}

func (r *remoteBackend) ShouldPostProcess() bool {
return true
}

func (r *remoteBackend) OpenEngine(ctx context.Context, config *backend.EngineConfig, engineUUID uuid.UUID) error {
return nil
}

func (r *remoteBackend) CloseEngine(ctx context.Context, config *backend.EngineConfig, engineUUID uuid.UUID) error {
return nil
}

func (r *remoteBackend) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64) error {
return nil
}

func (r *remoteBackend) CleanupEngine(ctx context.Context, engineUUID uuid.UUID) error {
for wid, seq := range r.mu.writersSeq {
filePrefix := fmt.Sprintf("%s_%d", engineUUID.String(), wid)
for i := 1; i <= seq; i++ {
dataPath := filepath.Join(filePrefix, strconv.Itoa(i))
err := r.externalStorage.DeleteFile(ctx, dataPath)
if err != nil {
return err
}
statPath := filepath.Join(filePrefix+"_stat", strconv.Itoa(i))
err = r.externalStorage.DeleteFile(ctx, statPath)
if err != nil {
return err
}
}
}
return nil
}

func (r *remoteBackend) FlushEngine(ctx context.Context, engineUUID uuid.UUID) error {
return nil
}

func (r *remoteBackend) FlushAllEngines(ctx context.Context) error {
return nil
}

func (r *remoteBackend) ResetEngine(ctx context.Context, engineUUID uuid.UUID) error {
return nil
}

func (r *remoteBackend) LocalWriter(ctx context.Context, cfg *backend.LocalWriterConfig, engineUUID uuid.UUID) (backend.EngineWriter, error) {
onClose := func(writerID, currentSeq int) {
r.saveWriterSeq(writerID, currentSeq)
}
return sharedisk.NewWriter(ctx, r.externalStorage, engineUUID, r.allocWriterID(), onClose), nil
}

func (r *remoteBackend) allocWriterID() int {
r.mu.Lock()
defer r.mu.Unlock()
seq := r.mu.maxWriterID
r.mu.maxWriterID++
return seq
}

// saveWriterSeq stores the current writer sequence for the given writer so that
// we can remove the correct file in external storage when cleaning up.
func (r *remoteBackend) saveWriterSeq(writerID int, currentSeq int) {
r.mu.Lock()
defer r.mu.Unlock()
r.mu.writersSeq[writerID] = currentSeq
}
74 changes: 71 additions & 3 deletions br/pkg/lightning/backend/sharedisk/sharedisk.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,24 @@ import (
"bytes"
"context"
"encoding/binary"
"fmt"
"path/filepath"
"strconv"
"sync"

"github.com/google/uuid"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/encode"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/backend/local"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/membuf"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/keyspace"
"github.com/pingcap/tidb/util/logutil"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/zap"
"golang.org/x/exp/slices"
"path/filepath"
"strconv"
"sync"
)

type rangeOffsets struct {
Expand Down Expand Up @@ -95,10 +100,46 @@ func Decode2RangeProperty(data []byte) (*RangeProperty, error) {
return rp, nil
}

func NewEngine(sizeDist, keyDist uint64) *Engine {
return &Engine{
rc: &RangePropertiesCollector{
// TODO(tangenta): decide the preserved size of props.
props: nil,
propSizeIdxDistance: sizeDist,
propKeysIdxDistance: keyDist,
},
}
}

type Engine struct {
rc *RangePropertiesCollector
}

func NewWriter(ctx context.Context, externalStorage storage.ExternalStorage, engineUUID uuid.UUID, writerID int, onClose func(int, int)) *Writer {
// TODO(tangenta): make it configurable.
engine := NewEngine(2048, 256)
pool := membuf.NewPool()
filePrefix := fmt.Sprintf("%s_%d", engineUUID.String(), writerID)
return &Writer{
ctx: ctx,
engine: engine,
memtableSizeLimit: 8 * 1024,
keyAdapter: &local.NoopKeyAdapter{},
exStorage: externalStorage,
memBufPool: pool,
kvBuffer: pool.NewBuffer(),
writeBatch: nil,
batchCount: 0,
batchSize: 0,
currentSeq: 0,
tikvCodec: keyspace.CodecV1,
filenamePrefix: filePrefix,
writerID: writerID,
kvStore: nil,
onClose: onClose,
}
}

// Writer is used to write data into external storage.
type Writer struct {
ctx context.Context
Expand All @@ -109,16 +150,19 @@ type Writer struct {
exStorage storage.ExternalStorage

// bytes buffer for writeBatch
memBufPool *membuf.Pool
kvBuffer *membuf.Buffer
writeBatch []common.KvPair

batchCount int
batchSize int64

currentSeq int
onClose func(writerID, currentSeq int)

tikvCodec tikv.Codec
filenamePrefix string
writerID int

kvStore *KeyValueStore
}
Expand Down Expand Up @@ -358,6 +402,30 @@ func (w *Writer) AppendRows(ctx context.Context, columnNames []string, rows enco
return nil
}

func (w *Writer) IsSynced() bool {
return false
}

func (w *Writer) Close(ctx context.Context) (backend.ChunkFlushStatus, error) {
defer w.memBufPool.Destroy()
defer w.onClose(w.writerID, w.currentSeq)
err := w.flushKVs(ctx)
if err != nil {
return status(false), err
}
err = w.kvStore.Finish()
if err != nil {
return status(false), err
}
return status(true), nil
}

type status bool

func (s status) Flushed() bool {
return bool(s)
}

func (w *Writer) flushKVs(ctx context.Context) error {
if w.batchCount == 0 {
return nil
Expand Down
9 changes: 5 additions & 4 deletions br/pkg/lightning/backend/sharedisk/sharedisk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ import (
"context"
"encoding/binary"
"fmt"
"math/rand"
"strconv"
"testing"

kv2 "github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/backend/local"
"github.com/pingcap/tidb/br/pkg/lightning/common"
Expand All @@ -27,16 +31,13 @@ import (
"github.com/pingcap/tidb/util/logutil"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"math/rand"
"strconv"
"testing"
)

func TestWriter(t *testing.T) {
bucket := "nfs"
prefix := "tools_test_data/sharedisk"
uri := fmt.Sprintf("s3://%s/%s?access-key=%s&secret-access-key=%s&endpoint=http://%s:%s&force-path-style=true",
bucket, prefix, ak, sak, hostname, port)
bucket, prefix, "minioadmin", "minioadmin", "127.0.0.1", "9000")
backend, err := storage2.ParseBackend(uri, nil)
require.NoError(t, err)
storage, err := storage2.New(context.Background(), backend, &storage2.ExternalStorageOptions{})
Expand Down
22 changes: 18 additions & 4 deletions ddl/ingest/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/encode"
"github.com/pingcap/tidb/br/pkg/lightning/backend/local"
lightning "github.com/pingcap/tidb/br/pkg/lightning/config"
Expand Down Expand Up @@ -73,7 +74,7 @@ type litBackendCtx struct {
MemRoot MemRoot
DiskRoot DiskRoot
jobID int64
backend *local.Backend
backend backend.Backend
ctx context.Context
cfg *lightning.Config
sysVars map[string]string
Expand All @@ -92,7 +93,11 @@ func (bc *litBackendCtx) CollectRemoteDuplicateRows(indexID int64, tbl table.Tab
// backend must be a local backend.
// todo: when we can separate local backend completely from tidb backend, will remove this cast.
//nolint:forcetypeassert
dupeController := bc.backend.GetDupeController(bc.cfg.TikvImporter.RangeConcurrency*2, errorMgr)
localBackend, ok := bc.backend.(*local.Backend)
if !ok {
return nil
}
dupeController := localBackend.GetDupeController(bc.cfg.TikvImporter.RangeConcurrency*2, errorMgr)
hasDupe, err := dupeController.CollectRemoteDuplicateRows(bc.ctx, tbl, tbl.Meta().Name.L, &encode.SessionOptions{
SQLMode: mysql.ModeStrictAllTables,
SysVars: bc.sysVars,
Expand Down Expand Up @@ -133,7 +138,11 @@ func (bc *litBackendCtx) FinishImport(indexID int64, unique bool, tbl table.Tabl
// backend must be a local backend.
// todo: when we can separate local backend completely from tidb backend, will remove this cast.
//nolint:forcetypeassert
dupeController := bc.backend.GetDupeController(bc.cfg.TikvImporter.RangeConcurrency*2, errorMgr)
localBackend, ok := bc.backend.(*local.Backend)
if !ok {
return nil
}
dupeController := localBackend.GetDupeController(bc.cfg.TikvImporter.RangeConcurrency*2, errorMgr)
hasDupe, err := dupeController.CollectRemoteDuplicateRows(bc.ctx, tbl, tbl.Meta().Name.L, &encode.SessionOptions{
SQLMode: mysql.ModeStrictAllTables,
SysVars: bc.sysVars,
Expand Down Expand Up @@ -215,7 +224,12 @@ func (bc *litBackendCtx) Flush(indexID int64, mode FlushMode) (flushed, imported

logutil.BgLogger().Info(LitInfoUnsafeImport, zap.Int64("index ID", indexID),
zap.String("usage info", bc.diskRoot.UsageInfo()))
err = bc.backend.UnsafeImportAndReset(bc.ctx, ei.uuid, int64(lightning.SplitRegionSize)*int64(lightning.MaxSplitRegionSizeRatio), int64(lightning.SplitRegionKeys))
//nolint:forcetypeassert
localBackend, ok := bc.backend.(*local.Backend)
if !ok {
return false, false, nil
}
err = localBackend.UnsafeImportAndReset(bc.ctx, ei.uuid, int64(lightning.SplitRegionSize)*int64(lightning.MaxSplitRegionSizeRatio), int64(lightning.SplitRegionKeys))
if err != nil {
logutil.BgLogger().Error(LitErrIngestDataErr, zap.Int64("index ID", indexID),
zap.String("usage info", bc.diskRoot.UsageInfo()))
Expand Down
Loading

0 comments on commit 6280dba

Please sign in to comment.