From e784f3318d59553287b86cad5aa3e85242cdf261 Mon Sep 17 00:00:00 2001 From: Jiahao Huang Date: Sat, 22 Jun 2019 17:21:36 +0800 Subject: [PATCH 1/4] pump: Add support to stop write at a limit avaliable space --- cmd/pump/pump.toml | 6 +++ go.mod | 2 +- pump/server.go | 7 +--- pump/storage/errors.go | 3 ++ pump/storage/storage.go | 78 +++++++++++++++++++++++++++++++-------- pump/storage/util.go | 26 +++++++++++++ pump/storage/util_test.go | 20 ++++++++++ pump/storage/vlog.go | 15 ++++++-- 8 files changed, 131 insertions(+), 26 deletions(-) diff --git a/cmd/pump/pump.toml b/cmd/pump/pump.toml index 65a906f0e..a0af5727f 100644 --- a/cmd/pump/pump.toml +++ b/cmd/pump/pump.toml @@ -30,6 +30,12 @@ pd-urls = "http://127.0.0.1:2379" # [storage] # Set to `true` (default) for best reliability, which prevents data loss when there is a power failure. # sync-log = true + +# stop write when disk available space less then the configured size +# 42 MB -> 42000000, 42 mib -> 44040192 +# default: 1 gib +# stop-write-at-available-space = "1 gib" + # # we suggest using the default config of the embedded LSM DB now, do not change it useless you know what you are doing # [storage.kv] diff --git a/go.mod b/go.mod index ff68bda7e..76a16f497 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/Shopify/toxiproxy v2.1.3+incompatible // indirect github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd // indirect github.com/coreos/etcd v3.3.0-rc.0.0.20180530235116-2b3aa7e1d49d+incompatible - github.com/dustin/go-humanize v1.0.0 // indirect + github.com/dustin/go-humanize v1.0.0 github.com/eapache/go-resiliency v1.1.0 // indirect github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect github.com/eapache/queue v0.0.0-20180227141424-093482f3f8ce // indirect diff --git a/pump/server.go b/pump/server.go index e09bdfbe8..9074ab126 100644 --- a/pump/server.go +++ b/pump/server.go @@ -146,6 +146,7 @@ func NewServer(cfg *Config) (*Server, error) { options = options.WithSync(cfg.Storage.GetSyncLog()) options = options.WithKVChanCapacity(cfg.Storage.GetKVChanCapacity()) options = options.WithSlowWriteThreshold(cfg.Storage.GetSlowWriteThreshold()) + options = options.WithStopWriteAtAvailableSpace(cfg.Storage.GetStopWriteAtAvailableSpace()) storage, err := storage.NewAppendWithResolver(cfg.DataDir, options, tiStore, lockResolver) if err != nil { @@ -177,12 +178,6 @@ func NewServer(cfg *Config) (*Server, error) { // WriteBinlog implements the gRPC interface of pump server func (s *Server) WriteBinlog(ctx context.Context, in *binlog.WriteBinlogReq) (*binlog.WriteBinlogResp, error) { - // pump client will write some empty Payload to detect whether pump is working, should avoid this - if in.Payload == nil { - ret := new(binlog.WriteBinlogResp) - return ret, nil - } - atomic.AddInt64(&s.writeBinlogCount, 1) return s.writeBinlog(ctx, in, false) } diff --git a/pump/storage/errors.go b/pump/storage/errors.go index 7b675b832..0c462244b 100644 --- a/pump/storage/errors.go +++ b/pump/storage/errors.go @@ -18,4 +18,7 @@ import "github.com/pingcap/errors" var ( // ErrWrongMagic means the magic number mismatch ErrWrongMagic = errors.New("wrong magic") + + // ErrNoAvailableSpace means no available space + ErrNoAvailableSpace = errors.New("no available space") ) diff --git a/pump/storage/storage.go b/pump/storage/storage.go index 8e5d0c293..ff07b5246 100644 --- a/pump/storage/storage.go +++ b/pump/storage/storage.go @@ -43,7 +43,8 @@ const ( maxTxnTimeoutSecond int64 = 600 chanCapacity = 1 << 20 // if pump takes a long time to write binlog, pump will display the binlog meta information (unit: Second) - slowWriteThreshold = 1.0 + slowWriteThreshold = 1.0 + defaultStopWriteAtAvailableSpace = 1 << 30 ) var ( @@ -80,8 +81,9 @@ var _ Storage = &Append{} // Append implement the Storage interface type Append struct { - dir string - vlog *valueLog + dir string + vlog *valueLog + storageSize storageSize metadata *leveldb.DB sorter *sorter @@ -223,6 +225,11 @@ func NewAppendWithResolver(dir string, options *Options, tiStore kv.Storage, tiL } append.wg.Add(1) + err = append.updateSize() + if err != nil { + return nil, errors.Trace(err) + } + go append.updateStatus() return } @@ -306,6 +313,19 @@ func (a *Append) handleSortItem(items <-chan sortItem) (quit chan struct{}) { return quit } +func (a *Append) updateSize() error { + size, err := getStorageSize(a.dir) + if err != nil { + return errors.Annotate(err, "update storage size failed") + } else { + storageSizeGauge.WithLabelValues("capacity").Set(float64(size.capacity)) + storageSizeGauge.WithLabelValues("available").Set(float64(size.available)) + } + atomic.StoreUint64(&a.storageSize.available, size.available) + atomic.StoreUint64(&a.storageSize.capacity, size.capacity) + return nil +} + func (a *Append) updateStatus() { defer a.wg.Done() @@ -335,12 +355,9 @@ func (a *Append) updateStatus() { atomic.StoreInt64(&a.latestTS, ts) } case <-updateSize: - size, err := getStorageSize(a.dir) + err := a.updateSize() if err != nil { - log.Error("update sotrage size failed", zap.Error(err)) - } else { - storageSizeGauge.WithLabelValues("capacity").Set(float64(size.capacity)) - storageSizeGauge.WithLabelValues("available").Set(float64(size.available)) + log.Error("update size failed", zap.Error(err)) } case <-logStatsTicker.C: var stats leveldb.DBStats @@ -357,6 +374,10 @@ func (a *Append) updateStatus() { } } +func (a *Append) writableOfSpace() bool { + return atomic.LoadUint64(&a.storageSize.available) > a.options.StopWriteAtAvailableSpace +} + // Write a commit binlog myself if the status is committed, // otherwise we can just ignore it, we will not get the commit binlog while iterating the kv by ts func (a *Append) writeCBinlog(pbinlog *pb.Binlog, commitTS int64) error { @@ -665,8 +686,25 @@ func (a *Append) MaxCommitTS() int64 { return atomic.LoadInt64(&a.maxCommitTS) } +func isFakeBinlog(binlog *pb.Binlog) bool { + return binlog.StartTs > 0 && binlog.StartTs == binlog.CommitTs +} + // WriteBinlog implement Storage.WriteBinlog func (a *Append) WriteBinlog(binlog *pb.Binlog) error { + if !a.writableOfSpace() { + // still accept fake binlog, so will not block drainer if fake binlog writes success + if !isFakeBinlog(binlog) { + return ErrNoAvailableSpace + } + } + + // pump client will write some empty Payload to detect whether pump is working, should avoid this + // Unmarshal(nil) will success... + if binlog.StartTs == 0 && binlog.CommitTs == 0 { + return nil + } + return errors.Trace(a.writeBinlog(binlog).err) } @@ -1049,8 +1087,8 @@ func (a *Append) PullCommitBinlog(ctx context.Context, last int64) <-chan []byte } type storageSize struct { - capacity int - available int + capacity uint64 + available uint64 } func getStorageSize(dir string) (size storageSize, err error) { @@ -1077,8 +1115,8 @@ func getStorageSize(dir string) (size storageSize, err error) { } // Available blocks * size per block = available space in bytes - size.available = int(stat.Bavail) * int(bSize) - size.capacity = int(stat.Blocks) * int(bSize) + size.available = stat.Bavail * bSize + size.capacity = stat.Blocks * bSize return } @@ -1087,9 +1125,10 @@ func getStorageSize(dir string) (size storageSize, err error) { type Config struct { SyncLog *bool `toml:"sync-log" json:"sync-log"` // the channel to buffer binlog meta, pump will block write binlog request if the channel is full - KVChanCapacity int `toml:"kv_chan_cap" json:"kv_chan_cap"` - SlowWriteThreshold float64 `toml:"slow_write_threshold" json:"slow_write_threshold"` - KV *KVConfig `toml:"kv" json:"kv"` + KVChanCapacity int `toml:"kv_chan_cap" json:"kv_chan_cap"` + SlowWriteThreshold float64 `toml:"slow_write_threshold" json:"slow_write_threshold"` + KV *KVConfig `toml:"kv" json:"kv"` + StopWriteAtAvailableSpace *HumanizeBytes `toml:"stop-write-at-available-space" json:"stop-write-at-available-space"` } // GetKVChanCapacity return kv_chan_cap config option @@ -1110,6 +1149,15 @@ func (c *Config) GetSlowWriteThreshold() float64 { return c.SlowWriteThreshold } +// GetStopWriteAtAvailableSpace return stop write available space +func (c *Config) GetStopWriteAtAvailableSpace() uint64 { + if c.StopWriteAtAvailableSpace == nil { + return defaultStopWriteAtAvailableSpace + } + + return c.StopWriteAtAvailableSpace.Uint64() +} + // GetSyncLog return sync-log config option func (c *Config) GetSyncLog() bool { if c.SyncLog == nil { diff --git a/pump/storage/util.go b/pump/storage/util.go index a2d28f3de..068f06df9 100644 --- a/pump/storage/util.go +++ b/pump/storage/util.go @@ -16,6 +16,8 @@ package storage import ( "encoding/binary" "sync/atomic" + + "github.com/dustin/go-humanize" ) var tsKeyPrefix = []byte("ts:") @@ -38,6 +40,30 @@ func encodeTSKey(ts int64) []byte { return buf } +type HumanizeBytes uint64 + +func (b HumanizeBytes) Uint64() uint64 { + return uint64(b) +} + +// UnmarshalText implements UnmarshalText +func (b *HumanizeBytes) UnmarshalText(text []byte) error { + var err error + + if len(text) == 0 { + *b = 0 + return nil + } + + n, err := humanize.ParseBytes(string(text)) + if err != nil { + return err + } + + *b = HumanizeBytes(n) + return nil +} + // test helper type memOracle struct { ts int64 diff --git a/pump/storage/util_test.go b/pump/storage/util_test.go index b9b17d003..b2e7512e6 100644 --- a/pump/storage/util_test.go +++ b/pump/storage/util_test.go @@ -18,6 +18,7 @@ import ( "sort" "testing" + "github.com/BurntSushi/toml" "github.com/pingcap/check" ) @@ -52,3 +53,22 @@ func (e *EncodeTSKeySuite) TestEncodeTSKey(c *check.C) { c.Assert(sorted, check.IsTrue) } + +type UtilSuite struct{} + +var _ = check.Suite(&UtilSuite{}) + +func (u *UtilSuite) TestHumanizeBytes(c *check.C) { + var s = struct { + DiskSize HumanizeBytes `toml:"disk_size" json:"disk_size"` + }{} + + tomlData := ` +disk_size = "42 MB" + + ` + + _, err := toml.Decode(tomlData, &s) + c.Assert(err, check.IsNil) + c.Assert(s.DiskSize.Uint64(), check.Equals, uint64(42*1000*1000)) +} diff --git a/pump/storage/vlog.go b/pump/storage/vlog.go index 840d568fd..88c735414 100644 --- a/pump/storage/vlog.go +++ b/pump/storage/vlog.go @@ -42,10 +42,11 @@ const ( // Options is the config options of Append and vlog type Options struct { - ValueLogFileSize int64 - Sync bool - KVChanCapacity int - SlowWriteThreshold float64 + ValueLogFileSize int64 + Sync bool + KVChanCapacity int + SlowWriteThreshold float64 + StopWriteAtAvailableSpace uint64 KVConfig *KVConfig } @@ -66,6 +67,12 @@ func (o *Options) WithKVConfig(kvConfig *KVConfig) *Options { return o } +// WithStopWriteAtAvailableSpace set the Config +func (o *Options) WithStopWriteAtAvailableSpace(bytes uint64) *Options { + o.StopWriteAtAvailableSpace = bytes + return o +} + // WithSlowWriteThreshold set the Config func (o *Options) WithSlowWriteThreshold(threshold float64) *Options { o.SlowWriteThreshold = threshold From 8d52bbf963d6d5164cbdffd7a48fd8d5abaa22fa Mon Sep 17 00:00:00 2001 From: Jiahao Huang Date: Sat, 22 Jun 2019 18:00:52 +0800 Subject: [PATCH 2/4] Fix check and test --- pump/server_test.go | 8 -------- pump/storage/storage.go | 7 ++++--- pump/storage/util.go | 2 ++ 3 files changed, 6 insertions(+), 11 deletions(-) diff --git a/pump/server_test.go b/pump/server_test.go index 137c8c0e5..15d2757a7 100644 --- a/pump/server_test.go +++ b/pump/server_test.go @@ -46,14 +46,6 @@ type writeBinlogSuite struct{} var _ = Suite(&writeBinlogSuite{}) -func (s *writeBinlogSuite) TestIgnoreEmptyRequest(c *C) { - server := &Server{} - resp, err := server.WriteBinlog(context.Background(), &binlog.WriteBinlogReq{}) - c.Assert(resp, NotNil) - c.Assert(err, IsNil) - c.Assert(server.writeBinlogCount, Equals, int64(0)) -} - func (s *writeBinlogSuite) TestReturnErrIfClusterIDMismatched(c *C) { server := &Server{clusterID: 42} req := &binlog.WriteBinlogReq{} diff --git a/pump/storage/storage.go b/pump/storage/storage.go index ff07b5246..00797c8eb 100644 --- a/pump/storage/storage.go +++ b/pump/storage/storage.go @@ -317,10 +317,11 @@ func (a *Append) updateSize() error { size, err := getStorageSize(a.dir) if err != nil { return errors.Annotate(err, "update storage size failed") - } else { - storageSizeGauge.WithLabelValues("capacity").Set(float64(size.capacity)) - storageSizeGauge.WithLabelValues("available").Set(float64(size.available)) } + + storageSizeGauge.WithLabelValues("capacity").Set(float64(size.capacity)) + storageSizeGauge.WithLabelValues("available").Set(float64(size.available)) + atomic.StoreUint64(&a.storageSize.available, size.available) atomic.StoreUint64(&a.storageSize.capacity, size.capacity) return nil diff --git a/pump/storage/util.go b/pump/storage/util.go index 068f06df9..fad831593 100644 --- a/pump/storage/util.go +++ b/pump/storage/util.go @@ -40,8 +40,10 @@ func encodeTSKey(ts int64) []byte { return buf } +// HumanizeBytes is used for humanize configure type HumanizeBytes uint64 +// Uint64 return bytes func (b HumanizeBytes) Uint64() uint64 { return uint64(b) } From ac51fc0d36c528f1b232887bd9f04700792ee0b2 Mon Sep 17 00:00:00 2001 From: Jiahao Huang Date: Wed, 26 Jun 2019 17:02:53 +0800 Subject: [PATCH 3/4] update limit to 10 gib and refine log --- cmd/pump/pump.toml | 4 ++-- pump/storage/storage.go | 4 ++-- pump/storage/util.go | 3 ++- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/cmd/pump/pump.toml b/cmd/pump/pump.toml index a0af5727f..28ffa75ea 100644 --- a/cmd/pump/pump.toml +++ b/cmd/pump/pump.toml @@ -33,8 +33,8 @@ pd-urls = "http://127.0.0.1:2379" # stop write when disk available space less then the configured size # 42 MB -> 42000000, 42 mib -> 44040192 -# default: 1 gib -# stop-write-at-available-space = "1 gib" +# default: 10 gib +# stop-write-at-available-space = "10 gib" # # we suggest using the default config of the embedded LSM DB now, do not change it useless you know what you are doing diff --git a/pump/storage/storage.go b/pump/storage/storage.go index 00797c8eb..9ba47384f 100644 --- a/pump/storage/storage.go +++ b/pump/storage/storage.go @@ -44,7 +44,7 @@ const ( chanCapacity = 1 << 20 // if pump takes a long time to write binlog, pump will display the binlog meta information (unit: Second) slowWriteThreshold = 1.0 - defaultStopWriteAtAvailableSpace = 1 << 30 + defaultStopWriteAtAvailableSpace = 10 * (1 << 30) ) var ( @@ -316,7 +316,7 @@ func (a *Append) handleSortItem(items <-chan sortItem) (quit chan struct{}) { func (a *Append) updateSize() error { size, err := getStorageSize(a.dir) if err != nil { - return errors.Annotate(err, "update storage size failed") + return errors.Annotatef(err, "update storage size failed, dir: %s", a.dir) } storageSizeGauge.WithLabelValues("capacity").Set(float64(size.capacity)) diff --git a/pump/storage/util.go b/pump/storage/util.go index fad831593..4f0d6521e 100644 --- a/pump/storage/util.go +++ b/pump/storage/util.go @@ -18,6 +18,7 @@ import ( "sync/atomic" "github.com/dustin/go-humanize" + "github.com/pingcap/errors" ) var tsKeyPrefix = []byte("ts:") @@ -59,7 +60,7 @@ func (b *HumanizeBytes) UnmarshalText(text []byte) error { n, err := humanize.ParseBytes(string(text)) if err != nil { - return err + return errors.Annotatef(err, "test: %s", string(text)) } *b = HumanizeBytes(n) From 42c172063a82bec7e5738e7a83fd7213767bf894 Mon Sep 17 00:00:00 2001 From: Jiahao Huang Date: Wed, 26 Jun 2019 17:16:26 +0800 Subject: [PATCH 4/4] fix typo --- pump/storage/util.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pump/storage/util.go b/pump/storage/util.go index 4f0d6521e..e4540beb5 100644 --- a/pump/storage/util.go +++ b/pump/storage/util.go @@ -60,7 +60,7 @@ func (b *HumanizeBytes) UnmarshalText(text []byte) error { n, err := humanize.ParseBytes(string(text)) if err != nil { - return errors.Annotatef(err, "test: %s", string(text)) + return errors.Annotatef(err, "text: %s", string(text)) } *b = HumanizeBytes(n)