From fe0202a0415142f4ed718646003dea360e81d8b6 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Wed, 19 May 2021 11:45:51 +0200 Subject: [PATCH 1/3] rlp: add concurrent encode benchmark --- rlp/encode_test.go | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/rlp/encode_test.go b/rlp/encode_test.go index 74e8ededcb7e..0177bb0350d9 100644 --- a/rlp/encode_test.go +++ b/rlp/encode_test.go @@ -23,6 +23,7 @@ import ( "io" "io/ioutil" "math/big" + "runtime" "sync" "testing" @@ -480,3 +481,35 @@ func BenchmarkEncodeBigInts(b *testing.B) { } } } + +func BenchmarkEncodeConcurrentInterface(b *testing.B) { + type struct1 struct { + A string + B *big.Int + C [20]byte + } + value := []interface{}{ + uint(999), + &struct1{A: "hello", B: big.NewInt(0xFFFFFFFF)}, + [10]byte{1, 2, 3, 4, 5, 6}, + []string{"yeah", "yeah", "yeah"}, + } + + var wg sync.WaitGroup + for cpu := 0; cpu < runtime.NumCPU(); cpu++ { + wg.Add(1) + go func() { + defer wg.Done() + + var buffer bytes.Buffer + for i := 0; i < b.N; i++ { + buffer.Reset() + err := Encode(&buffer, value) + if err != nil { + panic(err) + } + } + }() + } + wg.Wait() +} From b63a3651c089e1a6a3ed68a5705b0f802f0280bb Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 18 May 2021 21:39:17 +0200 Subject: [PATCH 2/3] rlp: use atomic.Value for type cache All encoding/decoding operations read the type cache to find the writer/decoder function responsible for a type. When analyzing CPU profiles of geth during sync, I found that the use of sync.RWMutex in cache lookups appears in the profiles. It seems we are running into cache contention problems because package rlp is heavily used on all CPU cores during sync. This change makes it use atomic.Value + a writer lock instead of sync.RWMutex. In the common case where the typeinfo entry is present in the cache, we simply fetch the map and lookup the type. Unfortunately, it is very hard to prove whether this is an improvement because single-threaded benchmarks won't hit the slowdown. --- rlp/decode.go | 4 +-- rlp/encode.go | 4 +-- rlp/typecache.go | 78 ++++++++++++++++++++++++++++++++---------------- 3 files changed, 57 insertions(+), 29 deletions(-) diff --git a/rlp/decode.go b/rlp/decode.go index 976780971773..e4262b64df9c 100644 --- a/rlp/decode.go +++ b/rlp/decode.go @@ -245,7 +245,7 @@ func makeListDecoder(typ reflect.Type, tag tags) (decoder, error) { } return decodeByteSlice, nil } - etypeinfo := cachedTypeInfo1(etype, tags{}) + etypeinfo := theTC.infoWhileGenerating(etype, tags{}) if etypeinfo.decoderErr != nil { return nil, etypeinfo.decoderErr } @@ -424,7 +424,7 @@ func zeroFields(structval reflect.Value, fields []field) { // makePtrDecoder creates a decoder that decodes into the pointer's element type. func makePtrDecoder(typ reflect.Type, tag tags) (decoder, error) { etype := typ.Elem() - etypeinfo := cachedTypeInfo1(etype, tags{}) + etypeinfo := theTC.infoWhileGenerating(etype, tags{}) switch { case etypeinfo.decoderErr != nil: return nil, etypeinfo.decoderErr diff --git a/rlp/encode.go b/rlp/encode.go index b7e74a133f2e..2e1b0102caae 100644 --- a/rlp/encode.go +++ b/rlp/encode.go @@ -517,7 +517,7 @@ func writeInterface(val reflect.Value, w *encbuf) error { } func makeSliceWriter(typ reflect.Type, ts tags) (writer, error) { - etypeinfo := cachedTypeInfo1(typ.Elem(), tags{}) + etypeinfo := theTC.infoWhileGenerating(typ.Elem(), tags{}) if etypeinfo.writerErr != nil { return nil, etypeinfo.writerErr } @@ -585,7 +585,7 @@ func makeStructWriter(typ reflect.Type) (writer, error) { } func makePtrWriter(typ reflect.Type, ts tags) (writer, error) { - etypeinfo := cachedTypeInfo1(typ.Elem(), tags{}) + etypeinfo := theTC.infoWhileGenerating(typ.Elem(), tags{}) if etypeinfo.writerErr != nil { return nil, etypeinfo.writerErr } diff --git a/rlp/typecache.go b/rlp/typecache.go index 3910dcf0806e..9f68edbdd0d3 100644 --- a/rlp/typecache.go +++ b/rlp/typecache.go @@ -21,13 +21,10 @@ import ( "reflect" "strings" "sync" + "sync/atomic" ) -var ( - typeCacheMutex sync.RWMutex - typeCache = make(map[typekey]*typeinfo) -) - +// typeinfo is an entry in the type cache. type typeinfo struct { decoder decoder decoderErr error // error from makeDecoder @@ -65,41 +62,72 @@ type decoder func(*Stream, reflect.Value) error type writer func(reflect.Value, *encbuf) error +var theTC = newTypeCache() + +type typeCache struct { + cur atomic.Value + + // This lock synchronizes writers. + mu sync.Mutex + next map[typekey]*typeinfo +} + +func newTypeCache() *typeCache { + c := new(typeCache) + c.cur.Store(make(map[typekey]*typeinfo)) + return c +} + func cachedDecoder(typ reflect.Type) (decoder, error) { - info := cachedTypeInfo(typ, tags{}) + info := theTC.info(typ) return info.decoder, info.decoderErr } func cachedWriter(typ reflect.Type) (writer, error) { - info := cachedTypeInfo(typ, tags{}) + info := theTC.info(typ) return info.writer, info.writerErr } -func cachedTypeInfo(typ reflect.Type, tags tags) *typeinfo { - typeCacheMutex.RLock() - info := typeCache[typekey{typ, tags}] - typeCacheMutex.RUnlock() - if info != nil { +func (c *typeCache) info(typ reflect.Type) *typeinfo { + key := typekey{Type: typ} + if info := c.cur.Load().(map[typekey]*typeinfo)[key]; info != nil { return info } - // not in the cache, need to generate info for this type. - typeCacheMutex.Lock() - defer typeCacheMutex.Unlock() - return cachedTypeInfo1(typ, tags) + + // Not in the cache, need to generate info for this type. + return c.generate(typ, tags{}) +} + +func (c *typeCache) generate(typ reflect.Type, tags tags) *typeinfo { + c.mu.Lock() + defer c.mu.Unlock() + + // Copy cur to next. + cur := c.cur.Load().(map[typekey]*typeinfo) + c.next = make(map[typekey]*typeinfo, len(cur)+1) + for k, v := range cur { + c.next[k] = v + } + + // Generate. + info := c.infoWhileGenerating(typ, tags) + + // next -> cur + c.cur.Store(c.next) + c.next = nil + return info } -func cachedTypeInfo1(typ reflect.Type, tags tags) *typeinfo { +func (c *typeCache) infoWhileGenerating(typ reflect.Type, tags tags) *typeinfo { key := typekey{typ, tags} - info := typeCache[key] - if info != nil { - // another goroutine got the write lock first + if info := c.next[key]; info != nil { return info } - // put a dummy value into the cache before generating. - // if the generator tries to lookup itself, it will get + // Put a dummy value into the cache before generating. + // If the generator tries to lookup itself, it will get // the dummy value and won't call itself recursively. - info = new(typeinfo) - typeCache[key] = info + info := new(typeinfo) + c.next[key] = info info.generate(typ, tags) return info } @@ -133,7 +161,7 @@ func structFields(typ reflect.Type) (fields []field, err error) { } else if anyOptional { return nil, fmt.Errorf(`rlp: struct field %v.%s needs "optional" tag`, typ, f.Name) } - info := cachedTypeInfo1(f.Type, tags) + info := theTC.infoWhileGenerating(f.Type, tags) fields = append(fields, field{i, info, tags.optional}) } } From 4ee540da6d4624213068c29040c4505a428beee8 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Fri, 21 May 2021 13:45:23 +0200 Subject: [PATCH 3/3] rlp: check current typecache after taking lock --- rlp/typecache.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/rlp/typecache.go b/rlp/typecache.go index 9f68edbdd0d3..62553d3b55c1 100644 --- a/rlp/typecache.go +++ b/rlp/typecache.go @@ -102,8 +102,12 @@ func (c *typeCache) generate(typ reflect.Type, tags tags) *typeinfo { c.mu.Lock() defer c.mu.Unlock() - // Copy cur to next. cur := c.cur.Load().(map[typekey]*typeinfo) + if info := cur[typekey{typ, tags}]; info != nil { + return info + } + + // Copy cur to next. c.next = make(map[typekey]*typeinfo, len(cur)+1) for k, v := range cur { c.next[k] = v