Skip to content
This repository has been archived by the owner on Feb 20, 2024. It is now read-only.

Commit

Permalink
Merge pull request #21 from Wondertan/feat/blockstore_streamer
Browse files Browse the repository at this point in the history
  • Loading branch information
Wondertan committed Nov 13, 2020
2 parents c6d9634 + e94c5ab commit fe6a76f
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 197 deletions.
54 changes: 54 additions & 0 deletions blockstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,65 @@ package blockstream

import (
"context"
blockstore "github.com/ipfs/go-ipfs-blockstore"

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
)

// TODO Implementation distributing requests between multiple streamers by some abstract characteristic.
type BlockStreamer interface {

// Stream initiates ordered stream of Blocks from implementation defined source.
//
// Err chan returns BlockError for requested blocks skipped on the response stream with reasonable error.
// It has two purposes: obvious error handling and overcoming congestions.
// If err chan returns any other error, the stream is terminated for some internal issue described by the error.
Stream(context.Context, <-chan []cid.Cid) (<-chan blocks.Block, <-chan error)
}

type BlockstoreStreamer struct {
bs blockstore.Blockstore
}

func NewBlockstoreStreamer(bs blockstore.Blockstore) BlockStreamer {
return &BlockstoreStreamer{
bs: bs,
}
}

func (b *BlockstoreStreamer) Stream(ctx context.Context, in <-chan []cid.Cid) (<-chan blocks.Block, <-chan error) {
outB, outErr := make(chan blocks.Block, len(in)), make(chan error, 1)

// TODO Spawn multiple routines to accelerate Stream for case when more Ps are available.
go func() {
select {
case ids, ok := <-in:
if !ok {
return
}

for _, id := range ids {
b, err := b.bs.Get(id)
if err != nil {
select {
case outErr <- &BlockError{Cid: id, Err: err}:
case <-ctx.Done():
return
}
}

select {
case outB <- b:
case <-ctx.Done():
return
}
}

case <-ctx.Done():
return
}
}()

return outB, outErr
}
19 changes: 19 additions & 0 deletions error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package blockstream

import (
"fmt"
"github.com/ipfs/go-cid"
)

type BlockError struct {
Cid cid.Cid
Err error
}

func (br *BlockError) Error() string {
return fmt.Sprintf("blockstream: failed to process block %s: %s", br.Cid, br.Err)
}

func (br *BlockError) Unwrap() error {
return br.Err
}
15 changes: 8 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,20 @@ module github.com/Wondertan/go-blockstream
go 1.14

require (
github.com/Wondertan/go-libp2p-access v0.0.2
github.com/Wondertan/go-libp2p-access v0.1.0
github.com/Wondertan/go-serde v1.0.1
github.com/davidlazar/go-crypto v0.0.0-20190912175916-7055855a373f // indirect
github.com/gogo/protobuf v1.3.1
github.com/ipfs/go-block-format v0.0.2
github.com/ipfs/go-cid v0.0.6
github.com/ipfs/go-datastore v0.4.4
github.com/ipfs/go-ipfs-blockstore v1.0.0
github.com/ipfs/go-ipfs-ds-help v1.0.0
github.com/ipfs/go-cid v0.0.7
github.com/ipfs/go-datastore v0.4.5
github.com/ipfs/go-ipfs-blockstore v1.0.2
github.com/ipfs/go-ipfs-exchange-interface v0.0.1
github.com/ipfs/go-ipld-format v0.2.0
github.com/ipfs/go-log/v2 v2.1.1
github.com/ipfs/go-merkledag v0.3.2
github.com/libp2p/go-libp2p v0.10.0
github.com/libp2p/go-libp2p-core v0.6.0
github.com/libp2p/go-libp2p v0.11.0
github.com/libp2p/go-libp2p-core v0.6.1
github.com/libp2p/go-sockaddr v0.1.0 // indirect
github.com/stretchr/testify v1.6.1
)
Loading

0 comments on commit fe6a76f

Please sign in to comment.