Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

config: skip irrelevant config file events #322

Merged
merged 8 commits into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions lib/util/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package logger

import (
"bytes"
"fmt"
"testing"

"go.uber.org/zap"
Expand All @@ -12,18 +14,24 @@ import (

type testingLog struct {
*testing.T
buf bytes.Buffer
}

func (t *testingLog) Write(b []byte) (int, error) {
t.Logf("%s", b)
return len(b), nil
return t.buf.Write(b)
}

// CreateLoggerForTest creates a logger for unit tests.
func CreateLoggerForTest(t *testing.T) *zap.Logger {
func (t *testingLog) String() string {
return t.buf.String()
}

// CreateLoggerForTest returns both the logger and its content.
func CreateLoggerForTest(t *testing.T) (*zap.Logger, fmt.Stringer) {
log := &testingLog{T: t}
return zap.New(zapcore.NewCore(
zapcore.NewConsoleEncoder(zap.NewDevelopmentEncoderConfig()),
zapcore.AddSync(&testingLog{t}),
zapcore.AddSync(log),
zap.InfoLevel,
)).Named(t.Name())
)).Named(t.Name()), log
}
8 changes: 4 additions & 4 deletions lib/util/security/cert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
)

func TestCertServer(t *testing.T) {
logger := logger.CreateLoggerForTest(t)
logger, _ := logger.CreateLoggerForTest(t)
tmpdir := t.TempDir()
certPath := filepath.Join(tmpdir, "cert")
keyPath := filepath.Join(tmpdir, "key")
Expand Down Expand Up @@ -267,7 +267,7 @@ func TestCertServer(t *testing.T) {
}

func TestReload(t *testing.T) {
lg := logger.CreateLoggerForTest(t)
lg, _ := logger.CreateLoggerForTest(t)
tmpdir := t.TempDir()
certPath := filepath.Join(tmpdir, "cert")
keyPath := filepath.Join(tmpdir, "key")
Expand Down Expand Up @@ -297,7 +297,7 @@ func TestReload(t *testing.T) {
}

func TestAutoCerts(t *testing.T) {
lg := logger.CreateLoggerForTest(t)
lg, _ := logger.CreateLoggerForTest(t)
cfg := config.TLSConfig{
AutoCerts: true,
}
Expand Down Expand Up @@ -338,7 +338,7 @@ func getExpireTime(t *testing.T, ci *CertInfo) time.Time {
}

func TestSetConfig(t *testing.T) {
lg := logger.CreateLoggerForTest(t)
lg, _ := logger.CreateLoggerForTest(t)
ci := NewCert(false)
cfg := config.TLSConfig{
SkipCA: true,
Expand Down
2 changes: 1 addition & 1 deletion lib/util/systimemon/systime_mon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
func TestSystimeMonitor(t *testing.T) {
errTriggered := atomic.NewBool(false)
nowTriggered := atomic.NewBool(false)
log := logger.CreateLoggerForTest(t)
log, _ := logger.CreateLoggerForTest(t)
ctx, cancel := context.WithCancel(context.Background())
var wg waitgroup.WaitGroup
wg.Run(func() {
Expand Down
8 changes: 4 additions & 4 deletions pkg/manager/cert/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func connectWithTLS(ctls, stls *tls.Config) (clientErr, serverErr error) {

// Test various configurations.
func TestInit(t *testing.T) {
lg := logger.CreateLoggerForTest(t)
lg, _ := logger.CreateLoggerForTest(t)
tmpdir := t.TempDir()

type testcase struct {
Expand Down Expand Up @@ -139,7 +139,7 @@ func TestInit(t *testing.T) {
// Test rotation works.
func TestRotate(t *testing.T) {
tmpdir := t.TempDir()
lg := logger.CreateLoggerForTest(t)
lg, _ := logger.CreateLoggerForTest(t)
caPath := filepath.Join(tmpdir, "ca")
keyPath := filepath.Join(tmpdir, "key")
certPath := filepath.Join(tmpdir, "cert")
Expand Down Expand Up @@ -321,7 +321,7 @@ func TestRotate(t *testing.T) {

func TestBidirectional(t *testing.T) {
tmpdir := t.TempDir()
lg := logger.CreateLoggerForTest(t)
lg, _ := logger.CreateLoggerForTest(t)
caPath1 := filepath.Join(tmpdir, "c1", "ca")
keyPath1 := filepath.Join(tmpdir, "c1", "key")
certPath1 := filepath.Join(tmpdir, "c1", "cert")
Expand Down Expand Up @@ -359,7 +359,7 @@ func TestBidirectional(t *testing.T) {

func TestWatchConfig(t *testing.T) {
tmpdir := t.TempDir()
lg := logger.CreateLoggerForTest(t)
lg, _ := logger.CreateLoggerForTest(t)
caPath1 := filepath.Join(tmpdir, "c1", "ca")
keyPath1 := filepath.Join(tmpdir, "c1", "key")
certPath1 := filepath.Join(tmpdir, "c1", "cert")
Expand Down
16 changes: 15 additions & 1 deletion pkg/manager/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,27 @@ func (e *ConfigManager) reloadConfigFile(file string) error {
func (e *ConfigManager) handleFSEvent(ev fsnotify.Event, f string) {
switch {
case ev.Has(fsnotify.Create), ev.Has(fsnotify.Write), ev.Has(fsnotify.Remove), ev.Has(fsnotify.Rename):
// The file may be the log file, triggering reload will cause more logs and thus cause reload again,
// so we need to filter the wrong files.
// The filesystem differs from OS to OS, so don't use string comparison.
f1, err := os.Stat(ev.Name)
if err != nil {
break
}
f2, err := os.Stat(f)
if err != nil {
break
}
if !os.SameFile(f1, f2) {
break
}
if ev.Has(fsnotify.Remove) || ev.Has(fsnotify.Rename) {
// in case of remove/rename the file, files are not present at filesystem for a while
// it may be too fast to read the config file now, sleep for a while
time.Sleep(50 * time.Millisecond)
}
// try to reload it
e.logger.Info("config file reloaded", zap.Error(e.reloadConfigFile(f)))
e.logger.Info("config file reloaded", zap.Stringer("event", ev), zap.Error(e.reloadConfigFile(f)))
}
}

Expand Down
148 changes: 144 additions & 4 deletions pkg/manager/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"os"
"path/filepath"
"strings"
"testing"
"time"

Expand All @@ -22,9 +23,9 @@ func TestConfigReload(t *testing.T) {
require.NoError(t, err)
require.NoError(t, f.Close())

cfgmgr1, _ := testConfigManager(t, tmpcfg)
cfgmgr1, _, _ := testConfigManager(t, tmpcfg)

cfgmgr2, _ := testConfigManager(t, "")
cfgmgr2, _, _ := testConfigManager(t, "")

cases := []struct {
name string
Expand Down Expand Up @@ -135,7 +136,7 @@ func TestConfigRemove(t *testing.T) {
require.NoError(t, err)
require.NoError(t, f.Close())

cfgmgr, _ := testConfigManager(t, tmpcfg)
cfgmgr, _, _ := testConfigManager(t, tmpcfg)

// remove and recreate the file in a very short time
require.NoError(t, os.Remove(tmpcfg))
Expand All @@ -153,8 +154,147 @@ func TestConfigRemove(t *testing.T) {
require.Eventually(t, func() bool { return cfgmgr.GetConfig().Proxy.Addr == "vv" }, time.Second, 100*time.Millisecond)
}

func TestFilePath(t *testing.T) {
var (
cfgmgr *ConfigManager
text fmt.Stringer
count int
)

tmpdir := t.TempDir()
checkLog := func(increased bool) {
// On linux, writing once will trigger 2 WRITE events. But on macOS, it only triggers once.
// So we always sleep 100ms to avoid missing any logs on the way.
time.Sleep(100 * time.Millisecond)
newCount := strings.Count(text.String(), "config file reloaded")
require.Equal(t, increased, newCount > count, fmt.Sprintf("now: %d, was: %d", newCount, count))
count = newCount
}

tests := []struct {
filename string
createFile func()
cleanFile func()
checker func(filename string)
}{
{
// Test updating another file in the same directory won't make it reload.
filename: filepath.Join(tmpdir, "cfg"),
checker: func(filename string) {
tmplog := filepath.Join(tmpdir, "log")
f, err := os.Create(tmplog)
require.NoError(t, err)
require.NoError(t, f.Close())
require.NoError(t, os.WriteFile(tmplog, []byte("hello"), 0644))
newlog := filepath.Join(tmpdir, "log1")
require.NoError(t, os.Rename(tmplog, newlog))
require.NoError(t, os.Remove(newlog))
checkLog(false)
},
},
{
// Test case-insensitive.
filename: filepath.Join(tmpdir, "cfg"),
createFile: func() {
f, err := os.Create(filepath.Join(tmpdir, "CFG"))
require.NoError(t, err)
require.NoError(t, f.Close())
// Linux is case-sensitive but macOS is case-insensitive.
// For linux, it creates another file. For macOS, it doesn't touch the file.
f, err = os.Create(filepath.Join(tmpdir, "cfg"))
require.NoError(t, err)
require.NoError(t, f.Close())
},
},
{
// Test relative path.
// `event.Name` is `cfg` on MacOS, but it's `./cfg` on Linux.
filename: "cfg",
},
{
// Test relative path.
filename: "./cfg",
},
{
// Test uncleaned path.
filename: fmt.Sprintf("%s%c%ccfg", tmpdir, filepath.Separator, filepath.Separator),
},
{
// Test removing and recreating the directory.
filename: "_tmp/cfg",
createFile: func() {
if err := os.Mkdir("_tmp", 0755); err != nil {
require.ErrorIs(t, err, os.ErrExist)
}
f, err := os.Create("_tmp/cfg")
require.NoError(t, err)
require.NoError(t, f.Close())
},
cleanFile: func() {
require.NoError(t, os.RemoveAll("_tmp"))
},
checker: func(filename string) {
require.NoError(t, os.RemoveAll("_tmp"))
// To update `count`.
checkLog(false)

require.NoError(t, os.Mkdir("_tmp", 0755))
f, err := os.Create("_tmp/cfg")
require.NoError(t, err)
require.NoError(t, f.Close())
checkLog(true)
},
},
{
// Test removing and recreating the file.
filename: "cfg",
checker: func(filename string) {
require.NoError(t, os.Remove(filename))
checkLog(false)

f, err := os.Create(filename)
require.NoError(t, err)
require.NoError(t, f.Close())
checkLog(true)
},
},
}

for _, test := range tests {
if test.createFile != nil {
test.createFile()
} else {
f, err := os.Create(test.filename)
require.NoError(t, err)
require.NoError(t, f.Close())
}

count = 0
cfgmgr, text, _ = testConfigManager(t, test.filename)
checkLog(false)

// Test write.
require.NoError(t, os.WriteFile(test.filename, []byte("proxy.pd-addrs = \"127.0.0.1:2379\""), 0644))
checkLog(true)

// Test other.
if test.checker != nil {
test.checker(test.filename)
}

// Test remove.
if test.cleanFile != nil {
test.cleanFile()
} else {
// It doesn't matter whether it triggers reload or not.
require.NoError(t, os.Remove(test.filename))
}
require.NoError(t, cfgmgr.Close())
}
}

func TestChecksum(t *testing.T) {
cfgmgr, _ := testConfigManager(t, "")
cfgmgr, _, _ := testConfigManager(t, "")
c1 := cfgmgr.GetConfigChecksum()
require.NoError(t, cfgmgr.SetTOMLConfig([]byte(`proxy.addr = "gg"`)))
c2 := cfgmgr.GetConfigChecksum()
Expand Down
21 changes: 18 additions & 3 deletions pkg/manager/config/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,22 @@ func (e *ConfigManager) Init(ctx context.Context, logger *zap.Logger, configFile
case <-nctx.Done():
return
case err := <-e.wch.Errors:
e.logger.Info("failed to watch config file", zap.Error(err))
e.logger.Warn("failed to watch config file", zap.Error(err))
case ev := <-e.wch.Events:
e.handleFSEvent(ev, configFile)
case <-ticker.C:
// There may be concurrent issues:
// 1. Remove the directory and the watcher removes the directory automatically
// 2. Create the directory and the file again within a tick
// 3. Add it to the watcher again, but the CREATE event is not sent
//
// Checking the watch list still have a concurrent issue because the watcher may remove the
// directory between WatchList and Add. We'll fix it later because it's complex to fix it entirely.
exists := len(e.wch.WatchList()) > 0
if err := e.wch.Add(parentDir); err != nil {
e.logger.Info("failed to rewatch config file", zap.Error(err))
e.logger.Warn("failed to rewatch config file", zap.Error(err))
} else if !exists {
e.logger.Info("config file reloaded", zap.Error(e.reloadConfigFile(configFile)))
}
}
}
Expand All @@ -122,14 +132,19 @@ func (e *ConfigManager) Init(ctx context.Context, logger *zap.Logger, configFile

func (e *ConfigManager) Close() error {
var wcherr error
e.cancel()
if e.cancel != nil {
e.cancel()
e.cancel = nil
}
if e.wch != nil {
wcherr = e.wch.Close()
e.wch = nil
}
e.sts.Lock()
for _, ch := range e.sts.listeners {
close(ch)
}
e.sts.listeners = nil
e.sts.Unlock()
e.wg.Wait()
return wcherr
Expand Down
7 changes: 4 additions & 3 deletions pkg/manager/config/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@ package config

import (
"context"
"fmt"
"testing"

"github.com/pingcap/TiProxy/lib/config"
"github.com/pingcap/TiProxy/lib/util/logger"
"github.com/stretchr/testify/require"
)

func testConfigManager(t *testing.T, configFile string, overlays ...*config.Config) (*ConfigManager, context.Context) {
logger := logger.CreateLoggerForTest(t)
func testConfigManager(t *testing.T, configFile string, overlays ...*config.Config) (*ConfigManager, fmt.Stringer, context.Context) {
logger, text := logger.CreateLoggerForTest(t)

ctx, cancel := context.WithCancel(context.Background())
if ddl, ok := t.Deadline(); ok {
Expand All @@ -29,5 +30,5 @@ func testConfigManager(t *testing.T, configFile string, overlays ...*config.Conf

t.Cleanup(cancel)

return cfgmgr, ctx
return cfgmgr, text, ctx
}
Loading