From 48e9a6891ceca1809663da924e0c0befd6aa5e62 Mon Sep 17 00:00:00 2001 From: Wondertan Date: Wed, 2 Dec 2020 01:58:41 +0200 Subject: [PATCH 1/4] workaround for fixed size buffer --- explore.go | 3 ++- session.go | 58 +++++++++++++++++++++++++++++------------------------- 2 files changed, 33 insertions(+), 28 deletions(-) diff --git a/explore.go b/explore.go index 05626d8..7bacfcc 100644 --- a/explore.go +++ b/explore.go @@ -2,6 +2,7 @@ package blockstream import ( "context" + "github.com/Wondertan/go-blockstream/block" "github.com/ipfs/go-cid" @@ -17,7 +18,7 @@ func Explore(ctx context.Context, id cid.Cid, bs Streamer, h Explorer) error { defer cancel() remains := 1 - in := make(chan []cid.Cid, 8) + in := make(chan []cid.Cid, 32) in <- []cid.Cid{id} defer close(in) diff --git a/session.go b/session.go index b23c2d5..2c715b0 100644 --- a/session.go +++ b/session.go @@ -14,9 +14,6 @@ import ( const ( maxAvailableWorkers = 128 requestBufferSize = 8 - - // TODO Short-term solution to use a big buffer. It requires dynamic solution - streamBufferSize = 512 ) // TODO Refactor this, my ayes hurt watching this @@ -175,27 +172,37 @@ func (ses *Session) requestId() uint32 { } func (ses *Session) streamWithStore(ctx context.Context, in <-chan []cid.Cid) (<-chan block.Result, <-chan error) { - outR, outErr := make(chan block.Result, streamBufferSize), make(chan error, 1) - go func() { - defer close(outR) - defer close(outErr) - - ctx, cancel := context.WithCancel(ctx) - defer cancel() + ctx, cancel := context.WithCancel(ctx) + outR, outErr := make(chan block.Result, len(in)), make(chan error, 1) + first := make(chan *blockJob, 1) - first := make(chan *blockJob, 1) + go func() { // handles input last := first - for { select { case ids, ok := <-in: if !ok { close(last) - in = nil - continue + return } - last = ses.process(ctx, ids, last) + case <-ses.ctx.Done(): + return + case <-ctx.Done(): + return + } + } + }() + + go func() { // handles output + defer func() { + cancel() + close(outR) + close(outErr) + }() + + for { + select { case j, ok := <-first: if !ok { return @@ -208,10 +215,8 @@ func (ses *Session) streamWithStore(ctx context.Context, in <-chan []cid.Cid) (< select { case outErr <- ses.err: case <-ctx.Done(): - return } } - return case <-ctx.Done(): return @@ -223,18 +228,18 @@ func (ses *Session) streamWithStore(ctx context.Context, in <-chan []cid.Cid) (< } func (ses *Session) blocksWithStore(ctx context.Context, ids []cid.Cid) (<-chan block.Result, <-chan error) { - outR, outErr := make(chan block.Result, streamBufferSize), make(chan error, 1) + ctx, cancel := context.WithCancel(ctx) + outR, outErr := make(chan block.Result, len(ids)), make(chan error, 1) + done := make(chan *blockJob, 1) go func() { - defer close(outR) - defer close(outErr) + defer func() { + cancel() + close(outR) + close(outErr) + }() - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - done := make(chan *blockJob, 1) ses.process(ctx, ids, done) - select { case j := <-done: j.write(outR) @@ -245,7 +250,6 @@ func (ses *Session) blocksWithStore(ctx context.Context, ids []cid.Cid) (<-chan case <-ctx.Done(): } } - case <-ctx.Done(): } }() @@ -288,7 +292,7 @@ func (ses *Session) worker(id uint32) { var fetch bool var fetched []blocks.Block - var toFetch = make([]cid.Cid, len(j.results)) + toFetch := make([]cid.Cid, len(j.results)) for i, res := range j.results { res.Block, res.Error = ses.bs.Get(res.Cid) From 25efb7a27186c988efa8f7d32c24dcf9d551b6a1 Mon Sep 17 00:00:00 2001 From: Wondertan Date: Wed, 2 Dec 2020 02:26:38 +0200 Subject: [PATCH 2/4] rework Result and minor code chnages --- block/result.go | 15 ++++++++++----- block/stream.go | 4 ++-- blockstream.go | 12 +++++++----- blockstream_test.go | 11 ++++++----- exchange/exchange.go | 7 ++++--- ipld/fetch.go | 8 ++------ session.go | 10 +++++----- session_test.go | 21 +++++++++++---------- 8 files changed, 47 insertions(+), 41 deletions(-) diff --git a/block/result.go b/block/result.go index 801b659..332c243 100644 --- a/block/result.go +++ b/block/result.go @@ -6,11 +6,16 @@ import ( ) type Result struct { - cid.Cid - Block blocks.Block - Error error + blocks.Block + + Id cid.Cid + Err error } -func (res Result) Get() (blocks.Block, error) { - return res.Block, res.Error +func (r Result) Cid() cid.Cid { + if r.Block != nil { + return r.Block.Cid() + } + + return r.Id } diff --git a/block/stream.go b/block/stream.go index ba732bd..3dbcb9a 100644 --- a/block/stream.go +++ b/block/stream.go @@ -64,7 +64,7 @@ func (s *Stream) stream() { if !errors.Is(err, io.EOF) { for _, id := range req.Remains() { select { - case s.out <- Result{Cid: id, Error: err}: + case s.out <- Result{Id: id, Err: err}: case <-s.ctx.Done(): return } @@ -76,7 +76,7 @@ func (s *Stream) stream() { for _, b := range bs { select { - case s.out <- Result{Cid: b.Cid(), Block: b}: + case s.out <- Result{Block: b}: case <-s.ctx.Done(): return } diff --git a/blockstream.go b/blockstream.go index 3fc1209..1f3ca53 100644 --- a/blockstream.go +++ b/blockstream.go @@ -5,8 +5,8 @@ import ( "errors" "sync" - "github.com/Wondertan/go-libp2p-access" - "github.com/ipfs/go-ipfs-blockstore" + access "github.com/Wondertan/go-libp2p-access" + blockstore "github.com/ipfs/go-ipfs-blockstore" logging "github.com/ipfs/go-log/v2" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" @@ -113,7 +113,7 @@ func (bs *BlockStream) Session(ctx context.Context, peers []peer.ID, opts ...Ses log.Errorf("Failed provider %s for session %s: %s", p.Pretty(), tkn, err) if ses.removeProvider() == 0 { - log.Errorf("Terminating session %s: %s", err) + log.Errorf("Terminating session %s: %s", tkn, err) ses.err = ErrNoProviders ses.cancel() @@ -162,8 +162,10 @@ func (bs *BlockStream) handler(s network.Stream) error { return nil } -type onToken func(access.Token) error -type сlose func(func() error) +type ( + onToken func(access.Token) error + сlose func(func() error) +) var logClose = func(f func() error) { err := f() diff --git a/blockstream_test.go b/blockstream_test.go index 7c94fe0..6ec526a 100644 --- a/blockstream_test.go +++ b/blockstream_test.go @@ -3,16 +3,17 @@ package blockstream import ( "context" "crypto/rand" - "github.com/Wondertan/go-blockstream/block" - "github.com/stretchr/testify/assert" "sync" "testing" + "github.com/Wondertan/go-blockstream/block" + "github.com/stretchr/testify/assert" + "github.com/libp2p/go-libp2p-core/peer" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" "github.com/stretchr/testify/require" - "github.com/Wondertan/go-libp2p-access" + access "github.com/Wondertan/go-libp2p-access" "github.com/Wondertan/go-blockstream/test" ) @@ -83,9 +84,9 @@ func TestBlockStream(t *testing.T) { for _, id := range cids { res, ok := <-ch require.True(t, ok) - assert.Equal(t, id, res.Cid) + assert.Equal(t, id, res.Cid()) assert.NotNil(t, res.Block) - assert.NoError(t, res.Error) + assert.NoError(t, res.Err) } _, ok := <-errs[i] diff --git a/exchange/exchange.go b/exchange/exchange.go index 1c14bdc..4bc73ff 100644 --- a/exchange/exchange.go +++ b/exchange/exchange.go @@ -2,11 +2,12 @@ package exchange import ( "context" + + blocks "github.com/ipfs/go-block-format" log2 "github.com/ipfs/go-log" - "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" - "github.com/ipfs/go-ipfs-blockstore" + blockstore "github.com/ipfs/go-ipfs-blockstore" iexchange "github.com/ipfs/go-ipfs-exchange-interface" "github.com/Wondertan/go-blockstream" @@ -71,7 +72,7 @@ func (f *fetcher) GetBlocks(ctx context.Context, ids []cid.Cid) (<-chan blocks.B resCh, errCh := f.ses.Blocks(ctx, ids) for res := range resCh { if res.Block == nil { - log.Warnf("Failed to retrieve %s: %s", res.Cid, res.Error) + log.Warnf("Failed to retrieve %s: %s", res.Cid, res.Err) continue } diff --git a/ipld/fetch.go b/ipld/fetch.go index 41272ff..bdfa6f9 100644 --- a/ipld/fetch.go +++ b/ipld/fetch.go @@ -2,6 +2,7 @@ package ipld import ( "context" + "github.com/Wondertan/go-blockstream/block" "github.com/ipfs/go-cid" format "github.com/ipfs/go-ipld-format" @@ -12,12 +13,7 @@ import ( // Traverse traverses and fetches whole IPLD graph from the stream. func Traverse(ctx context.Context, id cid.Cid, ses blockstream.Streamer) error { return blockstream.Explore(ctx, id, ses, func(res block.Result) ([]cid.Cid, error) { - b, err := res.Get() - if err != nil { - return nil, err - } - - nd, err := format.Decode(b) + nd, err := format.Decode(res) if err != nil { return nil, err } diff --git a/session.go b/session.go index 2c715b0..892569b 100644 --- a/session.go +++ b/session.go @@ -295,9 +295,9 @@ func (ses *Session) worker(id uint32) { toFetch := make([]cid.Cid, len(j.results)) for i, res := range j.results { - res.Block, res.Error = ses.bs.Get(res.Cid) - if res.Error != nil { - toFetch[i] = res.Cid + res.Block, res.Err = ses.bs.Get(res.Id) + if res.Err != nil { + toFetch[i] = res.Id fetch = true } else { continue @@ -325,7 +325,7 @@ func (ses *Session) worker(id uint32) { fetched = append(fetched, res.Block) } case <-j.ctx.Done(): - j.results[i].Error = j.ctx.Err() + j.results[i].Err = j.ctx.Err() } } @@ -358,7 +358,7 @@ type blockJob struct { func (ses *Session) newJob(ctx context.Context, ids []cid.Cid, done, next chan *blockJob) *blockJob { results := make([]*block.Result, len(ids)) for i, id := range ids { - results[i] = &block.Result{Cid: id} + results[i] = &block.Result{Id: id} } j := &blockJob{ diff --git a/session_test.go b/session_test.go index feb55d1..ec1256d 100644 --- a/session_test.go +++ b/session_test.go @@ -3,6 +3,10 @@ package blockstream import ( "context" "crypto/rand" + "io" + "testing" + "time" + "github.com/Wondertan/go-blockstream/block" "github.com/Wondertan/go-blockstream/test" blocks "github.com/ipfs/go-block-format" @@ -10,9 +14,6 @@ import ( blockstore "github.com/ipfs/go-ipfs-blockstore" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "io" - "testing" - "time" ) func TestRequestResponder(t *testing.T) { @@ -79,9 +80,9 @@ func TestSessionStream(t *testing.T) { for _, id := range ids[i*count/times : (i+1)*count/times] { res, ok := <-res require.True(t, ok) - assert.Equal(t, id, res.Cid) + assert.Equal(t, id, res.Cid()) assert.NotNil(t, res.Block) - assert.NoError(t, res.Error) + assert.NoError(t, res.Err) } } @@ -112,9 +113,9 @@ func TestSessionBlocks(t *testing.T) { for _, id := range ids[:count/2] { res, ok := <-res1 require.True(t, ok) - assert.Equal(t, id, res.Cid) + assert.Equal(t, id, res.Cid()) assert.NotNil(t, res.Block) - assert.NoError(t, res.Error) + assert.NoError(t, res.Err) } _, ok := <-err1 @@ -123,9 +124,9 @@ func TestSessionBlocks(t *testing.T) { for _, id := range ids[count/2:] { res, ok := <-res2 require.True(t, ok) - assert.Equal(t, id, res.Cid) + assert.Equal(t, id, res.Cid()) assert.NotNil(t, res.Block) - assert.NoError(t, res.Error) + assert.NoError(t, res.Err) } _, ok = <-err2 @@ -155,7 +156,7 @@ func TestSessionNotFound(t *testing.T) { res, err := ses.Blocks(ctx, ids) for res := range res { if i == missing { - assert.Error(t, res.Error) + assert.Error(t, res.Err) } i++ } From 31680b9444b04d5822a476eedd1f1a4c5256b03f Mon Sep 17 00:00:00 2001 From: Wondertan Date: Wed, 2 Dec 2020 03:02:19 +0200 Subject: [PATCH 3/4] rework job logic --- session.go | 47 +++++++++++++++++++++-------------------------- 1 file changed, 21 insertions(+), 26 deletions(-) diff --git a/session.go b/session.go index 892569b..df2f974 100644 --- a/session.go +++ b/session.go @@ -13,7 +13,6 @@ import ( const ( maxAvailableWorkers = 128 - requestBufferSize = 8 ) // TODO Refactor this, my ayes hurt watching this @@ -35,7 +34,7 @@ type Session struct { func newSession(ctx context.Context, opts ...SessionOption) *Session { ctx, cancel := context.WithCancel(ctx) ses := &Session{ - reqs: make(chan *block.Request, requestBufferSize), + reqs: make(chan *block.Request, 32), ctx: ctx, cancel: cancel, jobch: make(chan *blockJob), @@ -173,7 +172,7 @@ func (ses *Session) requestId() uint32 { func (ses *Session) streamWithStore(ctx context.Context, in <-chan []cid.Cid) (<-chan block.Result, <-chan error) { ctx, cancel := context.WithCancel(ctx) - outR, outErr := make(chan block.Result, len(in)), make(chan error, 1) + outR, outErr := make(chan block.Result, cap(in)), make(chan error, 1) first := make(chan *blockJob, 1) go func() { // handles input @@ -229,7 +228,7 @@ func (ses *Session) streamWithStore(ctx context.Context, in <-chan []cid.Cid) (< func (ses *Session) blocksWithStore(ctx context.Context, ids []cid.Cid) (<-chan block.Result, <-chan error) { ctx, cancel := context.WithCancel(ctx) - outR, outErr := make(chan block.Result, len(ids)), make(chan error, 1) + outR, outErr := make(chan block.Result, cap(ids)), make(chan error, 1) done := make(chan *blockJob, 1) go func() { @@ -292,14 +291,14 @@ func (ses *Session) worker(id uint32) { var fetch bool var fetched []blocks.Block - toFetch := make([]cid.Cid, len(j.results)) + toFetch := make([]cid.Cid, len(j.res)) - for i, res := range j.results { - res.Block, res.Err = ses.bs.Get(res.Id) - if res.Err != nil { - toFetch[i] = res.Id + var err error + for i, id := range j.ids { + j.res[i].Block, err = ses.bs.Get(id) + if err != nil { + toFetch[i] = id fetch = true - } else { continue } } @@ -320,12 +319,12 @@ func (ses *Session) worker(id uint32) { select { case res := <-s.Output(): - *j.results[i] = res + j.res[i] = res if res.Block != nil { fetched = append(fetched, res.Block) } case <-j.ctx.Done(): - j.results[i].Err = j.ctx.Err() + j.res[i].Err = j.ctx.Err() } } @@ -351,32 +350,28 @@ func (ses *Session) worker(id uint32) { type blockJob struct { id uint32 ctx context.Context - results []*block.Result + ids []cid.Cid + res []block.Result next, done chan *blockJob } func (ses *Session) newJob(ctx context.Context, ids []cid.Cid, done, next chan *blockJob) *blockJob { - results := make([]*block.Result, len(ids)) - for i, id := range ids { - results[i] = &block.Result{Id: id} - } - j := &blockJob{ - id: atomic.AddUint32(&ses.jobs, 1), - ctx: ctx, - results: results, - done: done, - next: next, + id: atomic.AddUint32(&ses.jobs, 1), + ctx: ctx, + ids: ids, + res: make([]block.Result, len(ids)), + done: done, + next: next, } - log.Debugf("Got new Job %d.", j.id) return j } func (j *blockJob) write(outR chan block.Result) { - for _, res := range j.results { + for _, res := range j.res { select { - case outR <- *res: + case outR <- res: case <-j.ctx.Done(): } } From 1796756db6c275a4740d87b2ab6b741136b2b95c Mon Sep 17 00:00:00 2001 From: Wondertan Date: Wed, 2 Dec 2020 03:17:17 +0200 Subject: [PATCH 4/4] check for a result error in explore --- explore.go | 13 ++++++++----- ipld/fetch.go | 6 +++--- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/explore.go b/explore.go index 7bacfcc..6ca5ec9 100644 --- a/explore.go +++ b/explore.go @@ -3,16 +3,16 @@ package blockstream import ( "context" - "github.com/Wondertan/go-blockstream/block" + blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" ) // Explorer gets keys from block in a user defined way. -type Explorer func(block.Result) ([]cid.Cid, error) +type Explorer func(blocks.Block) ([]cid.Cid, error) // Explore gets first blocks from stream, passes it to handler that may explore new key in block and handles them over -// until no more left. +// until no more left. Once any Result error appears, it returns immediately. func Explore(ctx context.Context, id cid.Cid, bs Streamer, h Explorer) error { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -25,14 +25,17 @@ func Explore(ctx context.Context, id cid.Cid, bs Streamer, h Explorer) error { out, errCh := bs.Stream(ctx, in) for { select { - case b, ok := <-out: + case res, ok := <-out: if !ok { out = nil continue } + if res.Err != nil { + return res.Err + } remains-- - ids, err := h(b) + ids, err := h(res) if err != nil { return err } diff --git a/ipld/fetch.go b/ipld/fetch.go index bdfa6f9..fd608b7 100644 --- a/ipld/fetch.go +++ b/ipld/fetch.go @@ -3,7 +3,7 @@ package ipld import ( "context" - "github.com/Wondertan/go-blockstream/block" + blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" format "github.com/ipfs/go-ipld-format" @@ -12,8 +12,8 @@ import ( // Traverse traverses and fetches whole IPLD graph from the stream. func Traverse(ctx context.Context, id cid.Cid, ses blockstream.Streamer) error { - return blockstream.Explore(ctx, id, ses, func(res block.Result) ([]cid.Cid, error) { - nd, err := format.Decode(res) + return blockstream.Explore(ctx, id, ses, func(b blocks.Block) ([]cid.Cid, error) { + nd, err := format.Decode(b) if err != nil { return nil, err }