From 6a863244573b896135daace4bad3c8e4d4fc9d78 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Tue, 17 Jul 2018 09:42:36 -0700 Subject: [PATCH 1/6] share a pool of (un)marshallers Also, use a cloner instead of unmarshalling when wrapping an object. --- encoding/cloner.go | 29 +++++++++++++++ encoding/marshaller.go | 79 ++++++++++++++++++++++++++++++++++++++++ encoding/unmarshaller.go | 76 ++++++++++++++++++++++++++++++++++++++ node.go | 69 ++++++++--------------------------- refmt.go | 71 ++++++++++++++++++++++++++++++++++++ wrap_test.go | 60 ++++++++++++++++++++++++++++++ 6 files changed, 330 insertions(+), 54 deletions(-) create mode 100644 encoding/cloner.go create mode 100644 encoding/marshaller.go create mode 100644 encoding/unmarshaller.go create mode 100644 refmt.go create mode 100644 wrap_test.go diff --git a/encoding/cloner.go b/encoding/cloner.go new file mode 100644 index 0000000..33ecb04 --- /dev/null +++ b/encoding/cloner.go @@ -0,0 +1,29 @@ +package encoding + +import ( + refmt "github.com/polydawn/refmt" + "github.com/polydawn/refmt/obj/atlas" +) + +// PooledCloner is a thread-safe pooled object cloner. +type PooledCloner struct { + Count int + cloners chan refmt.Cloner +} + +// SetAtlas set sets the pool's atlas. It is *not* safe to call this +// concurrently. +func (p *PooledCloner) SetAtlas(atlas atlas.Atlas) { + p.cloners = make(chan refmt.Cloner, p.Count) + for len(p.cloners) < cap(p.cloners) { + p.cloners <- refmt.NewCloner(atlas) + } +} + +// Clone clones a into b using a cloner from the pool. +func (p *PooledCloner) Clone(a, b interface{}) error { + c := <-p.cloners + err := c.Clone(a, b) + p.cloners <- c + return err +} diff --git a/encoding/marshaller.go b/encoding/marshaller.go new file mode 100644 index 0000000..358605e --- /dev/null +++ b/encoding/marshaller.go @@ -0,0 +1,79 @@ +package encoding + +import ( + "bytes" + "io" + + cbor "github.com/polydawn/refmt/cbor" + "github.com/polydawn/refmt/obj/atlas" +) + +type proxyWriter struct { + w io.Writer +} + +func (w *proxyWriter) Write(b []byte) (int, error) { + return w.w.Write(b) +} + +// Marshaller is a reusbale CBOR marshaller. +type Marshaller struct { + marshal *cbor.Marshaller + writer proxyWriter +} + +// NewMarshallerAtlased constructs a new cbor Marshaller using the given atlas. +func NewMarshallerAtlased(atl atlas.Atlas) *Marshaller { + m := new(Marshaller) + m.marshal = cbor.NewMarshallerAtlased(&m.writer, atl) + return m +} + +// Encode encodes the given object to the given writer. +func (m *Marshaller) Encode(obj interface{}, w io.Writer) error { + m.writer.w = w + err := m.marshal.Marshal(obj) + m.writer.w = nil + return err +} + +// Marshal marshels the given object to a byte slice. +func (m *Marshaller) Marshal(obj interface{}) ([]byte, error) { + var buf bytes.Buffer + if err := m.Encode(obj, &buf); err != nil { + return nil, err + } + return buf.Bytes(), nil +} + +// PooledMarshaller is a thread-safe pooled CBOR marshaller. +type PooledMarshaller struct { + Count int + marshallers chan *Marshaller +} + +// SetAtlas set sets the pool's atlas. It is *not* safe to call this +// concurrently. +func (p *PooledMarshaller) SetAtlas(atlas atlas.Atlas) { + p.marshallers = make(chan *Marshaller, p.Count) + for len(p.marshallers) < cap(p.marshallers) { + p.marshallers <- NewMarshallerAtlased(atlas) + } +} + +// Marshal marshals the passed object using the pool of marshallers. +func (p *PooledMarshaller) Marshal(obj interface{}) ([]byte, error) { + m := <-p.marshallers + bts, err := m.Marshal(obj) + p.marshallers <- m + return bts, err +} + +// Encode encodes the passed object to the given writer using the pool of +// marshallers. +func (p *PooledMarshaller) Encode(obj interface{}, w io.Writer) error { + m := <-p.marshallers + err := m.Encode(obj, w) + p.marshallers <- m + return err +} diff --git a/encoding/unmarshaller.go b/encoding/unmarshaller.go new file mode 100644 index 0000000..9a2d42a --- /dev/null +++ b/encoding/unmarshaller.go @@ -0,0 +1,76 @@ +package encoding + +import ( + "bytes" + "io" + + cbor "github.com/polydawn/refmt/cbor" + "github.com/polydawn/refmt/obj/atlas" +) + +type proxyReader struct { + r io.Reader +} + +func (r *proxyReader) Read(b []byte) (int, error) { + return r.r.Read(b) +} + +// Unmarshaller is a reusable CBOR unmarshaller. +type Unmarshaller struct { + unmarshal *cbor.Unmarshaller + reader proxyReader +} + +// NewUnmarshallerAtlased creates a new reusable unmarshaller. +func NewUnmarshallerAtlased(atl atlas.Atlas) *Unmarshaller { + m := new(Unmarshaller) + m.unmarshal = cbor.NewUnmarshallerAtlased(&m.reader, atl) + return m +} + +// Decode reads a CBOR object from the given reader and decodes it into the +// given object. +func (m *Unmarshaller) Decode(r io.Reader, obj interface{}) error { + m.reader.r = r + err := m.unmarshal.Unmarshal(obj) + m.reader.r = nil + return err +} + +// Unmarshal unmarshals the given CBOR byte slice into the given object. +func (m *Unmarshaller) Unmarshal(b []byte, obj interface{}) error { + return m.Decode(bytes.NewReader(b), obj) +} + +// PooledUnmarshaller is a thread-safe pooled CBOR unmarshaller. +type PooledUnmarshaller struct { + Count int + unmarshallers chan *Unmarshaller +} + +// SetAtlas set sets the pool's atlas. It is *not* safe to call this +// concurrently. +func (p *PooledUnmarshaller) SetAtlas(atlas atlas.Atlas) { + p.unmarshallers = make(chan *Unmarshaller, p.Count) + for len(p.unmarshallers) < cap(p.unmarshallers) { + p.unmarshallers <- NewUnmarshallerAtlased(atlas) + } +} + +// Decode decodes an object from the passed reader into the given object using +// the pool of unmarshallers. +func (p *PooledUnmarshaller) Decode(r io.Reader, obj interface{}) error { + u := <-p.unmarshallers + err := u.Decode(r, obj) + p.unmarshallers <- u + return err +} + +// Unmarshal unmarshals the passed object using the pool of unmarshallers. +func (p *PooledUnmarshaller) Unmarshal(b []byte, obj interface{}) error { + u := <-p.unmarshallers + err := u.Unmarshal(b, obj) + p.unmarshallers <- u + return err +} diff --git a/node.go b/node.go index 64430d6..76678f0 100644 --- a/node.go +++ b/node.go @@ -6,7 +6,6 @@ import ( "fmt" "io" "math" - "math/big" "strconv" "strings" @@ -14,9 +13,6 @@ import ( cid "github.com/ipfs/go-cid" node "github.com/ipfs/go-ipld-format" mh "github.com/multiformats/go-multihash" - - cbor "github.com/polydawn/refmt/cbor" - "github.com/polydawn/refmt/obj/atlas" ) // CBORTagLink is the integer used to represent tags in CBOR. @@ -47,49 +43,6 @@ var ( ErrNonStringLink = errors.New("link should have been a string") ) -// This atlas describes the CBOR Tag (42) for IPLD links, such that refmt can marshal and unmarshal them -var cidAtlasEntry = atlas.BuildEntry(cid.Cid{}). - UseTag(CBORTagLink). - Transform(). - TransformMarshal(atlas.MakeMarshalTransformFunc( - castCidToBytes, - )). - TransformUnmarshal(atlas.MakeUnmarshalTransformFunc( - castBytesToCid, - )). - Complete() - -var bigIntAtlasEntry = atlas.BuildEntry(big.Int{}).Transform(). - TransformMarshal(atlas.MakeMarshalTransformFunc( - func(i big.Int) ([]byte, error) { - return i.Bytes(), nil - })). - TransformUnmarshal(atlas.MakeUnmarshalTransformFunc( - func(x []byte) (big.Int, error) { - return *big.NewInt(0).SetBytes(x), nil - })). - Complete() - -var cborAtlas atlas.Atlas -var cborSortingMode = atlas.KeySortMode_RFC7049 -var atlasEntries = []*atlas.AtlasEntry{cidAtlasEntry, bigIntAtlasEntry} - -func init() { - cborAtlas = atlas.MustBuild(cidAtlasEntry, bigIntAtlasEntry).WithMapMorphism(atlas.MapMorphism{atlas.KeySortMode_RFC7049}) -} - -// RegisterCborType allows to register a custom cbor type -func RegisterCborType(i interface{}) { - var entry *atlas.AtlasEntry - if ae, ok := i.(*atlas.AtlasEntry); ok { - entry = ae - } else { - entry = atlas.BuildEntry(i).StructMap().AutogenerateWithSortingScheme(atlas.KeySortMode_RFC7049).Complete() - } - atlasEntries = append(atlasEntries, entry) - cborAtlas = atlas.MustBuild(atlasEntries...).WithMapMorphism(atlas.MapMorphism{atlas.KeySortMode_RFC7049}) -} - // DecodeBlock decodes a CBOR encoded Block into an IPLD Node. // // This method *does not* canonicalize and *will* preserve the CID. As a matter @@ -111,7 +64,10 @@ func decodeBlock(block blocks.Block) (*Node, error) { if err := DecodeInto(block.RawData(), &m); err != nil { return nil, err } + return newObject(block, m) +} +func newObject(block blocks.Block, m interface{}) (*Node, error) { tree, err := compTree(m) if err != nil { return nil, err @@ -154,16 +110,22 @@ func Decode(b []byte, mhType uint64, mhLen int) (*Node, error) { // DecodeInto decodes a serialized IPLD cbor object into the given object. func DecodeInto(b []byte, v interface{}) error { - // The cbor library really doesnt make this sort of operation easy on us - return cbor.UnmarshalAtlased(b, v, cborAtlas) + return unmarshaller.Unmarshal(b, v) } // WrapObject converts an arbitrary object into a Node. func WrapObject(m interface{}, mhType uint64, mhLen int) (*Node, error) { - data, err := cbor.MarshalAtlased(m, cborAtlas) + data, err := marshaller.Marshal(m) if err != nil { return nil, err } + + var obj interface{} + err = cloner.Clone(m, &obj) + if err != nil { + return nil, err + } + if mhType == math.MaxUint64 { mhType = mh.SHA2_256 } @@ -179,9 +141,8 @@ func WrapObject(m interface{}, mhType uint64, mhLen int) (*Node, error) { // TODO: Shouldn't this just panic? return nil, err } - // Do not reuse `m`. We need to re-decode it to put it in the right - // form. - return decodeBlock(block) + // No need to deserialize. We can just deep copy. + return newObject(block, obj) } // Resolve resolves a given path, and returns the object found at the end, as well @@ -457,7 +418,7 @@ func (n *Node) MarshalJSON() ([]byte, error) { // DumpObject marshals any object into its CBOR serialized byte representation // TODO: rename func DumpObject(obj interface{}) (out []byte, err error) { - return cbor.MarshalAtlased(obj, cborAtlas) + return marshaller.Marshal(obj) } func toSaneMap(n map[interface{}]interface{}) (interface{}, error) { diff --git a/refmt.go b/refmt.go new file mode 100644 index 0000000..8b5ea8a --- /dev/null +++ b/refmt.go @@ -0,0 +1,71 @@ +package cbornode + +import ( + "math/big" + "runtime" + + cid "github.com/ipfs/go-cid" + + encoding "github.com/ipfs/go-ipld-cbor/encoding" + + "github.com/polydawn/refmt/obj/atlas" +) + +// This atlas describes the CBOR Tag (42) for IPLD links, such that refmt can marshal and unmarshal them +var cidAtlasEntry = atlas.BuildEntry(cid.Cid{}). + UseTag(CBORTagLink). + Transform(). + TransformMarshal(atlas.MakeMarshalTransformFunc( + castCidToBytes, + )). + TransformUnmarshal(atlas.MakeUnmarshalTransformFunc( + castBytesToCid, + )). + Complete() + +var bigIntAtlasEntry = atlas.BuildEntry(big.Int{}).Transform(). + TransformMarshal(atlas.MakeMarshalTransformFunc( + func(i big.Int) ([]byte, error) { + return i.Bytes(), nil + })). + TransformUnmarshal(atlas.MakeUnmarshalTransformFunc( + func(x []byte) (big.Int, error) { + return *big.NewInt(0).SetBytes(x), nil + })). + Complete() + +var cborAtlas atlas.Atlas +var cborSortingMode = atlas.KeySortMode_RFC7049 +var atlasEntries = []*atlas.AtlasEntry{cidAtlasEntry, bigIntAtlasEntry} + +var ( + numWorkers = runtime.NumCPU() * 2 + cloner = encoding.PooledCloner{Count: numWorkers} + unmarshaller = encoding.PooledUnmarshaller{Count: numWorkers} + marshaller = encoding.PooledMarshaller{Count: numWorkers} +) + +func init() { + rebuildAtlas() +} + +func rebuildAtlas() { + cborAtlas = atlas.MustBuild(atlasEntries...). + WithMapMorphism(atlas.MapMorphism{atlas.KeySortMode_RFC7049}) + + marshaller.SetAtlas(cborAtlas) + unmarshaller.SetAtlas(cborAtlas) + cloner.SetAtlas(cborAtlas) +} + +// RegisterCborType allows to register a custom cbor type +func RegisterCborType(i interface{}) { + var entry *atlas.AtlasEntry + if ae, ok := i.(*atlas.AtlasEntry); ok { + entry = ae + } else { + entry = atlas.BuildEntry(i).StructMap().AutogenerateWithSortingScheme(atlas.KeySortMode_RFC7049).Complete() + } + atlasEntries = append(atlasEntries, entry) + rebuildAtlas() +} diff --git a/wrap_test.go b/wrap_test.go new file mode 100644 index 0000000..7b33587 --- /dev/null +++ b/wrap_test.go @@ -0,0 +1,60 @@ +package cbornode + +import ( + "testing" + + mh "github.com/multiformats/go-multihash" +) + +type myStruct struct { + items map[string]myStruct + foo string + bar []byte + baz []int +} + +func init() { + RegisterCborType(myStruct{}) +} + +func testStruct() myStruct { + return myStruct{ + items: map[string]myStruct{ + "foo": { + foo: "foo", + bar: []byte("bar"), + baz: []int{1, 2, 3, 4}, + }, + "bar": { + bar: []byte("bar"), + baz: []int{1, 2, 3, 4}, + }, + }, + baz: []int{5, 1, 2}, + } +} + +func BenchmarkWrapObject(b *testing.B) { + obj := testStruct() + b.ResetTimer() + for i := 0; i < b.N; i++ { + nd, err := WrapObject(obj, mh.SHA2_256, -1) + if err != nil { + b.Fatal(err, nd) + } + } +} +func BenchmarkDecodeBlock(b *testing.B) { + obj := testStruct() + nd, err := WrapObject(obj, mh.SHA2_256, -1) + if err != nil { + b.Fatal(err, nd) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + nd2, err := DecodeBlock(nd) + if err != nil { + b.Fatal(err, nd2) + } + } +} From a7e0713ac9797ab6e13c87ca9a112a826f47cb72 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Tue, 17 Jul 2018 10:37:54 -0700 Subject: [PATCH 2/6] add a parallel benchmark Increasing `numWorkers` doesn't seem to help at all (not supprising). Really, we could probably drop to NumCPUs (instad of 2x that) and we'd still be fine. --- wrap_test.go | 44 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/wrap_test.go b/wrap_test.go index 7b33587..bd443be 100644 --- a/wrap_test.go +++ b/wrap_test.go @@ -1,6 +1,7 @@ package cbornode import ( + "sync" "testing" mh "github.com/multiformats/go-multihash" @@ -44,6 +45,7 @@ func BenchmarkWrapObject(b *testing.B) { } } } + func BenchmarkDecodeBlock(b *testing.B) { obj := testStruct() nd, err := WrapObject(obj, mh.SHA2_256, -1) @@ -58,3 +60,45 @@ func BenchmarkDecodeBlock(b *testing.B) { } } } + +func BenchmarkWrapObjectParallel(b *testing.B) { + obj := testStruct() + b.ResetTimer() + var wg sync.WaitGroup + wg.Add(100) + for j := 0; j < 100; j++ { + go func() { + defer wg.Done() + for i := 0; i < b.N; i++ { + nd, err := WrapObject(obj, mh.SHA2_256, -1) + if err != nil { + b.Fatal(err, nd) + } + } + }() + } + wg.Wait() +} + +func BenchmarkDecodeBlockParallel(b *testing.B) { + obj := testStruct() + nd, err := WrapObject(obj, mh.SHA2_256, -1) + if err != nil { + b.Fatal(err, nd) + } + b.ResetTimer() + var wg sync.WaitGroup + wg.Add(100) + for j := 0; j < 100; j++ { + go func() { + defer wg.Done() + for i := 0; i < b.N; i++ { + nd2, err := DecodeBlock(nd) + if err != nil { + b.Fatal(err, nd2) + } + } + }() + } + wg.Wait() +} From 1c730dfa9852850e3784e76f88c0934ae4714ca0 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Tue, 17 Jul 2018 10:39:04 -0700 Subject: [PATCH 3/6] drop to NumCPU + 1 workers More than that doesn't help. +1 helps for the case where a goroutine holding a worker gets unscheduled. --- refmt.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/refmt.go b/refmt.go index 8b5ea8a..74fac0e 100644 --- a/refmt.go +++ b/refmt.go @@ -39,7 +39,7 @@ var cborSortingMode = atlas.KeySortMode_RFC7049 var atlasEntries = []*atlas.AtlasEntry{cidAtlasEntry, bigIntAtlasEntry} var ( - numWorkers = runtime.NumCPU() * 2 + numWorkers = runtime.NumCPU() + 1 cloner = encoding.PooledCloner{Count: numWorkers} unmarshaller = encoding.PooledUnmarshaller{Count: numWorkers} marshaller = encoding.PooledMarshaller{Count: numWorkers} From a5d0fdcf745431e3b08796fb9846493d025aa69c Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Tue, 17 Jul 2018 13:58:44 -0700 Subject: [PATCH 4/6] use a sync.Pool for the refmt (un)marshallers IIRC, pools are cleaned every GC cycle. However, by benchmarks aren't actually *faster* with this change so I figured we might as well go for it. Our issue here is really throughput, not latency, so even *if* these pools are cleaned on GC, it's still useful. --- encoding/cloner.go | 18 ++++++++++-------- encoding/marshaller.go | 21 +++++++++++---------- encoding/unmarshaller.go | 21 +++++++++++---------- refmt.go | 8 +++----- 4 files changed, 35 insertions(+), 33 deletions(-) diff --git a/encoding/cloner.go b/encoding/cloner.go index 33ecb04..2f52d80 100644 --- a/encoding/cloner.go +++ b/encoding/cloner.go @@ -1,29 +1,31 @@ package encoding import ( + "sync" + refmt "github.com/polydawn/refmt" "github.com/polydawn/refmt/obj/atlas" ) // PooledCloner is a thread-safe pooled object cloner. type PooledCloner struct { - Count int - cloners chan refmt.Cloner + pool sync.Pool } // SetAtlas set sets the pool's atlas. It is *not* safe to call this // concurrently. -func (p *PooledCloner) SetAtlas(atlas atlas.Atlas) { - p.cloners = make(chan refmt.Cloner, p.Count) - for len(p.cloners) < cap(p.cloners) { - p.cloners <- refmt.NewCloner(atlas) +func (p *PooledCloner) SetAtlas(atl atlas.Atlas) { + p.pool = sync.Pool{ + New: func() interface{} { + return refmt.NewCloner(atl) + }, } } // Clone clones a into b using a cloner from the pool. func (p *PooledCloner) Clone(a, b interface{}) error { - c := <-p.cloners + c := p.pool.Get().(refmt.Cloner) err := c.Clone(a, b) - p.cloners <- c + p.pool.Put(c) return err } diff --git a/encoding/marshaller.go b/encoding/marshaller.go index 358605e..564226d 100644 --- a/encoding/marshaller.go +++ b/encoding/marshaller.go @@ -3,6 +3,7 @@ package encoding import ( "bytes" "io" + "sync" cbor "github.com/polydawn/refmt/cbor" "github.com/polydawn/refmt/obj/atlas" @@ -48,32 +49,32 @@ func (m *Marshaller) Marshal(obj interface{}) ([]byte, error) { // PooledMarshaller is a thread-safe pooled CBOR marshaller. type PooledMarshaller struct { - Count int - marshallers chan *Marshaller + pool sync.Pool } // SetAtlas set sets the pool's atlas. It is *not* safe to call this // concurrently. -func (p *PooledMarshaller) SetAtlas(atlas atlas.Atlas) { - p.marshallers = make(chan *Marshaller, p.Count) - for len(p.marshallers) < cap(p.marshallers) { - p.marshallers <- NewMarshallerAtlased(atlas) +func (p *PooledMarshaller) SetAtlas(atl atlas.Atlas) { + p.pool = sync.Pool{ + New: func() interface{} { + return NewMarshallerAtlased(atl) + }, } } // Marshal marshals the passed object using the pool of marshallers. func (p *PooledMarshaller) Marshal(obj interface{}) ([]byte, error) { - m := <-p.marshallers + m := p.pool.Get().(*Marshaller) bts, err := m.Marshal(obj) - p.marshallers <- m + p.pool.Put(m) return bts, err } // Encode encodes the passed object to the given writer using the pool of // marshallers. func (p *PooledMarshaller) Encode(obj interface{}, w io.Writer) error { - m := <-p.marshallers + m := p.pool.Get().(*Marshaller) err := m.Encode(obj, w) - p.marshallers <- m + p.pool.Put(m) return err } diff --git a/encoding/unmarshaller.go b/encoding/unmarshaller.go index 9a2d42a..3bb1f0b 100644 --- a/encoding/unmarshaller.go +++ b/encoding/unmarshaller.go @@ -3,6 +3,7 @@ package encoding import ( "bytes" "io" + "sync" cbor "github.com/polydawn/refmt/cbor" "github.com/polydawn/refmt/obj/atlas" @@ -45,32 +46,32 @@ func (m *Unmarshaller) Unmarshal(b []byte, obj interface{}) error { // PooledUnmarshaller is a thread-safe pooled CBOR unmarshaller. type PooledUnmarshaller struct { - Count int - unmarshallers chan *Unmarshaller + pool sync.Pool } // SetAtlas set sets the pool's atlas. It is *not* safe to call this // concurrently. -func (p *PooledUnmarshaller) SetAtlas(atlas atlas.Atlas) { - p.unmarshallers = make(chan *Unmarshaller, p.Count) - for len(p.unmarshallers) < cap(p.unmarshallers) { - p.unmarshallers <- NewUnmarshallerAtlased(atlas) +func (p *PooledUnmarshaller) SetAtlas(atl atlas.Atlas) { + p.pool = sync.Pool{ + New: func() interface{} { + return NewUnmarshallerAtlased(atl) + }, } } // Decode decodes an object from the passed reader into the given object using // the pool of unmarshallers. func (p *PooledUnmarshaller) Decode(r io.Reader, obj interface{}) error { - u := <-p.unmarshallers + u := p.pool.Get().(*Unmarshaller) err := u.Decode(r, obj) - p.unmarshallers <- u + p.pool.Put(u) return err } // Unmarshal unmarshals the passed object using the pool of unmarshallers. func (p *PooledUnmarshaller) Unmarshal(b []byte, obj interface{}) error { - u := <-p.unmarshallers + u := p.pool.Get().(*Unmarshaller) err := u.Unmarshal(b, obj) - p.unmarshallers <- u + p.pool.Put(u) return err } diff --git a/refmt.go b/refmt.go index 74fac0e..03c0bb5 100644 --- a/refmt.go +++ b/refmt.go @@ -2,7 +2,6 @@ package cbornode import ( "math/big" - "runtime" cid "github.com/ipfs/go-cid" @@ -39,10 +38,9 @@ var cborSortingMode = atlas.KeySortMode_RFC7049 var atlasEntries = []*atlas.AtlasEntry{cidAtlasEntry, bigIntAtlasEntry} var ( - numWorkers = runtime.NumCPU() + 1 - cloner = encoding.PooledCloner{Count: numWorkers} - unmarshaller = encoding.PooledUnmarshaller{Count: numWorkers} - marshaller = encoding.PooledMarshaller{Count: numWorkers} + cloner encoding.PooledCloner + unmarshaller encoding.PooledUnmarshaller + marshaller encoding.PooledMarshaller ) func init() { From b76615f67a29c25fcde043ce8597242754906f34 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Tue, 17 Jul 2018 15:32:58 -0700 Subject: [PATCH 5/6] replace SetAtlas method with a constructor We don't need to mutate, just replace. --- encoding/cloner.go | 14 ++++++++------ encoding/marshaller.go | 14 ++++++++------ encoding/unmarshaller.go | 14 ++++++++------ refmt.go | 6 +++--- 4 files changed, 27 insertions(+), 21 deletions(-) diff --git a/encoding/cloner.go b/encoding/cloner.go index 2f52d80..0a04a3c 100644 --- a/encoding/cloner.go +++ b/encoding/cloner.go @@ -12,12 +12,14 @@ type PooledCloner struct { pool sync.Pool } -// SetAtlas set sets the pool's atlas. It is *not* safe to call this -// concurrently. -func (p *PooledCloner) SetAtlas(atl atlas.Atlas) { - p.pool = sync.Pool{ - New: func() interface{} { - return refmt.NewCloner(atl) +// NewPooledCloner returns a PooledCloner with the given atlas. Do not copy +// after use. +func NewPooledCloner(atl atlas.Atlas) PooledCloner { + return PooledCloner{ + pool: sync.Pool{ + New: func() interface{} { + return refmt.NewCloner(atl) + }, }, } } diff --git a/encoding/marshaller.go b/encoding/marshaller.go index 564226d..f4fe84b 100644 --- a/encoding/marshaller.go +++ b/encoding/marshaller.go @@ -52,12 +52,14 @@ type PooledMarshaller struct { pool sync.Pool } -// SetAtlas set sets the pool's atlas. It is *not* safe to call this -// concurrently. -func (p *PooledMarshaller) SetAtlas(atl atlas.Atlas) { - p.pool = sync.Pool{ - New: func() interface{} { - return NewMarshallerAtlased(atl) +// NewPooledMarshaller returns a PooledMarshaller with the given atlas. Do not +// copy after use. +func NewPooledMarshaller(atl atlas.Atlas) PooledMarshaller { + return PooledMarshaller{ + pool: sync.Pool{ + New: func() interface{} { + return NewMarshallerAtlased(atl) + }, }, } } diff --git a/encoding/unmarshaller.go b/encoding/unmarshaller.go index 3bb1f0b..0ad2e39 100644 --- a/encoding/unmarshaller.go +++ b/encoding/unmarshaller.go @@ -49,12 +49,14 @@ type PooledUnmarshaller struct { pool sync.Pool } -// SetAtlas set sets the pool's atlas. It is *not* safe to call this -// concurrently. -func (p *PooledUnmarshaller) SetAtlas(atl atlas.Atlas) { - p.pool = sync.Pool{ - New: func() interface{} { - return NewUnmarshallerAtlased(atl) +// NewPooledUnmarshaller returns a PooledUnmarshaller with the given atlas. Do +// not copy after use. +func NewPooledUnmarshaller(atl atlas.Atlas) PooledUnmarshaller { + return PooledUnmarshaller{ + pool: sync.Pool{ + New: func() interface{} { + return NewUnmarshallerAtlased(atl) + }, }, } } diff --git a/refmt.go b/refmt.go index 03c0bb5..5e1af46 100644 --- a/refmt.go +++ b/refmt.go @@ -51,9 +51,9 @@ func rebuildAtlas() { cborAtlas = atlas.MustBuild(atlasEntries...). WithMapMorphism(atlas.MapMorphism{atlas.KeySortMode_RFC7049}) - marshaller.SetAtlas(cborAtlas) - unmarshaller.SetAtlas(cborAtlas) - cloner.SetAtlas(cborAtlas) + marshaller = encoding.NewPooledMarshaller(cborAtlas) + unmarshaller = encoding.NewPooledUnmarshaller(cborAtlas) + cloner = encoding.NewPooledCloner(cborAtlas) } // RegisterCborType allows to register a custom cbor type From 59ca950fc7656955efe6922ab02520c21bcf46e8 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Fri, 7 Sep 2018 10:06:58 -0700 Subject: [PATCH 6/6] fix benchmarks Make sure we *actually* serialize/deserialize. Addresses @warpfork's CR. --- wrap_test.go | 44 +++++++++++++++++++++++++++----------------- 1 file changed, 27 insertions(+), 17 deletions(-) diff --git a/wrap_test.go b/wrap_test.go index bd443be..0a513d1 100644 --- a/wrap_test.go +++ b/wrap_test.go @@ -7,31 +7,31 @@ import ( mh "github.com/multiformats/go-multihash" ) -type myStruct struct { - items map[string]myStruct - foo string - bar []byte - baz []int +type MyStruct struct { + Items map[string]MyStruct + Foo string + Bar []byte + Baz []int } func init() { - RegisterCborType(myStruct{}) + RegisterCborType(MyStruct{}) } -func testStruct() myStruct { - return myStruct{ - items: map[string]myStruct{ - "foo": { - foo: "foo", - bar: []byte("bar"), - baz: []int{1, 2, 3, 4}, +func testStruct() MyStruct { + return MyStruct{ + Items: map[string]MyStruct{ + "Foo": { + Foo: "Foo", + Bar: []byte("Bar"), + Baz: []int{1, 2, 3, 4}, }, - "bar": { - bar: []byte("bar"), - baz: []int{1, 2, 3, 4}, + "Bar": { + Bar: []byte("Bar"), + Baz: []int{1, 2, 3, 4}, }, }, - baz: []int{5, 1, 2}, + Baz: []int{5, 1, 2}, } } @@ -102,3 +102,13 @@ func BenchmarkDecodeBlockParallel(b *testing.B) { } wg.Wait() } + +func BenchmarkDumpObject(b *testing.B) { + obj := testStruct() + for i := 0; i < b.N; i++ { + bytes, err := DumpObject(obj) + if err != nil { + b.Fatal(err, bytes) + } + } +}