Skip to content
This repository has been archived by the owner on Feb 20, 2024. It is now read-only.

Commit

Permalink
Merge pull request #24 from Wondertan/fix/stream_buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
Wondertan committed Dec 4, 2020
2 parents 451bf04 + 1796756 commit c3523ac
Show file tree
Hide file tree
Showing 9 changed files with 102 additions and 93 deletions.
15 changes: 10 additions & 5 deletions block/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,16 @@ import (
)

type Result struct {
cid.Cid
Block blocks.Block
Error error
blocks.Block

Id cid.Cid
Err error
}

func (res Result) Get() (blocks.Block, error) {
return res.Block, res.Error
func (r Result) Cid() cid.Cid {
if r.Block != nil {
return r.Block.Cid()
}

return r.Id
}
4 changes: 2 additions & 2 deletions block/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (s *Stream) stream() {
if !errors.Is(err, io.EOF) {
for _, id := range req.Remains() {
select {
case s.out <- Result{Cid: id, Error: err}:
case s.out <- Result{Id: id, Err: err}:
case <-s.ctx.Done():
return
}
Expand All @@ -76,7 +76,7 @@ func (s *Stream) stream() {

for _, b := range bs {
select {
case s.out <- Result{Cid: b.Cid(), Block: b}:
case s.out <- Result{Block: b}:
case <-s.ctx.Done():
return
}
Expand Down
12 changes: 7 additions & 5 deletions blockstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"errors"
"sync"

"github.com/Wondertan/go-libp2p-access"
"github.com/ipfs/go-ipfs-blockstore"
access "github.com/Wondertan/go-libp2p-access"
blockstore "github.com/ipfs/go-ipfs-blockstore"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
Expand Down Expand Up @@ -113,7 +113,7 @@ func (bs *BlockStream) Session(ctx context.Context, peers []peer.ID, opts ...Ses
log.Errorf("Failed provider %s for session %s: %s", p.Pretty(), tkn, err)

if ses.removeProvider() == 0 {
log.Errorf("Terminating session %s: %s", err)
log.Errorf("Terminating session %s: %s", tkn, err)

ses.err = ErrNoProviders
ses.cancel()
Expand Down Expand Up @@ -162,8 +162,10 @@ func (bs *BlockStream) handler(s network.Stream) error {
return nil
}

type onToken func(access.Token) error
type сlose func(func() error)
type (
onToken func(access.Token) error
сlose func(func() error)
)

var logClose = func(f func() error) {
err := f()
Expand Down
11 changes: 6 additions & 5 deletions blockstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@ package blockstream
import (
"context"
"crypto/rand"
"github.com/Wondertan/go-blockstream/block"
"github.com/stretchr/testify/assert"
"sync"
"testing"

"github.com/Wondertan/go-blockstream/block"
"github.com/stretchr/testify/assert"

"github.com/libp2p/go-libp2p-core/peer"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/stretchr/testify/require"

"github.com/Wondertan/go-libp2p-access"
access "github.com/Wondertan/go-libp2p-access"

"github.com/Wondertan/go-blockstream/test"
)
Expand Down Expand Up @@ -83,9 +84,9 @@ func TestBlockStream(t *testing.T) {
for _, id := range cids {
res, ok := <-ch
require.True(t, ok)
assert.Equal(t, id, res.Cid)
assert.Equal(t, id, res.Cid())
assert.NotNil(t, res.Block)
assert.NoError(t, res.Error)
assert.NoError(t, res.Err)
}

_, ok := <-errs[i]
Expand Down
7 changes: 4 additions & 3 deletions exchange/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package exchange

import (
"context"

blocks "github.com/ipfs/go-block-format"
log2 "github.com/ipfs/go-log"

"github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-ipfs-blockstore"
blockstore "github.com/ipfs/go-ipfs-blockstore"
iexchange "github.com/ipfs/go-ipfs-exchange-interface"

"github.com/Wondertan/go-blockstream"
Expand Down Expand Up @@ -71,7 +72,7 @@ func (f *fetcher) GetBlocks(ctx context.Context, ids []cid.Cid) (<-chan blocks.B
resCh, errCh := f.ses.Blocks(ctx, ids)
for res := range resCh {
if res.Block == nil {
log.Warnf("Failed to retrieve %s: %s", res.Cid, res.Error)
log.Warnf("Failed to retrieve %s: %s", res.Cid, res.Err)
continue
}

Expand Down
16 changes: 10 additions & 6 deletions explore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,40 @@ package blockstream

import (
"context"
"github.com/Wondertan/go-blockstream/block"

blocks "github.com/ipfs/go-block-format"

"github.com/ipfs/go-cid"
)

// Explorer gets keys from block in a user defined way.
type Explorer func(block.Result) ([]cid.Cid, error)
type Explorer func(blocks.Block) ([]cid.Cid, error)

// Explore gets first blocks from stream, passes it to handler that may explore new key in block and handles them over
// until no more left.
// until no more left. Once any Result error appears, it returns immediately.
func Explore(ctx context.Context, id cid.Cid, bs Streamer, h Explorer) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

remains := 1
in := make(chan []cid.Cid, 8)
in := make(chan []cid.Cid, 32)
in <- []cid.Cid{id}
defer close(in)

out, errCh := bs.Stream(ctx, in)
for {
select {
case b, ok := <-out:
case res, ok := <-out:
if !ok {
out = nil
continue
}
if res.Err != nil {
return res.Err
}

remains--
ids, err := h(b)
ids, err := h(res)
if err != nil {
return err
}
Expand Down
10 changes: 3 additions & 7 deletions ipld/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ package ipld

import (
"context"
"github.com/Wondertan/go-blockstream/block"

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
format "github.com/ipfs/go-ipld-format"

Expand All @@ -11,12 +12,7 @@ import (

// Traverse traverses and fetches whole IPLD graph from the stream.
func Traverse(ctx context.Context, id cid.Cid, ses blockstream.Streamer) error {
return blockstream.Explore(ctx, id, ses, func(res block.Result) ([]cid.Cid, error) {
b, err := res.Get()
if err != nil {
return nil, err
}

return blockstream.Explore(ctx, id, ses, func(b blocks.Block) ([]cid.Cid, error) {
nd, err := format.Decode(b)
if err != nil {
return nil, err
Expand Down
99 changes: 49 additions & 50 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@ import (

const (
maxAvailableWorkers = 128
requestBufferSize = 8

// TODO Short-term solution to use a big buffer. It requires dynamic solution
streamBufferSize = 512
)

// TODO Refactor this, my ayes hurt watching this
Expand All @@ -38,7 +34,7 @@ type Session struct {
func newSession(ctx context.Context, opts ...SessionOption) *Session {
ctx, cancel := context.WithCancel(ctx)
ses := &Session{
reqs: make(chan *block.Request, requestBufferSize),
reqs: make(chan *block.Request, 32),
ctx: ctx,
cancel: cancel,
jobch: make(chan *blockJob),
Expand Down Expand Up @@ -175,27 +171,37 @@ func (ses *Session) requestId() uint32 {
}

func (ses *Session) streamWithStore(ctx context.Context, in <-chan []cid.Cid) (<-chan block.Result, <-chan error) {
outR, outErr := make(chan block.Result, streamBufferSize), make(chan error, 1)
go func() {
defer close(outR)
defer close(outErr)

ctx, cancel := context.WithCancel(ctx)
defer cancel()
ctx, cancel := context.WithCancel(ctx)
outR, outErr := make(chan block.Result, cap(in)), make(chan error, 1)
first := make(chan *blockJob, 1)

first := make(chan *blockJob, 1)
go func() { // handles input
last := first

for {
select {
case ids, ok := <-in:
if !ok {
close(last)
in = nil
continue
return
}

last = ses.process(ctx, ids, last)
case <-ses.ctx.Done():
return
case <-ctx.Done():
return
}
}
}()

go func() { // handles output
defer func() {
cancel()
close(outR)
close(outErr)
}()

for {
select {
case j, ok := <-first:
if !ok {
return
Expand All @@ -208,10 +214,8 @@ func (ses *Session) streamWithStore(ctx context.Context, in <-chan []cid.Cid) (<
select {
case outErr <- ses.err:
case <-ctx.Done():
return
}
}

return
case <-ctx.Done():
return
Expand All @@ -223,18 +227,18 @@ func (ses *Session) streamWithStore(ctx context.Context, in <-chan []cid.Cid) (<
}

func (ses *Session) blocksWithStore(ctx context.Context, ids []cid.Cid) (<-chan block.Result, <-chan error) {
outR, outErr := make(chan block.Result, streamBufferSize), make(chan error, 1)
ctx, cancel := context.WithCancel(ctx)
outR, outErr := make(chan block.Result, cap(ids)), make(chan error, 1)
done := make(chan *blockJob, 1)

go func() {
defer close(outR)
defer close(outErr)

ctx, cancel := context.WithCancel(ctx)
defer cancel()
defer func() {
cancel()
close(outR)
close(outErr)
}()

done := make(chan *blockJob, 1)
ses.process(ctx, ids, done)

select {
case j := <-done:
j.write(outR)
Expand All @@ -245,7 +249,6 @@ func (ses *Session) blocksWithStore(ctx context.Context, ids []cid.Cid) (<-chan
case <-ctx.Done():
}
}

case <-ctx.Done():
}
}()
Expand Down Expand Up @@ -288,14 +291,14 @@ func (ses *Session) worker(id uint32) {

var fetch bool
var fetched []blocks.Block
var toFetch = make([]cid.Cid, len(j.results))
toFetch := make([]cid.Cid, len(j.res))

for i, res := range j.results {
res.Block, res.Error = ses.bs.Get(res.Cid)
if res.Error != nil {
toFetch[i] = res.Cid
var err error
for i, id := range j.ids {
j.res[i].Block, err = ses.bs.Get(id)
if err != nil {
toFetch[i] = id
fetch = true
} else {
continue
}
}
Expand All @@ -316,12 +319,12 @@ func (ses *Session) worker(id uint32) {

select {
case res := <-s.Output():
*j.results[i] = res
j.res[i] = res
if res.Block != nil {
fetched = append(fetched, res.Block)
}
case <-j.ctx.Done():
j.results[i].Error = j.ctx.Err()
j.res[i].Err = j.ctx.Err()
}
}

Expand All @@ -347,32 +350,28 @@ func (ses *Session) worker(id uint32) {
type blockJob struct {
id uint32
ctx context.Context
results []*block.Result
ids []cid.Cid
res []block.Result
next, done chan *blockJob
}

func (ses *Session) newJob(ctx context.Context, ids []cid.Cid, done, next chan *blockJob) *blockJob {
results := make([]*block.Result, len(ids))
for i, id := range ids {
results[i] = &block.Result{Cid: id}
}

j := &blockJob{
id: atomic.AddUint32(&ses.jobs, 1),
ctx: ctx,
results: results,
done: done,
next: next,
id: atomic.AddUint32(&ses.jobs, 1),
ctx: ctx,
ids: ids,
res: make([]block.Result, len(ids)),
done: done,
next: next,
}

log.Debugf("Got new Job %d.", j.id)
return j
}

func (j *blockJob) write(outR chan block.Result) {
for _, res := range j.results {
for _, res := range j.res {
select {
case outR <- *res:
case outR <- res:
case <-j.ctx.Done():
}
}
Expand Down
Loading

0 comments on commit c3523ac

Please sign in to comment.