Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] log: Add asynchronous wrappers. #273

Closed
wants to merge 9 commits into from
20 changes: 19 additions & 1 deletion log/example_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,31 @@
package log_test

import (
"net/url"
"os"

"github.com/go-kit/kit/log"
)

func Example_stdout() {
w := log.NewSyncWriter(os.Stdout)
logger := log.NewLogfmtLogger(w)

reqUrl := &url.URL{
Scheme: "https",
Host: "github.com",
Path: "/go-kit/kit",
}

logger.Log("method", "GET", "url", reqUrl)

// Output:
// method=GET url=https://github.com/go-kit/kit
}

func ExampleContext() {
logger := log.NewLogfmtLogger(os.Stdout)
w := log.NewSyncWriter(os.Stdout)
logger := log.NewLogfmtLogger(w)
logger.Log("foo", 123)
ctx := log.NewContext(logger).With("level", "info")
ctx.Log()
Expand Down
41 changes: 7 additions & 34 deletions log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@
//
// The fundamental interface is Logger. Loggers create log events from
// key/value data.
//
// Concurrent Safety
//
// Applications with multiple goroutines want each log event written to the
// same logger to remain separate from other log events. Package log provides
// multiple solutions for concurrent safe logging.
package log

import (
"errors"
"sync/atomic"
)
import "errors"

// Logger is the fundamental interface for all log operations. Log creates a
// log event from keyvals, a variadic sequence of alternating keys and values.
Expand Down Expand Up @@ -149,33 +152,3 @@ type LoggerFunc func(...interface{}) error
func (f LoggerFunc) Log(keyvals ...interface{}) error {
return f(keyvals...)
}

// SwapLogger wraps another logger that may be safely replaced while other
// goroutines use the SwapLogger concurrently. The zero value for a SwapLogger
// will discard all log events without error.
//
// SwapLogger serves well as a package global logger that can be changed by
// importers.
type SwapLogger struct {
logger atomic.Value
}

type loggerStruct struct {
Logger
}

// Log implements the Logger interface by forwarding keyvals to the currently
// wrapped logger. It does not log anything if the wrapped logger is nil.
func (l *SwapLogger) Log(keyvals ...interface{}) error {
s, ok := l.logger.Load().(loggerStruct)
if !ok || s.Logger == nil {
return nil
}
return s.Log(keyvals...)
}

// Swap replaces the currently wrapped logger with logger. Swap may be called
// concurrently with calls to Log from other goroutines.
func (l *SwapLogger) Swap(logger Logger) {
l.logger.Store(loggerStruct{logger})
}
46 changes: 0 additions & 46 deletions log/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,49 +207,3 @@ func BenchmarkTenWith(b *testing.B) {
lc.Log("k", "v")
}
}

func TestSwapLogger(t *testing.T) {
var logger log.SwapLogger

// Zero value does not panic or error.
err := logger.Log("k", "v")
if got, want := err, error(nil); got != want {
t.Errorf("got %v, want %v", got, want)
}

buf := &bytes.Buffer{}
json := log.NewJSONLogger(buf)
logger.Swap(json)

if err := logger.Log("k", "v"); err != nil {
t.Error(err)
}
if got, want := buf.String(), `{"k":"v"}`+"\n"; got != want {
t.Errorf("got %v, want %v", got, want)
}

buf.Reset()
prefix := log.NewLogfmtLogger(buf)
logger.Swap(prefix)

if err := logger.Log("k", "v"); err != nil {
t.Error(err)
}
if got, want := buf.String(), "k=v\n"; got != want {
t.Errorf("got %v, want %v", got, want)
}

buf.Reset()
logger.Swap(nil)

if err := logger.Log("k", "v"); err != nil {
t.Error(err)
}
if got, want := buf.String(), ""; got != want {
t.Errorf("got %v, want %v", got, want)
}
}

func TestSwapLoggerConcurrency(t *testing.T) {
testConcurrency(t, &log.SwapLogger{})
}
195 changes: 195 additions & 0 deletions log/sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
package log

import (
"errors"
"io"
"sync"
"sync/atomic"
)

// SwapLogger wraps another logger that may be safely replaced while other
// goroutines use the SwapLogger concurrently. The zero value for a SwapLogger
// will discard all log events without error.
//
// SwapLogger serves well as a package global logger that can be changed by
// importers.
type SwapLogger struct {
logger atomic.Value
}

type loggerStruct struct {
Logger
}

// Log implements the Logger interface by forwarding keyvals to the currently
// wrapped logger. It does not log anything if the wrapped logger is nil.
func (l *SwapLogger) Log(keyvals ...interface{}) error {
s, ok := l.logger.Load().(loggerStruct)
if !ok || s.Logger == nil {
return nil
}
return s.Log(keyvals...)
}

// Swap replaces the currently wrapped logger with logger. Swap may be called
// concurrently with calls to Log from other goroutines.
func (l *SwapLogger) Swap(logger Logger) {
l.logger.Store(loggerStruct{logger})
}

// SyncWriter synchronizes concurrent writes to an io.Writer.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In what circumstances would you want to

logger := log.NewLogfmtLogger(log.NewSyncWriter(w))

instead of

logger := log.NewSyncLogger(log.NewLogfmtLogger(w))

