Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

data race: avoid unprotected access to sb.file #408

Merged
merged 1 commit into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 25 additions & 52 deletions klog.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,13 +404,6 @@ func (t *traceLocation) Set(value string) error {
return nil
}

// flushSyncWriter is the interface satisfied by logging destinations.
type flushSyncWriter interface {
Flush() error
Sync() error
io.Writer
}

var logging loggingT
var commandLine flag.FlagSet

Expand Down Expand Up @@ -486,7 +479,7 @@ type settings struct {
// Access to all of the following fields must be protected via a mutex.

// file holds writer for each of the log types.
file [severity.NumSeverity]flushSyncWriter
file [severity.NumSeverity]io.Writer
// flushInterval is the interval for periodic flushing. If zero,
// the global default will be used.
flushInterval time.Duration
Expand Down Expand Up @@ -831,32 +824,12 @@ func (l *loggingT) printS(err error, s severity.Severity, depth int, msg string,
buffer.PutBuffer(b)
}

// redirectBuffer is used to set an alternate destination for the logs
type redirectBuffer struct {
w io.Writer
}

func (rb *redirectBuffer) Sync() error {
return nil
}

func (rb *redirectBuffer) Flush() error {
return nil
}

func (rb *redirectBuffer) Write(bytes []byte) (n int, err error) {
return rb.w.Write(bytes)
}

// SetOutput sets the output destination for all severities
func SetOutput(w io.Writer) {
logging.mu.Lock()
defer logging.mu.Unlock()
for s := severity.FatalLog; s >= severity.InfoLog; s-- {
rb := &redirectBuffer{
w: w,
}
logging.file[s] = rb
logging.file[s] = w
}
}

Expand All @@ -868,10 +841,7 @@ func SetOutputBySeverity(name string, w io.Writer) {
if !ok {
panic(fmt.Sprintf("SetOutputBySeverity(%q): unrecognized severity name", name))
}
rb := &redirectBuffer{
w: w,
}
logging.file[sev] = rb
logging.file[sev] = w
}

// LogToStderr sets whether to log exclusively to stderr, bypassing outputs
Expand Down Expand Up @@ -1011,8 +981,8 @@ func (l *loggingT) exit(err error) {
logExitFunc(err)
return
}
files := l.flushAll()
l.syncAll(files)
needToSync := l.flushAll()
l.syncAll(needToSync)
OsExit(2)
}

Expand All @@ -1029,10 +999,6 @@ type syncBuffer struct {
maxbytes uint64 // The max number of bytes this syncBuffer.file can hold before cleaning up.
}

func (sb *syncBuffer) Sync() error {
return sb.file.Sync()
}

