Skip to content

Commit

Permalink
pump: Add support to stop write at a limit avaliable space (#647) (#659)
Browse files Browse the repository at this point in the history
Default set it 10 gib
  • Loading branch information
july2993 authored and ericsyh committed Jul 2, 2019
1 parent 4808e90 commit 5ba5f7b
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 24 deletions.
6 changes: 6 additions & 0 deletions cmd/pump/pump.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ pd-urls = "http://127.0.0.1:2379"
# [storage]
# Set to `true` (default) for best reliability, which prevents data loss when there is a power failure.
# sync-log = true

# stop write when disk available space less then the configured size
# 42 MB -> 42000000, 42 mib -> 44040192
# default: 10 gib
# stop-write-at-available-space = "10 gib"

#
# we suggest using the default config of the embedded LSM DB now, do not change it useless you know what you are doing
# [storage.kv]
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/Shopify/toxiproxy v2.1.3+incompatible // indirect
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd // indirect
github.com/coreos/etcd v3.3.0-rc.0.0.20180530235116-2b3aa7e1d49d+incompatible
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/dustin/go-humanize v1.0.0
github.com/eapache/go-resiliency v1.1.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect
github.com/eapache/queue v0.0.0-20180227141424-093482f3f8ce // indirect
Expand Down
7 changes: 1 addition & 6 deletions pump/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func NewServer(cfg *Config) (*Server, error) {
options := storage.DefaultOptions()
options = options.WithKVConfig(cfg.Storage.KV)
options = options.WithSync(cfg.Storage.GetSyncLog())
options = options.WithStopWriteAtAvailableSpace(cfg.Storage.GetStopWriteAtAvailableSpace())

storage, err := storage.NewAppendWithResolver(cfg.DataDir, options, tiStore, lockResolver)
if err != nil {
Expand Down Expand Up @@ -184,12 +185,6 @@ func getPdClient(cfg *Config) (pd.Client, error) {

// WriteBinlog implements the gRPC interface of pump server
func (s *Server) WriteBinlog(ctx context.Context, in *binlog.WriteBinlogReq) (*binlog.WriteBinlogResp, error) {
// pump client will write some empty Payload to detect weather pump is working, should avoid this
if in.Payload == nil {
ret := new(binlog.WriteBinlogResp)
return ret, nil
}

atomic.AddInt64(&s.writeBinlogCount, 1)
return s.writeBinlog(ctx, in, false)
}
Expand Down
3 changes: 3 additions & 0 deletions pump/storage/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,7 @@ import "github.com/pingcap/errors"
var (
// ErrWrongMagic means the magic number mismatch
ErrWrongMagic = errors.New("wrong magic")

// ErrNoAvailableSpace means no available space
ErrNoAvailableSpace = errors.New("no available space")
)
79 changes: 64 additions & 15 deletions pump/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ import (
)

const (
maxTxnTimeoutSecond int64 = 600
chanSize = 1 << 20
maxTxnTimeoutSecond int64 = 600
chanSize = 1 << 20
defaultStopWriteAtAvailableSpace = 10 * (1 << 30)
)

var (
Expand Down Expand Up @@ -65,8 +66,9 @@ var _ Storage = &Append{}

// Append implement the Storage interface
type Append struct {
dir string
vlog *valueLog
dir string
vlog *valueLog
storageSize storageSize

metadata *leveldb.DB
sorter *sorter
Expand Down Expand Up @@ -212,6 +214,11 @@ func NewAppendWithResolver(dir string, options *Options, tiStore kv.Storage, tiL
}

append.wg.Add(1)
err = append.updateSize()
if err != nil {
return nil, errors.Trace(err)
}

go append.updateStatus()
return
}
Expand Down Expand Up @@ -293,6 +300,20 @@ func (a *Append) handleSortItem(items <-chan sortItem) (quit chan struct{}) {
return quit
}

func (a *Append) updateSize() error {
size, err := getStorageSize(a.dir)
if err != nil {
return errors.Annotatef(err, "update storage size failed, dir: %s", a.dir)
}

storageSizeGauge.WithLabelValues("capacity").Set(float64(size.capacity))
storageSizeGauge.WithLabelValues("available").Set(float64(size.available))

atomic.StoreUint64(&a.storageSize.available, size.available)
atomic.StoreUint64(&a.storageSize.capacity, size.capacity)
return nil
}

func (a *Append) updateStatus() {
defer a.wg.Done()

Expand Down Expand Up @@ -322,12 +343,9 @@ func (a *Append) updateStatus() {
atomic.StoreInt64(&a.latestTS, ts)
}
case <-updateSize:
size, err := getStorageSize(a.dir)
err := a.updateSize()
if err != nil {
log.Error("update sotrage size err: ", err)
} else {
storageSizeGauge.WithLabelValues("capacity").Set(float64(size.capacity))
storageSizeGauge.WithLabelValues("available").Set(float64(size.available))
log.Error("update size failed", zap.Error(err))
}
case <-logStatsTicker.C:
var stats leveldb.DBStats
Expand All @@ -344,6 +362,10 @@ func (a *Append) updateStatus() {
}
}

func (a *Append) writableOfSpace() bool {
return atomic.LoadUint64(&a.storageSize.available) > a.options.StopWriteAtAvailableSpace
}

func (a *Append) resolve(startTS int64) bool {
latestTS := atomic.LoadInt64(&a.latestTS)
if latestTS <= 0 {
Expand Down Expand Up @@ -618,8 +640,25 @@ func (a *Append) MaxCommitTS() int64 {
return atomic.LoadInt64(&a.maxCommitTS)
}

func isFakeBinlog(binlog *pb.Binlog) bool {
return binlog.StartTs > 0 && binlog.StartTs == binlog.CommitTs
}

// WriteBinlog implement Storage.WriteBinlog
func (a *Append) WriteBinlog(binlog *pb.Binlog) error {
if !a.writableOfSpace() {
// still accept fake binlog, so will not block drainer if fake binlog writes success
if !isFakeBinlog(binlog) {
return ErrNoAvailableSpace
}
}

// pump client will write some empty Payload to detect whether pump is working, should avoid this
// Unmarshal(nil) will success...
if binlog.StartTs == 0 && binlog.CommitTs == 0 {
return nil
}

return errors.Trace(a.writeBinlog(binlog).err)
}

Expand Down Expand Up @@ -1028,8 +1067,8 @@ func (a *Append) PullCommitBinlog(ctx context.Context, last int64) <-chan []byte
}

type storageSize struct {
capacity int
available int
capacity uint64
available uint64
}

func getStorageSize(dir string) (size storageSize, err error) {
Expand All @@ -1056,16 +1095,26 @@ func getStorageSize(dir string) (size storageSize, err error) {
}

// Available blocks * size per block = available space in bytes
size.available = int(stat.Bavail) * int(bSize)
size.capacity = int(stat.Blocks) * int(bSize)
size.available = stat.Bavail * bSize
size.capacity = stat.Blocks * bSize

return
}

// Config holds the configuration of storage
type Config struct {
SyncLog *bool `toml:"sync-log" json:"sync-log"`
KV *KVConfig `toml:"kv" json:"kv"`
SyncLog *bool `toml:"sync-log" json:"sync-log"`
KV *KVConfig `toml:"kv" json:"kv"`
StopWriteAtAvailableSpace *HumanizeBytes `toml:"stop-write-at-available-space" json:"stop-write-at-available-space"`
}

// GetStopWriteAtAvailableSpace return stop write available space
func (c *Config) GetStopWriteAtAvailableSpace() uint64 {
if c.StopWriteAtAvailableSpace == nil {
return defaultStopWriteAtAvailableSpace
}

return c.StopWriteAtAvailableSpace.Uint64()
}

// GetSyncLog return sync-log config option
Expand Down
29 changes: 29 additions & 0 deletions pump/storage/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package storage
import (
"encoding/binary"
"sync/atomic"

"github.com/dustin/go-humanize"
"github.com/pingcap/errors"
)

var tsKeyPrefix = []byte("ts:")
Expand All @@ -25,6 +28,32 @@ func encodeTSKey(ts int64) []byte {
return buf
}

// HumanizeBytes is used for humanize configure
type HumanizeBytes uint64

// Uint64 return bytes
func (b HumanizeBytes) Uint64() uint64 {
return uint64(b)
}

// UnmarshalText implements UnmarshalText
func (b *HumanizeBytes) UnmarshalText(text []byte) error {
var err error

if len(text) == 0 {
*b = 0
return nil
}

n, err := humanize.ParseBytes(string(text))
if err != nil {
return errors.Annotatef(err, "text: %s", string(text))
}

*b = HumanizeBytes(n)
return nil
}

// test helper
type memOracle struct {
ts int64
Expand Down
20 changes: 20 additions & 0 deletions pump/storage/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sort"
"testing"

"github.com/BurntSushi/toml"
"github.com/pingcap/check"
)

Expand Down Expand Up @@ -39,3 +40,22 @@ func (e *EncodeTSKeySuite) TestEncodeTSKey(c *check.C) {

c.Assert(sorted, check.IsTrue)
}

type UtilSuite struct{}

var _ = check.Suite(&UtilSuite{})

func (u *UtilSuite) TestHumanizeBytes(c *check.C) {
var s = struct {
DiskSize HumanizeBytes `toml:"disk_size" json:"disk_size"`
}{}

tomlData := `
disk_size = "42 MB"
`

_, err := toml.Decode(tomlData, &s)
c.Assert(err, check.IsNil)
c.Assert(s.DiskSize.Uint64(), check.Equals, uint64(42*1000*1000))
}
11 changes: 9 additions & 2 deletions pump/storage/vlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ const (

// Options is the config options of Append and vlog
type Options struct {
ValueLogFileSize int64
Sync bool
ValueLogFileSize int64
Sync bool
StopWriteAtAvailableSpace uint64

KVConfig *KVConfig
}
Expand All @@ -46,6 +47,12 @@ func (o *Options) WithKVConfig(kvConfig *KVConfig) *Options {
return o
}

// WithStopWriteAtAvailableSpace set the Config
func (o *Options) WithStopWriteAtAvailableSpace(bytes uint64) *Options {
o.StopWriteAtAvailableSpace = bytes
return o
}

// WithValueLogFileSize set the ValueLogFileSize
func (o *Options) WithValueLogFileSize(size int64) *Options {
o.ValueLogFileSize = size
Expand Down

0 comments on commit 5ba5f7b

Please sign in to comment.