Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add mutex to protect against race conditions #11

Merged
merged 1 commit into from
Oct 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 33 additions & 22 deletions core.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type core struct {
// permLabels is a collection of labels that have been added to the logger
// through the use of `With()`. These labels should never be cleared after
// logging a single entry, unlike `tempLabel`.
permLabels labels
permLabels *labels

// tempLabels keeps a record of all the labels that need to be applied to the
// current log entry. Zap serializes log fields at different parts of the
Expand All @@ -26,27 +26,29 @@ type core struct {
// Instead, we have to filter out these labels at both locations, and then add
// them back in the proper format right before we call `Write` on the original
// Zap core.
tempLabels labels
tempLabels *labels
}

// WrapCore returns a `zap.Option` that wraps the default core with the
// zapdriver one.
func WrapCore() zap.Option {
return zap.WrapCore(func(c zapcore.Core) zapcore.Core {
return &core{c, labels{}, labels{}}
return &core{c, newLabels(), newLabels()}
})
}

// With adds structured context to the Core.
func (c *core) With(fields []zap.Field) zapcore.Core {
var lbls labels
var lbls *labels
lbls, fields = c.extractLabels(fields)

for k, v := range lbls {
c.permLabels[k] = v
lbls.mutex.Lock()
for k, v := range lbls.store {
c.permLabels.store[k] = v
}
lbls.mutex.Unlock()

return &core{c.Core.With(fields), c.permLabels, labels{}}
return &core{c.Core.With(fields), c.permLabels, newLabels()}
}

// Check determines whether the supplied Entry should be logged (using the
Expand All @@ -64,17 +66,19 @@ func (c *core) Check(ent zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.Check
}

func (c *core) Write(ent zapcore.Entry, fields []zapcore.Field) error {
var lbls labels
var lbls *labels
lbls, fields = c.extractLabels(fields)

for k, v := range lbls {
c.tempLabels[k] = v
lbls.mutex.Lock()
for k, v := range lbls.store {
c.tempLabels.store[k] = v
}
lbls.mutex.Unlock()

fields = append(fields, labelsField(c.allLabels()))
fields = c.withSourceLocation(ent, fields)

c.tempLabels = labels{}
c.tempLabels = newLabels()

return c.Core.Write(ent, fields)
}
Expand All @@ -84,47 +88,54 @@ func (c *core) Sync() error {
return c.Core.Sync()
}

func (c *core) allLabels() labels {
lbls := labels{}
for k, v := range c.permLabels {
lbls[k] = v
func (c *core) allLabels() *labels {
lbls := newLabels()

lbls.mutex.Lock()
for k, v := range c.permLabels.store {
lbls.store[k] = v
}

for k, v := range c.tempLabels {
lbls[k] = v
for k, v := range c.tempLabels.store {
lbls.store[k] = v
}
lbls.mutex.Unlock()

return lbls
}

func (c *core) extractLabels(fields []zapcore.Field) (labels, []zapcore.Field) {
lbls := labels{}
func (c *core) extractLabels(fields []zapcore.Field) (*labels, []zapcore.Field) {
lbls := newLabels()
out := []zapcore.Field{}

lbls.mutex.Lock()
for i := range fields {
if !isLabelField(fields[i]) {
out = append(out, fields[i])
continue
}

lbls[strings.Replace(fields[i].Key, "labels.", "", 1)] = fields[i].String
lbls.store[strings.Replace(fields[i].Key, "labels.", "", 1)] = fields[i].String
}
lbls.mutex.Unlock()

return lbls, out
}

func (c *core) withLabels(fields []zapcore.Field) []zapcore.Field {
lbls := labels{}
lbls := newLabels()
out := []zapcore.Field{}

lbls.mutex.Lock()
for i := range fields {
if isLabelField(fields[i]) {
lbls[strings.Replace(fields[i].Key, "labels.", "", 1)] = fields[i].String
lbls.store[strings.Replace(fields[i].Key, "labels.", "", 1)] = fields[i].String
continue
}

out = append(out, fields[i])
}
lbls.mutex.Unlock()

return append(out, labelsField(lbls))
}
Expand Down
46 changes: 37 additions & 9 deletions core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,20 @@ func TestWithLabels(t *testing.T) {
Label("two", "value"),
}

labels := newLabels()
labels.store = map[string]string{"one": "value", "two": "value"}

want := []zap.Field{
zap.String("hello", "world"),
zap.Object("labels", labels(map[string]string{"one": "value", "two": "value"})),
zap.Object("labels", labels),
}

assert.Equal(t, want, (&core{}).withLabels(fields))
}

func TestExtractLabels(t *testing.T) {
var lbls labels
c := &core{zapcore.NewNopCore(), labels{}, labels{}}
var lbls *labels
c := &core{zapcore.NewNopCore(), newLabels(), newLabels()}

fields := []zap.Field{
zap.String("hello", "world"),
Expand All @@ -39,9 +42,12 @@ func TestExtractLabels(t *testing.T) {

lbls, fields = c.extractLabels(fields)

require.Len(t, lbls, 2)
assert.Equal(t, "world", lbls["one"])
assert.Equal(t, "worlds", lbls["two"])
require.Len(t, lbls.store, 2)

lbls.mutex.RLock()
assert.Equal(t, "world", lbls.store["one"])
assert.Equal(t, "worlds", lbls.store["two"])
lbls.mutex.RUnlock()

require.Len(t, fields, 1)
assert.Equal(t, zap.String("hello", "world"), fields[0])
Expand Down Expand Up @@ -86,8 +92,11 @@ func TestWithSourceLocation_OnlyWhenDefined(t *testing.T) {
}

func TestWrite(t *testing.T) {
temp := newLabels()
temp.store = map[string]string{"one": "1", "two": "2"}

debugcore, logs := observer.New(zapcore.DebugLevel)
core := &core{debugcore, labels{}, labels{}}
core := &core{debugcore, newLabels(), temp}

fields := []zap.Field{
zap.String("hello", "world"),
Expand All @@ -103,7 +112,7 @@ func TestWrite(t *testing.T) {

func TestWithAndWrite(t *testing.T) {
debugcore, logs := observer.New(zapcore.DebugLevel)
core := zapcore.Core(&core{debugcore, labels{}, labels{}})
core := zapcore.Core(&core{debugcore, newLabels(), newLabels()})

core = core.With([]zapcore.Field{Label("one", "world")})
err := core.Write(zapcore.Entry{}, []zapcore.Field{Label("two", "worlds")})
Expand All @@ -117,7 +126,7 @@ func TestWithAndWrite(t *testing.T) {

func TestWithAndWrite_MultipleEntries(t *testing.T) {
debugcore, logs := observer.New(zapcore.DebugLevel)
core := zapcore.Core(&core{debugcore, labels{}, labels{}})
core := zapcore.Core(&core{debugcore, newLabels(), newLabels()})

core = core.With([]zapcore.Field{Label("one", "world")})
err := core.Write(zapcore.Entry{}, []zapcore.Field{Label("two", "worlds")})
Expand All @@ -138,3 +147,22 @@ func TestWithAndWrite_MultipleEntries(t *testing.T) {
assert.Equal(t, "world", labels["one"])
assert.Equal(t, "worlds", labels["three"])
}

func TestAllLabels(t *testing.T) {
perm := newLabels()
perm.store = map[string]string{"one": "1", "two": "2", "three": "3"}

temp := newLabels()
temp.store = map[string]string{"one": "ONE", "three": "THREE"}

core := &core{zapcore.NewNopCore(), perm, temp}

out := core.allLabels()
require.Len(t, out.store, 3)

out.mutex.RLock()
assert.Equal(t, out.store["one"], "ONE")
assert.Equal(t, out.store["two"], "2")
assert.Equal(t, out.store["three"], "THREE")
out.mutex.RUnlock()
}
30 changes: 24 additions & 6 deletions label.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package zapdriver

import (
"strings"
"sync"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand All @@ -21,13 +22,15 @@ func Label(key, value string) zap.Field {
// string `labels.` and their value type set to StringType. It then wraps those
// key/value pairs in a top-level `labels` namespace.
func Labels(fields ...zap.Field) zap.Field {
lbls := labels{}
lbls := newLabels()

lbls.mutex.Lock()
for i := range fields {
if isLabelField(fields[i]) {
lbls[strings.Replace(fields[i].Key, "labels.", "", 1)] = fields[i].String
lbls.store[strings.Replace(fields[i].Key, "labels.", "", 1)] = fields[i].String
}
}
lbls.mutex.Unlock()

return labelsField(lbls)
}
Expand All @@ -36,16 +39,31 @@ func isLabelField(field zap.Field) bool {
return strings.HasPrefix(field.Key, "labels.") && field.Type == zapcore.StringType
}

func labelsField(l map[string]string) zap.Field {
return zap.Object("labels", labels(l))
func labelsField(l *labels) zap.Field {
return zap.Object("labels", l)
}

type labels map[string]string
type labels struct {
store map[string]string
mutex *sync.RWMutex
}

func newLabels() *labels {
return &labels{store: map[string]string{}, mutex: &sync.RWMutex{}}
}

func (l *labels) Add(key, value string) {
l.mutex.Lock()
l.store[key] = value
l.mutex.Unlock()
}

func (l labels) MarshalLogObject(enc zapcore.ObjectEncoder) error {
for k, v := range l {
l.mutex.RLock()
for k, v := range l.store {
enc.AddString(k, v)
}
l.mutex.RUnlock()

return nil
}
5 changes: 4 additions & 1 deletion label_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,8 @@ func TestLabels(t *testing.T) {
Label("hi", "universe"),
)

assert.Equal(t, zap.Object("labels", labels{"hello": "world", "hi": "universe"}), field)
labels := newLabels()
labels.store = map[string]string{"hello": "world", "hi": "universe"}

assert.Equal(t, zap.Object("labels", labels), field)
}