From 84191c335e698924f10f68392397fa191455ebea Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Fri, 7 Aug 2020 19:22:58 +0800 Subject: [PATCH 01/11] support go time package format gc time --- pkg/util/util.go | 14 ++++++++++++++ pkg/util/util_test.go | 18 ++++++++++++++++++ pump/config.go | 8 ++++---- pump/server.go | 7 ++++++- 4 files changed, 42 insertions(+), 5 deletions(-) diff --git a/pkg/util/util.go b/pkg/util/util.go index e3f8499cc..6ec3dd590 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "net" + "strconv" "strings" "time" @@ -301,3 +302,16 @@ func WaitUntilTimeout(name string, fn func(), timeout time.Duration) { case <-exited: } } + +// ParseGCDuration parses gc durations. The default unit is day. +func ParseGCDuration(gc string) (time.Duration, error) { + d, err := strconv.ParseUint(gc, 10, 64) + if err == nil { + return time.Duration(d) * 24 * time.Hour, nil + } + gcDuration, err := time.ParseDuration(gc) + if err != nil { + return 0, errors.Errorf("unsupported gc time %s, err: %s. please use 7 or 8h(max unit is hour) format gc time", err, gc) + } + return gcDuration, nil +} \ No newline at end of file diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go index e09ee557d..ecd8e09d8 100644 --- a/pkg/util/util_test.go +++ b/pkg/util/util_test.go @@ -110,6 +110,24 @@ func (s *utilSuite) TestStdLogger(c *C) { c.Assert(entrys[2].Message, Matches, ".*hola:Goodbye!.*") } +func (s *utilSuite) TestParseGCDuration(c *C) { + gc := "7" + expectDuration := 7 * 24 * time.Hour + duration, err := ParseGCDuration(gc) + c.Assert(err, IsNil) + c.Assert(duration, Equals, expectDuration) + + gc = "30min" + expectDuration = 30 * time.Minute + duration, err = ParseGCDuration(gc) + c.Assert(err, IsNil) + c.Assert(duration, Equals, expectDuration) + + gc = "7d" + duration, err = ParseGCDuration(gc) + c.Assert(err, ErrorMatches, "unsupported gc time 7d, err: .* please use 7 or 8h(max unit is hour) format gc time") +} + type getAddrIPSuite struct{} var _ = Suite(&getAddrIPSuite{}) diff --git a/pump/config.go b/pump/config.go index 4838a4976..9046cb644 100644 --- a/pump/config.go +++ b/pump/config.go @@ -38,7 +38,7 @@ const ( defaultListenAddr = "127.0.0.1:8250" defautMaxMsgSize = math.MaxInt32 // max grpc message size defaultHeartbeatInterval = 2 - defaultGC = 7 + defaultGC = "7" defaultDataDir = "data.pump" // default interval time to generate fake binlog, the unit is second @@ -65,8 +65,8 @@ type Config struct { EtcdDialTimeout time.Duration DataDir string `toml:"data-dir" json:"data-dir"` HeartbeatInterval int `toml:"heartbeat-interval" json:"heartbeat-interval"` - // pump only stores binlog events whose ts >= current time - GC(day) - GC int `toml:"gc" json:"gc"` + // pump only stores binlog events whose ts >= current time - GC Time. The default unit is day + GC string `toml:"gc" json:"gc"` LogFile string `toml:"log-file" json:"log-file"` Security security.Config `toml:"security" json:"security"` @@ -100,7 +100,7 @@ func NewConfig() *Config { fs.StringVar(&cfg.EtcdURLs, "pd-urls", defaultEtcdURLs, "a comma separated list of the PD endpoints") fs.StringVar(&cfg.DataDir, "data-dir", "", "the path to store binlog data") fs.IntVar(&cfg.HeartbeatInterval, "heartbeat-interval", defaultHeartbeatInterval, "number of seconds between heartbeat ticks") - fs.IntVar(&cfg.GC, "gc", defaultGC, "recycle binlog files older than gc days") + fs.StringVar(&cfg.GC, "gc", defaultGC, "recycle binlog files older than gc time. default unit is day. also accept 8h format time(max unit is hour)") fs.StringVar(&cfg.LogLevel, "L", "info", "log level: debug, info, warn, error, fatal") fs.StringVar(&cfg.MetricsAddr, "metrics-addr", "", "prometheus pushgateway address, leaves it empty will disable prometheus push") fs.IntVar(&cfg.MetricsInterval, "metrics-interval", 15, "prometheus client push interval in second, set \"0\" to disable prometheus push") diff --git a/pump/server.go b/pump/server.go index 2e6d5034b..5023d3442 100644 --- a/pump/server.go +++ b/pump/server.go @@ -108,6 +108,11 @@ func init() { // NewServer returns a instance of pump server func NewServer(cfg *Config) (*Server, error) { + gcDuration, err := util.ParseGCDuration(cfg.GC) + if err != nil { + return nil, errors.Trace(err) + } + var metrics *util.MetricClient if cfg.MetricsAddr != "" && cfg.MetricsInterval != 0 { metrics = util.NewMetricClient( @@ -178,7 +183,7 @@ func NewServer(cfg *Config) (*Server, error) { cancel: cancel, metrics: metrics, tiStore: tiStore, - gcDuration: time.Duration(cfg.GC) * 24 * time.Hour, + gcDuration: gcDuration, pdCli: pdCli, cfg: cfg, triggerGC: make(chan time.Time), From e6c0248842528074fe006bd1a6d011c1641b79c2 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Fri, 7 Aug 2020 19:55:53 +0800 Subject: [PATCH 02/11] fix bug --- pkg/util/util.go | 2 +- pkg/util/util_test.go | 4 ++-- pump/config.go | 9 +++++++-- pump/config_test.go | 3 ++- pump/server.go | 7 +------ pump/server_test.go | 2 +- 6 files changed, 14 insertions(+), 13 deletions(-) diff --git a/pkg/util/util.go b/pkg/util/util.go index 6ec3dd590..aeddac1c1 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -311,7 +311,7 @@ func ParseGCDuration(gc string) (time.Duration, error) { } gcDuration, err := time.ParseDuration(gc) if err != nil { - return 0, errors.Errorf("unsupported gc time %s, err: %s. please use 7 or 8h(max unit is hour) format gc time", err, gc) + return 0, errors.Errorf("unsupported gc time %s, please use 7 or 8h(max unit is hour) format gc time. err: %s", gc, err) } return gcDuration, nil } \ No newline at end of file diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go index ecd8e09d8..d32f671a6 100644 --- a/pkg/util/util_test.go +++ b/pkg/util/util_test.go @@ -117,7 +117,7 @@ func (s *utilSuite) TestParseGCDuration(c *C) { c.Assert(err, IsNil) c.Assert(duration, Equals, expectDuration) - gc = "30min" + gc = "30m" expectDuration = 30 * time.Minute duration, err = ParseGCDuration(gc) c.Assert(err, IsNil) @@ -125,7 +125,7 @@ func (s *utilSuite) TestParseGCDuration(c *C) { gc = "7d" duration, err = ParseGCDuration(gc) - c.Assert(err, ErrorMatches, "unsupported gc time 7d, err: .* please use 7 or 8h(max unit is hour) format gc time") + c.Assert(err, ErrorMatches, `unsupported gc time 7d, please use 7 or 8h\(max unit is hour\) format gc time.*`) } type getAddrIPSuite struct{} diff --git a/pump/config.go b/pump/config.go index 9046cb644..2ca232b63 100644 --- a/pump/config.go +++ b/pump/config.go @@ -76,6 +76,7 @@ type Config struct { MetricsInterval int configFile string printVersion bool + GCDuration time.Duration `tome:"-" json:"-"` tls *tls.Config Storage storage.Config `toml:"storage" json:"storage"` } @@ -176,6 +177,10 @@ func (cfg *Config) Parse(arguments []string) error { util.AdjustString(&cfg.DataDir, defaultDataDir) util.AdjustInt(&cfg.HeartbeatInterval, defaultHeartbeatInterval) + cfg.GCDuration, err = util.ParseGCDuration(cfg.GC) + if err != nil { + return err + } return cfg.validate() } @@ -186,8 +191,8 @@ func (cfg *Config) configFromFile(path string) error { // validate checks whether the configuration is valid func (cfg *Config) validate() error { // check GC - if cfg.GC <= 0 { - return errors.Errorf("GC is %d, must bigger than 0", cfg.GC) + if cfg.GCDuration <= 0 { + return errors.Errorf("GC is %s, must bigger than 0", cfg.GC) } // check ListenAddr diff --git a/pump/config_test.go b/pump/config_test.go index 76dc6a6d4..abc538d67 100644 --- a/pump/config_test.go +++ b/pump/config_test.go @@ -18,6 +18,7 @@ import ( "io/ioutil" "os" "path" + "time" "github.com/BurntSushi/toml" . "github.com/pingcap/check" @@ -29,7 +30,7 @@ type testConfigSuite struct{} func (s *testConfigSuite) TestValidate(c *C) { cfg := Config{} - cfg.GC = 1 + cfg.GCDuration = 24 * time.Hour cfg.ListenAddr = "http://:8250" cfg.EtcdURLs = "http://192.168.10.23:7777" diff --git a/pump/server.go b/pump/server.go index 5023d3442..1de1be331 100644 --- a/pump/server.go +++ b/pump/server.go @@ -108,11 +108,6 @@ func init() { // NewServer returns a instance of pump server func NewServer(cfg *Config) (*Server, error) { - gcDuration, err := util.ParseGCDuration(cfg.GC) - if err != nil { - return nil, errors.Trace(err) - } - var metrics *util.MetricClient if cfg.MetricsAddr != "" && cfg.MetricsInterval != 0 { metrics = util.NewMetricClient( @@ -183,7 +178,7 @@ func NewServer(cfg *Config) (*Server, error) { cancel: cancel, metrics: metrics, tiStore: tiStore, - gcDuration: gcDuration, + gcDuration: cfg.GCDuration, pdCli: pdCli, cfg: cfg, triggerGC: make(chan time.Time), diff --git a/pump/server_test.go b/pump/server_test.go index d2e116db6..9defbfb62 100644 --- a/pump/server_test.go +++ b/pump/server_test.go @@ -719,7 +719,7 @@ func (s *startServerSuite) TestStartPumpServer(c *C) { ctx: ctx, cancel: cancel, tiStore: nil, - gcDuration: time.Duration(cfg.GC) * 24 * time.Hour, + gcDuration: 24 * time.Hour, pdCli: nil, cfg: cfg, triggerGC: make(chan time.Time), From 17c24aadb8ccb20ff9bf80d35f4d6544398b975d Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Fri, 7 Aug 2020 20:00:04 +0800 Subject: [PATCH 03/11] fix make check --- pkg/util/util.go | 2 +- pump/config.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/util/util.go b/pkg/util/util.go index aeddac1c1..d526125aa 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -314,4 +314,4 @@ func ParseGCDuration(gc string) (time.Duration, error) { return 0, errors.Errorf("unsupported gc time %s, please use 7 or 8h(max unit is hour) format gc time. err: %s", gc, err) } return gcDuration, nil -} \ No newline at end of file +} diff --git a/pump/config.go b/pump/config.go index 2ca232b63..2d2b04a70 100644 --- a/pump/config.go +++ b/pump/config.go @@ -66,7 +66,7 @@ type Config struct { DataDir string `toml:"data-dir" json:"data-dir"` HeartbeatInterval int `toml:"heartbeat-interval" json:"heartbeat-interval"` // pump only stores binlog events whose ts >= current time - GC Time. The default unit is day - GC string `toml:"gc" json:"gc"` + GC string `toml:"gc" json:"gc"` LogFile string `toml:"log-file" json:"log-file"` Security security.Config `toml:"security" json:"security"` From 06b75bcd336ebc9d193a77fcd05ce44074d1e3a0 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Fri, 7 Aug 2020 20:04:01 +0800 Subject: [PATCH 04/11] fix check --- pkg/util/util_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go index d32f671a6..bfa1f9fad 100644 --- a/pkg/util/util_test.go +++ b/pkg/util/util_test.go @@ -124,7 +124,7 @@ func (s *utilSuite) TestParseGCDuration(c *C) { c.Assert(duration, Equals, expectDuration) gc = "7d" - duration, err = ParseGCDuration(gc) + _, err = ParseGCDuration(gc) c.Assert(err, ErrorMatches, `unsupported gc time 7d, please use 7 or 8h\(max unit is hour\) format gc time.*`) } From 0ebd99ec168d6cfe23ce7ee2fb0dd73c64ef7fa6 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Fri, 7 Aug 2020 23:32:38 +0800 Subject: [PATCH 05/11] add marshal and related test --- pkg/util/duration.go | 80 +++++++++++++++++++++++++++++++++++++++ pkg/util/duration_test.go | 42 ++++++++++++++++++++ pkg/util/util.go | 14 ------- pkg/util/util_test.go | 18 --------- pump/config.go | 19 ++++++---- pump/config_test.go | 77 ++++++++++++++++++++++++++++++++++++- pump/server.go | 2 +- 7 files changed, 211 insertions(+), 41 deletions(-) create mode 100644 pkg/util/duration.go create mode 100644 pkg/util/duration_test.go diff --git a/pkg/util/duration.go b/pkg/util/duration.go new file mode 100644 index 000000000..80615b60a --- /dev/null +++ b/pkg/util/duration.go @@ -0,0 +1,80 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "fmt" + "strconv" + "time" + + "github.com/pingcap/errors" +) + +// Duration is a wrapper of time.Duration for TOML and JSON. +type Duration struct { + time.Duration +} + +// NewDuration creates a Duration from time.Duration. +func NewDuration(duration time.Duration) Duration { + return Duration{Duration: duration} +} + +// MarshalJSON returns the duration as a JSON string. +func (d *Duration) MarshalJSON() ([]byte, error) { + return []byte(fmt.Sprintf(`"%s"`, d.String())), nil +} + +// UnmarshalJSON parses a JSON string into the duration. +func (d *Duration) UnmarshalJSON(text []byte) error { + s, err := strconv.Unquote(string(text)) + if err != nil { + return errors.WithStack(err) + } + duration, err := time.ParseDuration(s) + if err != nil { + return errors.WithStack(err) + } + d.Duration = duration + return nil +} + +// UnmarshalText parses a TOML string into the duration. +func (d *Duration) UnmarshalText(text []byte) error { + var err error + duration, err := ParseDuration(string(text)) + if err != nil { + return errors.WithStack(err) + } + d.Duration = duration.Duration + return nil +} + +// MarshalText returns the duration as a JSON string. +func (d Duration) MarshalText() ([]byte, error) { + return []byte(d.String()), nil +} + +// ParseDuration parses gc durations. The default unit is day. +func ParseDuration(gc string) (Duration, error) { + d, err := strconv.ParseUint(gc, 10, 64) + if err == nil { + return Duration{time.Duration(d) * 24 * time.Hour}, nil + } + gcDuration, err := time.ParseDuration(gc) + if err != nil { + return Duration{0}, errors.Annotatef(err, "unsupported gc time %s, etc: use 7 for 7 day, 7h for 7 hour", gc) + } + return Duration{gcDuration}, nil +} \ No newline at end of file diff --git a/pkg/util/duration_test.go b/pkg/util/duration_test.go new file mode 100644 index 000000000..390bde499 --- /dev/null +++ b/pkg/util/duration_test.go @@ -0,0 +1,42 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "time" + + . "github.com/pingcap/check" +) + +type durationSuite struct{} + +var _ = Suite(&durationSuite{}) + +func (s *durationSuite) TestParseDuration(c *C) { + gc := "7" + expectDuration := NewDuration(7 * 24 * time.Hour) + duration, err := ParseDuration(gc) + c.Assert(err, IsNil) + c.Assert(duration, Equals, expectDuration) + + gc = "30m" + expectDuration = NewDuration(30 * time.Minute) + duration, err = ParseDuration(gc) + c.Assert(err, IsNil) + c.Assert(duration, Equals, expectDuration) + + gc = "7d" + _, err = ParseDuration(gc) + c.Assert(err, NotNil) +} diff --git a/pkg/util/util.go b/pkg/util/util.go index d526125aa..e3f8499cc 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -17,7 +17,6 @@ import ( "context" "fmt" "net" - "strconv" "strings" "time" @@ -302,16 +301,3 @@ func WaitUntilTimeout(name string, fn func(), timeout time.Duration) { case <-exited: } } - -// ParseGCDuration parses gc durations. The default unit is day. -func ParseGCDuration(gc string) (time.Duration, error) { - d, err := strconv.ParseUint(gc, 10, 64) - if err == nil { - return time.Duration(d) * 24 * time.Hour, nil - } - gcDuration, err := time.ParseDuration(gc) - if err != nil { - return 0, errors.Errorf("unsupported gc time %s, please use 7 or 8h(max unit is hour) format gc time. err: %s", gc, err) - } - return gcDuration, nil -} diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go index bfa1f9fad..e09ee557d 100644 --- a/pkg/util/util_test.go +++ b/pkg/util/util_test.go @@ -110,24 +110,6 @@ func (s *utilSuite) TestStdLogger(c *C) { c.Assert(entrys[2].Message, Matches, ".*hola:Goodbye!.*") } -func (s *utilSuite) TestParseGCDuration(c *C) { - gc := "7" - expectDuration := 7 * 24 * time.Hour - duration, err := ParseGCDuration(gc) - c.Assert(err, IsNil) - c.Assert(duration, Equals, expectDuration) - - gc = "30m" - expectDuration = 30 * time.Minute - duration, err = ParseGCDuration(gc) - c.Assert(err, IsNil) - c.Assert(duration, Equals, expectDuration) - - gc = "7d" - _, err = ParseGCDuration(gc) - c.Assert(err, ErrorMatches, `unsupported gc time 7d, please use 7 or 8h\(max unit is hour\) format gc time.*`) -} - type getAddrIPSuite struct{} var _ = Suite(&getAddrIPSuite{}) diff --git a/pump/config.go b/pump/config.go index 2d2b04a70..d0ece3c61 100644 --- a/pump/config.go +++ b/pump/config.go @@ -66,7 +66,7 @@ type Config struct { DataDir string `toml:"data-dir" json:"data-dir"` HeartbeatInterval int `toml:"heartbeat-interval" json:"heartbeat-interval"` // pump only stores binlog events whose ts >= current time - GC Time. The default unit is day - GC string `toml:"gc" json:"gc"` + GC util.Duration `toml:"gc" json:"gc"` LogFile string `toml:"log-file" json:"log-file"` Security security.Config `toml:"security" json:"security"` @@ -76,7 +76,7 @@ type Config struct { MetricsInterval int configFile string printVersion bool - GCDuration time.Duration `tome:"-" json:"-"` + GCStr string tls *tls.Config Storage storage.Config `toml:"storage" json:"storage"` } @@ -101,7 +101,7 @@ func NewConfig() *Config { fs.StringVar(&cfg.EtcdURLs, "pd-urls", defaultEtcdURLs, "a comma separated list of the PD endpoints") fs.StringVar(&cfg.DataDir, "data-dir", "", "the path to store binlog data") fs.IntVar(&cfg.HeartbeatInterval, "heartbeat-interval", defaultHeartbeatInterval, "number of seconds between heartbeat ticks") - fs.StringVar(&cfg.GC, "gc", defaultGC, "recycle binlog files older than gc time. default unit is day. also accept 8h format time(max unit is hour)") + fs.StringVar(&cfg.GCStr, "gc", "", "recycle binlog files older than gc time. default unit is day. also accept 8h format time(max unit is hour)") fs.StringVar(&cfg.LogLevel, "L", "info", "log level: debug, info, warn, error, fatal") fs.StringVar(&cfg.MetricsAddr, "metrics-addr", "", "prometheus pushgateway address, leaves it empty will disable prometheus push") fs.IntVar(&cfg.MetricsInterval, "metrics-interval", 15, "prometheus client push interval in second, set \"0\" to disable prometheus push") @@ -177,9 +177,14 @@ func (cfg *Config) Parse(arguments []string) error { util.AdjustString(&cfg.DataDir, defaultDataDir) util.AdjustInt(&cfg.HeartbeatInterval, defaultHeartbeatInterval) - cfg.GCDuration, err = util.ParseGCDuration(cfg.GC) - if err != nil { - return err + if cfg.GC.Duration == 0 && cfg.GCStr == "" { + cfg.GCStr = defaultGC + } + if cfg.GCStr != "" { + cfg.GC, err = util.ParseDuration(cfg.GCStr) + if err != nil { + return err + } } return cfg.validate() } @@ -191,7 +196,7 @@ func (cfg *Config) configFromFile(path string) error { // validate checks whether the configuration is valid func (cfg *Config) validate() error { // check GC - if cfg.GCDuration <= 0 { + if cfg.GC.Duration <= 0 { return errors.Errorf("GC is %s, must bigger than 0", cfg.GC) } diff --git a/pump/config_test.go b/pump/config_test.go index abc538d67..6df6dec24 100644 --- a/pump/config_test.go +++ b/pump/config_test.go @@ -22,6 +22,7 @@ import ( "github.com/BurntSushi/toml" . "github.com/pingcap/check" + "github.com/pingcap/tidb-binlog/pkg/util" ) var _ = Suite(&testConfigSuite{}) @@ -30,7 +31,7 @@ type testConfigSuite struct{} func (s *testConfigSuite) TestValidate(c *C) { cfg := Config{} - cfg.GCDuration = 24 * time.Hour + cfg.GC = util.NewDuration(24 * time.Hour) cfg.ListenAddr = "http://:8250" cfg.EtcdURLs = "http://192.168.10.23:7777" @@ -156,6 +157,80 @@ func (s *testConfigSuite) TestConfigParsingFileWithInvalidArgs(c *C) { c.Assert(err, ErrorMatches, ".*contained unknown configuration options: unrecognized-option-test.*") } +func (s *testConfigSuite) TestConfigParsingDuration(c *C) { + yc := struct { + ListenAddr string `toml:"addr" json:"addr"` + AdvertiseAddr string `toml:"advertise-addr" json:"advertise-addr"` + EtcdURLs string `toml:"pd-urls" json:"pd-urls"` + BinlogDir string `toml:"data-dir" json:"data-dir"` + GC int `toml:"gc" json:"gc"` + HeartbeatInterval uint `toml:"heartbeat-interval" json:"heartbeat-interval"` + }{ + "192.168.199.100:8260", + "192.168.199.100:8260", + "http://192.168.199.110:2379,http://hostname:2379", + "/tmp/pump", + 5, + 1500, + } + + var buf bytes.Buffer + e := toml.NewEncoder(&buf) + err := e.Encode(yc) + c.Assert(err, IsNil) + + configFilename := path.Join(c.MkDir(), "pump_config_gc_int.toml") + err = ioutil.WriteFile(configFilename, buf.Bytes(), 0644) + c.Assert(err, IsNil) + + args := []string{ + "--config", + configFilename, + "-L", "debug", + } + + cfg := NewConfig() + err = cfg.Parse(args) + c.Assert(cfg.GC, Equals, util.NewDuration(5 * 24 * time.Hour)) +} + +func (s *testConfigSuite) TestConfigParsingDurationStr(c *C) { + yc := struct { + ListenAddr string `toml:"addr" json:"addr"` + AdvertiseAddr string `toml:"advertise-addr" json:"advertise-addr"` + EtcdURLs string `toml:"pd-urls" json:"pd-urls"` + BinlogDir string `toml:"data-dir" json:"data-dir"` + GC string `toml:"gc" json:"gc"` + HeartbeatInterval uint `toml:"heartbeat-interval" json:"heartbeat-interval"` + }{ + "192.168.199.100:8260", + "192.168.199.100:8260", + "http://192.168.199.110:2379,http://hostname:2379", + "/tmp/pump", + "30m", + 1500, + } + + var buf bytes.Buffer + e := toml.NewEncoder(&buf) + err := e.Encode(yc) + c.Assert(err, IsNil) + + configFilename := path.Join(c.MkDir(), "pump_config_gc_str.toml") + err = ioutil.WriteFile(configFilename, buf.Bytes(), 0644) + c.Assert(err, IsNil) + + args := []string{ + "--config", + configFilename, + "-L", "debug", + } + + cfg := NewConfig() + err = cfg.Parse(args) + c.Assert(cfg.GC, Equals, util.NewDuration(30 * time.Minute)) +} + func mustSuccess(c *C, err error) { c.Assert(err, IsNil) } diff --git a/pump/server.go b/pump/server.go index 1de1be331..380881986 100644 --- a/pump/server.go +++ b/pump/server.go @@ -178,7 +178,7 @@ func NewServer(cfg *Config) (*Server, error) { cancel: cancel, metrics: metrics, tiStore: tiStore, - gcDuration: cfg.GCDuration, + gcDuration: cfg.GC.Duration, pdCli: pdCli, cfg: cfg, triggerGC: make(chan time.Time), From 6dbbcd47c5f2ae1e33cb574c7171a8c529a99755 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Fri, 7 Aug 2020 23:35:34 +0800 Subject: [PATCH 06/11] pass make check --- pkg/util/duration.go | 2 +- pump/config_test.go | 28 ++++++++++++++-------------- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/pkg/util/duration.go b/pkg/util/duration.go index 80615b60a..92836385f 100644 --- a/pkg/util/duration.go +++ b/pkg/util/duration.go @@ -77,4 +77,4 @@ func ParseDuration(gc string) (Duration, error) { return Duration{0}, errors.Annotatef(err, "unsupported gc time %s, etc: use 7 for 7 day, 7h for 7 hour", gc) } return Duration{gcDuration}, nil -} \ No newline at end of file +} diff --git a/pump/config_test.go b/pump/config_test.go index 6df6dec24..24e75b6b0 100644 --- a/pump/config_test.go +++ b/pump/config_test.go @@ -159,12 +159,12 @@ func (s *testConfigSuite) TestConfigParsingFileWithInvalidArgs(c *C) { func (s *testConfigSuite) TestConfigParsingDuration(c *C) { yc := struct { - ListenAddr string `toml:"addr" json:"addr"` - AdvertiseAddr string `toml:"advertise-addr" json:"advertise-addr"` - EtcdURLs string `toml:"pd-urls" json:"pd-urls"` - BinlogDir string `toml:"data-dir" json:"data-dir"` - GC int `toml:"gc" json:"gc"` - HeartbeatInterval uint `toml:"heartbeat-interval" json:"heartbeat-interval"` + ListenAddr string `toml:"addr" json:"addr"` + AdvertiseAddr string `toml:"advertise-addr" json:"advertise-addr"` + EtcdURLs string `toml:"pd-urls" json:"pd-urls"` + BinlogDir string `toml:"data-dir" json:"data-dir"` + GC int `toml:"gc" json:"gc"` + HeartbeatInterval uint `toml:"heartbeat-interval" json:"heartbeat-interval"` }{ "192.168.199.100:8260", "192.168.199.100:8260", @@ -191,17 +191,17 @@ func (s *testConfigSuite) TestConfigParsingDuration(c *C) { cfg := NewConfig() err = cfg.Parse(args) - c.Assert(cfg.GC, Equals, util.NewDuration(5 * 24 * time.Hour)) + c.Assert(cfg.GC, Equals, util.NewDuration(5*24*time.Hour)) } func (s *testConfigSuite) TestConfigParsingDurationStr(c *C) { yc := struct { - ListenAddr string `toml:"addr" json:"addr"` - AdvertiseAddr string `toml:"advertise-addr" json:"advertise-addr"` - EtcdURLs string `toml:"pd-urls" json:"pd-urls"` - BinlogDir string `toml:"data-dir" json:"data-dir"` - GC string `toml:"gc" json:"gc"` - HeartbeatInterval uint `toml:"heartbeat-interval" json:"heartbeat-interval"` + ListenAddr string `toml:"addr" json:"addr"` + AdvertiseAddr string `toml:"advertise-addr" json:"advertise-addr"` + EtcdURLs string `toml:"pd-urls" json:"pd-urls"` + BinlogDir string `toml:"data-dir" json:"data-dir"` + GC string `toml:"gc" json:"gc"` + HeartbeatInterval uint `toml:"heartbeat-interval" json:"heartbeat-interval"` }{ "192.168.199.100:8260", "192.168.199.100:8260", @@ -228,7 +228,7 @@ func (s *testConfigSuite) TestConfigParsingDurationStr(c *C) { cfg := NewConfig() err = cfg.Parse(args) - c.Assert(cfg.GC, Equals, util.NewDuration(30 * time.Minute)) + c.Assert(cfg.GC, Equals, util.NewDuration(30*time.Minute)) } func mustSuccess(c *C, err error) { From ec05017e142f98d1238aebe18d21969dfb1346a9 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Fri, 7 Aug 2020 23:37:20 +0800 Subject: [PATCH 07/11] pass check --- pump/config_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pump/config_test.go b/pump/config_test.go index 24e75b6b0..03d0265ed 100644 --- a/pump/config_test.go +++ b/pump/config_test.go @@ -191,6 +191,7 @@ func (s *testConfigSuite) TestConfigParsingDuration(c *C) { cfg := NewConfig() err = cfg.Parse(args) + c.Assert(err, IsNil) c.Assert(cfg.GC, Equals, util.NewDuration(5*24*time.Hour)) } @@ -228,6 +229,7 @@ func (s *testConfigSuite) TestConfigParsingDurationStr(c *C) { cfg := NewConfig() err = cfg.Parse(args) + c.Assert(err, IsNil) c.Assert(cfg.GC, Equals, util.NewDuration(30*time.Minute)) } From 12176d6aef50ac3dc317aa20afcaa89fa5a48ecf Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Mon, 10 Aug 2020 11:19:02 +0800 Subject: [PATCH 08/11] address comment --- cmd/pump/pump.toml | 2 +- pkg/util/duration.go | 32 ++++++++++++++------------------ pkg/util/duration_test.go | 12 ++++++------ pump/config.go | 20 +++++++------------- pump/config_test.go | 26 ++++++++++++++++++++++---- pump/server.go | 6 +++++- pump/server_test.go | 1 + 7 files changed, 56 insertions(+), 43 deletions(-) diff --git a/cmd/pump/pump.toml b/cmd/pump/pump.toml index def9ffbe9..5238aa396 100644 --- a/cmd/pump/pump.toml +++ b/cmd/pump/pump.toml @@ -8,7 +8,7 @@ advertise-addr = "" # a integer value to control expiry date of the binlog data, indicates for how long (in days) the binlog data would be stored. # must bigger than 0 -gc = 7 +gc = 3 # path to the data directory of pump's data data-dir = "data.pump" diff --git a/pkg/util/duration.go b/pkg/util/duration.go index 92836385f..9b1230b87 100644 --- a/pkg/util/duration.go +++ b/pkg/util/duration.go @@ -22,18 +22,16 @@ import ( ) // Duration is a wrapper of time.Duration for TOML and JSON. -type Duration struct { - time.Duration -} +type Duration string // NewDuration creates a Duration from time.Duration. func NewDuration(duration time.Duration) Duration { - return Duration{Duration: duration} + return Duration(duration.String()) } // MarshalJSON returns the duration as a JSON string. func (d *Duration) MarshalJSON() ([]byte, error) { - return []byte(fmt.Sprintf(`"%s"`, d.String())), nil + return []byte(fmt.Sprintf(`"%s"`, *d)), nil } // UnmarshalJSON parses a JSON string into the duration. @@ -42,39 +40,37 @@ func (d *Duration) UnmarshalJSON(text []byte) error { if err != nil { return errors.WithStack(err) } - duration, err := time.ParseDuration(s) - if err != nil { - return errors.WithStack(err) - } - d.Duration = duration + *d = Duration(s) return nil } // UnmarshalText parses a TOML string into the duration. func (d *Duration) UnmarshalText(text []byte) error { var err error - duration, err := ParseDuration(string(text)) + td := Duration(text) + _, err = td.ParseDuration() if err != nil { return errors.WithStack(err) } - d.Duration = duration.Duration + *d = Duration(text) return nil } // MarshalText returns the duration as a JSON string. func (d Duration) MarshalText() ([]byte, error) { - return []byte(d.String()), nil + return []byte(d), nil } // ParseDuration parses gc durations. The default unit is day. -func ParseDuration(gc string) (Duration, error) { - d, err := strconv.ParseUint(gc, 10, 64) +func (d Duration) ParseDuration() (time.Duration, error) { + gc := string(d) + t, err := strconv.ParseUint(gc, 10, 64) if err == nil { - return Duration{time.Duration(d) * 24 * time.Hour}, nil + return time.Duration(t) * 24 * time.Hour, nil } gcDuration, err := time.ParseDuration(gc) if err != nil { - return Duration{0}, errors.Annotatef(err, "unsupported gc time %s, etc: use 7 for 7 day, 7h for 7 hour", gc) + return 0, errors.Annotatef(err, "unsupported gc time %s, etc: use 7 for 7 day, 7h for 7 hour", gc) } - return Duration{gcDuration}, nil + return gcDuration, nil } diff --git a/pkg/util/duration_test.go b/pkg/util/duration_test.go index 390bde499..66728b832 100644 --- a/pkg/util/duration_test.go +++ b/pkg/util/duration_test.go @@ -24,19 +24,19 @@ type durationSuite struct{} var _ = Suite(&durationSuite{}) func (s *durationSuite) TestParseDuration(c *C) { - gc := "7" - expectDuration := NewDuration(7 * 24 * time.Hour) - duration, err := ParseDuration(gc) + gc := Duration("7") + expectDuration := 7 * 24 * time.Hour + duration, err := gc.ParseDuration() c.Assert(err, IsNil) c.Assert(duration, Equals, expectDuration) gc = "30m" - expectDuration = NewDuration(30 * time.Minute) - duration, err = ParseDuration(gc) + expectDuration = 30 * time.Minute + duration, err = gc.ParseDuration() c.Assert(err, IsNil) c.Assert(duration, Equals, expectDuration) gc = "7d" - _, err = ParseDuration(gc) + _, err = gc.ParseDuration() c.Assert(err, NotNil) } diff --git a/pump/config.go b/pump/config.go index d0ece3c61..978a62c37 100644 --- a/pump/config.go +++ b/pump/config.go @@ -76,7 +76,6 @@ type Config struct { MetricsInterval int configFile string printVersion bool - GCStr string tls *tls.Config Storage storage.Config `toml:"storage" json:"storage"` } @@ -101,7 +100,7 @@ func NewConfig() *Config { fs.StringVar(&cfg.EtcdURLs, "pd-urls", defaultEtcdURLs, "a comma separated list of the PD endpoints") fs.StringVar(&cfg.DataDir, "data-dir", "", "the path to store binlog data") fs.IntVar(&cfg.HeartbeatInterval, "heartbeat-interval", defaultHeartbeatInterval, "number of seconds between heartbeat ticks") - fs.StringVar(&cfg.GCStr, "gc", "", "recycle binlog files older than gc time. default unit is day. also accept 8h format time(max unit is hour)") + fs.StringVar((*string)(&cfg.GC), "gc", defaultGC, "recycle binlog files older than gc time. default unit is day. also accept 8h format time(max unit is hour)") fs.StringVar(&cfg.LogLevel, "L", "info", "log level: debug, info, warn, error, fatal") fs.StringVar(&cfg.MetricsAddr, "metrics-addr", "", "prometheus pushgateway address, leaves it empty will disable prometheus push") fs.IntVar(&cfg.MetricsInterval, "metrics-interval", 15, "prometheus client push interval in second, set \"0\" to disable prometheus push") @@ -177,15 +176,6 @@ func (cfg *Config) Parse(arguments []string) error { util.AdjustString(&cfg.DataDir, defaultDataDir) util.AdjustInt(&cfg.HeartbeatInterval, defaultHeartbeatInterval) - if cfg.GC.Duration == 0 && cfg.GCStr == "" { - cfg.GCStr = defaultGC - } - if cfg.GCStr != "" { - cfg.GC, err = util.ParseDuration(cfg.GCStr) - if err != nil { - return err - } - } return cfg.validate() } @@ -196,8 +186,12 @@ func (cfg *Config) configFromFile(path string) error { // validate checks whether the configuration is valid func (cfg *Config) validate() error { // check GC - if cfg.GC.Duration <= 0 { - return errors.Errorf("GC is %s, must bigger than 0", cfg.GC) + if duration, err := cfg.GC.ParseDuration(); err == nil { + if duration <= 0 { + return errors.Errorf("GC is %s, must bigger than 0", cfg.GC) + } + } else { + return errors.Errorf("parse GC time failed, err: %s", err) } // check ListenAddr diff --git a/pump/config_test.go b/pump/config_test.go index 03d0265ed..287ad6466 100644 --- a/pump/config_test.go +++ b/pump/config_test.go @@ -157,7 +157,7 @@ func (s *testConfigSuite) TestConfigParsingFileWithInvalidArgs(c *C) { c.Assert(err, ErrorMatches, ".*contained unknown configuration options: unrecognized-option-test.*") } -func (s *testConfigSuite) TestConfigParsingDuration(c *C) { +func (s *testConfigSuite) TestConfigParsingIntegerDuration(c *C) { yc := struct { ListenAddr string `toml:"addr" json:"addr"` AdvertiseAddr string `toml:"advertise-addr" json:"advertise-addr"` @@ -192,10 +192,26 @@ func (s *testConfigSuite) TestConfigParsingDuration(c *C) { cfg := NewConfig() err = cfg.Parse(args) c.Assert(err, IsNil) - c.Assert(cfg.GC, Equals, util.NewDuration(5*24*time.Hour)) + duration, err := cfg.GC.ParseDuration() + c.Assert(err, IsNil) + c.Assert(duration, Equals, 5*24*time.Hour) + + // test whether gc config can be covered by command lines + args = []string{ + "--config", + configFilename, + "-L", "debug", + "--gc", "3", + } + cfg = NewConfig() + err = cfg.Parse(args) + c.Assert(err, IsNil) + duration, err = cfg.GC.ParseDuration() + c.Assert(err, IsNil) + c.Assert(duration, Equals, 3*24*time.Hour) } -func (s *testConfigSuite) TestConfigParsingDurationStr(c *C) { +func (s *testConfigSuite) TestConfigParsingStringDuration(c *C) { yc := struct { ListenAddr string `toml:"addr" json:"addr"` AdvertiseAddr string `toml:"advertise-addr" json:"advertise-addr"` @@ -230,7 +246,9 @@ func (s *testConfigSuite) TestConfigParsingDurationStr(c *C) { cfg := NewConfig() err = cfg.Parse(args) c.Assert(err, IsNil) - c.Assert(cfg.GC, Equals, util.NewDuration(30*time.Minute)) + duration, err := cfg.GC.ParseDuration() + c.Assert(err, IsNil) + c.Assert(duration, Equals, 30*time.Minute) } func mustSuccess(c *C, err error) { diff --git a/pump/server.go b/pump/server.go index 380881986..482f418be 100644 --- a/pump/server.go +++ b/pump/server.go @@ -108,6 +108,10 @@ func init() { // NewServer returns a instance of pump server func NewServer(cfg *Config) (*Server, error) { + gcDuration, err := cfg.GC.ParseDuration() + if err != nil { + return nil, errors.Trace(err) + } var metrics *util.MetricClient if cfg.MetricsAddr != "" && cfg.MetricsInterval != 0 { metrics = util.NewMetricClient( @@ -178,7 +182,7 @@ func NewServer(cfg *Config) (*Server, error) { cancel: cancel, metrics: metrics, tiStore: tiStore, - gcDuration: cfg.GC.Duration, + gcDuration: gcDuration, pdCli: pdCli, cfg: cfg, triggerGC: make(chan time.Time), diff --git a/pump/server_test.go b/pump/server_test.go index 9defbfb62..a89ea6019 100644 --- a/pump/server_test.go +++ b/pump/server_test.go @@ -585,6 +585,7 @@ func (s *newServerSuite) SetUpTest(c *C) { LogLevel: "debug", MetricsAddr: "192.168.199.100:5000", MetricsInterval: 15, + GC: "7", Security: security.Config{ SSLCA: "/path/to/ca.pem", SSLCert: "/path/to/drainer.pem", From 14b0adfcb38cbf35aa017eede182e0981b9552a3 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Mon, 10 Aug 2020 11:25:06 +0800 Subject: [PATCH 09/11] fix check --- pump/server_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pump/server_test.go b/pump/server_test.go index a89ea6019..a22a0f13c 100644 --- a/pump/server_test.go +++ b/pump/server_test.go @@ -585,7 +585,7 @@ func (s *newServerSuite) SetUpTest(c *C) { LogLevel: "debug", MetricsAddr: "192.168.199.100:5000", MetricsInterval: 15, - GC: "7", + GC: "7", Security: security.Config{ SSLCA: "/path/to/ca.pem", SSLCert: "/path/to/drainer.pem", From 7fa52450bb04994100cf49be6fcc43812244cdad Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Mon, 10 Aug 2020 11:26:45 +0800 Subject: [PATCH 10/11] revert toml --- cmd/pump/pump.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/pump/pump.toml b/cmd/pump/pump.toml index 5238aa396..def9ffbe9 100644 --- a/cmd/pump/pump.toml +++ b/cmd/pump/pump.toml @@ -8,7 +8,7 @@ advertise-addr = "" # a integer value to control expiry date of the binlog data, indicates for how long (in days) the binlog data would be stored. # must bigger than 0 -gc = 3 +gc = 7 # path to the data directory of pump's data data-dir = "data.pump" From 7c92fa1ec98665d82ed33493d521685791f767c8 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Mon, 10 Aug 2020 13:54:40 +0800 Subject: [PATCH 11/11] address comment --- pkg/util/duration.go | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/pkg/util/duration.go b/pkg/util/duration.go index 9b1230b87..20141c1c4 100644 --- a/pkg/util/duration.go +++ b/pkg/util/duration.go @@ -14,13 +14,21 @@ package util import ( + "encoding/json" "fmt" "strconv" "time" + "github.com/BurntSushi/toml" "github.com/pingcap/errors" ) +var empty = "" +var _ toml.TextMarshaler = Duration(empty) +var _ toml.TextUnmarshaler = (*Duration)(&empty) +var _ json.Marshaler = Duration(empty) +var _ json.Unmarshaler = (*Duration)(&empty) + // Duration is a wrapper of time.Duration for TOML and JSON. type Duration string @@ -30,8 +38,8 @@ func NewDuration(duration time.Duration) Duration { } // MarshalJSON returns the duration as a JSON string. -func (d *Duration) MarshalJSON() ([]byte, error) { - return []byte(fmt.Sprintf(`"%s"`, *d)), nil +func (d Duration) MarshalJSON() ([]byte, error) { + return []byte(fmt.Sprintf(`"%s"`, d)), nil } // UnmarshalJSON parses a JSON string into the duration. @@ -40,6 +48,11 @@ func (d *Duration) UnmarshalJSON(text []byte) error { if err != nil { return errors.WithStack(err) } + td := Duration(s) + _, err = td.ParseDuration() + if err != nil { + return errors.WithStack(err) + } *d = Duration(s) return nil }