From 1ef0b491cbe039a943a463262d7011c1f7b206c7 Mon Sep 17 00:00:00 2001 From: Chris Hines Date: Mon, 9 May 2016 00:36:29 -0400 Subject: [PATCH 1/9] Add SyncWriter and SyncLogger. --- log/example_test.go | 20 ++++++++++- log/log.go | 35 +------------------- log/log_test.go | 16 +++++++++ log/sync.go | 81 +++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 117 insertions(+), 35 deletions(-) create mode 100644 log/sync.go diff --git a/log/example_test.go b/log/example_test.go index 174161548..9ecf2769a 100644 --- a/log/example_test.go +++ b/log/example_test.go @@ -1,13 +1,31 @@ package log_test import ( + "net/url" "os" "github.com/go-kit/kit/log" ) +func Example_stdout() { + w := log.NewSyncWriter(os.Stdout) + logger := log.NewLogfmtLogger(w) + + reqUrl := &url.URL{ + Scheme: "https", + Host: "github.com", + Path: "/go-kit/kit", + } + + logger.Log("method", "GET", "url", reqUrl) + + // Output: + // method=GET url=https://github.com/go-kit/kit +} + func ExampleContext() { - logger := log.NewLogfmtLogger(os.Stdout) + w := log.NewSyncWriter(os.Stdout) + logger := log.NewLogfmtLogger(w) logger.Log("foo", 123) ctx := log.NewContext(logger).With("level", "info") ctx.Log() diff --git a/log/log.go b/log/log.go index 25e76cb74..4b5618c65 100644 --- a/log/log.go +++ b/log/log.go @@ -4,10 +4,7 @@ // key/value data. package log -import ( - "errors" - "sync/atomic" -) +import "errors" // Logger is the fundamental interface for all log operations. Log creates a // log event from keyvals, a variadic sequence of alternating keys and values. @@ -149,33 +146,3 @@ type LoggerFunc func(...interface{}) error func (f LoggerFunc) Log(keyvals ...interface{}) error { return f(keyvals...) } - -// SwapLogger wraps another logger that may be safely replaced while other -// goroutines use the SwapLogger concurrently. The zero value for a SwapLogger -// will discard all log events without error. -// -// SwapLogger serves well as a package global logger that can be changed by -// importers. -type SwapLogger struct { - logger atomic.Value -} - -type loggerStruct struct { - Logger -} - -// Log implements the Logger interface by forwarding keyvals to the currently -// wrapped logger. It does not log anything if the wrapped logger is nil. -func (l *SwapLogger) Log(keyvals ...interface{}) error { - s, ok := l.logger.Load().(loggerStruct) - if !ok || s.Logger == nil { - return nil - } - return s.Log(keyvals...) -} - -// Swap replaces the currently wrapped logger with logger. Swap may be called -// concurrently with calls to Log from other goroutines. -func (l *SwapLogger) Swap(logger Logger) { - l.logger.Store(loggerStruct{logger}) -} diff --git a/log/log_test.go b/log/log_test.go index 7cd084411..48578fd51 100644 --- a/log/log_test.go +++ b/log/log_test.go @@ -3,6 +3,7 @@ package log_test import ( "bytes" "fmt" + "io" "sync" "testing" @@ -253,3 +254,18 @@ func TestSwapLogger(t *testing.T) { func TestSwapLoggerConcurrency(t *testing.T) { testConcurrency(t, &log.SwapLogger{}) } + +func TestSyncLogger(t *testing.T) { + var w io.Writer + w = &bytes.Buffer{} + logger := log.NewLogfmtLogger(w) + logger = log.NewSyncLogger(logger) + testConcurrency(t, logger) +} + +func TestSyncWriter(t *testing.T) { + var w io.Writer + w = &bytes.Buffer{} + w = log.NewSyncWriter(w) + testConcurrency(t, log.NewLogfmtLogger(w)) +} diff --git a/log/sync.go b/log/sync.go new file mode 100644 index 000000000..985b4b2dd --- /dev/null +++ b/log/sync.go @@ -0,0 +1,81 @@ +package log + +import ( + "io" + "sync" + "sync/atomic" +) + +// SwapLogger wraps another logger that may be safely replaced while other +// goroutines use the SwapLogger concurrently. The zero value for a SwapLogger +// will discard all log events without error. +// +// SwapLogger serves well as a package global logger that can be changed by +// importers. +type SwapLogger struct { + logger atomic.Value +} + +type loggerStruct struct { + Logger +} + +// Log implements the Logger interface by forwarding keyvals to the currently +// wrapped logger. It does not log anything if the wrapped logger is nil. +func (l *SwapLogger) Log(keyvals ...interface{}) error { + s, ok := l.logger.Load().(loggerStruct) + if !ok || s.Logger == nil { + return nil + } + return s.Log(keyvals...) +} + +// Swap replaces the currently wrapped logger with logger. Swap may be called +// concurrently with calls to Log from other goroutines. +func (l *SwapLogger) Swap(logger Logger) { + l.logger.Store(loggerStruct{logger}) +} + +// SyncWriter synchronizes concurrent writes to an io.Writer. +type SyncWriter struct { + mu sync.Mutex + w io.Writer +} + +// NewSyncWriter returns a new SyncWriter. The returned writer is safe for +// concurrent use. +func NewSyncWriter(w io.Writer) *SyncWriter { + return &SyncWriter{ + w: w, + } +} + +// Write writes p to the underlying io.Writer. If another write is already in +// progress, the calling goroutine blocks until the SyncWriter is available. +func (w *SyncWriter) Write(p []byte) (n int, err error) { + w.mu.Lock() + n, err = w.w.Write(p) + w.mu.Unlock() + return n, err +} + +// syncLogger provides concurrent safe logging for another Logger. +type syncLogger struct { + mu sync.Mutex + logger Logger +} + +// NewSyncLogger returns a logger that synchronizes concurrent use of the +// wrapped logger. When multiple goroutines use the SyncLogger concurrently +// only one goroutine will be allowed to log to the wrapped logger at a time. +// The other goroutines will block until the logger is available. +func NewSyncLogger(logger Logger) Logger { + return &syncLogger{logger: logger} +} + +func (l *syncLogger) Log(keyvals ...interface{}) error { + l.mu.Lock() + err := l.logger.Log(keyvals...) + l.mu.Unlock() + return err +} From f2ec6f85eeb1b0549a591dd690fde88cc9896bc6 Mon Sep 17 00:00:00 2001 From: Chris Hines Date: Fri, 20 May 2016 21:37:46 -0400 Subject: [PATCH 2/9] Add AsyncLogger. --- log/log.go | 6 ++ log/log_test.go | 62 -------------- log/sync.go | 122 ++++++++++++++++++++++++++- log/sync_test.go | 210 +++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 334 insertions(+), 66 deletions(-) create mode 100644 log/sync_test.go diff --git a/log/log.go b/log/log.go index 4b5618c65..fed019407 100644 --- a/log/log.go +++ b/log/log.go @@ -2,6 +2,12 @@ // // The fundamental interface is Logger. Loggers create log events from // key/value data. +// +// Concurrent Safety +// +// Applications with multiple goroutines want each log event written to the +// same logger to remain separate from other log events. Package log provides +// multiple solutions for concurrent safe logging. package log import "errors" diff --git a/log/log_test.go b/log/log_test.go index 48578fd51..03009dd0f 100644 --- a/log/log_test.go +++ b/log/log_test.go @@ -3,7 +3,6 @@ package log_test import ( "bytes" "fmt" - "io" "sync" "testing" @@ -208,64 +207,3 @@ func BenchmarkTenWith(b *testing.B) { lc.Log("k", "v") } } - -func TestSwapLogger(t *testing.T) { - var logger log.SwapLogger - - // Zero value does not panic or error. - err := logger.Log("k", "v") - if got, want := err, error(nil); got != want { - t.Errorf("got %v, want %v", got, want) - } - - buf := &bytes.Buffer{} - json := log.NewJSONLogger(buf) - logger.Swap(json) - - if err := logger.Log("k", "v"); err != nil { - t.Error(err) - } - if got, want := buf.String(), `{"k":"v"}`+"\n"; got != want { - t.Errorf("got %v, want %v", got, want) - } - - buf.Reset() - prefix := log.NewLogfmtLogger(buf) - logger.Swap(prefix) - - if err := logger.Log("k", "v"); err != nil { - t.Error(err) - } - if got, want := buf.String(), "k=v\n"; got != want { - t.Errorf("got %v, want %v", got, want) - } - - buf.Reset() - logger.Swap(nil) - - if err := logger.Log("k", "v"); err != nil { - t.Error(err) - } - if got, want := buf.String(), ""; got != want { - t.Errorf("got %v, want %v", got, want) - } -} - -func TestSwapLoggerConcurrency(t *testing.T) { - testConcurrency(t, &log.SwapLogger{}) -} - -func TestSyncLogger(t *testing.T) { - var w io.Writer - w = &bytes.Buffer{} - logger := log.NewLogfmtLogger(w) - logger = log.NewSyncLogger(logger) - testConcurrency(t, logger) -} - -func TestSyncWriter(t *testing.T) { - var w io.Writer - w = &bytes.Buffer{} - w = log.NewSyncWriter(w) - testConcurrency(t, log.NewLogfmtLogger(w)) -} diff --git a/log/sync.go b/log/sync.go index 985b4b2dd..bd4278a9e 100644 --- a/log/sync.go +++ b/log/sync.go @@ -1,6 +1,7 @@ package log import ( + "errors" "io" "sync" "sync/atomic" @@ -43,11 +44,9 @@ type SyncWriter struct { } // NewSyncWriter returns a new SyncWriter. The returned writer is safe for -// concurrent use. +// concurrent use by multiple goroutines. func NewSyncWriter(w io.Writer) *SyncWriter { - return &SyncWriter{ - w: w, - } + return &SyncWriter{w: w} } // Write writes p to the underlying io.Writer. If another write is already in @@ -73,9 +72,124 @@ func NewSyncLogger(logger Logger) Logger { return &syncLogger{logger: logger} } +// Log logs keyvals to the underlying Logger. If another log is already in +// progress, the calling goroutine blocks until the syncLogger is available. func (l *syncLogger) Log(keyvals ...interface{}) error { l.mu.Lock() err := l.logger.Log(keyvals...) l.mu.Unlock() return err } + +// AsyncLogger provides buffered asynchronous and concurrent safe logging for +// another logger. +// +// If the wrapped logger's Log method ever returns an error, the AsyncLogger +// will stop processing log events and make the error available via the Err +// method. Any unprocessed log events in the buffer will be lost. +type AsyncLogger struct { + logger Logger + keyvalsC chan []interface{} + + stopping chan struct{} + stopped chan struct{} + + mu sync.Mutex + err error +} + +// NewAsyncLogger returns a new AsyncLogger that logs to logger and can buffer +// up to size log events before overflowing. +func NewAsyncLogger(logger Logger, size int) *AsyncLogger { + l := &AsyncLogger{ + logger: logger, + keyvalsC: make(chan []interface{}, size), + stopping: make(chan struct{}), + stopped: make(chan struct{}), + } + go l.run() + return l +} + +// run forwards log events from l.keyvalsC to l.logger. +func (l *AsyncLogger) run() { + defer close(l.stopped) + for keyvals := range l.keyvalsC { + err := l.logger.Log(keyvals...) + if err != nil { + l.mu.Lock() + l.stop(err) + l.mu.Unlock() + return + } + } +} + +// caller must hold l.mu +func (l *AsyncLogger) stop(err error) { + if err != nil && l.err == nil { + l.err = err + } + select { + case <-l.stopping: + // already stopping, do nothing + default: + close(l.stopping) + close(l.keyvalsC) + } +} + +// Log queues keyvals for logging by the wrapped Logger. Log may be called +// concurrently by multiple goroutines. If the the buffer is full, Log will +// return ErrAsyncLoggerOverflow and the keyvals are not queued. If the +// AsyncLogger is stopping, Log will return ErrAsyncLoggerStopping. +func (l *AsyncLogger) Log(keyvals ...interface{}) error { + l.mu.Lock() + defer l.mu.Unlock() + + select { + case <-l.stopping: + return ErrAsyncLoggerStopping + default: + } + + select { + case l.keyvalsC <- keyvals: + return nil + default: + return ErrAsyncLoggerOverflow + } +} + +// Errors returned by AsyncLogger. +var ( + ErrAsyncLoggerStopping = errors.New("aysnc logger: logger stopped") + ErrAsyncLoggerOverflow = errors.New("aysnc logger: log buffer overflow") +) + +// Stop stops the AsyncLogger. After stop returns the logger will not accept +// new log events. Log events queued prior to calling Stop will be logged. +func (l *AsyncLogger) Stop() { + l.mu.Lock() + l.stop(nil) + l.mu.Unlock() +} + +// Stopping returns a channel that is closed after Stop is called. +func (l *AsyncLogger) Stopping() <-chan struct{} { + return l.stopping +} + +// Stopped returns a channel that is closed after Stop is called and all log +// events have been sent to the wrapped logger. +func (l *AsyncLogger) Stopped() <-chan struct{} { + return l.stopped +} + +// Err returns the first error returned by the wrapped logger. +func (l *AsyncLogger) Err() error { + l.mu.Lock() + err := l.err + l.mu.Unlock() + return err +} diff --git a/log/sync_test.go b/log/sync_test.go new file mode 100644 index 000000000..08cc4a9d8 --- /dev/null +++ b/log/sync_test.go @@ -0,0 +1,210 @@ +package log_test + +import ( + "bytes" + "errors" + "io" + "testing" + + "github.com/go-kit/kit/log" +) + +func TestSwapLogger(t *testing.T) { + var logger log.SwapLogger + + // Zero value does not panic or error. + err := logger.Log("k", "v") + if got, want := err, error(nil); got != want { + t.Errorf("got %v, want %v", got, want) + } + + buf := &bytes.Buffer{} + json := log.NewJSONLogger(buf) + logger.Swap(json) + + if err := logger.Log("k", "v"); err != nil { + t.Error(err) + } + if got, want := buf.String(), `{"k":"v"}`+"\n"; got != want { + t.Errorf("got %v, want %v", got, want) + } + + buf.Reset() + prefix := log.NewLogfmtLogger(buf) + logger.Swap(prefix) + + if err := logger.Log("k", "v"); err != nil { + t.Error(err) + } + if got, want := buf.String(), "k=v\n"; got != want { + t.Errorf("got %v, want %v", got, want) + } + + buf.Reset() + logger.Swap(nil) + + if err := logger.Log("k", "v"); err != nil { + t.Error(err) + } + if got, want := buf.String(), ""; got != want { + t.Errorf("got %v, want %v", got, want) + } +} + +func TestSwapLoggerConcurrency(t *testing.T) { + testConcurrency(t, &log.SwapLogger{}) +} + +func TestSyncLoggerConcurrency(t *testing.T) { + var w io.Writer + w = &bytes.Buffer{} + logger := log.NewLogfmtLogger(w) + logger = log.NewSyncLogger(logger) + testConcurrency(t, logger) +} + +func TestSyncWriterConcurrency(t *testing.T) { + var w io.Writer + w = &bytes.Buffer{} + w = log.NewSyncWriter(w) + testConcurrency(t, log.NewLogfmtLogger(w)) +} + +func TestAsyncLoggerConcurrency(t *testing.T) { + var w io.Writer + w = &bytes.Buffer{} + logger := log.NewLogfmtLogger(w) + al := log.NewAsyncLogger(logger, 10) + testConcurrency(t, al) + al.Stop() + <-al.Stopped() +} + +func TestAsyncLoggerLogs(t *testing.T) { + output := [][]interface{}{} + logger := log.LoggerFunc(func(keyvals ...interface{}) error { + output = append(output, keyvals) + return nil + }) + + const logcnt = 10 + al := log.NewAsyncLogger(logger, logcnt) + + for i := 0; i < logcnt; i++ { + al.Log("key", i) + } + + al.Stop() + <-al.Stopping() + + if got, want := al.Log("key", "late"), log.ErrAsyncLoggerStopping; got != want { + t.Errorf(`logger err: got "%v", want "%v"`, got, want) + } + + <-al.Stopped() + + if got, want := al.Err(), error(nil); got != want { + t.Errorf(`logger err: got "%v", want "%v"`, got, want) + } + + if got, want := len(output), logcnt; got != want { + t.Errorf("logged events: got %v, want %v", got, want) + } + + for i, e := range output { + if got, want := e[1], i; got != want { + t.Errorf("log event mismatch, got %v, want %v", got, want) + } + } +} + +func TestAsyncLoggerLogError(t *testing.T) { + const logcnt = 10 + const logBeforeError = logcnt / 2 + logErr := errors.New("log error") + + output := [][]interface{}{} + logger := log.LoggerFunc(func(keyvals ...interface{}) error { + output = append(output, keyvals) + if len(output) == logBeforeError { + return logErr + } + return nil + }) + + al := log.NewAsyncLogger(logger, logcnt) + + for i := 0; i < logcnt; i++ { + al.Log("key", i) + } + + <-al.Stopping() + + if got, want := al.Log("key", "late"), log.ErrAsyncLoggerStopping; got != want { + t.Errorf(`log while stopping err: got "%v", want "%v"`, got, want) + } + + <-al.Stopped() + + if got, want := al.Err(), logErr; got != want { + t.Errorf(`logger err: got "%v", want "%v"`, got, want) + } + + if got, want := len(output), logBeforeError; got != want { + t.Errorf("logged events: got %v, want %v", got, want) + } + + for i, e := range output { + if got, want := e[1], i; got != want { + t.Errorf("log event mismatch, got %v, want %v", got, want) + } + } +} + +func TestAsyncLoggerOverflow(t *testing.T) { + var ( + output = make(chan []interface{}, 10) + loggerdone = make(chan struct{}) + ) + + logger := log.LoggerFunc(func(keyvals ...interface{}) error { + output <- keyvals + <-loggerdone // block here to stall the AsyncLogger.run loop + return nil + }) + + al := log.NewAsyncLogger(logger, 1) + + if got, want := al.Log("k", 1), error(nil); got != want { + t.Errorf(`first log err: got "%v", want "%v"`, got, want) + } + + <-output + // Now we know the AsyncLogger.run loop has consumed the first log event + // and will be stalled until loggerdone is closed. + + // This log event fills the buffer without error. + if got, want := al.Log("k", 2), error(nil); got != want { + t.Errorf(`second log err: got "%v", want "%v"`, got, want) + } + + // Now we test for buffer overflow. + if got, want := al.Log("k", 3), log.ErrAsyncLoggerOverflow; got != want { + t.Errorf(`third log err: got "%v", want "%v"`, got, want) + } + + al.Stop() + <-al.Stopping() + + if got, want := al.Log("key", "late"), log.ErrAsyncLoggerStopping; got != want { + t.Errorf(`log while stopping err: got "%v", want "%v"`, got, want) + } + + // Release the AsyncLogger.run loop and wait for it to stop. + close(loggerdone) + <-al.Stopped() + + if got, want := al.Err(), error(nil); got != want { + t.Errorf(`logger err: got "%v", want "%v"`, got, want) + } +} From 7bf8f6e6e871d462e08d934c24e3c3324bd31716 Mon Sep 17 00:00:00 2001 From: Chris Hines Date: Sun, 22 May 2016 21:58:17 -0400 Subject: [PATCH 3/9] Run more tests in parallel. --- log/json_logger_test.go | 1 + log/log_test.go | 1 + log/logfmt_logger_test.go | 1 + log/nop_logger_test.go | 1 + log/sync_test.go | 4 ++++ log/value_test.go | 6 ++++-- 6 files changed, 12 insertions(+), 2 deletions(-) diff --git a/log/json_logger_test.go b/log/json_logger_test.go index 291157721..78697b6cf 100644 --- a/log/json_logger_test.go +++ b/log/json_logger_test.go @@ -100,6 +100,7 @@ func (textstringer) String() string { } func TestJSONLoggerStringValue(t *testing.T) { + t.Parallel() tests := []struct { v interface{} expected string diff --git a/log/log_test.go b/log/log_test.go index 03009dd0f..7c44095b3 100644 --- a/log/log_test.go +++ b/log/log_test.go @@ -71,6 +71,7 @@ func TestContextMissingValue(t *testing.T) { // whether Context.Log is called via an interface typed variable or a concrete // typed variable. func TestContextStackDepth(t *testing.T) { + t.Parallel() fn := fmt.Sprintf("%n", stack.Caller(0)) var output []interface{} diff --git a/log/logfmt_logger_test.go b/log/logfmt_logger_test.go index 185e94851..9e7361ff3 100644 --- a/log/logfmt_logger_test.go +++ b/log/logfmt_logger_test.go @@ -11,6 +11,7 @@ import ( ) func TestLogfmtLogger(t *testing.T) { + t.Parallel() buf := &bytes.Buffer{} logger := log.NewLogfmtLogger(buf) diff --git a/log/nop_logger_test.go b/log/nop_logger_test.go index 043553e62..25af1836d 100644 --- a/log/nop_logger_test.go +++ b/log/nop_logger_test.go @@ -7,6 +7,7 @@ import ( ) func TestNopLogger(t *testing.T) { + t.Parallel() logger := log.NewNopLogger() if err := logger.Log("abc", 123); err != nil { t.Error(err) diff --git a/log/sync_test.go b/log/sync_test.go index 08cc4a9d8..18bc13afd 100644 --- a/log/sync_test.go +++ b/log/sync_test.go @@ -10,6 +10,7 @@ import ( ) func TestSwapLogger(t *testing.T) { + t.Parallel() var logger log.SwapLogger // Zero value does not panic or error. @@ -81,6 +82,7 @@ func TestAsyncLoggerConcurrency(t *testing.T) { } func TestAsyncLoggerLogs(t *testing.T) { + t.Parallel() output := [][]interface{}{} logger := log.LoggerFunc(func(keyvals ...interface{}) error { output = append(output, keyvals) @@ -119,6 +121,7 @@ func TestAsyncLoggerLogs(t *testing.T) { } func TestAsyncLoggerLogError(t *testing.T) { + t.Parallel() const logcnt = 10 const logBeforeError = logcnt / 2 logErr := errors.New("log error") @@ -162,6 +165,7 @@ func TestAsyncLoggerLogError(t *testing.T) { } func TestAsyncLoggerOverflow(t *testing.T) { + t.Parallel() var ( output = make(chan []interface{}, 10) loggerdone = make(chan struct{}) diff --git a/log/value_test.go b/log/value_test.go index 52773611c..44e6478af 100644 --- a/log/value_test.go +++ b/log/value_test.go @@ -9,6 +9,7 @@ import ( ) func TestValueBinding(t *testing.T) { + t.Parallel() var output []interface{} logger := log.Logger(log.LoggerFunc(func(keyvals ...interface{}) error { @@ -33,7 +34,7 @@ func TestValueBinding(t *testing.T) { if want, have := start.Add(time.Second), timestamp; want != have { t.Errorf("output[1]: want %v, have %v", want, have) } - if want, have := "value_test.go:28", fmt.Sprint(output[3]); want != have { + if want, have := "value_test.go:29", fmt.Sprint(output[3]); want != have { t.Errorf("output[3]: want %s, have %s", want, have) } @@ -46,12 +47,13 @@ func TestValueBinding(t *testing.T) { if want, have := start.Add(2*time.Second), timestamp; want != have { t.Errorf("output[1]: want %v, have %v", want, have) } - if want, have := "value_test.go:41", fmt.Sprint(output[3]); want != have { + if want, have := "value_test.go:42", fmt.Sprint(output[3]); want != have { t.Errorf("output[3]: want %s, have %s", want, have) } } func TestValueBinding_loggingZeroKeyvals(t *testing.T) { + t.Parallel() var output []interface{} logger := log.Logger(log.LoggerFunc(func(keyvals ...interface{}) error { From 4e51f0b286afb2239b5001ad2df99c4c540b4f29 Mon Sep 17 00:00:00 2001 From: Chris Hines Date: Sun, 22 May 2016 22:02:25 -0400 Subject: [PATCH 4/9] Improve log.testConcurrency to check errors and control total event count. --- log/concurrency_test.go | 36 ++++++++++++++++++++++++------------ log/json_logger_test.go | 3 ++- log/logfmt_logger_test.go | 3 ++- log/sync_test.go | 11 ++++++----- 4 files changed, 34 insertions(+), 19 deletions(-) diff --git a/log/concurrency_test.go b/log/concurrency_test.go index e68d16a3e..95a749e77 100644 --- a/log/concurrency_test.go +++ b/log/concurrency_test.go @@ -1,8 +1,7 @@ package log_test import ( - "strconv" - "sync" + "math" "testing" "github.com/go-kit/kit/log" @@ -10,19 +9,32 @@ import ( // These test are designed to be run with the race detector. -func testConcurrency(t *testing.T, logger log.Logger) { - for _, n := range []int{10, 100, 500} { - wg := sync.WaitGroup{} - wg.Add(n) - for i := 0; i < n; i++ { - go func() { spam(logger); wg.Done() }() +func testConcurrency(t *testing.T, logger log.Logger, total int) { + n := int(math.Sqrt(float64(total))) + share := total / n + + errC := make(chan error, n) + + for i := 0; i < n; i++ { + go func() { + errC <- spam(logger, share) + }() + } + + for i := 0; i < n; i++ { + err := <-errC + if err != nil { + t.Fatalf("concurrent logging error: %v", err) } - wg.Wait() } } -func spam(logger log.Logger) { - for i := 0; i < 100; i++ { - logger.Log("key", strconv.FormatInt(int64(i), 10)) +func spam(logger log.Logger, count int) error { + for i := 0; i < count; i++ { + err := logger.Log("key", i) + if err != nil { + return err + } } + return nil } diff --git a/log/json_logger_test.go b/log/json_logger_test.go index 78697b6cf..42df70c1c 100644 --- a/log/json_logger_test.go +++ b/log/json_logger_test.go @@ -153,5 +153,6 @@ func BenchmarkJSONLoggerContextual(b *testing.B) { } func TestJSONLoggerConcurrency(t *testing.T) { - testConcurrency(t, log.NewJSONLogger(ioutil.Discard)) + t.Parallel() + testConcurrency(t, log.NewJSONLogger(ioutil.Discard), 10000) } diff --git a/log/logfmt_logger_test.go b/log/logfmt_logger_test.go index 9e7361ff3..91bbca15c 100644 --- a/log/logfmt_logger_test.go +++ b/log/logfmt_logger_test.go @@ -48,7 +48,8 @@ func BenchmarkLogfmtLoggerContextual(b *testing.B) { } func TestLogfmtLoggerConcurrency(t *testing.T) { - testConcurrency(t, log.NewLogfmtLogger(ioutil.Discard)) + t.Parallel() + testConcurrency(t, log.NewLogfmtLogger(ioutil.Discard), 10000) } type mymap map[int]int diff --git a/log/sync_test.go b/log/sync_test.go index 18bc13afd..3448d6af6 100644 --- a/log/sync_test.go +++ b/log/sync_test.go @@ -53,7 +53,8 @@ func TestSwapLogger(t *testing.T) { } func TestSwapLoggerConcurrency(t *testing.T) { - testConcurrency(t, &log.SwapLogger{}) + t.Parallel() + testConcurrency(t, &log.SwapLogger{}, 10000) } func TestSyncLoggerConcurrency(t *testing.T) { @@ -61,22 +62,22 @@ func TestSyncLoggerConcurrency(t *testing.T) { w = &bytes.Buffer{} logger := log.NewLogfmtLogger(w) logger = log.NewSyncLogger(logger) - testConcurrency(t, logger) + testConcurrency(t, logger, 10000) } func TestSyncWriterConcurrency(t *testing.T) { var w io.Writer w = &bytes.Buffer{} w = log.NewSyncWriter(w) - testConcurrency(t, log.NewLogfmtLogger(w)) + testConcurrency(t, log.NewLogfmtLogger(w), 10000) } func TestAsyncLoggerConcurrency(t *testing.T) { var w io.Writer w = &bytes.Buffer{} logger := log.NewLogfmtLogger(w) - al := log.NewAsyncLogger(logger, 10) - testConcurrency(t, al) + al := log.NewAsyncLogger(logger, 10000) + testConcurrency(t, al, 10000) al.Stop() <-al.Stopped() } From 5847566bfb4978c7f6091e19312111dce08bc517 Mon Sep 17 00:00:00 2001 From: Chris Hines Date: Sun, 22 May 2016 22:03:47 -0400 Subject: [PATCH 5/9] Add AsyncLogger Len and Cap methods for monitoring. --- log/sync.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/log/sync.go b/log/sync.go index bd4278a9e..03e5da6cf 100644 --- a/log/sync.go +++ b/log/sync.go @@ -193,3 +193,15 @@ func (l *AsyncLogger) Err() error { l.mu.Unlock() return err } + +// Len returns a snapshot of the number of buffered log events. The returned +// count should only be used for monitoring purposes as it becomes stale +// quickly. +func (l *AsyncLogger) Len() int { + return len(l.keyvalsC) +} + +// Cap returns the maximum capacity of the buffer. +func (l *AsyncLogger) Cap() int { + return cap(l.keyvalsC) +} From dabeb686221bf7be9dba655df6ef9bf48cc3a4f1 Mon Sep 17 00:00:00 2001 From: Chris Hines Date: Sun, 22 May 2016 22:06:00 -0400 Subject: [PATCH 6/9] Centralize AsyncLogger locking code. --- log/sync.go | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/log/sync.go b/log/sync.go index 03e5da6cf..052456608 100644 --- a/log/sync.go +++ b/log/sync.go @@ -91,11 +91,11 @@ type AsyncLogger struct { logger Logger keyvalsC chan []interface{} - stopping chan struct{} - stopped chan struct{} + mu sync.Mutex + err error + stopping chan struct{} // must be closed before keyvalsC - mu sync.Mutex - err error + stopped chan struct{} // closed when run loop exits } // NewAsyncLogger returns a new AsyncLogger that logs to logger and can buffer @@ -117,16 +117,14 @@ func (l *AsyncLogger) run() { for keyvals := range l.keyvalsC { err := l.logger.Log(keyvals...) if err != nil { - l.mu.Lock() l.stop(err) - l.mu.Unlock() return } } } -// caller must hold l.mu func (l *AsyncLogger) stop(err error) { + l.mu.Lock() if err != nil && l.err == nil { l.err = err } @@ -137,6 +135,7 @@ func (l *AsyncLogger) stop(err error) { close(l.stopping) close(l.keyvalsC) } + l.mu.Unlock() } // Log queues keyvals for logging by the wrapped Logger. Log may be called @@ -170,9 +169,7 @@ var ( // Stop stops the AsyncLogger. After stop returns the logger will not accept // new log events. Log events queued prior to calling Stop will be logged. func (l *AsyncLogger) Stop() { - l.mu.Lock() l.stop(nil) - l.mu.Unlock() } // Stopping returns a channel that is closed after Stop is called. From a8be77686a39d78d9d0d8fefd3fe5d1597d24315 Mon Sep 17 00:00:00 2001 From: Chris Hines Date: Sun, 22 May 2016 22:44:45 -0400 Subject: [PATCH 7/9] Improve AsyncLogger test coverage. --- log/sync_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/log/sync_test.go b/log/sync_test.go index 3448d6af6..55b45c8c3 100644 --- a/log/sync_test.go +++ b/log/sync_test.go @@ -98,6 +98,7 @@ func TestAsyncLoggerLogs(t *testing.T) { } al.Stop() + al.Stop() // stop is idempotent <-al.Stopping() if got, want := al.Log("key", "late"), log.ErrAsyncLoggerStopping; got != want { @@ -105,6 +106,7 @@ func TestAsyncLoggerLogs(t *testing.T) { } <-al.Stopped() + al.Stop() // stop is idempotent if got, want := al.Err(), error(nil); got != want { t.Errorf(`logger err: got "%v", want "%v"`, got, want) @@ -149,6 +151,7 @@ func TestAsyncLoggerLogError(t *testing.T) { } <-al.Stopped() + al.Stop() // stop is idempotent and must not destroy result of Err if got, want := al.Err(), logErr; got != want { t.Errorf(`logger err: got "%v", want "%v"`, got, want) From 7f5e3dcc0f429be5e33e75473b142bb8df9cfbbc Mon Sep 17 00:00:00 2001 From: Chris Hines Date: Mon, 30 May 2016 21:16:30 -0400 Subject: [PATCH 8/9] Rename AsyncLogger to NonblockingLogger and add a new simpler AsyncLogger implementation. --- log/sync.go | 101 +++++++++++++++++++++++++++++++++++------------ log/sync_test.go | 76 +++++++++++++++++++++++++---------- 2 files changed, 132 insertions(+), 45 deletions(-) diff --git a/log/sync.go b/log/sync.go index 052456608..7e5a0880c 100644 --- a/log/sync.go +++ b/log/sync.go @@ -84,12 +84,61 @@ func (l *syncLogger) Log(keyvals ...interface{}) error { // AsyncLogger provides buffered asynchronous and concurrent safe logging for // another logger. // -// If the wrapped logger's Log method ever returns an error, the AsyncLogger -// will stop processing log events and make the error available via the Err -// method. Any unprocessed log events in the buffer will be lost. +// Errors returned by the wrapped logger are ignored, therefore the wrapped +// logger should must handle all errors appropriately. type AsyncLogger struct { logger Logger keyvalsC chan []interface{} +} + +// NewAsyncLogger returns a new AsyncLogger that logs to logger and can buffer +// up to size log events before its Log method blocks. +func NewAsyncLogger(logger Logger, size int) *AsyncLogger { + l := &AsyncLogger{ + logger: logger, + keyvalsC: make(chan []interface{}, size), + } + go l.run() + return l +} + +// run forwards log events from l.keyvalsC to l.logger. +func (l *AsyncLogger) run() { + for keyvals := range l.keyvalsC { + l.logger.Log(keyvals...) + } +} + +// Log queues keyvals for logging by the wrapped Logger. Log may be called +// concurrently by multiple goroutines. If the the buffer is full, Log will +// block until space is available. Log always returns a nil error. +func (l *AsyncLogger) Log(keyvals ...interface{}) error { + l.keyvalsC <- keyvals + return nil +} + +// Len returns a snapshot of the number of buffered log events. The returned +// count should only be used for monitoring purposes as it becomes stale +// quickly. +func (l *AsyncLogger) Len() int { + return len(l.keyvalsC) +} + +// Cap returns the maximum capacity of the buffer. +func (l *AsyncLogger) Cap() int { + return cap(l.keyvalsC) +} + +// NonblockingLogger provides buffered asynchronous and concurrent safe +// logging for another logger. +// +// If the wrapped logger's Log method ever returns an error, the +// NonblockingLogger will stop processing log events and make the error +// available via the Err method. Any unprocessed log events in the buffer will +// be lost. +type NonblockingLogger struct { + logger Logger + keyvalsC chan []interface{} mu sync.Mutex err error @@ -98,10 +147,10 @@ type AsyncLogger struct { stopped chan struct{} // closed when run loop exits } -// NewAsyncLogger returns a new AsyncLogger that logs to logger and can buffer -// up to size log events before overflowing. -func NewAsyncLogger(logger Logger, size int) *AsyncLogger { - l := &AsyncLogger{ +// NewNonblockingLogger returns a new NonblockingLogger that logs to logger +// and can buffer up to size log events before overflowing. +func NewNonblockingLogger(logger Logger, size int) *NonblockingLogger { + l := &NonblockingLogger{ logger: logger, keyvalsC: make(chan []interface{}, size), stopping: make(chan struct{}), @@ -112,7 +161,7 @@ func NewAsyncLogger(logger Logger, size int) *AsyncLogger { } // run forwards log events from l.keyvalsC to l.logger. -func (l *AsyncLogger) run() { +func (l *NonblockingLogger) run() { defer close(l.stopped) for keyvals := range l.keyvalsC { err := l.logger.Log(keyvals...) @@ -123,7 +172,7 @@ func (l *AsyncLogger) run() { } } -func (l *AsyncLogger) stop(err error) { +func (l *NonblockingLogger) stop(err error) { l.mu.Lock() if err != nil && l.err == nil { l.err = err @@ -140,15 +189,16 @@ func (l *AsyncLogger) stop(err error) { // Log queues keyvals for logging by the wrapped Logger. Log may be called // concurrently by multiple goroutines. If the the buffer is full, Log will -// return ErrAsyncLoggerOverflow and the keyvals are not queued. If the -// AsyncLogger is stopping, Log will return ErrAsyncLoggerStopping. -func (l *AsyncLogger) Log(keyvals ...interface{}) error { +// return ErrNonblockingLoggerOverflow and the keyvals are not queued. If the +// NonblockingLogger is stopping, Log will return +// ErrNonblockingLoggerStopping. +func (l *NonblockingLogger) Log(keyvals ...interface{}) error { l.mu.Lock() defer l.mu.Unlock() select { case <-l.stopping: - return ErrAsyncLoggerStopping + return ErrNonblockingLoggerStopping default: } @@ -156,35 +206,36 @@ func (l *AsyncLogger) Log(keyvals ...interface{}) error { case l.keyvalsC <- keyvals: return nil default: - return ErrAsyncLoggerOverflow + return ErrNonblockingLoggerOverflow } } -// Errors returned by AsyncLogger. +// Errors returned by NonblockingLogger. var ( - ErrAsyncLoggerStopping = errors.New("aysnc logger: logger stopped") - ErrAsyncLoggerOverflow = errors.New("aysnc logger: log buffer overflow") + ErrNonblockingLoggerStopping = errors.New("aysnc logger: logger stopped") + ErrNonblockingLoggerOverflow = errors.New("aysnc logger: log buffer overflow") ) -// Stop stops the AsyncLogger. After stop returns the logger will not accept -// new log events. Log events queued prior to calling Stop will be logged. -func (l *AsyncLogger) Stop() { +// Stop stops the NonblockingLogger. After stop returns the logger will not +// accept new log events. Log events queued prior to calling Stop will be +// logged. +func (l *NonblockingLogger) Stop() { l.stop(nil) } // Stopping returns a channel that is closed after Stop is called. -func (l *AsyncLogger) Stopping() <-chan struct{} { +func (l *NonblockingLogger) Stopping() <-chan struct{} { return l.stopping } // Stopped returns a channel that is closed after Stop is called and all log // events have been sent to the wrapped logger. -func (l *AsyncLogger) Stopped() <-chan struct{} { +func (l *NonblockingLogger) Stopped() <-chan struct{} { return l.stopped } // Err returns the first error returned by the wrapped logger. -func (l *AsyncLogger) Err() error { +func (l *NonblockingLogger) Err() error { l.mu.Lock() err := l.err l.mu.Unlock() @@ -194,11 +245,11 @@ func (l *AsyncLogger) Err() error { // Len returns a snapshot of the number of buffered log events. The returned // count should only be used for monitoring purposes as it becomes stale // quickly. -func (l *AsyncLogger) Len() int { +func (l *NonblockingLogger) Len() int { return len(l.keyvalsC) } // Cap returns the maximum capacity of the buffer. -func (l *AsyncLogger) Cap() int { +func (l *NonblockingLogger) Cap() int { return cap(l.keyvalsC) } diff --git a/log/sync_test.go b/log/sync_test.go index 55b45c8c3..e106976ec 100644 --- a/log/sync_test.go +++ b/log/sync_test.go @@ -57,6 +57,13 @@ func TestSwapLoggerConcurrency(t *testing.T) { testConcurrency(t, &log.SwapLogger{}, 10000) } +func TestSyncWriterConcurrency(t *testing.T) { + var w io.Writer + w = &bytes.Buffer{} + w = log.NewSyncWriter(w) + testConcurrency(t, log.NewLogfmtLogger(w), 10000) +} + func TestSyncLoggerConcurrency(t *testing.T) { var w io.Writer w = &bytes.Buffer{} @@ -65,24 +72,53 @@ func TestSyncLoggerConcurrency(t *testing.T) { testConcurrency(t, logger, 10000) } -func TestSyncWriterConcurrency(t *testing.T) { - var w io.Writer - w = &bytes.Buffer{} - w = log.NewSyncWriter(w) - testConcurrency(t, log.NewLogfmtLogger(w), 10000) +func TestAsyncLoggerConcurrency(t *testing.T) { + for _, size := range []int{1, 100, 1000, 10000} { + var w io.Writer + w = &bytes.Buffer{} + logger := log.NewLogfmtLogger(w) + logger = log.NewAsyncLogger(logger, size) + testConcurrency(t, logger, 10000) + } } -func TestAsyncLoggerConcurrency(t *testing.T) { +func TestAsyncLoggerLogs(t *testing.T) { + t.Parallel() + output := make(chan []interface{}) + logger := log.LoggerFunc(func(keyvals ...interface{}) error { + output <- keyvals + return nil + }) + + const size = 4 + const logcnt = size * 20 + al := log.NewAsyncLogger(logger, size) + + go func() { + for i := 0; i < logcnt; i++ { + al.Log("key", i) + } + }() + + for i := 0; i < logcnt; i++ { + e := <-output + if got, want := e[1], i; got != want { + t.Errorf("log event mismatch, got %v, want %v", got, want) + } + } +} + +func TestNonblockingLoggerConcurrency(t *testing.T) { var w io.Writer w = &bytes.Buffer{} logger := log.NewLogfmtLogger(w) - al := log.NewAsyncLogger(logger, 10000) + al := log.NewNonblockingLogger(logger, 10000) testConcurrency(t, al, 10000) al.Stop() <-al.Stopped() } -func TestAsyncLoggerLogs(t *testing.T) { +func TestNonblockingLoggerLogs(t *testing.T) { t.Parallel() output := [][]interface{}{} logger := log.LoggerFunc(func(keyvals ...interface{}) error { @@ -91,7 +127,7 @@ func TestAsyncLoggerLogs(t *testing.T) { }) const logcnt = 10 - al := log.NewAsyncLogger(logger, logcnt) + al := log.NewNonblockingLogger(logger, logcnt) for i := 0; i < logcnt; i++ { al.Log("key", i) @@ -101,7 +137,7 @@ func TestAsyncLoggerLogs(t *testing.T) { al.Stop() // stop is idempotent <-al.Stopping() - if got, want := al.Log("key", "late"), log.ErrAsyncLoggerStopping; got != want { + if got, want := al.Log("key", "late"), log.ErrNonblockingLoggerStopping; got != want { t.Errorf(`logger err: got "%v", want "%v"`, got, want) } @@ -123,7 +159,7 @@ func TestAsyncLoggerLogs(t *testing.T) { } } -func TestAsyncLoggerLogError(t *testing.T) { +func TestNonblockingLoggerLogError(t *testing.T) { t.Parallel() const logcnt = 10 const logBeforeError = logcnt / 2 @@ -138,7 +174,7 @@ func TestAsyncLoggerLogError(t *testing.T) { return nil }) - al := log.NewAsyncLogger(logger, logcnt) + al := log.NewNonblockingLogger(logger, logcnt) for i := 0; i < logcnt; i++ { al.Log("key", i) @@ -146,7 +182,7 @@ func TestAsyncLoggerLogError(t *testing.T) { <-al.Stopping() - if got, want := al.Log("key", "late"), log.ErrAsyncLoggerStopping; got != want { + if got, want := al.Log("key", "late"), log.ErrNonblockingLoggerStopping; got != want { t.Errorf(`log while stopping err: got "%v", want "%v"`, got, want) } @@ -168,7 +204,7 @@ func TestAsyncLoggerLogError(t *testing.T) { } } -func TestAsyncLoggerOverflow(t *testing.T) { +func TestNonblockingLoggerOverflow(t *testing.T) { t.Parallel() var ( output = make(chan []interface{}, 10) @@ -177,18 +213,18 @@ func TestAsyncLoggerOverflow(t *testing.T) { logger := log.LoggerFunc(func(keyvals ...interface{}) error { output <- keyvals - <-loggerdone // block here to stall the AsyncLogger.run loop + <-loggerdone // block here to stall the NonblockingLogger.run loop return nil }) - al := log.NewAsyncLogger(logger, 1) + al := log.NewNonblockingLogger(logger, 1) if got, want := al.Log("k", 1), error(nil); got != want { t.Errorf(`first log err: got "%v", want "%v"`, got, want) } <-output - // Now we know the AsyncLogger.run loop has consumed the first log event + // Now we know the NonblockingLogger.run loop has consumed the first log event // and will be stalled until loggerdone is closed. // This log event fills the buffer without error. @@ -197,18 +233,18 @@ func TestAsyncLoggerOverflow(t *testing.T) { } // Now we test for buffer overflow. - if got, want := al.Log("k", 3), log.ErrAsyncLoggerOverflow; got != want { + if got, want := al.Log("k", 3), log.ErrNonblockingLoggerOverflow; got != want { t.Errorf(`third log err: got "%v", want "%v"`, got, want) } al.Stop() <-al.Stopping() - if got, want := al.Log("key", "late"), log.ErrAsyncLoggerStopping; got != want { + if got, want := al.Log("key", "late"), log.ErrNonblockingLoggerStopping; got != want { t.Errorf(`log while stopping err: got "%v", want "%v"`, got, want) } - // Release the AsyncLogger.run loop and wait for it to stop. + // Release the NonblockingLogger.run loop and wait for it to stop. close(loggerdone) <-al.Stopped() From 2a348cc41662e402da909dd91146435171d44bbc Mon Sep 17 00:00:00 2001 From: Chris Hines Date: Mon, 30 May 2016 23:48:10 -0400 Subject: [PATCH 9/9] Require logger wrapped by NonblockingLogger to handle errors. --- log/sync.go | 38 +++++++--------------------------- log/sync_test.go | 54 ------------------------------------------------ 2 files changed, 8 insertions(+), 84 deletions(-) diff --git a/log/sync.go b/log/sync.go index 7e5a0880c..3ac21f8a2 100644 --- a/log/sync.go +++ b/log/sync.go @@ -85,7 +85,7 @@ func (l *syncLogger) Log(keyvals ...interface{}) error { // another logger. // // Errors returned by the wrapped logger are ignored, therefore the wrapped -// logger should must handle all errors appropriately. +// logger must handle all errors appropriately. type AsyncLogger struct { logger Logger keyvalsC chan []interface{} @@ -132,16 +132,13 @@ func (l *AsyncLogger) Cap() int { // NonblockingLogger provides buffered asynchronous and concurrent safe // logging for another logger. // -// If the wrapped logger's Log method ever returns an error, the -// NonblockingLogger will stop processing log events and make the error -// available via the Err method. Any unprocessed log events in the buffer will -// be lost. +// Errors returned by the wrapped logger are ignored, therefore the wrapped +// logger must handle all errors appropriately. type NonblockingLogger struct { logger Logger keyvalsC chan []interface{} mu sync.Mutex - err error stopping chan struct{} // must be closed before keyvalsC stopped chan struct{} // closed when run loop exits @@ -164,19 +161,15 @@ func NewNonblockingLogger(logger Logger, size int) *NonblockingLogger { func (l *NonblockingLogger) run() { defer close(l.stopped) for keyvals := range l.keyvalsC { - err := l.logger.Log(keyvals...) - if err != nil { - l.stop(err) - return - } + l.logger.Log(keyvals...) } } -func (l *NonblockingLogger) stop(err error) { +// Stop stops the NonblockingLogger. After stop returns the logger will not +// accept new log events. Log events queued prior to calling Stop will be +// logged. +func (l *NonblockingLogger) Stop() { l.mu.Lock() - if err != nil && l.err == nil { - l.err = err - } select { case <-l.stopping: // already stopping, do nothing @@ -216,13 +209,6 @@ var ( ErrNonblockingLoggerOverflow = errors.New("aysnc logger: log buffer overflow") ) -// Stop stops the NonblockingLogger. After stop returns the logger will not -// accept new log events. Log events queued prior to calling Stop will be -// logged. -func (l *NonblockingLogger) Stop() { - l.stop(nil) -} - // Stopping returns a channel that is closed after Stop is called. func (l *NonblockingLogger) Stopping() <-chan struct{} { return l.stopping @@ -234,14 +220,6 @@ func (l *NonblockingLogger) Stopped() <-chan struct{} { return l.stopped } -// Err returns the first error returned by the wrapped logger. -func (l *NonblockingLogger) Err() error { - l.mu.Lock() - err := l.err - l.mu.Unlock() - return err -} - // Len returns a snapshot of the number of buffered log events. The returned // count should only be used for monitoring purposes as it becomes stale // quickly. diff --git a/log/sync_test.go b/log/sync_test.go index e106976ec..dadacf8c6 100644 --- a/log/sync_test.go +++ b/log/sync_test.go @@ -2,7 +2,6 @@ package log_test import ( "bytes" - "errors" "io" "testing" @@ -144,10 +143,6 @@ func TestNonblockingLoggerLogs(t *testing.T) { <-al.Stopped() al.Stop() // stop is idempotent - if got, want := al.Err(), error(nil); got != want { - t.Errorf(`logger err: got "%v", want "%v"`, got, want) - } - if got, want := len(output), logcnt; got != want { t.Errorf("logged events: got %v, want %v", got, want) } @@ -159,51 +154,6 @@ func TestNonblockingLoggerLogs(t *testing.T) { } } -func TestNonblockingLoggerLogError(t *testing.T) { - t.Parallel() - const logcnt = 10 - const logBeforeError = logcnt / 2 - logErr := errors.New("log error") - - output := [][]interface{}{} - logger := log.LoggerFunc(func(keyvals ...interface{}) error { - output = append(output, keyvals) - if len(output) == logBeforeError { - return logErr - } - return nil - }) - - al := log.NewNonblockingLogger(logger, logcnt) - - for i := 0; i < logcnt; i++ { - al.Log("key", i) - } - - <-al.Stopping() - - if got, want := al.Log("key", "late"), log.ErrNonblockingLoggerStopping; got != want { - t.Errorf(`log while stopping err: got "%v", want "%v"`, got, want) - } - - <-al.Stopped() - al.Stop() // stop is idempotent and must not destroy result of Err - - if got, want := al.Err(), logErr; got != want { - t.Errorf(`logger err: got "%v", want "%v"`, got, want) - } - - if got, want := len(output), logBeforeError; got != want { - t.Errorf("logged events: got %v, want %v", got, want) - } - - for i, e := range output { - if got, want := e[1], i; got != want { - t.Errorf("log event mismatch, got %v, want %v", got, want) - } - } -} - func TestNonblockingLoggerOverflow(t *testing.T) { t.Parallel() var ( @@ -247,8 +197,4 @@ func TestNonblockingLoggerOverflow(t *testing.T) { // Release the NonblockingLogger.run loop and wait for it to stop. close(loggerdone) <-al.Stopped() - - if got, want := al.Err(), error(nil); got != want { - t.Errorf(`logger err: got "%v", want "%v"`, got, want) - } }