Skip to content

Commit

Permalink
support go time package format gc time (#996) (#997)
Browse files Browse the repository at this point in the history
* support go time package format gc time
  • Loading branch information
ti-srebot authored Aug 10, 2020
1 parent 465037f commit 647b500
Show file tree
Hide file tree
Showing 6 changed files with 245 additions and 9 deletions.
89 changes: 89 additions & 0 deletions pkg/util/duration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package util

import (
"encoding/json"
"fmt"
"strconv"
"time"

"github.com/BurntSushi/toml"
"github.com/pingcap/errors"
)

var empty = ""
var _ toml.TextMarshaler = Duration(empty)
var _ toml.TextUnmarshaler = (*Duration)(&empty)
var _ json.Marshaler = Duration(empty)
var _ json.Unmarshaler = (*Duration)(&empty)

// Duration is a wrapper of time.Duration for TOML and JSON.
type Duration string

// NewDuration creates a Duration from time.Duration.
func NewDuration(duration time.Duration) Duration {
return Duration(duration.String())
}

// MarshalJSON returns the duration as a JSON string.
func (d Duration) MarshalJSON() ([]byte, error) {
return []byte(fmt.Sprintf(`"%s"`, d)), nil
}

// UnmarshalJSON parses a JSON string into the duration.
func (d *Duration) UnmarshalJSON(text []byte) error {
s, err := strconv.Unquote(string(text))
if err != nil {
return errors.WithStack(err)
}
td := Duration(s)
_, err = td.ParseDuration()
if err != nil {
return errors.WithStack(err)
}
*d = Duration(s)
return nil
}

// UnmarshalText parses a TOML string into the duration.
func (d *Duration) UnmarshalText(text []byte) error {
var err error
td := Duration(text)
_, err = td.ParseDuration()
if err != nil {
return errors.WithStack(err)
}
*d = Duration(text)
return nil
}

// MarshalText returns the duration as a JSON string.
func (d Duration) MarshalText() ([]byte, error) {
return []byte(d), nil
}

// ParseDuration parses gc durations. The default unit is day.
func (d Duration) ParseDuration() (time.Duration, error) {
gc := string(d)
t, err := strconv.ParseUint(gc, 10, 64)
if err == nil {
return time.Duration(t) * 24 * time.Hour, nil
}
gcDuration, err := time.ParseDuration(gc)
if err != nil {
return 0, errors.Annotatef(err, "unsupported gc time %s, etc: use 7 for 7 day, 7h for 7 hour", gc)
}
return gcDuration, nil
}
42 changes: 42 additions & 0 deletions pkg/util/duration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package util

import (
"time"

. "github.com/pingcap/check"
)

type durationSuite struct{}

var _ = Suite(&durationSuite{})

func (s *durationSuite) TestParseDuration(c *C) {
gc := Duration("7")
expectDuration := 7 * 24 * time.Hour
duration, err := gc.ParseDuration()
c.Assert(err, IsNil)
c.Assert(duration, Equals, expectDuration)

gc = "30m"
expectDuration = 30 * time.Minute
duration, err = gc.ParseDuration()
c.Assert(err, IsNil)
c.Assert(duration, Equals, expectDuration)

gc = "7d"
_, err = gc.ParseDuration()
c.Assert(err, NotNil)
}
16 changes: 10 additions & 6 deletions pump/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const (
defaultListenAddr = "127.0.0.1:8250"
defautMaxKafkaSize = 1024 * 1024 * 1024
defaultHeartbeatInterval = 2
defaultGC = 7
defaultGC = "7"
defaultDataDir = "data.pump"

// default interval time to generate fake binlog, the unit is second
Expand All @@ -64,8 +64,8 @@ type Config struct {
EtcdDialTimeout time.Duration
DataDir string `toml:"data-dir" json:"data-dir"`
HeartbeatInterval int `toml:"heartbeat-interval" json:"heartbeat-interval"`
// pump only stores binlog events whose ts >= current time - GC(day)
GC int `toml:"gc" json:"gc"`
// pump only stores binlog events whose ts >= current time - GC Time. The default unit is day
GC util.Duration `toml:"gc" json:"gc"`
LogFile string `toml:"log-file" json:"log-file"`
Security security.Config `toml:"security" json:"security"`

Expand Down Expand Up @@ -99,7 +99,7 @@ func NewConfig() *Config {
fs.StringVar(&cfg.EtcdURLs, "pd-urls", defaultEtcdURLs, "a comma separated list of the PD endpoints")
fs.StringVar(&cfg.DataDir, "data-dir", "", "the path to store binlog data")
fs.IntVar(&cfg.HeartbeatInterval, "heartbeat-interval", defaultHeartbeatInterval, "number of seconds between heartbeat ticks")
fs.IntVar(&cfg.GC, "gc", defaultGC, "recycle binlog files older than gc days")
fs.StringVar((*string)(&cfg.GC), "gc", defaultGC, "recycle binlog files older than gc time. default unit is day. also accept 8h format time(max unit is hour)")
fs.StringVar(&cfg.LogLevel, "L", "info", "log level: debug, info, warn, error, fatal")
fs.StringVar(&cfg.MetricsAddr, "metrics-addr", "", "prometheus pushgateway address, leaves it empty will disable prometheus push")
fs.IntVar(&cfg.MetricsInterval, "metrics-interval", 15, "prometheus client push interval in second, set \"0\" to disable prometheus push")
Expand Down Expand Up @@ -185,8 +185,12 @@ func (cfg *Config) configFromFile(path string) error {
// validate checks whether the configuration is valid
func (cfg *Config) validate() error {
// check GC
if cfg.GC <= 0 {
return errors.Errorf("GC is %d, must bigger than 0", cfg.GC)
if duration, err := cfg.GC.ParseDuration(); err == nil {
if duration <= 0 {
return errors.Errorf("GC is %s, must bigger than 0", cfg.GC)
}
} else {
return errors.Errorf("parse GC time failed, err: %s", err)
}

// check ListenAddr
Expand Down
98 changes: 97 additions & 1 deletion pump/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ import (
"io/ioutil"
"os"
"path"
"time"

"github.com/BurntSushi/toml"
. "github.com/pingcap/check"
"github.com/pingcap/tidb-binlog/pkg/util"
)

var _ = Suite(&testConfigSuite{})
Expand All @@ -29,7 +31,7 @@ type testConfigSuite struct{}

func (s *testConfigSuite) TestValidate(c *C) {
cfg := Config{}
cfg.GC = 1
cfg.GC = util.NewDuration(24 * time.Hour)
cfg.ListenAddr = "http://:8250"
cfg.EtcdURLs = "http://192.168.10.23:7777"

Expand Down Expand Up @@ -155,6 +157,100 @@ func (s *testConfigSuite) TestConfigParsingFileWithInvalidArgs(c *C) {
c.Assert(err, ErrorMatches, ".*contained unknown configuration options: unrecognized-option-test.*")
}

func (s *testConfigSuite) TestConfigParsingIntegerDuration(c *C) {
yc := struct {
ListenAddr string `toml:"addr" json:"addr"`
AdvertiseAddr string `toml:"advertise-addr" json:"advertise-addr"`
EtcdURLs string `toml:"pd-urls" json:"pd-urls"`
BinlogDir string `toml:"data-dir" json:"data-dir"`
GC int `toml:"gc" json:"gc"`
HeartbeatInterval uint `toml:"heartbeat-interval" json:"heartbeat-interval"`
}{
"192.168.199.100:8260",
"192.168.199.100:8260",
"http://192.168.199.110:2379,http://hostname:2379",
"/tmp/pump",
5,
1500,
}

var buf bytes.Buffer
e := toml.NewEncoder(&buf)
err := e.Encode(yc)
c.Assert(err, IsNil)

configFilename := path.Join(c.MkDir(), "pump_config_gc_int.toml")
err = ioutil.WriteFile(configFilename, buf.Bytes(), 0644)
c.Assert(err, IsNil)

args := []string{
"--config",
configFilename,
"-L", "debug",
}

cfg := NewConfig()
err = cfg.Parse(args)
c.Assert(err, IsNil)
duration, err := cfg.GC.ParseDuration()
c.Assert(err, IsNil)
c.Assert(duration, Equals, 5*24*time.Hour)

// test whether gc config can be covered by command lines
args = []string{
"--config",
configFilename,
"-L", "debug",
"--gc", "3",
}
cfg = NewConfig()
err = cfg.Parse(args)
c.Assert(err, IsNil)
duration, err = cfg.GC.ParseDuration()
c.Assert(err, IsNil)
c.Assert(duration, Equals, 3*24*time.Hour)
}

func (s *testConfigSuite) TestConfigParsingStringDuration(c *C) {
yc := struct {
ListenAddr string `toml:"addr" json:"addr"`
AdvertiseAddr string `toml:"advertise-addr" json:"advertise-addr"`
EtcdURLs string `toml:"pd-urls" json:"pd-urls"`
BinlogDir string `toml:"data-dir" json:"data-dir"`
GC string `toml:"gc" json:"gc"`
HeartbeatInterval uint `toml:"heartbeat-interval" json:"heartbeat-interval"`
}{
"192.168.199.100:8260",
"192.168.199.100:8260",
"http://192.168.199.110:2379,http://hostname:2379",
"/tmp/pump",
"30m",
1500,
}

var buf bytes.Buffer
e := toml.NewEncoder(&buf)
err := e.Encode(yc)
c.Assert(err, IsNil)

configFilename := path.Join(c.MkDir(), "pump_config_gc_str.toml")
err = ioutil.WriteFile(configFilename, buf.Bytes(), 0644)
c.Assert(err, IsNil)

args := []string{
"--config",
configFilename,
"-L", "debug",
}

cfg := NewConfig()
err = cfg.Parse(args)
c.Assert(err, IsNil)
duration, err := cfg.GC.ParseDuration()
c.Assert(err, IsNil)
c.Assert(duration, Equals, 30*time.Minute)
}

func mustSuccess(c *C, err error) {
c.Assert(err, IsNil)
}
Expand Down
6 changes: 5 additions & 1 deletion pump/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ func init() {

// NewServer returns a instance of pump server
func NewServer(cfg *Config) (*Server, error) {
gcDuration, err := cfg.GC.ParseDuration()
if err != nil {
return nil, errors.Trace(err)
}
var metrics *util.MetricClient
if cfg.MetricsAddr != "" && cfg.MetricsInterval != 0 {
metrics = util.NewMetricClient(
Expand Down Expand Up @@ -175,7 +179,7 @@ func NewServer(cfg *Config) (*Server, error) {
cancel: cancel,
metrics: metrics,
tiStore: tiStore,
gcDuration: time.Duration(cfg.GC) * 24 * time.Hour,
gcDuration: gcDuration,
pdCli: pdCli,
cfg: cfg,
triggerGC: make(chan time.Time),
Expand Down
3 changes: 2 additions & 1 deletion pump/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,7 @@ func (s *newServerSuite) SetUpTest(c *C) {
LogLevel: "debug",
MetricsAddr: "192.168.199.100:5000",
MetricsInterval: 15,
GC: "7",
Security: security.Config{
SSLCA: "/path/to/ca.pem",
SSLCert: "/path/to/drainer.pem",
Expand Down Expand Up @@ -713,7 +714,7 @@ func (s *startServerSuite) TestStartPumpServer(c *C) {
ctx: ctx,
cancel: cancel,
tiStore: nil,
gcDuration: time.Duration(cfg.GC) * 24 * time.Hour,
gcDuration: 24 * time.Hour,
pdCli: nil,
cfg: cfg,
triggerGC: make(chan time.Time),
Expand Down

0 comments on commit 647b500

Please sign in to comment.