Skip to content

Commit

Permalink
localbackend: fix resource leak when err on new local backend (pingca…
Browse files Browse the repository at this point in the history
  • Loading branch information
D3Hunter authored May 30, 2024
1 parent afd8de1 commit 29bf008
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 29 deletions.
6 changes: 4 additions & 2 deletions br/pkg/restore/split/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"bytes"
"context"
"encoding/hex"
goerrors "errors"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -112,7 +113,7 @@ func PaginateScanRegion(
var batch []*RegionInfo
batch, err = client.ScanRegions(ctx, scanStartKey, endKey, limit)
if err != nil {
err = errors.Annotatef(berrors.ErrPDBatchScanRegion, "scan regions from start-key:%s, err: %s",
err = errors.Annotatef(berrors.ErrPDBatchScanRegion.Wrap(err), "scan regions from start-key:%s, err: %s",
redact.Key(scanStartKey), err.Error())
return err
}
Expand Down Expand Up @@ -227,7 +228,8 @@ func NewWaitRegionOnlineBackoffer() *WaitRegionOnlineBackoffer {
// NextBackoff returns a duration to wait before retrying again
func (b *WaitRegionOnlineBackoffer) NextBackoff(err error) time.Duration {
// TODO(lance6716): why we only backoff when the error is ErrPDBatchScanRegion?
if berrors.ErrPDBatchScanRegion.Equal(err) {
var perr *errors.Error
if goerrors.As(err, &perr) && berrors.ErrPDBatchScanRegion.ID() == perr.ID() {
// it needs more time to wait splitting the regions that contains data in PITR.
// 2s * 150
delayTime := b.Stat.ExponentialBackoff()
Expand Down
84 changes: 60 additions & 24 deletions pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,43 @@ func NewBackend(
config BackendConfig,
pdSvcDiscovery pd.ServiceDiscovery,
) (b *Backend, err error) {
var (
pdCli pd.Client
spkv *tikvclient.EtcdSafePointKV
pdCliForTiKV *tikvclient.CodecPDClient
rpcCli tikvclient.Client
tikvCli *tikvclient.KVStore
pdHTTPCli pdhttp.Client
importClientFactory *importClientFactoryImpl
multiIngestSupported bool
)
defer func() {
if err == nil {
return
}
if importClientFactory != nil {
importClientFactory.Close()
}
if pdHTTPCli != nil {
pdHTTPCli.Close()
}
if tikvCli != nil {
// tikvCli uses pdCliForTiKV(which wraps pdCli) , spkv and rpcCli, so
// close tikvCli will close all of them.
_ = tikvCli.Close()
} else {
if rpcCli != nil {
_ = rpcCli.Close()
}
if spkv != nil {
_ = spkv.Close()
}
// pdCliForTiKV wraps pdCli, so we only need close pdCli
if pdCli != nil {
pdCli.Close()
}
}
}()
config.adjust()
var pdAddrs []string
if pdSvcDiscovery != nil {
Expand All @@ -525,7 +562,7 @@ func NewBackend(
} else {
pdAddrs = strings.Split(config.PDAddr, ",")
}
pdCli, err := pd.NewClientWithContext(
pdCli, err = pd.NewClientWithContext(
ctx, pdAddrs, tls.ToPDSecurityOption(),
pd.WithGRPCDialOptions(maxCallMsgSize...),
// If the time too short, we may scatter a region many times, because
Expand All @@ -537,12 +574,11 @@ func NewBackend(
}

// The following copies tikv.NewTxnClient without creating yet another pdClient.
spkv, err := tikvclient.NewEtcdSafePointKV(strings.Split(config.PDAddr, ","), tls.TLSConfig())
spkv, err = tikvclient.NewEtcdSafePointKV(strings.Split(config.PDAddr, ","), tls.TLSConfig())
if err != nil {
return nil, common.ErrCreateKVClient.Wrap(err).GenWithStackByArgs()
}

var pdCliForTiKV *tikvclient.CodecPDClient
if config.KeyspaceName == "" {
pdCliForTiKV = tikvclient.NewCodecPDClient(tikvclient.ModeTxn, pdCli)
} else {
Expand All @@ -553,18 +589,24 @@ func NewBackend(
}

tikvCodec := pdCliForTiKV.GetCodec()
rpcCli := tikvclient.NewRPCClient(tikvclient.WithSecurity(tls.ToTiKVSecurityConfig()), tikvclient.WithCodec(tikvCodec))
tikvCli, err := tikvclient.NewKVStore("lightning-local-backend", pdCliForTiKV, spkv, rpcCli)
rpcCli = tikvclient.NewRPCClient(tikvclient.WithSecurity(tls.ToTiKVSecurityConfig()), tikvclient.WithCodec(tikvCodec))
tikvCli, err = tikvclient.NewKVStore("lightning-local-backend", pdCliForTiKV, spkv, rpcCli)
if err != nil {
return nil, common.ErrCreateKVClient.Wrap(err).GenWithStackByArgs()
}
pdHTTPCli := pdhttp.NewClientWithServiceDiscovery(
pdHTTPCli = pdhttp.NewClientWithServiceDiscovery(
"lightning",
pdCli.GetServiceDiscovery(),
pdhttp.WithTLSConfig(tls.TLSConfig()),
).WithBackoffer(retry.InitialBackoffer(time.Second, time.Second, pdutil.PDRequestRetryTime*time.Second))
splitCli := split.NewClient(pdCli, pdHTTPCli, tls.TLSConfig(), config.RegionSplitBatchSize, config.RegionSplitConcurrency)
importClientFactory := newImportClientFactoryImpl(splitCli, tls, config.MaxConnPerStore, config.ConnCompressType)
importClientFactory = newImportClientFactoryImpl(splitCli, tls, config.MaxConnPerStore, config.ConnCompressType)

multiIngestSupported, err = checkMultiIngestSupport(ctx, pdCli, importClientFactory)
if err != nil {
return nil, common.ErrCheckMultiIngest.Wrap(err).GenWithStackByArgs()
}

var writeLimiter StoreWriteLimiter
if config.StoreWriteBWLimit > 0 {
writeLimiter = newStoreWriteLimiter(config.StoreWriteBWLimit)
Expand All @@ -581,21 +623,18 @@ func NewBackend(

BackendConfig: config,

supportMultiIngest: multiIngestSupported,
importClientFactory: importClientFactory,
writeLimiter: writeLimiter,
logger: log.FromContext(ctx),
}
engineMgr, err := newEngineManager(config, local, local.logger)
local.engineMgr, err = newEngineManager(config, local, local.logger)
if err != nil {
return nil, err
}
local.engineMgr = engineMgr
if m, ok := metric.GetCommonMetric(ctx); ok {
local.metrics = m
}
if err = local.checkMultiIngestSupport(ctx); err != nil {
return nil, common.ErrCheckMultiIngest.Wrap(err).GenWithStackByArgs()
}
local.tikvSideCheckFreeSpace(ctx)

return local, nil
Expand Down Expand Up @@ -627,10 +666,10 @@ func (local *Backend) TotalMemoryConsume() int64 {
return local.engineMgr.totalMemoryConsume()
}

func (local *Backend) checkMultiIngestSupport(ctx context.Context) error {
stores, err := local.pdCli.GetAllStores(ctx, pd.WithExcludeTombstone())
func checkMultiIngestSupport(ctx context.Context, pdCli pd.Client, importClientFactory ImportClientFactory) (bool, error) {
stores, err := pdCli.GetAllStores(ctx, pd.WithExcludeTombstone())
if err != nil {
return errors.Trace(err)
return false, errors.Trace(err)
}

hasTiFlash := false
Expand All @@ -652,10 +691,10 @@ func (local *Backend) checkMultiIngestSupport(ctx context.Context) error {
select {
case <-time.After(100 * time.Millisecond):
case <-ctx.Done():
return ctx.Err()
return false, ctx.Err()
}
}
client, err1 := local.getImportClient(ctx, s.Id)
client, err1 := importClientFactory.Create(ctx, s.Id)
if err1 != nil {
err = err1
log.FromContext(ctx).Warn("get import client failed", zap.Error(err), zap.String("store", s.Address))
Expand All @@ -668,8 +707,7 @@ func (local *Backend) checkMultiIngestSupport(ctx context.Context) error {
if st, ok := status.FromError(err); ok {
if st.Code() == codes.Unimplemented {
log.FromContext(ctx).Info("multi ingest not support", zap.Any("unsupported store", s))
local.supportMultiIngest = false
return nil
return false, nil
}
}
log.FromContext(ctx).Warn("check multi ingest support failed", zap.Error(err), zap.String("store", s.Address),
Expand All @@ -679,17 +717,15 @@ func (local *Backend) checkMultiIngestSupport(ctx context.Context) error {
// if the cluster contains no TiFlash store, we don't need the multi-ingest feature,
// so in this condition, downgrade the logic instead of return an error.
if hasTiFlash {
return errors.Trace(err)
return false, errors.Trace(err)
}
log.FromContext(ctx).Warn("check multi failed all retry, fallback to false", log.ShortError(err))
local.supportMultiIngest = false
return nil
return false, nil
}
}

local.supportMultiIngest = true
log.FromContext(ctx).Info("multi ingest support")
return nil
return true, nil
}

func (local *Backend) tikvSideCheckFreeSpace(ctx context.Context) {
Expand Down
5 changes: 2 additions & 3 deletions pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1081,13 +1081,12 @@ func TestMultiIngest(t *testing.T) {
return importCli
},
},
logger: log.L(),
}
err := local.checkMultiIngestSupport(context.Background())
supportMultiIngest, err := checkMultiIngestSupport(context.Background(), local.pdCli, local.importClientFactory)
if err != nil {
require.Contains(t, err.Error(), testCase.retErr)
} else {
require.Equal(t, testCase.supportMutliIngest, local.supportMultiIngest)
require.Equal(t, testCase.supportMutliIngest, supportMultiIngest)
}
}
}
Expand Down

0 comments on commit 29bf008

Please sign in to comment.