? Could a little bit of guidance make its way into the comments on the types?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I plan to add docs like that before removing the WIP tag. Generally speaking you want to do the least amount of work you can within a mutex while making sure that each log event is handled completely while the lock is held. So the first form is preferred in this case because NewLogfmtLogger makes a single call to Write for each log event.

The second form is needed for loggers that perform multiple writes per log event. NewJSONLogger currently does that, and external implementations of Logger could as well.

So, crafting the above into some nice docs is still on the todo list for this PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha.

type SyncWriter struct {
mu sync.Mutex
w io.Writer
}

// NewSyncWriter returns a new SyncWriter. The returned writer is safe for
// concurrent use by multiple goroutines.
func NewSyncWriter(w io.Writer) *SyncWriter {
return &SyncWriter{w: w}
}

// Write writes p to the underlying io.Writer. If another write is already in
// progress, the calling goroutine blocks until the SyncWriter is available.
func (w *SyncWriter) Write(p []byte) (n int, err error) {
w.mu.Lock()
n, err = w.w.Write(p)
w.mu.Unlock()
return n, err
}

// syncLogger provides concurrent safe logging for another Logger.
type syncLogger struct {
mu sync.Mutex
logger Logger
}

// NewSyncLogger returns a logger that synchronizes concurrent use of the
// wrapped logger. When multiple goroutines use the SyncLogger concurrently
// only one goroutine will be allowed to log to the wrapped logger at a time.
// The other goroutines will block until the logger is available.
func NewSyncLogger(logger Logger) Logger {
return &syncLogger{logger: logger}
}

// Log logs keyvals to the underlying Logger. If another log is already in
// progress, the calling goroutine blocks until the syncLogger is available.
func (l *syncLogger) Log(keyvals ...interface{}) error {
l.mu.Lock()
err := l.logger.Log(keyvals...)
l.mu.Unlock()
return err
}

// AsyncLogger provides buffered asynchronous and concurrent safe logging for
// another logger.
//
// If the wrapped logger's Log method ever returns an error, the AsyncLogger
// will stop processing log events and make the error available via the Err
// method. Any unprocessed log events in the buffer will be lost.
type AsyncLogger struct {
logger Logger
keyvalsC chan []interface{}

stopping chan struct{}
stopped chan struct{}

mu sync.Mutex
err error
}

// NewAsyncLogger returns a new AsyncLogger that logs to logger and can buffer
// up to size log events before overflowing.
func NewAsyncLogger(logger Logger, size int) *AsyncLogger {
l := &AsyncLogger{
logger: logger,
keyvalsC: make(chan []interface{}, size),
stopping: make(chan struct{}),
stopped: make(chan struct{}),
}
go l.run()
return l
}

// run forwards log events from l.keyvalsC to l.logger.
func (l *AsyncLogger) run() {
defer close(l.stopped)
for keyvals := range l.keyvalsC {
err := l.logger.Log(keyvals...)
if err != nil {
l.mu.Lock()
l.stop(err)
l.mu.Unlock()
return
}
}
}

// caller must hold l.mu
func (l *AsyncLogger) stop(err error) {
if err != nil && l.err == nil {
l.err = err
}
select {
case <-l.stopping:
// already stopping, do nothing
default:
close(l.stopping)
close(l.keyvalsC)
}
}

// Log queues keyvals for logging by the wrapped Logger. Log may be called
// concurrently by multiple goroutines. If the the buffer is full, Log will
// return ErrAsyncLoggerOverflow and the keyvals are not queued. If the
// AsyncLogger is stopping, Log will return ErrAsyncLoggerStopping.
func (l *AsyncLogger) Log(keyvals ...interface{}) error {
l.mu.Lock()
defer l.mu.Unlock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is interesting. At first I thought it was an oversight, but now I see it's to prevent a race. Still, it seems a shame to require a mutex op with every Log; my intuition is that there's a clever way to avoid it...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, no. If AsyncLogger's contract is that all successful Log invocations make it to the writer, then you do indeed need some explicit synchronization.


select {
case <-l.stopping:
return ErrAsyncLoggerStopping
default:
}

select {
case l.keyvalsC <- keyvals:
return nil
default:
return ErrAsyncLoggerOverflow
}
}

// Errors returned by AsyncLogger.
var (
ErrAsyncLoggerStopping = errors.New("aysnc logger: logger stopped")
ErrAsyncLoggerOverflow = errors.New("aysnc logger: log buffer overflow")
)

// Stop stops the AsyncLogger. After stop returns the logger will not accept
// new log events. Log events queued prior to calling Stop will be logged.
func (l *AsyncLogger) Stop() {
l.mu.Lock()
l.stop(nil)
l.mu.Unlock()
}

// Stopping returns a channel that is closed after Stop is called.
func (l *AsyncLogger) Stopping() <-chan struct{} {
return l.stopping
}

// Stopped returns a channel that is closed after Stop is called and all log
// events have been sent to the wrapped logger.
func (l *AsyncLogger) Stopped() <-chan struct{} {
return l.stopped
}

// Err returns the first error returned by the wrapped logger.
func (l *AsyncLogger) Err() error {
l.mu.Lock()
err := l.err
l.mu.Unlock()
return err
}
Loading