Skip to content

Commit

Permalink
pump: Add support to stop write at a limit avaliable space (#647)
Browse files Browse the repository at this point in the history
Default set it 10 gib
  • Loading branch information
july2993 authored Jun 26, 2019
1 parent 061e321 commit 0aec623
Show file tree
Hide file tree
Showing 9 changed files with 135 additions and 34 deletions.
6 changes: 6 additions & 0 deletions cmd/pump/pump.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ pd-urls = "http://127.0.0.1:2379"
# [storage]
# Set to `true` (default) for best reliability, which prevents data loss when there is a power failure.
# sync-log = true

# stop write when disk available space less then the configured size
# 42 MB -> 42000000, 42 mib -> 44040192
# default: 10 gib
# stop-write-at-available-space = "10 gib"

#
# we suggest using the default config of the embedded LSM DB now, do not change it useless you know what you are doing
# [storage.kv]
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/Shopify/toxiproxy v2.1.3+incompatible // indirect
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd // indirect
github.com/coreos/etcd v3.3.0-rc.0.0.20180530235116-2b3aa7e1d49d+incompatible
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/dustin/go-humanize v1.0.0
github.com/eapache/go-resiliency v1.1.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect
github.com/eapache/queue v0.0.0-20180227141424-093482f3f8ce // indirect
Expand Down
7 changes: 1 addition & 6 deletions pump/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func NewServer(cfg *Config) (*Server, error) {
options = options.WithSync(cfg.Storage.GetSyncLog())
options = options.WithKVChanCapacity(cfg.Storage.GetKVChanCapacity())
options = options.WithSlowWriteThreshold(cfg.Storage.GetSlowWriteThreshold())
options = options.WithStopWriteAtAvailableSpace(cfg.Storage.GetStopWriteAtAvailableSpace())

storage, err := storage.NewAppendWithResolver(cfg.DataDir, options, tiStore, lockResolver)
if err != nil {
Expand Down Expand Up @@ -179,12 +180,6 @@ func NewServer(cfg *Config) (*Server, error) {

// WriteBinlog implements the gRPC interface of pump server
func (s *Server) WriteBinlog(ctx context.Context, in *binlog.WriteBinlogReq) (*binlog.WriteBinlogResp, error) {
// pump client will write some empty Payload to detect whether pump is working, should avoid this
if in.Payload == nil {
ret := new(binlog.WriteBinlogResp)
return ret, nil
}

atomic.AddInt64(&s.writeBinlogCount, 1)
return s.writeBinlog(ctx, in, false)
}
Expand Down
8 changes: 0 additions & 8 deletions pump/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,6 @@ type writeBinlogSuite struct{}

var _ = Suite(&writeBinlogSuite{})

func (s *writeBinlogSuite) TestIgnoreEmptyRequest(c *C) {
server := &Server{}
resp, err := server.WriteBinlog(context.Background(), &binlog.WriteBinlogReq{})
c.Assert(resp, NotNil)
c.Assert(err, IsNil)
c.Assert(server.writeBinlogCount, Equals, int64(0))
}

func (s *writeBinlogSuite) TestReturnErrIfClusterIDMismatched(c *C) {
server := &Server{clusterID: 42}
req := &binlog.WriteBinlogReq{}
Expand Down
3 changes: 3 additions & 0 deletions pump/storage/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,7 @@ import "github.com/pingcap/errors"
var (
// ErrWrongMagic means the magic number mismatch
ErrWrongMagic = errors.New("wrong magic")

// ErrNoAvailableSpace means no available space
ErrNoAvailableSpace = errors.New("no available space")
)
79 changes: 64 additions & 15 deletions pump/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ const (
maxTxnTimeoutSecond int64 = 600
chanCapacity = 1 << 20
// if pump takes a long time to write binlog, pump will display the binlog meta information (unit: Second)
slowWriteThreshold = 1.0
slowWriteThreshold = 1.0
defaultStopWriteAtAvailableSpace = 10 * (1 << 30)
)

var (
Expand Down Expand Up @@ -81,8 +82,9 @@ var _ Storage = &Append{}

// Append implement the Storage interface
type Append struct {
dir string
vlog *valueLog
dir string
vlog *valueLog
storageSize storageSize

metadata *leveldb.DB
sorter *sorter
Expand Down Expand Up @@ -224,6 +226,11 @@ func NewAppendWithResolver(dir string, options *Options, tiStore kv.Storage, tiL
}

append.wg.Add(1)
err = append.updateSize()
if err != nil {
return nil, errors.Trace(err)
}

go append.updateStatus()
return
}
Expand Down Expand Up @@ -307,6 +314,20 @@ func (a *Append) handleSortItem(items <-chan sortItem) (quit chan struct{}) {
return quit
}

func (a *Append) updateSize() error {
size, err := getStorageSize(a.dir)
if err != nil {
return errors.Annotatef(err, "update storage size failed, dir: %s", a.dir)
}

storageSizeGauge.WithLabelValues("capacity").Set(float64(size.capacity))
storageSizeGauge.WithLabelValues("available").Set(float64(size.available))

atomic.StoreUint64(&a.storageSize.available, size.available)
atomic.StoreUint64(&a.storageSize.capacity, size.capacity)
return nil
}

func (a *Append) updateStatus() {
defer a.wg.Done()

Expand Down Expand Up @@ -336,12 +357,9 @@ func (a *Append) updateStatus() {
atomic.StoreInt64(&a.latestTS, ts)
}
case <-updateSize:
size, err := getStorageSize(a.dir)
err := a.updateSize()
if err != nil {
log.Error("update sotrage size failed", zap.Error(err))
} else {
storageSizeGauge.WithLabelValues("capacity").Set(float64(size.capacity))
storageSizeGauge.WithLabelValues("available").Set(float64(size.available))
log.Error("update size failed", zap.Error(err))
}
case <-logStatsTicker.C:
var stats leveldb.DBStats
Expand All @@ -358,6 +376,10 @@ func (a *Append) updateStatus() {
}
}

func (a *Append) writableOfSpace() bool {
return atomic.LoadUint64(&a.storageSize.available) > a.options.StopWriteAtAvailableSpace
}

// Write a commit binlog myself if the status is committed,
// otherwise we can just ignore it, we will not get the commit binlog while iterating the kv by ts
func (a *Append) writeCBinlog(pbinlog *pb.Binlog, commitTS int64) error {
Expand Down Expand Up @@ -683,8 +705,25 @@ func (a *Append) MaxCommitTS() int64 {
return atomic.LoadInt64(&a.maxCommitTS)
}

func isFakeBinlog(binlog *pb.Binlog) bool {
return binlog.StartTs > 0 && binlog.StartTs == binlog.CommitTs
}

// WriteBinlog implement Storage.WriteBinlog
func (a *Append) WriteBinlog(binlog *pb.Binlog) error {
if !a.writableOfSpace() {
// still accept fake binlog, so will not block drainer if fake binlog writes success
if !isFakeBinlog(binlog) {
return ErrNoAvailableSpace
}
}

// pump client will write some empty Payload to detect whether pump is working, should avoid this
// Unmarshal(nil) will success...
if binlog.StartTs == 0 && binlog.CommitTs == 0 {
return nil
}

return errors.Trace(a.writeBinlog(binlog).err)
}

Expand Down Expand Up @@ -1067,8 +1106,8 @@ func (a *Append) PullCommitBinlog(ctx context.Context, last int64) <-chan []byte
}

type storageSize struct {
capacity int
available int
capacity uint64
available uint64
}

func getStorageSize(dir string) (size storageSize, err error) {
Expand All @@ -1095,8 +1134,8 @@ func getStorageSize(dir string) (size storageSize, err error) {
}

// Available blocks * size per block = available space in bytes
size.available = int(stat.Bavail) * int(bSize)
size.capacity = int(stat.Blocks) * int(bSize)
size.available = stat.Bavail * bSize
size.capacity = stat.Blocks * bSize

return
}
Expand All @@ -1105,9 +1144,10 @@ func getStorageSize(dir string) (size storageSize, err error) {
type Config struct {
SyncLog *bool `toml:"sync-log" json:"sync-log"`
// the channel to buffer binlog meta, pump will block write binlog request if the channel is full
KVChanCapacity int `toml:"kv_chan_cap" json:"kv_chan_cap"`
SlowWriteThreshold float64 `toml:"slow_write_threshold" json:"slow_write_threshold"`
KV *KVConfig `toml:"kv" json:"kv"`
KVChanCapacity int `toml:"kv_chan_cap" json:"kv_chan_cap"`
SlowWriteThreshold float64 `toml:"slow_write_threshold" json:"slow_write_threshold"`
KV *KVConfig `toml:"kv" json:"kv"`
StopWriteAtAvailableSpace *HumanizeBytes `toml:"stop-write-at-available-space" json:"stop-write-at-available-space"`
}

// GetKVChanCapacity return kv_chan_cap config option
Expand All @@ -1128,6 +1168,15 @@ func (c *Config) GetSlowWriteThreshold() float64 {
return c.SlowWriteThreshold
}

// GetStopWriteAtAvailableSpace return stop write available space
func (c *Config) GetStopWriteAtAvailableSpace() uint64 {
if c.StopWriteAtAvailableSpace == nil {
return defaultStopWriteAtAvailableSpace
}

return c.StopWriteAtAvailableSpace.Uint64()
}

// GetSyncLog return sync-log config option
func (c *Config) GetSyncLog() bool {
if c.SyncLog == nil {
Expand Down
29 changes: 29 additions & 0 deletions pump/storage/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ package storage
import (
"encoding/binary"
"sync/atomic"

"github.com/dustin/go-humanize"
"github.com/pingcap/errors"
)

var tsKeyPrefix = []byte("ts:")
Expand All @@ -38,6 +41,32 @@ func encodeTSKey(ts int64) []byte {
return buf
}

// HumanizeBytes is used for humanize configure
type HumanizeBytes uint64

// Uint64 return bytes
func (b HumanizeBytes) Uint64() uint64 {
return uint64(b)
}

// UnmarshalText implements UnmarshalText
func (b *HumanizeBytes) UnmarshalText(text []byte) error {
var err error

if len(text) == 0 {
*b = 0
return nil
}

n, err := humanize.ParseBytes(string(text))
if err != nil {
return errors.Annotatef(err, "text: %s", string(text))
}

*b = HumanizeBytes(n)
return nil
}

// test helper
type memOracle struct {
ts int64
Expand Down
20 changes: 20 additions & 0 deletions pump/storage/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"sort"
"testing"

"github.com/BurntSushi/toml"
"github.com/pingcap/check"
)

Expand Down Expand Up @@ -52,3 +53,22 @@ func (e *EncodeTSKeySuite) TestEncodeTSKey(c *check.C) {

c.Assert(sorted, check.IsTrue)
}

type UtilSuite struct{}

var _ = check.Suite(&UtilSuite{})

func (u *UtilSuite) TestHumanizeBytes(c *check.C) {
var s = struct {
DiskSize HumanizeBytes `toml:"disk_size" json:"disk_size"`
}{}

tomlData := `
disk_size = "42 MB"
`

_, err := toml.Decode(tomlData, &s)
c.Assert(err, check.IsNil)
c.Assert(s.DiskSize.Uint64(), check.Equals, uint64(42*1000*1000))
}
15 changes: 11 additions & 4 deletions pump/storage/vlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ const (

// Options is the config options of Append and vlog
type Options struct {
ValueLogFileSize int64
Sync bool
KVChanCapacity int
SlowWriteThreshold float64
ValueLogFileSize int64
Sync bool
KVChanCapacity int
SlowWriteThreshold float64
StopWriteAtAvailableSpace uint64

KVConfig *KVConfig
}
Expand All @@ -66,6 +67,12 @@ func (o *Options) WithKVConfig(kvConfig *KVConfig) *Options {
return o
}

// WithStopWriteAtAvailableSpace set the Config
func (o *Options) WithStopWriteAtAvailableSpace(bytes uint64) *Options {
o.StopWriteAtAvailableSpace = bytes
return o
}

// WithSlowWriteThreshold set the Config
func (o *Options) WithSlowWriteThreshold(threshold float64) *Options {
o.SlowWriteThreshold = threshold
Expand Down

0 comments on commit 0aec623

Please sign in to comment.