Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pump: Add support to stop write at a limit avaliable space #647

Merged
merged 6 commits into from
Jun 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cmd/pump/pump.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ pd-urls = "http://127.0.0.1:2379"
# [storage]
# Set to `true` (default) for best reliability, which prevents data loss when there is a power failure.
# sync-log = true

# stop write when disk available space less then the configured size
# 42 MB -> 42000000, 42 mib -> 44040192
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one gib is too small, use 10% or 10 gib is better

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it may be annoying to set such a limit in a test or development env, how about default 0 and set
10 gib if deployed by ansible ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if so, use 10% is better, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update to be 10 gib in ac51fc0

# default: 10 gib
# stop-write-at-available-space = "10 gib"

#
# we suggest using the default config of the embedded LSM DB now, do not change it useless you know what you are doing
# [storage.kv]
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/Shopify/toxiproxy v2.1.3+incompatible // indirect
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd // indirect
github.com/coreos/etcd v3.3.0-rc.0.0.20180530235116-2b3aa7e1d49d+incompatible
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/dustin/go-humanize v1.0.0
github.com/eapache/go-resiliency v1.1.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect
github.com/eapache/queue v0.0.0-20180227141424-093482f3f8ce // indirect
Expand Down
7 changes: 1 addition & 6 deletions pump/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func NewServer(cfg *Config) (*Server, error) {
options = options.WithSync(cfg.Storage.GetSyncLog())
options = options.WithKVChanCapacity(cfg.Storage.GetKVChanCapacity())
options = options.WithSlowWriteThreshold(cfg.Storage.GetSlowWriteThreshold())
options = options.WithStopWriteAtAvailableSpace(cfg.Storage.GetStopWriteAtAvailableSpace())

storage, err := storage.NewAppendWithResolver(cfg.DataDir, options, tiStore, lockResolver)
if err != nil {
Expand Down Expand Up @@ -179,12 +180,6 @@ func NewServer(cfg *Config) (*Server, error) {

// WriteBinlog implements the gRPC interface of pump server
func (s *Server) WriteBinlog(ctx context.Context, in *binlog.WriteBinlogReq) (*binlog.WriteBinlogResp, error) {
// pump client will write some empty Payload to detect whether pump is working, should avoid this
if in.Payload == nil {
ret := new(binlog.WriteBinlogResp)
return ret, nil
}

atomic.AddInt64(&s.writeBinlogCount, 1)
return s.writeBinlog(ctx, in, false)
}
Expand Down
8 changes: 0 additions & 8 deletions pump/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,6 @@ type writeBinlogSuite struct{}

var _ = Suite(&writeBinlogSuite{})

func (s *writeBinlogSuite) TestIgnoreEmptyRequest(c *C) {
server := &Server{}
resp, err := server.WriteBinlog(context.Background(), &binlog.WriteBinlogReq{})
c.Assert(resp, NotNil)
c.Assert(err, IsNil)
c.Assert(server.writeBinlogCount, Equals, int64(0))
}

func (s *writeBinlogSuite) TestReturnErrIfClusterIDMismatched(c *C) {
server := &Server{clusterID: 42}
req := &binlog.WriteBinlogReq{}
Expand Down
3 changes: 3 additions & 0 deletions pump/storage/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,7 @@ import "github.com/pingcap/errors"
var (
// ErrWrongMagic means the magic number mismatch
ErrWrongMagic = errors.New("wrong magic")

// ErrNoAvailableSpace means no available space
ErrNoAvailableSpace = errors.New("no available space")
)
79 changes: 64 additions & 15 deletions pump/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ const (
maxTxnTimeoutSecond int64 = 600
chanCapacity = 1 << 20
// if pump takes a long time to write binlog, pump will display the binlog meta information (unit: Second)
slowWriteThreshold = 1.0
slowWriteThreshold = 1.0
defaultStopWriteAtAvailableSpace = 10 * (1 << 30)
)

var (
Expand Down Expand Up @@ -81,8 +82,9 @@ var _ Storage = &Append{}

// Append implement the Storage interface
type Append struct {
dir string
vlog *valueLog
dir string
vlog *valueLog
storageSize storageSize

metadata *leveldb.DB
sorter *sorter
Expand Down Expand Up @@ -224,6 +226,11 @@ func NewAppendWithResolver(dir string, options *Options, tiStore kv.Storage, tiL
}

append.wg.Add(1)
err = append.updateSize()
if err != nil {
return nil, errors.Trace(err)
}

go append.updateStatus()
return
}
Expand Down Expand Up @@ -307,6 +314,20 @@ func (a *Append) handleSortItem(items <-chan sortItem) (quit chan struct{}) {
return quit
}

func (a *Append) updateSize() error {
size, err := getStorageSize(a.dir)
if err != nil {
return errors.Annotatef(err, "update storage size failed, dir: %s", a.dir)
}

storageSizeGauge.WithLabelValues("capacity").Set(float64(size.capacity))
storageSizeGauge.WithLabelValues("available").Set(float64(size.available))

atomic.StoreUint64(&a.storageSize.available, size.available)
atomic.StoreUint64(&a.storageSize.capacity, size.capacity)
return nil
}

func (a *Append) updateStatus() {
defer a.wg.Done()

Expand Down Expand Up @@ -336,12 +357,9 @@ func (a *Append) updateStatus() {
atomic.StoreInt64(&a.latestTS, ts)
}
case <-updateSize:
size, err := getStorageSize(a.dir)
err := a.updateSize()
if err != nil {
log.Error("update sotrage size failed", zap.Error(err))
} else {
storageSizeGauge.WithLabelValues("capacity").Set(float64(size.capacity))
storageSizeGauge.WithLabelValues("available").Set(float64(size.available))
log.Error("update size failed", zap.Error(err))
}
case <-logStatsTicker.C:
var stats leveldb.DBStats
Expand All @@ -358,6 +376,10 @@ func (a *Append) updateStatus() {
}
}

func (a *Append) writableOfSpace() bool {
return atomic.LoadUint64(&a.storageSize.available) > a.options.StopWriteAtAvailableSpace
}

// Write a commit binlog myself if the status is committed,
// otherwise we can just ignore it, we will not get the commit binlog while iterating the kv by ts
func (a *Append) writeCBinlog(pbinlog *pb.Binlog, commitTS int64) error {
Expand Down Expand Up @@ -683,8 +705,25 @@ func (a *Append) MaxCommitTS() int64 {
return atomic.LoadInt64(&a.maxCommitTS)
}

func isFakeBinlog(binlog *pb.Binlog) bool {
return binlog.StartTs > 0 && binlog.StartTs == binlog.CommitTs
}

// WriteBinlog implement Storage.WriteBinlog
func (a *Append) WriteBinlog(binlog *pb.Binlog) error {
if !a.writableOfSpace() {
// still accept fake binlog, so will not block drainer if fake binlog writes success
if !isFakeBinlog(binlog) {
return ErrNoAvailableSpace
}
}

// pump client will write some empty Payload to detect whether pump is working, should avoid this
// Unmarshal(nil) will success...
if binlog.StartTs == 0 && binlog.CommitTs == 0 {
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

return errors.Trace(a.writeBinlog(binlog).err)
}

Expand Down Expand Up @@ -1067,8 +1106,8 @@ func (a *Append) PullCommitBinlog(ctx context.Context, last int64) <-chan []byte
}

type storageSize struct {
capacity int
available int
capacity uint64
available uint64
}

func getStorageSize(dir string) (size storageSize, err error) {
Expand All @@ -1095,8 +1134,8 @@ func getStorageSize(dir string) (size storageSize, err error) {
}

// Available blocks * size per block = available space in bytes
size.available = int(stat.Bavail) * int(bSize)
size.capacity = int(stat.Blocks) * int(bSize)
size.available = stat.Bavail * bSize
size.capacity = stat.Blocks * bSize

return
}
Expand All @@ -1105,9 +1144,10 @@ func getStorageSize(dir string) (size storageSize, err error) {
type Config struct {
SyncLog *bool `toml:"sync-log" json:"sync-log"`
// the channel to buffer binlog meta, pump will block write binlog request if the channel is full
KVChanCapacity int `toml:"kv_chan_cap" json:"kv_chan_cap"`
SlowWriteThreshold float64 `toml:"slow_write_threshold" json:"slow_write_threshold"`
KV *KVConfig `toml:"kv" json:"kv"`
KVChanCapacity int `toml:"kv_chan_cap" json:"kv_chan_cap"`
SlowWriteThreshold float64 `toml:"slow_write_threshold" json:"slow_write_threshold"`
KV *KVConfig `toml:"kv" json:"kv"`
StopWriteAtAvailableSpace *HumanizeBytes `toml:"stop-write-at-available-space" json:"stop-write-at-available-space"`
}

// GetKVChanCapacity return kv_chan_cap config option
Expand All @@ -1128,6 +1168,15 @@ func (c *Config) GetSlowWriteThreshold() float64 {
return c.SlowWriteThreshold
}

// GetStopWriteAtAvailableSpace return stop write available space
func (c *Config) GetStopWriteAtAvailableSpace() uint64 {
if c.StopWriteAtAvailableSpace == nil {
return defaultStopWriteAtAvailableSpace
}

return c.StopWriteAtAvailableSpace.Uint64()
}

// GetSyncLog return sync-log config option
func (c *Config) GetSyncLog() bool {
if c.SyncLog == nil {
Expand Down
29 changes: 29 additions & 0 deletions pump/storage/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ package storage
import (
"encoding/binary"
"sync/atomic"

"github.com/dustin/go-humanize"
"github.com/pingcap/errors"
)

var tsKeyPrefix = []byte("ts:")
Expand All @@ -38,6 +41,32 @@ func encodeTSKey(ts int64) []byte {
return buf
}

// HumanizeBytes is used for humanize configure
type HumanizeBytes uint64

// Uint64 return bytes
func (b HumanizeBytes) Uint64() uint64 {
return uint64(b)
}

// UnmarshalText implements UnmarshalText
func (b *HumanizeBytes) UnmarshalText(text []byte) error {
var err error

if len(text) == 0 {
*b = 0
return nil
}

n, err := humanize.ParseBytes(string(text))
july2993 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return errors.Annotatef(err, "text: %s", string(text))
}

*b = HumanizeBytes(n)
return nil
}

// test helper
type memOracle struct {
ts int64
Expand Down
20 changes: 20 additions & 0 deletions pump/storage/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"sort"
"testing"

"github.com/BurntSushi/toml"
"github.com/pingcap/check"
)

Expand Down Expand Up @@ -52,3 +53,22 @@ func (e *EncodeTSKeySuite) TestEncodeTSKey(c *check.C) {

c.Assert(sorted, check.IsTrue)
}

type UtilSuite struct{}

var _ = check.Suite(&UtilSuite{})

func (u *UtilSuite) TestHumanizeBytes(c *check.C) {
var s = struct {
DiskSize HumanizeBytes `toml:"disk_size" json:"disk_size"`
}{}

tomlData := `
disk_size = "42 MB"

`

_, err := toml.Decode(tomlData, &s)
c.Assert(err, check.IsNil)
c.Assert(s.DiskSize.Uint64(), check.Equals, uint64(42*1000*1000))
}
15 changes: 11 additions & 4 deletions pump/storage/vlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ const (

// Options is the config options of Append and vlog
type Options struct {
ValueLogFileSize int64
Sync bool
KVChanCapacity int
SlowWriteThreshold float64
ValueLogFileSize int64
Sync bool
KVChanCapacity int
SlowWriteThreshold float64
StopWriteAtAvailableSpace uint64

KVConfig *KVConfig
}
Expand All @@ -66,6 +67,12 @@ func (o *Options) WithKVConfig(kvConfig *KVConfig) *Options {
return o
}

// WithStopWriteAtAvailableSpace set the Config
func (o *Options) WithStopWriteAtAvailableSpace(bytes uint64) *Options {
o.StopWriteAtAvailableSpace = bytes
return o
}

// WithSlowWriteThreshold set the Config
func (o *Options) WithSlowWriteThreshold(threshold float64) *Options {
o.SlowWriteThreshold = threshold
Expand Down