Skip to content

Commit

Permalink
api: refactor config API to adapt tidb-operator/TOML changes (pingcap…
Browse files Browse the repository at this point in the history
  • Loading branch information
xhebox committed Mar 13, 2023
1 parent 05193d5 commit bc18e85
Show file tree
Hide file tree
Showing 10 changed files with 119 additions and 154 deletions.
27 changes: 8 additions & 19 deletions lib/cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,23 @@
package cli

import (
"fmt"
"net/http"
"os"

"github.com/spf13/cobra"
)

const (
configPrefix = "/api/admin/config"
configPrefix = "/api/admin/config/"
)

func getConfigCmd(ctx *Context, pathSuffix string) *cobra.Command {
func GetConfigCmd(ctx *Context) *cobra.Command {
rootCmd := &cobra.Command{
Use: pathSuffix,
Use: "config",
Short: "",
}
path := fmt.Sprintf("%s/%s", configPrefix, pathSuffix)

// set config proxy
// set config
{
setProxy := &cobra.Command{
Use: "set",
Expand All @@ -49,7 +48,7 @@ func getConfigCmd(ctx *Context, pathSuffix string) *cobra.Command {
b = f
}

resp, err := doRequest(cmd.Context(), ctx, http.MethodPut, path, b)
resp, err := doRequest(cmd.Context(), ctx, http.MethodPut, configPrefix, b)
if err != nil {
return err
}
Expand All @@ -60,13 +59,13 @@ func getConfigCmd(ctx *Context, pathSuffix string) *cobra.Command {
rootCmd.AddCommand(setProxy)
}

// get config proxy
// get config
{
getProxy := &cobra.Command{
Use: "get",
}
getProxy.RunE = func(cmd *cobra.Command, args []string) error {
resp, err := doRequest(cmd.Context(), ctx, http.MethodGet, path, nil)
resp, err := doRequest(cmd.Context(), ctx, http.MethodGet, configPrefix, nil)
if err != nil {
return err
}
Expand All @@ -79,13 +78,3 @@ func getConfigCmd(ctx *Context, pathSuffix string) *cobra.Command {

return rootCmd
}

func GetConfigCmd(ctx *Context) *cobra.Command {
rootCmd := &cobra.Command{
Use: "config",
Short: "",
}
rootCmd.AddCommand(getConfigCmd(ctx, "proxy"))
rootCmd.AddCommand(getConfigCmd(ctx, "log"))
return rootCmd
}
5 changes: 5 additions & 0 deletions lib/config/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,11 @@ func NewConfig(data []byte) (*Config, error) {
return &cfg, nil
}

func (cfg *Config) Clone() *Config {
newCfg := *cfg
return &newCfg
}

func (cfg *Config) Check() error {
if cfg.Workdir == "" {
d, err := os.Getwd()
Expand Down
9 changes: 1 addition & 8 deletions lib/util/cmd/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,7 @@ func (ws *AtomicWriteSyncer) setOutput(output closableSyncer) error {

// Close closes logger.
func (ws *AtomicWriteSyncer) Close() error {
var err error
ws.Lock()
if ws.output != nil {
err = ws.output.Close()
ws.output = nil
}
ws.Unlock()
return err
return ws.setOutput(nil)
}

// initFileLog initializes file based logging options.
Expand Down
58 changes: 12 additions & 46 deletions pkg/manager/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,75 +18,41 @@ import (
"context"
"encoding/json"
"path"
"reflect"

"github.com/pingcap/TiProxy/lib/config"
"github.com/pingcap/TiProxy/lib/util/errors"
"go.uber.org/zap"
)

var cfg2Path = map[reflect.Type]string{
reflect.TypeOf(config.ProxyServerOnline{}): "proxy",
reflect.TypeOf(config.LogOnline{}): "log",
}

func (e *ConfigManager) SetConfig(ctx context.Context, val any) error {
rf := reflect.TypeOf(val)
if rf.Kind() == reflect.Pointer {
rf = rf.Elem()
}
p, ok := cfg2Path[rf]
if !ok {
return errors.WithStack(errors.New("invalid type"))
}
func (e *ConfigManager) SetConfig(ctx context.Context, val *config.Config) error {
c, err := json.Marshal(val)
if err != nil {
return errors.WithStack(err)
}
return e.set(ctx, pathPrefixConfig, p, c)
return e.set(ctx, pathPrefixConfig, "all", c)
}

func (e *ConfigManager) GetConfig(ctx context.Context, val any) error {
rf := reflect.TypeOf(val)
if rf.Kind() == reflect.Pointer {
rf = rf.Elem()
}
p, ok := cfg2Path[rf]
if !ok {
return errors.WithStack(errors.New("invalid type"))
}

c, err := e.get(ctx, pathPrefixConfig, p)
func (e *ConfigManager) GetConfig(ctx context.Context, val *config.Config) error {
c, err := e.get(ctx, pathPrefixConfig, "all")
if err != nil {
return err
}

return json.Unmarshal(c.Value, val)
}

func MakeConfigChan[T any](e *ConfigManager, initval *T) <-chan *T {
cfgchan := make(chan *T, 64)
rf := reflect.TypeOf(initval)
if rf.Kind() == reflect.Pointer {
rf = rf.Elem()
}
p, ok := cfg2Path[rf]
if !ok {
panic(errors.WithStack(errors.New("invalid type")))
}

_ = e.SetConfig(context.Background(), initval)

e.Watch(path.Join(pathPrefixConfig, p), func(_ *zap.Logger, k KVEvent) {
var v T
func (e *ConfigManager) WatchConfig(ctx context.Context) <-chan *config.Config {
ch := make(chan *config.Config)
e.Watch(path.Join(pathPrefixConfig, "all"), func(_ *zap.Logger, k KVEvent) {
var c config.Config
if k.Type == KVEventDel {
cfgchan <- &v
ch <- &c
} else {
e := json.Unmarshal(k.Value, &v)
e := json.Unmarshal(k.Value, &c)
if e == nil {
cfgchan <- &v
ch <- &c
}
}
})
return cfgchan
return ch
}
5 changes: 5 additions & 0 deletions pkg/manager/config/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ func (srv *ConfigManager) Init(ctx context.Context, cfg *config.Config, logger *
return a.Key < b.Key
})

// init config for other components
if err := srv.SetConfig(ctx, cfg); err != nil {
return err
}

var nctx context.Context
nctx, srv.cancel = context.WithCancel(ctx)
srv.wg.Run(func() {
Expand Down
25 changes: 15 additions & 10 deletions pkg/manager/logger/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package logger
import (
"context"
"encoding/json"
"fmt"

"github.com/pingcap/TiProxy/lib/config"
"github.com/pingcap/TiProxy/lib/util/cmd"
Expand Down Expand Up @@ -51,28 +52,32 @@ func NewLoggerManager(cfg *config.Log) (*LoggerManager, *zap.Logger, error) {
}

// Init starts a goroutine to watch configuration.
func (lm *LoggerManager) Init(cfgCh <-chan *config.LogOnline) {
func (lm *LoggerManager) Init(cfgch <-chan *config.Config) {
ctx, cancel := context.WithCancel(context.Background())
lm.cancel = cancel

lm.wg.Run(func() {
lm.watchCfg(ctx, cfgCh)
lm.watchCfg(ctx, cfgch)
})
lm.cancel = cancel
}

func (lm *LoggerManager) watchCfg(ctx context.Context, cfgCh <-chan *config.LogOnline) {
func (lm *LoggerManager) watchCfg(ctx context.Context, cfgch <-chan *config.Config) {
for {
select {
case <-ctx.Done():
return
case cfg := <-cfgCh:
case acfg := <-cfgch:
cfg := &acfg.Log.LogOnline

err := lm.updateLoggerCfg(cfg)
if err != nil {
bytes, merr := json.Marshal(cfg)
if merr != nil {
lm.logger.Error("update logger configuration failed", zap.NamedError("marshal_err", merr), zap.Error(err))
continue
}
lm.logger.Error("update logger configuration failed", zap.String("cfg", string(bytes)), zap.Error(err))
fmt.Printf("ggg %+v %+v\n", cfg, err)
lm.logger.Error("update logger configuration failed",
zap.NamedError("update error", err),
zap.String("cfg", string(bytes)),
zap.NamedError("cfg marshal error", merr),
)
}
}
}
Expand Down
Loading

0 comments on commit bc18e85

Please sign in to comment.