diff --git a/cmd/tiproxy/main.go b/cmd/tiproxy/main.go index d12ac0d0..1670a45d 100644 --- a/cmd/tiproxy/main.go +++ b/cmd/tiproxy/main.go @@ -19,7 +19,6 @@ import ( "fmt" "os" - "github.com/pingcap/TiProxy/lib/config" "github.com/pingcap/TiProxy/lib/util/cmd" "github.com/pingcap/TiProxy/lib/util/errors" "github.com/pingcap/TiProxy/pkg/sctx" @@ -38,44 +37,16 @@ func main() { Short: "start the proxy server", Version: fmt.Sprintf("%s, commit %s", Version, Commit), } + rootCmd.SetOutput(os.Stdout) + rootCmd.SetErr(os.Stderr) - configFile := rootCmd.PersistentFlags().String("config", "", "proxy config file path") - logEncoder := rootCmd.PersistentFlags().String("log_encoder", "tidb", "log in format of tidb, console, or json") - logLevel := rootCmd.PersistentFlags().String("log_level", "", "log level") - _ = rootCmd.PersistentFlags().String("cluster_name", "tiproxy", "default cluster name, used to generate node name and differential clusters in dns discovery") - _ = rootCmd.PersistentFlags().String("node_name", "", "by default, it is generate prefixed by cluster-name") - _ = rootCmd.PersistentFlags().String("pub_addr", "127.0.0.1", "IP or domain, will be used as the accessible addr for others") - _ = rootCmd.PersistentFlags().StringSlice("bootstrap_clusters", []string{}, "lists of other nodes in the cluster, e.g. 'n1=xxx,n2=xxx', where xx are IPs or domains") - _ = rootCmd.PersistentFlags().String("bootstrap_discovery_etcd", "", "etcd discovery service url") - _ = rootCmd.PersistentFlags().String("bootstrap_discovery_dns", "", "dns srv discovery") + sctx := &sctx.Context{} - rootCmd.RunE = func(cmd *cobra.Command, _ []string) error { - var proxyConfigData []byte - - if *configFile != "" { - var err error - proxyConfigData, err = os.ReadFile(*configFile) - if err != nil { - return err - } - } - - cfg, err := config.NewConfig(proxyConfigData) - if err != nil { - return err - } - - if *logEncoder != "" { - cfg.Log.Encoder = *logEncoder - } - if *logLevel != "" { - cfg.Log.Level = *logLevel - } - - sctx := &sctx.Context{ - Config: cfg, - } + rootCmd.PersistentFlags().StringVar(&sctx.ConfigFile, "config", "", "proxy config file path") + rootCmd.PersistentFlags().StringVar(&sctx.Overlay.Log.Encoder, "log_encoder", "", "log in format of tidb, console, or json") + rootCmd.PersistentFlags().StringVar(&sctx.Overlay.Log.Level, "log_level", "", "log level") + rootCmd.RunE = func(cmd *cobra.Command, _ []string) error { srv, err := server.NewServer(cmd.Context(), sctx) if err != nil { return errors.Wrapf(err, "fail to create server") diff --git a/go.mod b/go.mod index ef7fb731..f1fc9571 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,9 @@ module github.com/pingcap/TiProxy go 1.19 require ( + github.com/BurntSushi/toml v1.2.1 github.com/cenkalti/backoff/v4 v4.2.0 + github.com/fsnotify/fsnotify v1.6.0 github.com/gin-contrib/pprof v1.4.0 github.com/gin-contrib/zap v0.0.2 github.com/gin-gonic/gin v1.8.1 @@ -27,8 +29,7 @@ require ( ) require ( - cloud.google.com/go/compute/metadata v0.2.0 // indirect - github.com/BurntSushi/toml v1.2.1 // indirect + cloud.google.com/go/compute/metadata v0.2.3 // indirect github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 // indirect github.com/benbjohnson/clock v1.3.0 // indirect github.com/beorn7/perks v1.0.1 // indirect @@ -80,7 +81,7 @@ require ( github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef // indirect github.com/opentracing/basictracer-go v1.0.0 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect - github.com/pelletier/go-toml/v2 v2.0.2 // indirect + github.com/pelletier/go-toml/v2 v2.0.5 // indirect github.com/pingcap/check v0.0.0-20211026125417-57bd13f7b5f0 // indirect github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32 // indirect github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3 // indirect diff --git a/go.sum b/go.sum index d4b598b3..2c194393 100644 --- a/go.sum +++ b/go.sum @@ -20,8 +20,9 @@ cloud.google.com/go/bigquery v1.4.0/go.mod h1:S8dzgnTigyfTmLBfrtrhyYhwRxG72rYxvf cloud.google.com/go/bigquery v1.5.0/go.mod h1:snEHRnqQbz117VIFhE8bmtwIDY80NLUZUMb4Nv6dBIg= cloud.google.com/go/bigquery v1.7.0/go.mod h1://okPTzCYNXSlb24MZs83e2Do+h+VXtc4gLoIoXIAPc= cloud.google.com/go/bigquery v1.8.0/go.mod h1:J5hqkt3O0uAFnINi6JXValWIb1v0goeZM77hZzJN/fQ= -cloud.google.com/go/compute/metadata v0.2.0 h1:nBbNSZyDpkNlo3DepaaLKVuO7ClyifSAmNloSCZrHnQ= -cloud.google.com/go/compute/metadata v0.2.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k= +cloud.google.com/go/compute v1.14.0 h1:hfm2+FfxVmnRlh6LpB7cg1ZNU+5edAHmW679JePztk0= +cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY= +cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA= cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE= cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1b3c64qFpCk= cloud.google.com/go/firestore v1.1.0/go.mod h1:ulACoGHTpvq5r8rxGJ4ddJZBZqakUQqClKRT5SZwBmk= @@ -186,6 +187,8 @@ github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga github.com/flosch/pongo2 v0.0.0-20190707114632-bbf5a6c351f4/go.mod h1:T9YF2M40nIgbVgp3rreNmTged+9HrbNTIQf1PsaIiTA= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= +github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/gavv/httpexpect v2.0.0+incompatible/go.mod h1:x+9tiU1YnrOvnB725RkpoLv1M62hOWzwo5OXotisrKc= github.com/getsentry/raven-go v0.2.0/go.mod h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49PX4NzFV5kcQ= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= @@ -508,8 +511,8 @@ github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYr github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pelletier/go-toml/v2 v2.0.1/go.mod h1:r9LEWfGN8R5k0VXJ+0BkIe7MYkRdwZOjgMj2KwnJFUo= -github.com/pelletier/go-toml/v2 v2.0.2 h1:+jQXlF3scKIcSEKkdHzXhCTDLPFi5r1wnK6yPS+49Gw= -github.com/pelletier/go-toml/v2 v2.0.2/go.mod h1:MovirKjgVRESsAvNZlAjtFwV867yGuwRkXbG66OzopI= +github.com/pelletier/go-toml/v2 v2.0.5 h1:ipoSadvV8oGUjnUbMub59IDPPwfxF694nG/jwbMiyQg= +github.com/pelletier/go-toml/v2 v2.0.5/go.mod h1:OMHamSCAODeSsVrwwvcJOaoN0LIUIaFVNZzmWyNfXas= github.com/pingcap/badger v1.5.1-0.20221229114011-ddffaa0fff7a h1:QB16qn8wx5X4SRn3/5axrjPMNS3WRt87+5Bfrnmt6IA= github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8/go.mod h1:B1+S9LNcuMyLH/4HMTViQOJevkGiik3wW2AN9zb2fNQ= github.com/pingcap/check v0.0.0-20211026125417-57bd13f7b5f0 h1:HVl5539r48eA+uDuX/ziBmQCxzT1pGrzWbKuXT46Bq0= @@ -651,7 +654,6 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5 github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= @@ -717,7 +719,7 @@ go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU= go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4= go.etcd.io/etcd/api/v3 v3.5.6 h1:Cy2qx3npLcYqTKqGJzMypnMv2tiRyifZJ17BlWIWA7A= go.etcd.io/etcd/api/v3 v3.5.6/go.mod h1:KFtNaxGDw4Yx/BA4iPPwevUTAuqcsPxzyX8PHydchN8= -go.etcd.io/etcd/client/pkg/v3 v3.5.4/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3YSwc9/Ac1g= +go.etcd.io/etcd/client/pkg/v3 v3.5.5/go.mod h1:ggrwbk069qxpKPq8/FKkQ3Xq9y39kbFR4LnKszpRXeQ= go.etcd.io/etcd/client/pkg/v3 v3.5.6 h1:TXQWYceBKqLp4sa87rcPs11SXxUA/mHwH975v+BDvLU= go.etcd.io/etcd/client/pkg/v3 v3.5.6/go.mod h1:ggrwbk069qxpKPq8/FKkQ3Xq9y39kbFR4LnKszpRXeQ= go.etcd.io/etcd/client/v2 v2.305.6 h1:fIDR0p4KMjw01MJMfUIDWdQbjo06PD6CeYM5z4EHLi0= @@ -781,6 +783,7 @@ go.uber.org/multierr v1.4.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+ go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= +go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI= go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ= go.uber.org/ratelimit v0.2.0 h1:UQE2Bgi7p2B85uP5dC2bbRtig0C+OeNRnNEafLjsLPA= @@ -974,6 +977,7 @@ golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ= golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= diff --git a/lib/cli/config.go b/lib/cli/config.go index a766fead..17e2bec1 100644 --- a/lib/cli/config.go +++ b/lib/cli/config.go @@ -36,7 +36,7 @@ func GetConfigCmd(ctx *Context) *cobra.Command { setProxy := &cobra.Command{ Use: "set", } - input := setProxy.Flags().String("input", "", "specify the input json file for proxy config") + input := setProxy.Flags().String("input", "", "specify the input toml file for proxy config") setProxy.RunE = func(cmd *cobra.Command, args []string) error { b := cmd.InOrStdin() if *input != "" { diff --git a/lib/cli/main.go b/lib/cli/main.go index 59499043..c40e6183 100644 --- a/lib/cli/main.go +++ b/lib/cli/main.go @@ -18,6 +18,7 @@ import ( "crypto/tls" "net" "net/http" + "os" "time" "github.com/pingcap/TiProxy/lib/config" @@ -32,6 +33,8 @@ func GetRootCmd(tlsConfig *tls.Config) *cobra.Command { Short: "cli", SilenceUsage: true, } + rootCmd.SetOutput(os.Stdout) + rootCmd.SetErr(os.Stderr) ctx := &Context{} diff --git a/lib/config/proxy.go b/lib/config/proxy.go index e7c357bb..bbc6adc4 100644 --- a/lib/config/proxy.go +++ b/lib/config/proxy.go @@ -110,7 +110,7 @@ type Security struct { SQLTLS TLSConfig `yaml:"sql-tls,omitempty" toml:"sql-tls,omitempty" json:"sql-tls,omitempty"` } -func NewConfig(data []byte) (*Config, error) { +func NewConfig() *Config { var cfg Config cfg.Proxy.Addr = "0.0.0.0:6000" @@ -128,16 +128,7 @@ func NewConfig(data []byte) (*Config, error) { cfg.Advance.IgnoreWrongNamespace = true - if len(data) > 0 { - if err := toml.Unmarshal(data, &cfg); err != nil { - return nil, err - } - } - - if err := cfg.Check(); err != nil { - return nil, err - } - return &cfg, nil + return &cfg } func (cfg *Config) Clone() *Config { @@ -146,12 +137,13 @@ func (cfg *Config) Clone() *Config { } func (cfg *Config) Check() error { + if cfg.Workdir == "" { d, err := os.Getwd() if err != nil { return err } - cfg.Workdir = filepath.Clean(d) + cfg.Workdir = filepath.Clean(filepath.Join(d, "work")) } switch cfg.Proxy.ProxyProtocol { @@ -160,6 +152,7 @@ func (cfg *Config) Check() error { default: return errors.Wrapf(ErrUnsupportedProxyProtocolVersion, "%s", cfg.Proxy.ProxyProtocol) } + return nil } diff --git a/lib/config/proxy_test.go b/lib/config/proxy_test.go index 8425f819..283d1a41 100644 --- a/lib/config/proxy_test.go +++ b/lib/config/proxy_test.go @@ -19,6 +19,7 @@ import ( "path/filepath" "testing" + "github.com/BurntSushi/toml" "github.com/stretchr/testify/require" ) @@ -93,7 +94,8 @@ var testProxyConfig = Config{ func TestProxyConfig(t *testing.T) { data1, err := testProxyConfig.ToBytes() require.NoError(t, err) - cfg, err := NewConfig(data1) + var cfg Config + err = toml.Unmarshal(data1, &cfg) require.NoError(t, err) data2, err := cfg.ToBytes() require.NoError(t, err) @@ -113,7 +115,7 @@ func TestProxyCheck(t *testing.T) { post: func(t *testing.T, c *Config) { cwd, err := os.Getwd() require.NoError(t, err) - require.Equal(t, filepath.Clean(cwd), c.Workdir) + require.Equal(t, filepath.Clean(filepath.Join(cwd, "work")), c.Workdir) }, }, { diff --git a/lib/go.mod b/lib/go.mod index ab3f9e72..cc0c1f21 100644 --- a/lib/go.mod +++ b/lib/go.mod @@ -4,18 +4,18 @@ go 1.16 require ( github.com/spf13/cobra v1.5.0 - github.com/stretchr/testify v1.8.0 - go.etcd.io/etcd/client/pkg/v3 v3.5.4 + github.com/stretchr/testify v1.8.1 + go.etcd.io/etcd/client/pkg/v3 v3.5.5 go.uber.org/atomic v1.9.0 go.uber.org/zap v1.23.0 ) require ( github.com/BurntSushi/toml v0.3.1 - github.com/kr/text v0.2.0 // indirect + github.com/kr/pretty v0.3.0 // indirect github.com/pkg/errors v0.9.1 // indirect - go.uber.org/multierr v1.7.0 // indirect - golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 // indirect + go.uber.org/multierr v1.8.0 // indirect + golang.org/x/sys v0.0.0-20220908164124-27713097b956 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/natefinch/lumberjack.v2 v2.0.0 ) diff --git a/lib/go.sum b/lib/go.sum index bb87d1ca..c1c50c1f 100644 --- a/lib/go.sum +++ b/lib/go.sum @@ -12,8 +12,9 @@ github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5x github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= -github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -23,6 +24,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= +github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/spf13/cobra v1.5.0 h1:X+jTBEBqF0bHN+9cSMgmfuvv2VHJ9ezmFNf9Y/XstYU= github.com/spf13/cobra v1.5.0/go.mod h1:dWXEIy2H428czQCjInthrTRUg7yKbok+2Qi/yBIJoUM= @@ -30,22 +33,24 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= -go.etcd.io/etcd/client/pkg/v3 v3.5.4 h1:lrneYvz923dvC14R54XcA7FXoZ3mlGZAgmwhfm7HqOg= -go.etcd.io/etcd/client/pkg/v3 v3.5.4/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3YSwc9/Ac1g= +go.etcd.io/etcd/client/pkg/v3 v3.5.5 h1:9S0JUVvmrVl7wCF39iTQthdaaNIiAaQbmK75ogO6GU8= +go.etcd.io/etcd/client/pkg/v3 v3.5.5/go.mod h1:ggrwbk069qxpKPq8/FKkQ3Xq9y39kbFR4LnKszpRXeQ= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= -go.uber.org/multierr v1.7.0 h1:zaiO/rmgFjbmCXdSYJWQcdvOCsthmdaHfr3Gm2Kx4Ec= -go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= +go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8= +go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo= go.uber.org/zap v1.23.0 h1:OjGQ5KQDEUawVHxNwQgPpiypGHOxo2mNZsOqTak4fFY= go.uber.org/zap v1.23.0/go.mod h1:D+nX8jyLsMHMYrln8A0rJjFt/T/9/bGgIhAqxv5URuY= @@ -65,8 +70,8 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 h1:SrN+KX8Art/Sf4HNj6Zcz06G7VEz+7w9tdXTPOZ7+l4= -golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220908164124-27713097b956 h1:XeJjHH1KiLpKGb6lvMiksZ9l0fVUh+AmGcm0nOMEBOY= +golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= @@ -81,6 +86,7 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8= gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/pkg/manager/config/config.go b/pkg/manager/config/config.go index e3970df1..066e8caf 100644 --- a/pkg/manager/config/config.go +++ b/pkg/manager/config/config.go @@ -15,44 +15,85 @@ package config import ( - "context" - "encoding/json" - "path" + "os" + "time" + "github.com/BurntSushi/toml" + "github.com/fsnotify/fsnotify" "github.com/pingcap/TiProxy/lib/config" - "github.com/pingcap/TiProxy/lib/util/errors" "go.uber.org/zap" ) -func (e *ConfigManager) SetConfig(ctx context.Context, val *config.Config) error { - c, err := json.Marshal(val) +func (e *ConfigManager) reloadConfigFile(file string) error { + proxyConfigData, err := os.ReadFile(file) if err != nil { - return errors.WithStack(err) + return err } - return e.set(ctx, pathPrefixConfig, "all", c) + + return e.SetTOMLConfig(proxyConfigData) } -func (e *ConfigManager) GetConfig(ctx context.Context, val *config.Config) error { - c, err := e.get(ctx, pathPrefixConfig, "all") - if err != nil { +func (e *ConfigManager) handleFSEvent(ev fsnotify.Event, f string) { + switch { + case ev.Has(fsnotify.Create), ev.Has(fsnotify.Write), ev.Has(fsnotify.Remove), ev.Has(fsnotify.Rename): + if ev.Has(fsnotify.Remove) || ev.Has(fsnotify.Rename) { + // in case of remove/rename the file, files are not present at filesystem for a while + // it may be too fast to read the config file now, sleep for a while + time.Sleep(50 * time.Millisecond) + } + // try to reload it + e.logger.Info("config file reloaded", zap.Error(e.reloadConfigFile(f)), zap.Any("cfg", e.GetConfig())) + } +} + +// SetTOMLConfig will do partial config update. Usually, user will expect config changes +// only when they specified a config item. It is, however, impossible to tell a struct +// `c.max-conns == 0` means no user-input, or it specified `0`. +// So we always update the current config with a TOML string, which only overwrite fields +// that are specified by users. +func (e *ConfigManager) SetTOMLConfig(data []byte) error { + e.sts.Lock() + defer e.sts.Unlock() + + base := e.sts.current + if base == nil { + base = config.NewConfig() + } else { + base = base.Clone() + } + + if err := toml.Unmarshal(data, base); err != nil { + return err + } + + if err := toml.Unmarshal(e.overlay, base); err != nil { + return err + } + + if err := base.Check(); err != nil { return err } - return json.Unmarshal(c.Value, val) + e.sts.current = base + + for _, list := range e.sts.listeners { + list <- base.Clone() + } + + return nil } -func (e *ConfigManager) WatchConfig(ctx context.Context) <-chan *config.Config { +func (e *ConfigManager) GetConfig() *config.Config { + e.sts.Lock() + v := e.sts.current + e.sts.Unlock() + return v +} + +func (e *ConfigManager) WatchConfig() <-chan *config.Config { ch := make(chan *config.Config) - e.Watch(path.Join(pathPrefixConfig, "all"), func(_ *zap.Logger, k KVEvent) { - var c config.Config - if k.Type == KVEventDel { - ch <- &c - } else { - e := json.Unmarshal(k.Value, &c) - if e == nil { - ch <- &c - } - } - }) + e.sts.Lock() + e.sts.listeners = append(e.sts.listeners, ch) + e.sts.Unlock() return ch } diff --git a/pkg/manager/config/config_test.go b/pkg/manager/config/config_test.go new file mode 100644 index 00000000..421110ab --- /dev/null +++ b/pkg/manager/config/config_test.go @@ -0,0 +1,165 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "fmt" + "os" + "path/filepath" + "testing" + "time" + + "github.com/pingcap/TiProxy/lib/config" + "github.com/stretchr/testify/require" +) + +func TestConfigReload(t *testing.T) { + tmpdir := t.TempDir() + tmpcfg := filepath.Join(tmpdir, "cfg") + + f, err := os.Create(tmpcfg) + require.NoError(t, err) + require.NoError(t, f.Close()) + + cfgmgr1, _ := testConfigManager(t, tmpcfg) + + cfgmgr2, _ := testConfigManager(t, "") + + cases := []struct { + name string + precfg string + precheck func(*config.Config) bool + postcfg string + postcheck func(*config.Config) bool + }{ + { + name: "pd override", + precfg: `proxy.pd-addrs = "127.0.0.1:2379"`, + precheck: func(c *config.Config) bool { + return c.Proxy.PDAddrs == "127.0.0.1:2379" + }, + postcfg: `proxy.pd-addrs = ""`, + postcheck: func(c *config.Config) bool { + return c.Proxy.PDAddrs == "" + }, + }, + { + name: "proxy-protocol override", + precfg: `proxy.proxy-protocol = "v2"`, + precheck: func(c *config.Config) bool { + return c.Proxy.ProxyProtocol == "v2" + }, + postcfg: `proxy.proxy-protocol = ""`, + postcheck: func(c *config.Config) bool { + return c.Proxy.ProxyProtocol == "" + }, + }, + { + name: "logfile name override", + precfg: `log.log-file.filename = "gdfg"`, + precheck: func(c *config.Config) bool { + return c.Log.LogFile.Filename == "gdfg" + }, + postcfg: `log.log-file.filename = ""`, + postcheck: func(c *config.Config) bool { + return c.Log.LogFile.Filename == "" + }, + }, + { + name: "override empty fields", + precfg: `metrics.metrics-addr = ""`, + precheck: func(c *config.Config) bool { + return c.Metrics.MetricsAddr == "" + }, + postcfg: `metrics.metrics-addr = "gg"`, + postcheck: func(c *config.Config) bool { + return c.Metrics.MetricsAddr == "gg" + }, + }, + { + name: "override non-empty fields", + precfg: `metrics.metrics-addr = "ee"`, + precheck: func(c *config.Config) bool { + return c.Metrics.MetricsAddr == "ee" + }, + postcfg: `metrics.metrics-addr = "gg"`, + postcheck: func(c *config.Config) bool { + return c.Metrics.MetricsAddr == "gg" + }, + }, + { + name: "non empty fields should not be override by empty fields", + precfg: `proxy.addr = "gg"`, + precheck: func(c *config.Config) bool { + return c.Proxy.Addr == "gg" + }, + postcfg: ``, + postcheck: func(c *config.Config) bool { + return c.Proxy.Addr == "gg" + }, + }, + } + + for i, tc := range cases { + msg := fmt.Sprintf("%s[%d]", tc.name, i) + + // normal path and HTTP API + require.NoError(t, cfgmgr2.SetTOMLConfig([]byte(tc.precfg)), msg) + if tc.precheck != nil { + require.True(t, tc.precheck(cfgmgr2.GetConfig()), msg) + } + require.NoError(t, cfgmgr2.SetTOMLConfig([]byte(tc.postcfg)), msg) + if tc.postcheck != nil { + require.True(t, tc.postcheck(cfgmgr2.GetConfig()), msg) + } + + // config file auto reload + require.NoError(t, cfgmgr1.SetTOMLConfig([]byte(tc.precfg)), msg) + if tc.precheck != nil { + require.True(t, tc.precheck(cfgmgr1.GetConfig()), msg) + } + + require.NoError(t, os.WriteFile(tmpcfg, []byte(tc.postcfg), 0644), msg) + if tc.postcheck != nil { + require.Eventually(t, func() bool { return tc.postcheck(cfgmgr1.GetConfig()) }, time.Second, 100*time.Millisecond, msg) + } + } +} + +func TestConfigRemove(t *testing.T) { + tmpdir := t.TempDir() + tmpcfg := filepath.Join(tmpdir, "cfg") + + f, err := os.Create(tmpcfg) + require.NoError(t, err) + require.NoError(t, f.Close()) + + cfgmgr, _ := testConfigManager(t, tmpcfg) + + // remove and recreate the file in a very short time + require.NoError(t, os.Remove(tmpcfg)) + require.NoError(t, os.WriteFile(tmpcfg, []byte(`proxy.addr = "gg"`), 0644)) + + // check that reload still works + require.Eventually(t, func() bool { return cfgmgr.GetConfig().Proxy.Addr == "gg" }, time.Second, 100*time.Millisecond) + + // remove again but with a long sleep + require.NoError(t, os.Remove(tmpcfg)) + time.Sleep(200 * time.Millisecond) + + // but eventually re-watched the file again + require.NoError(t, os.WriteFile(tmpcfg, []byte(`proxy.addr = "vv"`), 0644)) + require.Eventually(t, func() bool { return cfgmgr.GetConfig().Proxy.Addr == "vv" }, time.Second, 100*time.Millisecond) +} diff --git a/pkg/manager/config/manager.go b/pkg/manager/config/manager.go index 89fbf8fc..d11f7f35 100644 --- a/pkg/manager/config/manager.go +++ b/pkg/manager/config/manager.go @@ -16,14 +16,15 @@ package config import ( "context" - "path" - "strings" + "path/filepath" + "sync" + "time" + "github.com/fsnotify/fsnotify" "github.com/pingcap/TiProxy/lib/config" "github.com/pingcap/TiProxy/lib/util/errors" "github.com/pingcap/TiProxy/lib/util/waitgroup" "github.com/tidwall/btree" - clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" ) @@ -42,108 +43,99 @@ type KVValue struct { Value []byte } -type KVEventType byte - -const ( - KVEventPut KVEventType = iota - KVEventDel -) - -type KVEvent struct { - KVValue - Type KVEventType -} - -type kvListener struct { - key string - wfn func(*zap.Logger, KVEvent) -} - type ConfigManager struct { - wg waitgroup.WaitGroup - cancel context.CancelFunc - kv *btree.BTreeG[KVValue] - events chan KVEvent - listeners []kvListener - logger *zap.Logger + wg waitgroup.WaitGroup + cancel context.CancelFunc + logger *zap.Logger + + kv *btree.BTreeG[KVValue] + + wch *fsnotify.Watcher + overlay []byte + sts struct { + sync.Mutex + listeners []chan<- *config.Config + current *config.Config + } } func NewConfigManager() *ConfigManager { return &ConfigManager{} } -func (srv *ConfigManager) Init(ctx context.Context, cfg *config.Config, logger *zap.Logger) error { - srv.logger = logger - srv.events = make(chan KVEvent, 256) - srv.kv = btree.NewBTreeG(func(a, b KVValue) bool { +func (e *ConfigManager) Init(ctx context.Context, logger *zap.Logger, configFile string, overlay *config.Config) error { + var err error + var nctx context.Context + nctx, e.cancel = context.WithCancel(ctx) + + e.logger = logger + + // for namespace persistence + e.kv = btree.NewBTreeG(func(a, b KVValue) bool { return a.Key < b.Key }) - // init config for other components - if err := srv.SetConfig(ctx, cfg); err != nil { - return err + // for config watch + if overlay != nil { + e.overlay, err = overlay.ToBytes() + if err != nil { + return errors.WithStack(err) + } } - var nctx context.Context - nctx, srv.cancel = context.WithCancel(ctx) - srv.wg.Run(func() { - for { - select { - case <-nctx.Done(): - return - case ev := <-srv.events: - for _, lsn := range srv.listeners { - lsn.wfn(srv.logger, ev) - } - } + if configFile != "" { + e.wch, err = fsnotify.NewWatcher() + if err != nil { + return errors.WithStack(err) } - }) - return nil -} - -func (e *ConfigManager) Watch(key string, handler func(*zap.Logger, KVEvent)) { - e.listeners = append(e.listeners, kvListener{key, handler}) -} -func (e *ConfigManager) get(ctx context.Context, ns, key string) (KVValue, error) { - nkey := path.Clean(path.Join(ns, key)) - v, ok := e.kv.Get(KVValue{Key: nkey}) - if !ok { - return v, errors.WithStack(errors.Wrapf(ErrNoResults, "key=%s", nkey)) - } - return v, nil -} + // Watch the parent dir, because vim/k8s or other apps may not edit files in-place: + // e.g. k8s configmap is a symlink of a symlink to a file, which will only trigger + // a remove event for the file. + parentDir := filepath.Dir(configFile) -func (e *ConfigManager) list(ctx context.Context, ns string, ops ...clientv3.OpOption) ([]KVValue, error) { - k := path.Clean(ns) - var resp []KVValue - e.kv.Ascend(KVValue{Key: k}, func(item KVValue) bool { - if !strings.HasPrefix(item.Key, k) { - return false + if err := e.reloadConfigFile(configFile); err != nil { + return errors.WithStack(err) + } + if err := e.wch.Add(parentDir); err != nil { + return errors.WithStack(err) } - resp = append(resp, item) - return true - }) - return resp, nil -} - -func (e *ConfigManager) set(ctx context.Context, ns, key string, val []byte) error { - v := KVValue{Key: path.Clean(path.Join(ns, key)), Value: val} - _, _ = e.kv.Set(v) - e.events <- KVEvent{Type: KVEventPut, KVValue: v} - return nil -} -func (e *ConfigManager) del(ctx context.Context, ns, key string) error { - v, ok := e.kv.Delete(KVValue{Key: path.Clean(path.Join(ns, key))}) - if ok { - e.events <- KVEvent{Type: KVEventPut, KVValue: v} + e.wg.Run(func() { + // Some apps will trigger rename/remove events, which means they will re-create/rename + // the new file to the directory. Watch possibly stopped after rename/remove events. + // So, we use a tick to repeatedly add the parent dir to re-watch files. + ticker := time.NewTicker(200 * time.Millisecond) + for { + select { + case <-nctx.Done(): + return + case err := <-e.wch.Errors: + e.logger.Info("failed to watch config file", zap.Error(err)) + case ev := <-e.wch.Events: + e.handleFSEvent(ev, configFile) + case <-ticker.C: + if err := e.wch.Add(parentDir); err != nil { + e.logger.Info("failed to rewatch config file", zap.Error(err)) + } + } + } + }) + } else { + if err := e.SetTOMLConfig(nil); err != nil { + return errors.WithStack(err) + } } + return nil } func (e *ConfigManager) Close() error { + var wcherr error e.cancel() + if e.wch != nil { + wcherr = e.wch.Close() + } e.wg.Wait() - return nil + return wcherr } diff --git a/pkg/manager/config/manager_test.go b/pkg/manager/config/manager_test.go index 2f17e4b7..ac3c021e 100644 --- a/pkg/manager/config/manager_test.go +++ b/pkg/manager/config/manager_test.go @@ -16,173 +16,29 @@ package config import ( "context" - "fmt" - "net/url" - "path" - "path/filepath" "testing" - "time" "github.com/pingcap/TiProxy/lib/config" "github.com/pingcap/TiProxy/lib/util/logger" - "github.com/pingcap/TiProxy/lib/util/waitgroup" "github.com/stretchr/testify/require" - clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/server/v3/embed" - "go.uber.org/zap" ) -func testConfigManager(t *testing.T, cfg *config.Config) (*ConfigManager, context.Context) { - addr, err := url.Parse("http://127.0.0.1:0") - require.NoError(t, err) - - testDir := t.TempDir() - +func testConfigManager(t *testing.T, configFile string, overlays ...*config.Config) (*ConfigManager, context.Context) { logger := logger.CreateLoggerForTest(t) - etcd_cfg := embed.NewConfig() - etcd_cfg.LCUrls = []url.URL{*addr} - etcd_cfg.LPUrls = []url.URL{*addr} - etcd_cfg.Dir = filepath.Join(testDir, "etcd") - etcd_cfg.ZapLoggerBuilder = embed.NewZapLoggerBuilder(logger.Named("etcd")) - etcd, err := embed.StartEtcd(etcd_cfg) - require.NoError(t, err) - ctx, cancel := context.WithCancel(context.Background()) if ddl, ok := t.Deadline(); ok { ctx, cancel = context.WithDeadline(ctx, ddl) } cfgmgr := NewConfigManager() - require.NoError(t, cfgmgr.Init(ctx, cfg, logger)) + require.NoError(t, cfgmgr.Init(ctx, logger, configFile, nil)) t.Cleanup(func() { require.NoError(t, cfgmgr.Close()) - etcd.Close() }) t.Cleanup(cancel) return cfgmgr, ctx } - -func TestBase(t *testing.T) { - cfgmgr, ctx := testConfigManager(t, &config.Config{}) - - nsNum := 10 - valNum := 30 - getNs := func(i int) string { - return fmt.Sprintf("ns-%d", i) - } - getKey := func(i int) string { - return fmt.Sprintf("%02d", i) - } - - // test .set - for i := 0; i < nsNum; i++ { - ns := getNs(i) - for j := 0; j < valNum; j++ { - k := getKey(j) - require.NoError(t, cfgmgr.set(ctx, ns, k, []byte(k))) - } - } - - // test .get - for i := 0; i < nsNum; i++ { - ns := getNs(i) - for j := 0; j < valNum; j++ { - k := getKey(j) - v, err := cfgmgr.get(ctx, ns, k) - require.NoError(t, err) - require.Equal(t, string(v.Key), path.Join(ns, k)) - require.Equal(t, string(v.Value), k) - } - } - - // test .list - for i := 0; i < nsNum; i++ { - ns := getNs(i) - vals, err := cfgmgr.list(ctx, ns, clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend)) - require.NoError(t, err) - require.Len(t, vals, valNum) - for j := 0; j < valNum; j++ { - k := getKey(j) - require.Equal(t, string(vals[j].Value), k) - } - } - - // test .del - for i := 0; i < nsNum; i++ { - ns := getNs(i) - for j := 0; j < valNum; j++ { - k := getKey(j) - require.NoError(t, cfgmgr.set(ctx, ns, k, nil)) - - require.NoError(t, cfgmgr.del(ctx, ns, k)) - } - vals, err := cfgmgr.list(ctx, ns) - require.NoError(t, err) - require.Len(t, vals, 0) - } -} - -func TestBaseConcurrency(t *testing.T) { - cfgmgr, ctx := testConfigManager(t, &config.Config{}) - - var wg waitgroup.WaitGroup - batchNum := 16 - for i := 0; i < batchNum; i++ { - k := fmt.Sprint(i) - wg.Run(func() { - require.NoError(t, cfgmgr.set(ctx, k, "1", []byte("1"))) - }) - - wg.Run(func() { - err := cfgmgr.del(ctx, k, "1") - require.NoError(t, err) - }) - } - wg.Wait() - - for i := 0; i < batchNum; i++ { - k := fmt.Sprint(i) - - require.NoError(t, cfgmgr.set(ctx, k, "1", []byte("1"))) - } - - for i := 0; i < batchNum; i++ { - k := fmt.Sprint(i) - - wg.Run(func() { - require.NoError(t, cfgmgr.set(ctx, k, "1", []byte("1"))) - }) - - wg.Run(func() { - _, err := cfgmgr.get(ctx, k, "1") - require.NoError(t, err) - }) - } - wg.Wait() -} - -func TestBaseWatch(t *testing.T) { - cfgmgr, ctx := testConfigManager(t, &config.Config{}) - - ch := make(chan string, 1) - cfgmgr.Watch("test", func(_ *zap.Logger, e KVEvent) { - ch <- string(e.Value) - }) - - // set it - require.NoError(t, cfgmgr.set(ctx, "test", "t", []byte("1"))) - // now the only way to check watch is to wait - select { - case <-time.After(5 * time.Second): - t.Fatal("timeout waiting chan") - case tg := <-ch: - for len(ch) > 0 { - tg = <-ch - } - require.Equal(t, "1", tg) - } -} diff --git a/pkg/manager/config/namespace.go b/pkg/manager/config/namespace.go index 63c975a0..185a0fd1 100644 --- a/pkg/manager/config/namespace.go +++ b/pkg/manager/config/namespace.go @@ -17,11 +17,47 @@ package config import ( "context" "encoding/json" - "errors" + "path" + "strings" "github.com/pingcap/TiProxy/lib/config" + "github.com/pingcap/TiProxy/lib/util/errors" + clientv3 "go.etcd.io/etcd/client/v3" ) +func (e *ConfigManager) get(ctx context.Context, ns, key string) (KVValue, error) { + nkey := path.Clean(path.Join(ns, key)) + v, ok := e.kv.Get(KVValue{Key: nkey}) + if !ok { + return v, errors.WithStack(errors.Wrapf(ErrNoResults, "key=%s", nkey)) + } + return v, nil +} + +func (e *ConfigManager) list(ctx context.Context, ns string, ops ...clientv3.OpOption) ([]KVValue, error) { + k := path.Clean(ns) + var resp []KVValue + e.kv.Ascend(KVValue{Key: k}, func(item KVValue) bool { + if !strings.HasPrefix(item.Key, k) { + return false + } + resp = append(resp, item) + return true + }) + return resp, nil +} + +func (e *ConfigManager) set(ctx context.Context, ns, key string, val []byte) error { + v := KVValue{Key: path.Clean(path.Join(ns, key)), Value: val} + _, _ = e.kv.Set(v) + return nil +} + +func (e *ConfigManager) del(ctx context.Context, ns, key string) error { + _, _ = e.kv.Delete(KVValue{Key: path.Clean(path.Join(ns, key))}) + return nil +} + func (e *ConfigManager) GetNamespace(ctx context.Context, ns string) (*config.Namespace, error) { kv, err := e.get(ctx, pathPrefixNamespace, ns) if err != nil { diff --git a/pkg/manager/config/namespace_test.go b/pkg/manager/config/namespace_test.go new file mode 100644 index 00000000..8de70852 --- /dev/null +++ b/pkg/manager/config/namespace_test.go @@ -0,0 +1,124 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "fmt" + "path" + "testing" + + "github.com/pingcap/TiProxy/lib/util/waitgroup" + "github.com/stretchr/testify/require" + clientv3 "go.etcd.io/etcd/client/v3" +) + +func TestBase(t *testing.T) { + cfgmgr, ctx := testConfigManager(t, "") + + nsNum := 10 + valNum := 30 + getNs := func(i int) string { + return fmt.Sprintf("ns-%d", i) + } + getKey := func(i int) string { + return fmt.Sprintf("%02d", i) + } + + // test .set + for i := 0; i < nsNum; i++ { + ns := getNs(i) + for j := 0; j < valNum; j++ { + k := getKey(j) + require.NoError(t, cfgmgr.set(ctx, ns, k, []byte(k))) + } + } + + // test .get + for i := 0; i < nsNum; i++ { + ns := getNs(i) + for j := 0; j < valNum; j++ { + k := getKey(j) + v, err := cfgmgr.get(ctx, ns, k) + require.NoError(t, err) + require.Equal(t, string(v.Key), path.Join(ns, k)) + require.Equal(t, string(v.Value), k) + } + } + + // test .list + for i := 0; i < nsNum; i++ { + ns := getNs(i) + vals, err := cfgmgr.list(ctx, ns, clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend)) + require.NoError(t, err) + require.Len(t, vals, valNum) + for j := 0; j < valNum; j++ { + k := getKey(j) + require.Equal(t, string(vals[j].Value), k) + } + } + + // test .del + for i := 0; i < nsNum; i++ { + ns := getNs(i) + for j := 0; j < valNum; j++ { + k := getKey(j) + require.NoError(t, cfgmgr.set(ctx, ns, k, nil)) + + require.NoError(t, cfgmgr.del(ctx, ns, k)) + } + vals, err := cfgmgr.list(ctx, ns) + require.NoError(t, err) + require.Len(t, vals, 0) + } +} + +func TestBaseConcurrency(t *testing.T) { + cfgmgr, ctx := testConfigManager(t, "") + + var wg waitgroup.WaitGroup + batchNum := 16 + for i := 0; i < batchNum; i++ { + k := fmt.Sprint(i) + wg.Run(func() { + require.NoError(t, cfgmgr.set(ctx, k, "1", []byte("1"))) + }) + + wg.Run(func() { + err := cfgmgr.del(ctx, k, "1") + require.NoError(t, err) + }) + } + wg.Wait() + + for i := 0; i < batchNum; i++ { + k := fmt.Sprint(i) + + require.NoError(t, cfgmgr.set(ctx, k, "1", []byte("1"))) + } + + for i := 0; i < batchNum; i++ { + k := fmt.Sprint(i) + + wg.Run(func() { + require.NoError(t, cfgmgr.set(ctx, k, "1", []byte("1"))) + }) + + wg.Run(func() { + _, err := cfgmgr.get(ctx, k, "1") + require.NoError(t, err) + }) + } + wg.Wait() +} diff --git a/pkg/manager/logger/manager.go b/pkg/manager/logger/manager.go index b27e29df..54f1c3b8 100644 --- a/pkg/manager/logger/manager.go +++ b/pkg/manager/logger/manager.go @@ -17,7 +17,6 @@ package logger import ( "context" "encoding/json" - "fmt" "github.com/pingcap/TiProxy/lib/config" "github.com/pingcap/TiProxy/lib/util/cmd" @@ -40,6 +39,9 @@ type LoggerManager struct { func NewLoggerManager(cfg *config.Log) (*LoggerManager, *zap.Logger, error) { lm := &LoggerManager{} var err error + if cfg == nil { + cfg = &config.NewConfig().Log + } mainLogger, syncer, level, err := cmd.BuildLogger(cfg) if err != nil { return nil, nil, err @@ -72,7 +74,6 @@ func (lm *LoggerManager) watchCfg(ctx context.Context, cfgch <-chan *config.Conf err := lm.updateLoggerCfg(cfg) if err != nil { bytes, merr := json.Marshal(cfg) - fmt.Printf("ggg %+v %+v\n", cfg, err) lm.logger.Error("update logger configuration failed", zap.NamedError("update error", err), zap.String("cfg", string(bytes)), diff --git a/pkg/manager/namespace/manager.go b/pkg/manager/namespace/manager.go index 0f79181e..03012233 100644 --- a/pkg/manager/namespace/manager.go +++ b/pkg/manager/namespace/manager.go @@ -86,14 +86,14 @@ func (mgr *NamespaceManager) CommitNamespaces(nss []*config.Namespace, nss_delet return nil } -func (mgr *NamespaceManager) Init(logger *zap.Logger, nss []*config.Namespace, client *clientv3.Client, httpCli *http.Client) error { +func (mgr *NamespaceManager) Init(logger *zap.Logger, nscs []*config.Namespace, client *clientv3.Client, httpCli *http.Client) error { mgr.Lock() mgr.client = client mgr.httpCli = httpCli mgr.logger = logger mgr.Unlock() - return mgr.CommitNamespaces(nss, nil) + return mgr.CommitNamespaces(nscs, nil) } func (n *NamespaceManager) GetNamespace(nm string) (*Namespace, bool) { diff --git a/pkg/proxy/proxy.go b/pkg/proxy/proxy.go index b21befba..7c86b240 100644 --- a/pkg/proxy/proxy.go +++ b/pkg/proxy/proxy.go @@ -91,28 +91,40 @@ func (s *SQLServer) reset(cfg *config.ProxyServerOnline) { func (s *SQLServer) Run(ctx context.Context, cfgch <-chan *config.Config) { // Create another context because it still needs to run after graceful shutdown. ctx, s.cancelFunc = context.WithCancel(context.Background()) - for { - select { - case <-ctx.Done(): - return - case ach := <-cfgch: - s.reset(&ach.Proxy.ProxyServerOnline) - default: - conn, err := s.listener.Accept() - if err != nil { - if errors.Is(err, net.ErrClosed) { - return - } - s.logger.Error("accept failed", zap.Error(err)) - continue + s.wg.Run(func() { + for { + select { + case <-ctx.Done(): + return + case ach := <-cfgch: + s.reset(&ach.Proxy.ProxyServerOnline) } + } + }) - s.wg.Run(func() { - s.onConn(ctx, conn) - }) + s.wg.Run(func() { + for { + select { + case <-ctx.Done(): + return + default: + conn, err := s.listener.Accept() + if err != nil { + if errors.Is(err, net.ErrClosed) { + return + } + + s.logger.Error("accept failed", zap.Error(err)) + continue + } + + s.wg.Run(func() { + s.onConn(ctx, conn) + }) + } } - } + }) } func (s *SQLServer) onConn(ctx context.Context, conn net.Conn) { diff --git a/pkg/sctx/context.go b/pkg/sctx/context.go index f9cd5e08..48102b0c 100644 --- a/pkg/sctx/context.go +++ b/pkg/sctx/context.go @@ -21,8 +21,9 @@ import ( ) type Context struct { - Config *config.Config - Handler ServerHandler + Overlay config.Config + ConfigFile string + Handler ServerHandler } type ServerHandler interface { diff --git a/pkg/server/api/config.go b/pkg/server/api/config.go index 58d1db54..446649f6 100644 --- a/pkg/server/api/config.go +++ b/pkg/server/api/config.go @@ -15,11 +15,10 @@ package api import ( + "io" "net/http" "github.com/gin-gonic/gin" - "github.com/pingcap/TiProxy/lib/config" - "github.com/pingcap/TiProxy/lib/util/errors" mgrcfg "github.com/pingcap/TiProxy/pkg/manager/config" "go.uber.org/zap" ) @@ -30,13 +29,14 @@ type configHttpHandler struct { } func (h *configHttpHandler) HandleSetConfig(c *gin.Context) { - cfg := new(config.Config) - if c.ShouldBindJSON(cfg) != nil { - c.JSON(http.StatusBadRequest, "bad config json") + data, err := io.ReadAll(c.Request.Body) + if err != nil { + h.logger.Error("fail to read config", zap.Error(err)) + c.JSON(http.StatusInternalServerError, "fail to read config") return } - if err := h.cfgmgr.SetConfig(c, cfg); err != nil { + if err := h.cfgmgr.SetTOMLConfig(data); err != nil { h.logger.Error("can not update config", zap.Error(err)) c.JSON(http.StatusInternalServerError, "can not update config") return @@ -46,15 +46,7 @@ func (h *configHttpHandler) HandleSetConfig(c *gin.Context) { } func (h *configHttpHandler) HandleGetConfig(c *gin.Context) { - var cfg config.Config - err := h.cfgmgr.GetConfig(c, &cfg) - if err != nil && !errors.Is(err, mgrcfg.ErrNoResults) { - h.logger.Error("can not get config", zap.Error(err)) - c.JSON(http.StatusInternalServerError, "can not get config") - return - } - - c.JSON(http.StatusOK, &cfg) + c.TOML(http.StatusOK, h.cfgmgr.GetConfig()) } func registerConfig(group *gin.RouterGroup, logger *zap.Logger, cfgmgr *mgrcfg.ConfigManager) { diff --git a/pkg/server/server.go b/pkg/server/server.go index f11ed1a3..4a093f7b 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -75,29 +75,34 @@ func NewServer(ctx context.Context, sctx *sctx.Context) (srv *Server, err error) wg: waitgroup.WaitGroup{}, } - cfg := sctx.Config handler := sctx.Handler + ready := atomic.NewBool(false) // set up logger var lg *zap.Logger - if srv.LoggerManager, lg, err = logger.NewLoggerManager(&cfg.Log); err != nil { + if srv.LoggerManager, lg, err = logger.NewLoggerManager(&sctx.Overlay.Log); err != nil { return } - // setup certs - { - clogger := lg.Named("cert") - if err = srv.CertManager.Init(cfg, clogger); err != nil { - return - } + // setup config manager + if err = srv.ConfigManager.Init(ctx, lg.Named("config"), sctx.ConfigFile, &sctx.Overlay); err != nil { + err = errors.WithStack(err) + return } + cfg := srv.ConfigManager.GetConfig() - ready := atomic.NewBool(false) + // also hook logger + srv.LoggerManager.Init(srv.ConfigManager.WatchConfig()) // setup metrics srv.MetricsManager.Init(ctx, lg.Named("metrics"), cfg.Metrics.MetricsAddr, cfg.Metrics.MetricsInterval, cfg.Proxy.Addr) - // setup gin and etcd + // setup certs + if err = srv.CertManager.Init(cfg, lg.Named("cert")); err != nil { + return + } + + // setup gin { slogger := lg.Named("gin") gin.SetMode(gin.ReleaseMode) @@ -150,21 +155,20 @@ func NewServer(ctx context.Context, sctx *sctx.Context) (srv *Server, err error) } } - // setup config manager + // setup namespace manager { - err = srv.ConfigManager.Init(ctx, cfg, lg.Named("config")) + srv.ObserverClient, err = router.InitEtcdClient(lg.Named("pd"), cfg, srv.CertManager) if err != nil { err = errors.WithStack(err) return } - srv.LoggerManager.Init(srv.ConfigManager.WatchConfig(ctx)) - nscs, nerr := srv.ConfigManager.ListAllNamespace(ctx) if nerr != nil { err = errors.WithStack(nerr) return } + if len(nscs) == 0 { // no existed namespace nsc := &config.Namespace{ @@ -177,25 +181,10 @@ func NewServer(ctx context.Context, sctx *sctx.Context) (srv *Server, err error) if err = srv.ConfigManager.SetNamespace(ctx, nsc.Namespace, nsc); err != nil { return } - } - } - - // setup namespace manager - { - srv.ObserverClient, err = router.InitEtcdClient(lg.Named("pd"), cfg, srv.CertManager) - if err != nil { - err = errors.WithStack(err) - return - } - - var nss []*config.Namespace - nss, err = srv.ConfigManager.ListAllNamespace(ctx) - if err != nil { - err = errors.WithStack(err) - return + nscs = append(nscs, nsc) } - err = srv.NamespaceManager.Init(lg.Named("nsmgr"), nss, srv.ObserverClient, srv.Http) + err = srv.NamespaceManager.Init(lg.Named("nsmgr"), nscs, srv.ObserverClient, srv.Http) if err != nil { err = errors.WithStack(err) return @@ -216,9 +205,7 @@ func NewServer(ctx context.Context, sctx *sctx.Context) (srv *Server, err error) return } - srv.wg.Run(func() { - srv.Proxy.Run(ctx, srv.ConfigManager.WatchConfig(ctx)) - }) + srv.Proxy.Run(ctx, srv.ConfigManager.WatchConfig()) } ready.Toggle()