Skip to content

Commit

Permalink
server: remove etcd (#140)
Browse files Browse the repository at this point in the history
  • Loading branch information
xhebox authored Dec 2, 2022
1 parent cc58ecd commit ed9650d
Show file tree
Hide file tree
Showing 11 changed files with 178 additions and 531 deletions.
5 changes: 0 additions & 5 deletions cmd/tiproxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/pingcap/TiProxy/lib/config"
"github.com/pingcap/TiProxy/lib/util/cmd"
"github.com/pingcap/TiProxy/lib/util/errors"
"github.com/pingcap/TiProxy/lib/util/waitgroup"
"github.com/pingcap/TiProxy/pkg/sctx"
"github.com/pingcap/TiProxy/pkg/server"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -78,15 +77,11 @@ func main() {
return errors.Wrapf(err, "fail to create server")
}

var wg waitgroup.WaitGroup
wg.Run(func() { srv.Run(cmd.Context()) })

<-cmd.Context().Done()
if e := srv.Close(); e != nil {
err = errors.Wrapf(err, "shutdown with errors")
}

wg.Wait()
return err
}

Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ require (
github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726
github.com/spf13/cobra v1.5.0
github.com/stretchr/testify v1.8.0
go.etcd.io/etcd/api/v3 v3.5.4
github.com/tidwall/btree v1.5.2
go.etcd.io/etcd/client/v3 v3.5.4
go.etcd.io/etcd/server/v3 v3.5.4
go.uber.org/atomic v1.10.0
Expand Down Expand Up @@ -96,6 +96,7 @@ require (
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.etcd.io/bbolt v1.3.6 // indirect
go.etcd.io/etcd/api/v3 v3.5.4 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.4 // indirect
go.etcd.io/etcd/client/v2 v2.305.4 // indirect
go.etcd.io/etcd/pkg/v3 v3.5.4 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,8 @@ github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PK
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ=
github.com/tidwall/btree v1.5.2 h1:5eA83Gfki799V3d3bJo9sWk+yL2LRoTEah3O/SA6/8w=
github.com/tidwall/btree v1.5.2/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY=
github.com/tikv/client-go/v2 v2.0.1-0.20220906094532-f867f498456f h1:wjRWmUl4QmJF7V0aUskjT8EjjpfWxi5o9SQR5S1nNWA=
github.com/tikv/client-go/v2 v2.0.1-0.20220906094532-f867f498456f/go.mod h1:tkKDJ88lryb16v7FfCh8pvvfwwCkh4aGeSOqHviPaaE=
github.com/tikv/pd/client v0.0.0-20220725055910-7187a7ab72db h1:r1eMh9Rny3hfWuBuxOnbsCRrR4FhthiNxLQ5rAUtaww=
Expand Down
188 changes: 47 additions & 141 deletions pkg/manager/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,170 +17,76 @@ package config
import (
"context"
"encoding/json"
"path"
"reflect"

"github.com/pingcap/TiProxy/lib/config"
"github.com/pingcap/TiProxy/lib/util/errors"
"go.etcd.io/etcd/api/v3/mvccpb"
"go.uber.org/zap"
)

type mKeyType reflect.Type

type OnlineCfgTypes interface {
config.ProxyServerOnline | config.LogOnline
}

type imeta interface {
getPrefix() string
unmarshal(bytes []byte) (any, error)
addToCh(any)
getInitial(cfg *config.Config) any
var cfg2Path = map[reflect.Type]string{
reflect.TypeOf(config.ProxyServerOnline{}): "proxy",
reflect.TypeOf(config.LogOnline{}): "log",
}

type meta[T OnlineCfgTypes] struct {
prefix string
initFunc func(cfg *config.Config) T
ch chan *T
}

func newMeta[T OnlineCfgTypes](prefix string, initFunc func(cfg *config.Config) T) *meta[T] {
return &meta[T]{
prefix: prefix,
initFunc: initFunc,
ch: make(chan *T, 1),
func (e *ConfigManager) SetConfig(ctx context.Context, val any) error {
rf := reflect.TypeOf(val)
if rf.Kind() == reflect.Pointer {
rf = rf.Elem()
}
}

func (m meta[T]) unmarshal(bytes []byte) (any, error) {
var t T
err := json.Unmarshal(bytes, &t)
return &t, err
}

func (m meta[T]) getPrefix() string {
return m.prefix
}

func (m meta[T]) addToCh(obj any) {
m.ch <- obj.(*T)
}

func (m meta[T]) getInitial(cfg *config.Config) any {
return m.initFunc(cfg)
}

func getMetaKey[T OnlineCfgTypes]() mKeyType {
return reflect.TypeOf(new(T))
}

func getMetaKeyByObj[T OnlineCfgTypes](t *T) mKeyType {
return reflect.TypeOf(t)
}

func (e *ConfigManager) initMetas() {
e.metas = map[mKeyType]imeta{
getMetaKey[config.ProxyServerOnline](): newMeta(pathPrefixProxyServer, func(cfg *config.Config) config.ProxyServerOnline {
return cfg.Proxy.ProxyServerOnline
}),
getMetaKey[config.LogOnline](): newMeta(pathPrefixLog, func(cfg *config.Config) config.LogOnline {
return cfg.Log.LogOnline
}),
p, ok := cfg2Path[rf]
if !ok {
return errors.WithStack(errors.New("invalid type"))
}
}

func (e *ConfigManager) watchConfig(ctx context.Context, cfg *config.Config) error {
for _, m := range e.metas {
if err := func(m imeta) error {
_, err := e.get(ctx, m.getPrefix(), "")
if err != nil && errors.Is(err, ErrNoOrMultiResults) {
value, err := json.Marshal(m.getInitial(cfg))
if err != nil {
return err
}
if err = e.set(ctx, m.getPrefix(), "", string(value)); err != nil {
return err
}
}
e.watch(ctx, m.getPrefix(), "", func(logger *zap.Logger, evt mvccpb.Event) {
if obj, err := m.unmarshal(evt.Kv.Value); err != nil {
logger.Warn("failed unmarshal proxy config", zap.Error(err))
return
} else {
m.addToCh(obj)
}
})
return nil
}(m); err != nil {
return err
}
}
return nil
}

func (e *ConfigManager) getCfg(ctx context.Context, metaKey reflect.Type) (any, error) {
m := e.metas[metaKey]
val, err := e.get(ctx, m.getPrefix(), "")
c, err := json.Marshal(val)
if err != nil {
return nil, err
return errors.WithStack(err)
}
return m.unmarshal(val.Value)
return e.set(ctx, pathPrefixConfig, p, c)
}

func (e *ConfigManager) setCfg(ctx context.Context, metaKey mKeyType, obj any) error {
m := e.metas[metaKey]
value, err := json.Marshal(obj)
if err != nil {
return err
func (e *ConfigManager) GetConfig(ctx context.Context, val any) error {
rf := reflect.TypeOf(val)
if rf.Kind() == reflect.Pointer {
rf = rf.Elem()
}
p, ok := cfg2Path[rf]
if !ok {
return errors.WithStack(errors.New("invalid type"))
}
return e.set(ctx, m.getPrefix(), "", string(value))
}

// GetConfig queries the configuration from the config center.
func GetConfig[T OnlineCfgTypes](ctx context.Context, e *ConfigManager, t *T) error {
obj, err := e.getCfg(ctx, getMetaKeyByObj(t))
c, err := e.get(ctx, pathPrefixConfig, p)
if err != nil {
return err
}
*t = *obj.(*T)
return nil
}

// SetConfig sets a configuration to the config center.
func SetConfig[T OnlineCfgTypes](ctx context.Context, e *ConfigManager, t *T) error {
return e.setCfg(ctx, getMetaKeyByObj(t), t)
}

// GetCfgWatch returns the channel that contains updated configuration.
func GetCfgWatch[T OnlineCfgTypes](e *ConfigManager) chan *T {
mt := e.metas[getMetaKey[T]()].(*meta[T])
return mt.ch
}

func (e *ConfigManager) GetProxyConfigWatch() <-chan *config.ProxyServerOnline {
return GetCfgWatch[config.ProxyServerOnline](e)
}

func (e *ConfigManager) GetProxyConfig(ctx context.Context) (*config.ProxyServerOnline, error) {
var pso config.ProxyServerOnline
err := GetConfig(ctx, e, &pso)
return &pso, err
return json.Unmarshal(c.Value, val)
}

func (e *ConfigManager) SetProxyConfig(ctx context.Context, proxy *config.ProxyServerOnline) error {
return SetConfig(ctx, e, proxy)
}

func (e *ConfigManager) GetLogConfigWatch() <-chan *config.LogOnline {
return GetCfgWatch[config.LogOnline](e)
}
func MakeConfigChan[T any](e *ConfigManager, initval *T) <-chan *T {
cfgchan := make(chan *T, 64)
rf := reflect.TypeOf(initval)
if rf.Kind() == reflect.Pointer {
rf = rf.Elem()
}
p, ok := cfg2Path[rf]
if !ok {
panic(errors.WithStack(errors.New("invalid type")))
}

func (e *ConfigManager) GetLogConfig(ctx context.Context) (*config.LogOnline, error) {
var co config.LogOnline
err := GetConfig(ctx, e, &co)
return &co, err
}
_ = e.SetConfig(context.Background(), initval)

func (e *ConfigManager) SetLogConfig(ctx context.Context, log *config.LogOnline) error {
return SetConfig(ctx, e, log)
e.Watch(path.Join(pathPrefixConfig, p), func(_ *zap.Logger, k KVEvent) {
var v T
if k.Type == KVEventDel {
cfgchan <- &v
} else {
e := json.Unmarshal(k.Value, &v)
if e == nil {
cfgchan <- &v
}
}
})
return cfgchan
}
101 changes: 0 additions & 101 deletions pkg/manager/config/config_test.go

This file was deleted.

Loading

0 comments on commit ed9650d

Please sign in to comment.