From 6d8aa228a2f1ba20e488cce74f11f627540abad6 Mon Sep 17 00:00:00 2001 From: xhe Date: Tue, 29 Nov 2022 19:14:12 +0800 Subject: [PATCH 1/3] server: remove etcd Signed-off-by: xhe --- cmd/tiproxy/main.go | 5 - go.mod | 1 + go.sum | 2 + pkg/manager/config/config.go | 189 +++++++---------------------- pkg/manager/config/config_test.go | 101 --------------- pkg/manager/config/manager.go | 161 +++++++++++------------- pkg/manager/config/manager_test.go | 21 ++-- pkg/manager/config/namespace.go | 14 +-- pkg/server/api/config.go | 12 +- pkg/server/server.go | 180 +++++---------------------- 10 files changed, 177 insertions(+), 509 deletions(-) delete mode 100644 pkg/manager/config/config_test.go diff --git a/cmd/tiproxy/main.go b/cmd/tiproxy/main.go index c6758670..a58f24ec 100644 --- a/cmd/tiproxy/main.go +++ b/cmd/tiproxy/main.go @@ -21,7 +21,6 @@ import ( "github.com/pingcap/TiProxy/lib/config" "github.com/pingcap/TiProxy/lib/util/cmd" "github.com/pingcap/TiProxy/lib/util/errors" - "github.com/pingcap/TiProxy/lib/util/waitgroup" "github.com/pingcap/TiProxy/pkg/sctx" "github.com/pingcap/TiProxy/pkg/server" "github.com/spf13/cobra" @@ -78,15 +77,11 @@ func main() { return errors.Wrapf(err, "fail to create server") } - var wg waitgroup.WaitGroup - wg.Run(func() { srv.Run(cmd.Context()) }) - <-cmd.Context().Done() if e := srv.Close(); e != nil { err = errors.Wrapf(err, "shutdown with errors") } - wg.Wait() return err } diff --git a/go.mod b/go.mod index 83cfc7f5..a82c20d2 100644 --- a/go.mod +++ b/go.mod @@ -86,6 +86,7 @@ require ( github.com/soheilhy/cmux v0.1.5 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/stathat/consistent v1.0.0 // indirect + github.com/tidwall/btree v1.5.2 // indirect github.com/tikv/client-go/v2 v2.0.1-0.20220906094532-f867f498456f // indirect github.com/tikv/pd/client v0.0.0-20220725055910-7187a7ab72db // indirect github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect diff --git a/go.sum b/go.sum index 5869968d..cae1fa33 100644 --- a/go.sum +++ b/go.sum @@ -611,6 +611,8 @@ github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PK github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ= +github.com/tidwall/btree v1.5.2 h1:5eA83Gfki799V3d3bJo9sWk+yL2LRoTEah3O/SA6/8w= +github.com/tidwall/btree v1.5.2/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY= github.com/tikv/client-go/v2 v2.0.1-0.20220906094532-f867f498456f h1:wjRWmUl4QmJF7V0aUskjT8EjjpfWxi5o9SQR5S1nNWA= github.com/tikv/client-go/v2 v2.0.1-0.20220906094532-f867f498456f/go.mod h1:tkKDJ88lryb16v7FfCh8pvvfwwCkh4aGeSOqHviPaaE= github.com/tikv/pd/client v0.0.0-20220725055910-7187a7ab72db h1:r1eMh9Rny3hfWuBuxOnbsCRrR4FhthiNxLQ5rAUtaww= diff --git a/pkg/manager/config/config.go b/pkg/manager/config/config.go index ba7e19b7..eacf57f3 100644 --- a/pkg/manager/config/config.go +++ b/pkg/manager/config/config.go @@ -17,170 +17,75 @@ package config import ( "context" "encoding/json" + "path" "reflect" "github.com/pingcap/TiProxy/lib/config" "github.com/pingcap/TiProxy/lib/util/errors" - "go.etcd.io/etcd/api/v3/mvccpb" "go.uber.org/zap" ) -type mKeyType reflect.Type - -type OnlineCfgTypes interface { - config.ProxyServerOnline | config.LogOnline -} - -type imeta interface { - getPrefix() string - unmarshal(bytes []byte) (any, error) - addToCh(any) - getInitial(cfg *config.Config) any +var cfg2Path = map[reflect.Type]string{ + reflect.TypeOf(config.ProxyServerOnline{}): "proxy", + reflect.TypeOf(config.LogOnline{}): "log", } -type meta[T OnlineCfgTypes] struct { - prefix string - initFunc func(cfg *config.Config) T - ch chan *T -} - -func newMeta[T OnlineCfgTypes](prefix string, initFunc func(cfg *config.Config) T) *meta[T] { - return &meta[T]{ - prefix: prefix, - initFunc: initFunc, - ch: make(chan *T, 1), +func (e *ConfigManager) SetConfig(ctx context.Context, val any) error { + rf := reflect.TypeOf(val) + if rf.Kind() == reflect.Pointer { + rf = rf.Elem() } -} - -func (m meta[T]) unmarshal(bytes []byte) (any, error) { - var t T - err := json.Unmarshal(bytes, &t) - return &t, err -} - -func (m meta[T]) getPrefix() string { - return m.prefix -} - -func (m meta[T]) addToCh(obj any) { - m.ch <- obj.(*T) -} - -func (m meta[T]) getInitial(cfg *config.Config) any { - return m.initFunc(cfg) -} - -func getMetaKey[T OnlineCfgTypes]() mKeyType { - return reflect.TypeOf(new(T)) -} - -func getMetaKeyByObj[T OnlineCfgTypes](t *T) mKeyType { - return reflect.TypeOf(t) -} - -func (e *ConfigManager) initMetas() { - e.metas = map[mKeyType]imeta{ - getMetaKey[config.ProxyServerOnline](): newMeta(pathPrefixProxyServer, func(cfg *config.Config) config.ProxyServerOnline { - return cfg.Proxy.ProxyServerOnline - }), - getMetaKey[config.LogOnline](): newMeta(pathPrefixLog, func(cfg *config.Config) config.LogOnline { - return cfg.Log.LogOnline - }), - } -} - -func (e *ConfigManager) watchConfig(ctx context.Context, cfg *config.Config) error { - for _, m := range e.metas { - if err := func(m imeta) error { - _, err := e.get(ctx, m.getPrefix(), "") - if err != nil && errors.Is(err, ErrNoOrMultiResults) { - value, err := json.Marshal(m.getInitial(cfg)) - if err != nil { - return err - } - if err = e.set(ctx, m.getPrefix(), "", string(value)); err != nil { - return err - } - } - e.watch(ctx, m.getPrefix(), "", func(logger *zap.Logger, evt mvccpb.Event) { - if obj, err := m.unmarshal(evt.Kv.Value); err != nil { - logger.Warn("failed unmarshal proxy config", zap.Error(err)) - return - } else { - m.addToCh(obj) - } - }) - return nil - }(m); err != nil { - return err - } + p, ok := cfg2Path[rf] + if !ok { + return errors.WithStack(errors.New("invalid type")) } - return nil -} - -func (e *ConfigManager) getCfg(ctx context.Context, metaKey reflect.Type) (any, error) { - m := e.metas[metaKey] - val, err := e.get(ctx, m.getPrefix(), "") + c, err := json.Marshal(val) if err != nil { - return nil, err + return errors.WithStack(err) } - return m.unmarshal(val.Value) + return e.set(ctx, pathPrefixConfig, p, c) } -func (e *ConfigManager) setCfg(ctx context.Context, metaKey mKeyType, obj any) error { - m := e.metas[metaKey] - value, err := json.Marshal(obj) - if err != nil { - return err +func (e *ConfigManager) GetConfig(ctx context.Context, val any) error { + rf := reflect.TypeOf(val) + if rf.Kind() == reflect.Pointer { + rf = rf.Elem() + } + p, ok := cfg2Path[rf] + if !ok { + return errors.WithStack(errors.New("invalid type")) } - return e.set(ctx, m.getPrefix(), "", string(value)) -} -// GetConfig queries the configuration from the config center. -func GetConfig[T OnlineCfgTypes](ctx context.Context, e *ConfigManager, t *T) error { - obj, err := e.getCfg(ctx, getMetaKeyByObj(t)) + c, err := e.get(ctx, pathPrefixConfig, p) if err != nil { return err } - *t = *obj.(*T) - return nil -} - -// SetConfig sets a configuration to the config center. -func SetConfig[T OnlineCfgTypes](ctx context.Context, e *ConfigManager, t *T) error { - return e.setCfg(ctx, getMetaKeyByObj(t), t) -} - -// GetCfgWatch returns the channel that contains updated configuration. -func GetCfgWatch[T OnlineCfgTypes](e *ConfigManager) chan *T { - mt := e.metas[getMetaKey[T]()].(*meta[T]) - return mt.ch -} -func (e *ConfigManager) GetProxyConfigWatch() <-chan *config.ProxyServerOnline { - return GetCfgWatch[config.ProxyServerOnline](e) + return json.Unmarshal(c.Value, val) } -func (e *ConfigManager) GetProxyConfig(ctx context.Context) (*config.ProxyServerOnline, error) { - var pso config.ProxyServerOnline - err := GetConfig(ctx, e, &pso) - return &pso, err -} - -func (e *ConfigManager) SetProxyConfig(ctx context.Context, proxy *config.ProxyServerOnline) error { - return SetConfig(ctx, e, proxy) -} - -func (e *ConfigManager) GetLogConfigWatch() <-chan *config.LogOnline { - return GetCfgWatch[config.LogOnline](e) -} - -func (e *ConfigManager) GetLogConfig(ctx context.Context) (*config.LogOnline, error) { - var co config.LogOnline - err := GetConfig(ctx, e, &co) - return &co, err -} +func MakeConfigChan[T any](e *ConfigManager, initval *T) <-chan *T { + cfgchan := make(chan *T, 64) + rf := reflect.TypeOf(initval) + if rf.Kind() == reflect.Pointer { + rf = rf.Elem() + } + p, ok := cfg2Path[rf] + if !ok { + panic(errors.WithStack(errors.New("invalid type"))) + } -func (e *ConfigManager) SetLogConfig(ctx context.Context, log *config.LogOnline) error { - return SetConfig(ctx, e, log) + e.SetConfig(context.Background(), initval) + e.Watch(path.Join(pathPrefixConfig, p), func(_ *zap.Logger, k KVEvent) { + var v T + if k.Type == KVEventDel { + cfgchan <- &v + } else { + e := json.Unmarshal(k.Value, &v) + if e == nil { + cfgchan <- &v + } + } + }) + return cfgchan } diff --git a/pkg/manager/config/config_test.go b/pkg/manager/config/config_test.go deleted file mode 100644 index ac81f3e1..00000000 --- a/pkg/manager/config/config_test.go +++ /dev/null @@ -1,101 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package config - -import ( - "context" - "testing" - "time" - - "github.com/pingcap/TiProxy/lib/config" - "github.com/stretchr/testify/require" -) - -func TestProxyConfig(t *testing.T) { - cases := []*config.ProxyServerOnline{ - { - MaxConnections: 1, - TCPKeepAlive: false, - }, - { - MaxConnections: 1, - TCPKeepAlive: true, - }, - { - MaxConnections: 0, - TCPKeepAlive: false, - }, - { - MaxConnections: 0, - TCPKeepAlive: true, - }, - } - - testWatch(t, cases, func(cfgmgr *ConfigManager) <-chan *config.ProxyServerOnline { - return cfgmgr.GetProxyConfigWatch() - }, func(ctx context.Context, cfgmgr *ConfigManager, tc *config.ProxyServerOnline) error { - return cfgmgr.SetProxyConfig(ctx, tc) - }) -} - -func TestLogConfig(t *testing.T) { - cases := []*config.LogOnline{ - { - Level: "info", - LogFile: config.LogFile{ - Filename: "proxy.log", - MaxSize: 100, - MaxDays: 10, - MaxBackups: 10, - }, - }, - { - Level: "debug", - LogFile: config.LogFile{ - Filename: "l.log", - MaxSize: 1, - MaxDays: 1, - MaxBackups: 1, - }, - }, - { - Level: "", - LogFile: config.LogFile{}, - }, - } - - testWatch(t, cases, func(cfgmgr *ConfigManager) <-chan *config.LogOnline { - return cfgmgr.GetLogConfigWatch() - }, func(ctx context.Context, cfgmgr *ConfigManager, tc *config.LogOnline) error { - return cfgmgr.SetLogConfig(ctx, tc) - }) -} - -func testWatch[T OnlineCfgTypes](t *testing.T, cases []*T, getWatch func(*ConfigManager) <-chan *T, - setConfig func(context.Context, *ConfigManager, *T) error) { - cfgmgr, ctx := testConfigManager(t, &config.Config{}) - ch := getWatch(cfgmgr) - require.Equal(t, <-ch, new(T)) - - for _, tc := range cases { - require.NoError(t, setConfig(ctx, cfgmgr, tc)) - select { - case <-time.After(5 * time.Second): - t.Fatalf("\n\ntimeout waiting chan\n\n") - case tg := <-ch: - require.Equal(t, tc, tg) - } - } -} diff --git a/pkg/manager/config/manager.go b/pkg/manager/config/manager.go index 7d639dc3..3f844f6e 100644 --- a/pkg/manager/config/manager.go +++ b/pkg/manager/config/manager.go @@ -17,136 +17,123 @@ package config import ( "context" "path" - "time" + "strings" "github.com/pingcap/TiProxy/lib/config" "github.com/pingcap/TiProxy/lib/util/errors" "github.com/pingcap/TiProxy/lib/util/waitgroup" - "go.etcd.io/etcd/api/v3/mvccpb" + "github.com/tidwall/btree" clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/server/v3/lease" - "go.etcd.io/etcd/server/v3/mvcc" "go.uber.org/zap" ) const ( - DefaultEtcdDialTimeout = 3 * time.Second - DefaultEtcdPath = "/config" - - pathPrefixNamespace = "ns" - pathPrefixProxyServer = "proxy" - pathPrefixLog = "log" + pathPrefixNamespace = "ns" + pathPrefixConfig = "config" ) var ( - ErrNoOrMultiResults = errors.Errorf("has no results or multiple results") + ErrNoResults = errors.Errorf("has no results") + ErrFail2Update = errors.Errorf("failed to update") +) + +type KVValue struct { + Key string + Value []byte +} + +type KVEventType byte + +const ( + KVEventPut KVEventType = iota + KVEventDel ) +type KVEvent struct { + KVValue + Type KVEventType +} + +type kvListener struct { + key string + wfn func(*zap.Logger, KVEvent) +} + type ConfigManager struct { - wg waitgroup.WaitGroup - kv mvcc.WatchableKV - cancel context.CancelFunc - logger *zap.Logger - basePath string - watchInterval time.Duration - ignoreWrongNamespace bool - metas map[mKeyType]imeta + wg waitgroup.WaitGroup + cancel context.CancelFunc + kv *btree.BTreeG[KVValue] + events chan KVEvent + listeners []kvListener + logger *zap.Logger } func NewConfigManager() *ConfigManager { return &ConfigManager{} } -func (srv *ConfigManager) Init(ctx context.Context, kv mvcc.WatchableKV, cfg *config.Config, logger *zap.Logger) error { +func (srv *ConfigManager) Init(ctx context.Context, cfg *config.Config, logger *zap.Logger) error { srv.logger = logger - srv.ignoreWrongNamespace = cfg.Advance.IgnoreWrongNamespace - // slash appended to distinguish '/dir'(file) and '/dir/'(directory) - srv.basePath = appendSlashToDirPath(DefaultEtcdPath) - - srv.kv = kv - srv.initMetas() - - ctx, cancel := context.WithCancel(ctx) - srv.cancel = cancel - - return srv.watchConfig(ctx, cfg) -} + srv.events = make(chan KVEvent, 256) + srv.kv = btree.NewBTreeG(func(a, b KVValue) bool { + return a.Key < b.Key + }) -func (e *ConfigManager) watch(ctx context.Context, ns, key string, f func(*zap.Logger, mvccpb.Event)) { - wkey := []byte(path.Join(e.basePath, ns, key)) - logger := e.logger.With(zap.String("component", string(wkey))) - retryInterval := 5 * time.Second - rev := e.kv.Rev() - e.wg.Run(func() { - wch := e.kv.NewWatchStream() - defer wch.Close() - for { - if _, err := wch.Watch(mvcc.AutoWatchID, wkey, getPrefix(wkey), rev); err == nil { - break - } - if k := retryInterval * 2; k < e.watchInterval { - retryInterval = k - } - logger.Warn("failed to watch, will try again later", zap.Duration("sleep", retryInterval)) - select { - case <-ctx.Done(): - return - case <-time.After(retryInterval): - } - } + var nctx context.Context + nctx, srv.cancel = context.WithCancel(ctx) + srv.wg.Run(func() { for { select { - case <-ctx.Done(): + case <-nctx.Done(): return - case res := <-wch.Chan(): - for _, evt := range res.Events { - f(logger, evt) + case ev := <-srv.events: + for _, lsn := range srv.listeners { + lsn.wfn(srv.logger, ev) } } } }) + return nil } -func (e *ConfigManager) get(ctx context.Context, ns, key string) (*mvccpb.KeyValue, error) { - resp, err := e.kv.Range(ctx, []byte(path.Join(e.basePath, ns, key)), nil, mvcc.RangeOptions{Rev: 0}) - if err != nil { - return nil, err - } - if len(resp.KVs) != 1 { - return nil, ErrNoOrMultiResults - } - return &resp.KVs[0], nil +func (e *ConfigManager) Watch(key string, handler func(*zap.Logger, KVEvent)) { + e.listeners = append(e.listeners, kvListener{key, handler}) } -func getPrefix(key []byte) []byte { - end := make([]byte, len(key)) - copy(end, key) - for i := len(end) - 1; i >= 0; i-- { - if end[i] < 0xff { - end[i] = end[i] + 1 - end = end[:i+1] - return end - } +func (e *ConfigManager) get(ctx context.Context, ns, key string) (KVValue, error) { + nkey := path.Clean(path.Join(ns, key)) + v, ok := e.kv.Get(KVValue{Key: nkey}) + if !ok { + return v, errors.WithStack(errors.Wrapf(ErrNoResults, "key=%s", nkey)) } - return []byte{0} + return v, nil } -func (e *ConfigManager) list(ctx context.Context, ns string, ops ...clientv3.OpOption) ([]mvccpb.KeyValue, error) { - k := []byte(path.Join(e.basePath, ns)) - resp, err := e.kv.Range(ctx, k, getPrefix(k), mvcc.RangeOptions{Rev: 0}) - if err != nil { - return nil, err - } - return resp.KVs, nil +func (e *ConfigManager) list(ctx context.Context, ns string, ops ...clientv3.OpOption) ([]KVValue, error) { + k := path.Clean(ns) + var resp []KVValue + e.kv.Ascend(KVValue{Key: k}, func(item KVValue) bool { + if !strings.HasPrefix(item.Key, k) { + return false + } + resp = append(resp, item) + return true + }) + return resp, nil } -func (e *ConfigManager) set(ctx context.Context, ns, key, val string) error { - _ = e.kv.Put([]byte(path.Join(e.basePath, ns, key)), []byte(val), lease.NoLease) +func (e *ConfigManager) set(ctx context.Context, ns, key string, val []byte) error { + v := KVValue{Key: path.Clean(path.Join(ns, key)), Value: val} + _, _ = e.kv.Set(v) + e.events <- KVEvent{Type: KVEventPut, KVValue: v} return nil } func (e *ConfigManager) del(ctx context.Context, ns, key string) error { - _, _ = e.kv.DeleteRange([]byte(path.Join(e.basePath, ns, key)), nil) + v, ok := e.kv.Delete(KVValue{Key: path.Clean(path.Join(ns, key))}) + if ok { + e.events <- KVEvent{Type: KVEventPut, KVValue: v} + } return nil } diff --git a/pkg/manager/config/manager_test.go b/pkg/manager/config/manager_test.go index 7d0fe651..2f17e4b7 100644 --- a/pkg/manager/config/manager_test.go +++ b/pkg/manager/config/manager_test.go @@ -27,7 +27,6 @@ import ( "github.com/pingcap/TiProxy/lib/util/logger" "github.com/pingcap/TiProxy/lib/util/waitgroup" "github.com/stretchr/testify/require" - "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/server/v3/embed" "go.uber.org/zap" @@ -55,7 +54,7 @@ func testConfigManager(t *testing.T, cfg *config.Config) (*ConfigManager, contex } cfgmgr := NewConfigManager() - require.NoError(t, cfgmgr.Init(ctx, etcd.Server.KV(), cfg, logger)) + require.NoError(t, cfgmgr.Init(ctx, cfg, logger)) t.Cleanup(func() { require.NoError(t, cfgmgr.Close()) @@ -84,7 +83,7 @@ func TestBase(t *testing.T) { ns := getNs(i) for j := 0; j < valNum; j++ { k := getKey(j) - require.NoError(t, cfgmgr.set(ctx, ns, k, k)) + require.NoError(t, cfgmgr.set(ctx, ns, k, []byte(k))) } } @@ -95,7 +94,7 @@ func TestBase(t *testing.T) { k := getKey(j) v, err := cfgmgr.get(ctx, ns, k) require.NoError(t, err) - require.Equal(t, string(v.Key), path.Join(DefaultEtcdPath, ns, k)) + require.Equal(t, string(v.Key), path.Join(ns, k)) require.Equal(t, string(v.Value), k) } } @@ -117,7 +116,7 @@ func TestBase(t *testing.T) { ns := getNs(i) for j := 0; j < valNum; j++ { k := getKey(j) - require.NoError(t, cfgmgr.set(ctx, ns, k, k)) + require.NoError(t, cfgmgr.set(ctx, ns, k, nil)) require.NoError(t, cfgmgr.del(ctx, ns, k)) } @@ -135,7 +134,7 @@ func TestBaseConcurrency(t *testing.T) { for i := 0; i < batchNum; i++ { k := fmt.Sprint(i) wg.Run(func() { - require.NoError(t, cfgmgr.set(ctx, k, "1", "1")) + require.NoError(t, cfgmgr.set(ctx, k, "1", []byte("1"))) }) wg.Run(func() { @@ -148,14 +147,14 @@ func TestBaseConcurrency(t *testing.T) { for i := 0; i < batchNum; i++ { k := fmt.Sprint(i) - require.NoError(t, cfgmgr.set(ctx, k, "1", "1")) + require.NoError(t, cfgmgr.set(ctx, k, "1", []byte("1"))) } for i := 0; i < batchNum; i++ { k := fmt.Sprint(i) wg.Run(func() { - require.NoError(t, cfgmgr.set(ctx, k, "1", "1")) + require.NoError(t, cfgmgr.set(ctx, k, "1", []byte("1"))) }) wg.Run(func() { @@ -170,12 +169,12 @@ func TestBaseWatch(t *testing.T) { cfgmgr, ctx := testConfigManager(t, &config.Config{}) ch := make(chan string, 1) - cfgmgr.watch(ctx, "test", "t", func(_ *zap.Logger, e mvccpb.Event) { - ch <- string(e.Kv.Value) + cfgmgr.Watch("test", func(_ *zap.Logger, e KVEvent) { + ch <- string(e.Value) }) // set it - require.NoError(t, cfgmgr.set(ctx, "test", "t", "1")) + require.NoError(t, cfgmgr.set(ctx, "test", "t", []byte("1"))) // now the only way to check watch is to wait select { case <-time.After(5 * time.Second): diff --git a/pkg/manager/config/namespace.go b/pkg/manager/config/namespace.go index 9e007b37..63c975a0 100644 --- a/pkg/manager/config/namespace.go +++ b/pkg/manager/config/namespace.go @@ -20,16 +20,15 @@ import ( "errors" "github.com/pingcap/TiProxy/lib/config" - "go.uber.org/zap" ) func (e *ConfigManager) GetNamespace(ctx context.Context, ns string) (*config.Namespace, error) { - etcdKeyValue, err := e.get(ctx, pathPrefixNamespace, ns) + kv, err := e.get(ctx, pathPrefixNamespace, ns) if err != nil { return nil, err } var cfg config.Namespace - err = json.Unmarshal(etcdKeyValue.Value, &cfg) + err = json.Unmarshal(kv.Value, &cfg) return &cfg, err } @@ -43,12 +42,7 @@ func (e *ConfigManager) ListAllNamespace(ctx context.Context) ([]*config.Namespa for _, kv := range etcdKeyValues { var nsCfg config.Namespace if err := json.Unmarshal(kv.Value, &nsCfg); err != nil { - if e.ignoreWrongNamespace { - e.logger.Warn("parse namespace config error", zap.Error(err), zap.ByteString("namespace", kv.Key)) - continue - } else { - return nil, err - } + return nil, err } ret = append(ret, &nsCfg) } @@ -64,7 +58,7 @@ func (e *ConfigManager) SetNamespace(ctx context.Context, ns string, nsc *config if err != nil { return err } - return e.set(ctx, pathPrefixNamespace, ns, string(r)) + return e.set(ctx, pathPrefixNamespace, ns, r) } func (e *ConfigManager) DelNamespace(ctx context.Context, ns string) error { diff --git a/pkg/server/api/config.go b/pkg/server/api/config.go index 518ac7fe..632fa0d8 100644 --- a/pkg/server/api/config.go +++ b/pkg/server/api/config.go @@ -28,14 +28,18 @@ type configHttpHandler struct { cfgmgr *mgrcfg.ConfigManager } -func setConfig[T mgrcfg.OnlineCfgTypes](h *configHttpHandler, c *gin.Context) { +type OnlineCfgTypes interface { + config.ProxyServerOnline | config.LogOnline +} + +func setConfig[T OnlineCfgTypes](h *configHttpHandler, c *gin.Context) { cfg := new(T) if c.ShouldBindJSON(cfg) != nil { c.JSON(http.StatusBadRequest, "bad config json") return } - if err := mgrcfg.SetConfig(c, h.cfgmgr, cfg); err != nil { + if err := h.cfgmgr.SetConfig(c, cfg); err != nil { h.logger.Error("can not update config", zap.Error(err)) c.JSON(http.StatusInternalServerError, "can not update config") return @@ -44,9 +48,9 @@ func setConfig[T mgrcfg.OnlineCfgTypes](h *configHttpHandler, c *gin.Context) { c.JSON(http.StatusOK, "") } -func getConfig[T mgrcfg.OnlineCfgTypes](h *configHttpHandler, c *gin.Context) { +func getConfig[T OnlineCfgTypes](h *configHttpHandler, c *gin.Context) { var cfg T - err := mgrcfg.GetConfig(c, h.cfgmgr, &cfg) + err := h.cfgmgr.GetConfig(c, &cfg) if err != nil { h.logger.Error("can not get config", zap.Error(err)) c.JSON(http.StatusInternalServerError, "can not get config") diff --git a/pkg/server/server.go b/pkg/server/server.go index f9d006ee..3439fbd8 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -16,19 +16,14 @@ package server import ( "context" - "fmt" + "crypto/tls" + "net" "net/http" - "net/url" - "os" - "path/filepath" - "strconv" - "strings" ginzap "github.com/gin-contrib/zap" "github.com/gin-gonic/gin" "github.com/pingcap/TiProxy/lib/config" "github.com/pingcap/TiProxy/lib/util/errors" - "github.com/pingcap/TiProxy/lib/util/security" "github.com/pingcap/TiProxy/lib/util/waitgroup" "github.com/pingcap/TiProxy/pkg/manager/cert" mgrcfg "github.com/pingcap/TiProxy/pkg/manager/config" @@ -40,12 +35,12 @@ import ( "github.com/pingcap/TiProxy/pkg/sctx" "github.com/pingcap/TiProxy/pkg/server/api" clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/server/v3/embed" "go.uber.org/atomic" "go.uber.org/zap" ) type Server struct { + wg waitgroup.WaitGroup // managers ConfigManager *mgrcfg.ConfigManager NamespaceManager *mgrns.NamespaceManager @@ -55,10 +50,11 @@ type Server struct { ObserverClient *clientv3.Client // HTTP client Http *http.Client - // HTTP/GRPC services - Etcd *embed.Etcd + // HTTP server + HTTPListener net.Listener // L7 proxy - Proxy *proxy.SQLServer + Proxy *proxy.SQLServer + proxyCh <-chan *config.ProxyServerOnline } func NewServer(ctx context.Context, sctx *sctx.Context) (srv *Server, err error) { @@ -67,6 +63,7 @@ func NewServer(ctx context.Context, sctx *sctx.Context) (srv *Server, err error) MetricsManager: metrics.NewMetricsManager(), NamespaceManager: mgrns.NewNamespaceManager(), CertManager: cert.NewCertManager(), + wg: waitgroup.WaitGroup{}, } cfg := sctx.Config @@ -92,11 +89,12 @@ func NewServer(ctx context.Context, sctx *sctx.Context) (srv *Server, err error) // setup gin and etcd { + slogger := lg.Named("gin") gin.SetMode(gin.ReleaseMode) engine := gin.New() engine.Use( gin.Recovery(), - ginzap.Ginzap(lg.Named("gin"), "", true), + ginzap.Ginzap(slogger, "", true), func(c *gin.Context) { if !ready.Load() { c.Abort() @@ -105,31 +103,19 @@ func NewServer(ctx context.Context, sctx *sctx.Context) (srv *Server, err error) }, ) - // This is the tricky part. While HTTP services rely on managers, managers also rely on the etcd server. - // Etcd server is used to bring up the config manager and HTTP services itself. - // That means we have cyclic dependencies. Here's the solution: - // 1. create managers first, and pass them down - // 2. start etcd and HTTP, but HTTP will wait for managers to init - // 3. init managers using bootstrapped etcd - // - // We have some alternative solution, for example: - // 1. globally lazily creation of managers. It introduced racing/chaos-management/hard-code-reading as in TiDB. - // 2. pass down '*Server' struct such that the underlying relies on the pointer only. But it does not work well for golang. To avoid cyclic imports between 'api' and `server` packages, two packages needs to be merged. That is basically what happened to TiDB '*Session'. api.Register(engine.Group("/api"), cfg.API, lg.Named("api"), srv.NamespaceManager, srv.ConfigManager) - srv.Etcd, err = buildEtcd(sctx, lg.Named("etcd"), engine) + srv.HTTPListener, err = net.Listen("tcp", cfg.API.Addr) if err != nil { - err = errors.WithStack(err) - return + return nil, err } - - // wait for etcd server - select { - case <-ctx.Done(): - err = fmt.Errorf("timeout on creating etcd") - return - case <-srv.Etcd.Server.ReadyNotify(): + if tlscfg := srv.CertManager.ServerTLS(); tlscfg != nil { + srv.HTTPListener = tls.NewListener(srv.HTTPListener, tlscfg) } + + srv.wg.Run(func() { + slogger.Info("HTTP closed", zap.Error(engine.RunListener(srv.HTTPListener))) + }) } // general cluster HTTP client @@ -143,12 +129,13 @@ func NewServer(ctx context.Context, sctx *sctx.Context) (srv *Server, err error) // setup config manager { - err = srv.ConfigManager.Init(ctx, srv.Etcd.Server.KV(), cfg, lg.Named("config")) + err = srv.ConfigManager.Init(ctx, cfg, lg.Named("config")) if err != nil { err = errors.WithStack(err) return } - srv.LoggerManager.Init(srv.ConfigManager.GetLogConfigWatch()) + + srv.LoggerManager.Init(mgrcfg.MakeConfigChan(srv.ConfigManager, &cfg.Log.LogOnline)) nscs, nerr := srv.ConfigManager.ListAllNamespace(ctx) if nerr != nil { @@ -199,21 +186,26 @@ func NewServer(ctx context.Context, sctx *sctx.Context) (srv *Server, err error) err = errors.WithStack(err) return } + + srv.proxyCh = mgrcfg.MakeConfigChan(srv.ConfigManager, &cfg.Proxy.ProxyServerOnline) + + srv.wg.Run(func() { + srv.Proxy.Run(ctx, srv.proxyCh) + }) } ready.Toggle() return } -func (s *Server) Run(ctx context.Context) { - s.Proxy.Run(ctx, s.ConfigManager.GetProxyConfigWatch()) -} - func (s *Server) Close() error { errs := make([]error, 0, 4) if s.Proxy != nil { errs = append(errs, s.Proxy.Close()) } + if s.HTTPListener != nil { + s.HTTPListener.Close() + } if s.NamespaceManager != nil { errs = append(errs, s.NamespaceManager.Close()) } @@ -223,20 +215,6 @@ func (s *Server) Close() error { if s.ObserverClient != nil { errs = append(errs, s.ObserverClient.Close()) } - if s.Etcd != nil { - var wg waitgroup.WaitGroup - wg.Run(func() { - for { - err, ok := <-s.Etcd.Err() - if !ok { - break - } - errs = append(errs, err) - } - }) - s.Etcd.Close() - wg.Wait() - } if s.MetricsManager != nil { s.MetricsManager.Close() } @@ -245,102 +223,6 @@ func (s *Server) Close() error { errs = append(errs, err) } } + s.wg.Wait() return errors.Collect(ErrCloseServer, errs...) } - -func buildEtcd(ctx *sctx.Context, logger *zap.Logger, engine *gin.Engine) (srv *embed.Etcd, err error) { - cfg := ctx.Config - if ctx.Cluster.ClusterName == "" { - return nil, errors.New("cluster_name can not be empty") - } - if ctx.Cluster.NodeName == "" { - hname, err := os.Hostname() - if err != nil { - return nil, errors.WithStack(err) - } - ctx.Cluster.NodeName = fmt.Sprintf("%s-%s", ctx.Cluster.ClusterName, hname) - } - - cnt := 0 - if len(ctx.Cluster.BootstrapClusters) != 0 { - cnt++ - } - if ctx.Cluster.BootstrapDurl != "" { - cnt++ - } - if ctx.Cluster.BootstrapDdns != "" { - cnt++ - } - if cnt > 1 { - return nil, errors.New("you can only pass one 'bootstrap_xxx' to bootstrap the node, or leave them empty to start a single-node cluster") - } - - etcd_cfg := embed.NewConfig() - - if etcd_cfg.ClientTLSInfo, etcd_cfg.PeerTLSInfo, err = security.BuildEtcdTLSConfig(logger, cfg.Security.ServerTLS, cfg.Security.PeerTLS); err != nil { - return - } - - apiAddr, err := url.Parse(fmt.Sprintf("http://%s", cfg.API.Addr)) - if err != nil { - return nil, err - } - if etcd_cfg.ClientTLSInfo.Empty() { - apiAddr.Scheme = "http" - } else { - apiAddr.Scheme = "https" - } - etcd_cfg.LCUrls = []url.URL{*apiAddr} - apiAddrAdvertise := *apiAddr - apiAddrAdvertise.Host = fmt.Sprintf("%s:%s", ctx.Cluster.PubAddr, apiAddrAdvertise.Port()) - etcd_cfg.ACUrls = []url.URL{apiAddrAdvertise} - - peerPort := cfg.Advance.PeerPort - if peerPort == "" { - peerPortNum, err := strconv.Atoi(apiAddr.Port()) - if err != nil { - return nil, err - } - peerPort = strconv.Itoa(peerPortNum + 1) - } - peerAddr := *apiAddr - if etcd_cfg.PeerTLSInfo.Empty() { - peerAddr.Scheme = "http" - } else { - peerAddr.Scheme = "https" - } - peerAddr.Host = fmt.Sprintf("%s:%s", peerAddr.Hostname(), peerPort) - etcd_cfg.LPUrls = []url.URL{peerAddr} - peerAddrAdvertise := peerAddr - peerAddrAdvertise.Host = fmt.Sprintf("%s:%s", ctx.Cluster.PubAddr, peerPort) - etcd_cfg.APUrls = []url.URL{peerAddrAdvertise} - - etcd_cfg.Name = ctx.Cluster.NodeName - if cnt == 0 { - etcd_cfg.InitialCluster = fmt.Sprintf("%s=%s", ctx.Cluster.NodeName, peerAddrAdvertise.String()) - etcd_cfg.InitialClusterToken = ctx.Cluster.ClusterName - } else if len(ctx.Cluster.BootstrapClusters) > 0 { - for i := range ctx.Cluster.BootstrapClusters { - if etcd_cfg.PeerTLSInfo.Empty() { - ctx.Cluster.BootstrapClusters[i] = strings.Replace(ctx.Cluster.BootstrapClusters[i], "=", "=http://", 1) - } else { - ctx.Cluster.BootstrapClusters[i] = strings.Replace(ctx.Cluster.BootstrapClusters[i], "=", "=https://", 1) - } - } - etcd_cfg.InitialCluster = strings.Join(append(ctx.Cluster.BootstrapClusters, fmt.Sprintf("%s=%s", ctx.Cluster.NodeName, peerAddrAdvertise.String())), ",") - etcd_cfg.InitialClusterToken = ctx.Cluster.ClusterName - } else if ctx.Cluster.BootstrapDurl != "" { - etcd_cfg.Durl = ctx.Cluster.BootstrapDurl - } else if ctx.Cluster.BootstrapDdns != "" { - etcd_cfg.DNSCluster = ctx.Cluster.BootstrapDdns - etcd_cfg.DNSClusterServiceName = ctx.Cluster.ClusterName - } - - etcd_cfg.Dir = filepath.Join(cfg.Workdir, "etcd") - etcd_cfg.ZapLoggerBuilder = embed.NewZapLoggerBuilder(logger) - - etcd_cfg.UserHandlers = map[string]http.Handler{ - "/api/": engine, - } - return embed.StartEtcd(etcd_cfg) -} From 426e48eb1c71eae0885da0ec90f50614c734c16a Mon Sep 17 00:00:00 2001 From: xhe Date: Fri, 2 Dec 2022 10:47:59 +0800 Subject: [PATCH 2/3] *: fix CI Signed-off-by: xhe --- pkg/manager/config/config.go | 3 ++- pkg/manager/config/utils.go | 22 ---------------------- 2 files changed, 2 insertions(+), 23 deletions(-) delete mode 100644 pkg/manager/config/utils.go diff --git a/pkg/manager/config/config.go b/pkg/manager/config/config.go index eacf57f3..fb220172 100644 --- a/pkg/manager/config/config.go +++ b/pkg/manager/config/config.go @@ -75,7 +75,8 @@ func MakeConfigChan[T any](e *ConfigManager, initval *T) <-chan *T { panic(errors.WithStack(errors.New("invalid type"))) } - e.SetConfig(context.Background(), initval) + _ = e.SetConfig(context.Background(), initval) + e.Watch(path.Join(pathPrefixConfig, p), func(_ *zap.Logger, k KVEvent) { var v T if k.Type == KVEventDel { diff --git a/pkg/manager/config/utils.go b/pkg/manager/config/utils.go deleted file mode 100644 index 3a525949..00000000 --- a/pkg/manager/config/utils.go +++ /dev/null @@ -1,22 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package config - -func appendSlashToDirPath(dir string) string { - if len(dir) == 0 || dir[len(dir)-1] == '/' { - return dir - } - return dir + "/" -} From 8c031697fadcccecff7dd39eadacd1eae1a7464f Mon Sep 17 00:00:00 2001 From: xhe Date: Fri, 2 Dec 2022 10:53:28 +0800 Subject: [PATCH 3/3] *: fix CI Signed-off-by: xhe --- go.mod | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go.mod b/go.mod index a82c20d2..33f4dbdd 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726 github.com/spf13/cobra v1.5.0 github.com/stretchr/testify v1.8.0 - go.etcd.io/etcd/api/v3 v3.5.4 + github.com/tidwall/btree v1.5.2 go.etcd.io/etcd/client/v3 v3.5.4 go.etcd.io/etcd/server/v3 v3.5.4 go.uber.org/atomic v1.10.0 @@ -86,7 +86,6 @@ require ( github.com/soheilhy/cmux v0.1.5 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/stathat/consistent v1.0.0 // indirect - github.com/tidwall/btree v1.5.2 // indirect github.com/tikv/client-go/v2 v2.0.1-0.20220906094532-f867f498456f // indirect github.com/tikv/pd/client v0.0.0-20220725055910-7187a7ab72db // indirect github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect @@ -97,6 +96,7 @@ require ( github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect github.com/yusufpapurcu/wmi v1.2.2 // indirect go.etcd.io/bbolt v1.3.6 // indirect + go.etcd.io/etcd/api/v3 v3.5.4 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.4 // indirect go.etcd.io/etcd/client/v2 v2.305.4 // indirect go.etcd.io/etcd/pkg/v3 v3.5.4 // indirect