// CalculateMaxSize returns the real max size in bytes after considering the default max size and the flag options.
func CalculateMaxSize() uint64 {
if logging.logFile != "" {
Expand Down Expand Up @@ -1224,37 +1190,44 @@ func StartFlushDaemon(interval time.Duration) {
// lockAndFlushAll is like flushAll but locks l.mu first.
func (l *loggingT) lockAndFlushAll() {
l.mu.Lock()
files := l.flushAll()
needToSync := l.flushAll()
l.mu.Unlock()
// Some environments are slow when syncing and holding the lock might cause contention.
l.syncAll(files)
l.syncAll(needToSync)
}

// flushAll flushes all the logs
// l.mu is held.
func (l *loggingT) flushAll() []flushSyncWriter {
files := make([]flushSyncWriter, 0, severity.NumSeverity)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 'files' slice has been initialized with an initial capacity, preventing it from triggering reallocations during appends and thereby eliminating additional memory allocations.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't reallocate, but it still allocates.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A fixed-capacity slice possesses the same underlying mechanism as an array. On the other hand, for slices with dynamic capacities, invoking the append operation may trigger slice expansion, necessitating a reallocation of memory.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I misconstrued this.

//
// The result is the number of files which need to be synced and the pointers to them.
func (l *loggingT) flushAll() fileArray {
var needToSync fileArray

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 'needToSync' can be refactored as a 'sync.Map' and embedded within the 'loggingT' structure. When creating 'syncBuffer', it should be appended to 'needToSync'. During 'syncAll', data can be retrieved from 'needToSync'.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Putting what should be a local variable into the global logging feels like a bad workaround. I'd like to understand better why fileArray is not being copied by value.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The map 'l.file' is non-thread-safe, necessitating the creation of a temporary deep copy of 'l.file' to avoid data races. Hence, allocating memory for an array is indispensable.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An array has a fixed size. My expectation is that Go therefore can treat fileArray like any other struct and copy it by value instead of doing allocations on the heap.

Is that expectation incorrect?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

your expectation is correct.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. BenchmarkLogs calls flushAll/syncAll once for a very large b.N. Whatever difference the changes around memory allocation are, they are insignificant when divided by that b.N. That is why I was seeing 9 allocations for BenchmarkLogs before/after this PR, not because nothing changed.

I double-checked that the current code does indeed not allocate and added a benchmark for it, so I think we are good to merge it.


// Flush from fatal down, in case there's trouble flushing.
for s := severity.FatalLog; s >= severity.InfoLog; s-- {
file := l.file[s]
if file != nil {
_ = file.Flush() // ignore error
if sb, ok := file.(*syncBuffer); ok && sb.file != nil {
_ = sb.Flush() // ignore error
needToSync.files[needToSync.num] = sb.file
needToSync.num++
}
files = append(files, file)
}
if logging.loggerOptions.flush != nil {
logging.loggerOptions.flush()
}
return files
return needToSync
}

type fileArray struct {
num int
files [severity.NumSeverity]*os.File
}

// syncAll attempts to "sync" their data to disk.
func (l *loggingT) syncAll(files []flushSyncWriter) {
func (l *loggingT) syncAll(needToSync fileArray) {
// Flush from fatal down, in case there's trouble flushing.
for _, file := range files {
if file != nil {
_ = file.Sync() // ignore error
}
for i := 0; i < needToSync.num; i++ {
_ = needToSync.files[i].Sync() // ignore error
}
}

Expand Down
65 changes: 56 additions & 9 deletions klog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"flag"
"fmt"
"io"
"io/ioutil"
stdLog "log"
"os"
Expand Down Expand Up @@ -73,7 +74,7 @@ func (f *flushBuffer) Sync() error {
}

// swap sets the log writers and returns the old array.
func (l *loggingT) swap(writers [severity.NumSeverity]flushSyncWriter) (old [severity.NumSeverity]flushSyncWriter) {
func (l *loggingT) swap(writers [severity.NumSeverity]io.Writer) (old [severity.NumSeverity]io.Writer) {
l.mu.Lock()
defer l.mu.Unlock()
old = l.file
Expand All @@ -82,8 +83,8 @@ func (l *loggingT) swap(writers [severity.NumSeverity]flushSyncWriter) (old [sev
}

// newBuffers sets the log writers to all new byte buffers and returns the old array.
func (l *loggingT) newBuffers() [severity.NumSeverity]flushSyncWriter {
return l.swap([severity.NumSeverity]flushSyncWriter{new(flushBuffer), new(flushBuffer), new(flushBuffer), new(flushBuffer)})
func (l *loggingT) newBuffers() [severity.NumSeverity]io.Writer {
return l.swap([severity.NumSeverity]io.Writer{new(flushBuffer), new(flushBuffer), new(flushBuffer), new(flushBuffer)})
}

// contents returns the specified log value as a string.
Expand Down Expand Up @@ -540,14 +541,17 @@ func TestOpenAppendOnStart(t *testing.T) {

// Logging creates the file
Info(x)
_, ok := logging.file[severity.InfoLog].(*syncBuffer)
sb, ok := logging.file[severity.InfoLog].(*syncBuffer)
if !ok {
t.Fatal("info wasn't created")
}

// ensure we wrote what we expected
files := logging.flushAll()
logging.syncAll(files)
needToSync := logging.flushAll()
if needToSync.num != 1 || needToSync.files[0] != sb.file {
t.Errorf("Should have received exactly the file from severity.InfoLog for syncing, got instead: %+v", needToSync)
}
logging.syncAll(needToSync)
b, err := ioutil.ReadFile(logging.logFile)
if err != nil {
t.Fatalf("unexpected error: %v", err)
Expand Down Expand Up @@ -811,15 +815,58 @@ func BenchmarkLogs(b *testing.B) {
Severity: severity.FatalLog,
}
logging.logFile = testFile.Name()
logging.swap([severity.NumSeverity]flushSyncWriter{nil, nil, nil, nil})
logging.swap([severity.NumSeverity]io.Writer{nil, nil, nil, nil})

for i := 0; i < b.N; i++ {
Error("error")
Warning("warning")
Info("info")
}
files := logging.flushAll()
logging.syncAll(files)
needToSync := logging.flushAll()
sb, ok := logging.file[severity.InfoLog].(*syncBuffer)
if !ok {
b.Fatal("info wasn't created")
}
if needToSync.num != 1 || needToSync.files[0] != sb.file {
b.Fatalf("Should have received exactly the file from severity.InfoLog for syncing, got instead: %+v", needToSync)
}
logging.syncAll(needToSync)
Comment on lines 820 to +833

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After placing logging.flushAll() and syncAll(needToSync) snippet inside the for loop, the profiler indicates 10 allocations per operation for slices, one more than for arrays. Arrays utilize stack memory, while slices utilize heap memory, which incurs significantly higher overhead. Thus, transitioning to arrays would be more favorable.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've come to the same conclusion 😄

}

func BenchmarkFlush(b *testing.B) {
defer CaptureState().Restore()
setFlags()
defer logging.swap(logging.newBuffers())

testFile, err := ioutil.TempFile("", "test.log")
if err != nil {
b.Fatal("unable to create temporary file")
}
defer os.Remove(testFile.Name())

require.NoError(b, logging.verbosity.Set("0"))
logging.toStderr = false
logging.alsoToStderr = false
logging.stderrThreshold = severityValue{
Severity: severity.FatalLog,
}
logging.logFile = testFile.Name()
logging.swap([severity.NumSeverity]io.Writer{nil, nil, nil, nil})

// Create output file.
Info("info")
needToSync := logging.flushAll()

if needToSync.num != 1 {
b.Fatalf("expected exactly one file to sync, got: %+v", needToSync)
}

b.ResetTimer()

for i := 0; i < b.N; i++ {
needToSync := logging.flushAll()
logging.syncAll(needToSync)
}
}

// Test the logic on checking log size limitation.
Expand Down
Loading