Skip to content
This repository has been archived by the owner on Feb 20, 2024. It is now read-only.

Add Blockstore Blockstreamer and fix Walk func #21

Merged
merged 3 commits into from
Nov 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions blockstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,65 @@ package blockstream

import (
"context"
blockstore "github.com/ipfs/go-ipfs-blockstore"

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
)

// TODO Implementation distributing requests between multiple streamers by some abstract characteristic.
type BlockStreamer interface {

// Stream initiates ordered stream of Blocks from implementation defined source.
//
// Err chan returns BlockError for requested blocks skipped on the response stream with reasonable error.
// It has two purposes: obvious error handling and overcoming congestions.
// If err chan returns any other error, the stream is terminated for some internal issue described by the error.
Stream(context.Context, <-chan []cid.Cid) (<-chan blocks.Block, <-chan error)
}

type BlockstoreStreamer struct {
bs blockstore.Blockstore
}

func NewBlockstoreStreamer(bs blockstore.Blockstore) BlockStreamer {
return &BlockstoreStreamer{
bs: bs,
}
}

func (b *BlockstoreStreamer) Stream(ctx context.Context, in <-chan []cid.Cid) (<-chan blocks.Block, <-chan error) {
outB, outErr := make(chan blocks.Block, len(in)), make(chan error, 1)

// TODO Spawn multiple routines to accelerate Stream for case when more Ps are available.
go func() {
select {
case ids, ok := <-in:
if !ok {
return
}

for _, id := range ids {
b, err := b.bs.Get(id)
if err != nil {
select {
case outErr <- &BlockError{Cid: id, Err: err}:
case <-ctx.Done():
return
}
}

select {
case outB <- b:
case <-ctx.Done():
return
}
}

case <-ctx.Done():
return
}
}()

return outB, outErr
}
19 changes: 19 additions & 0 deletions error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package blockstream

import (
"fmt"
"github.com/ipfs/go-cid"
)

type BlockError struct {
Cid cid.Cid
Err error
}

func (br *BlockError) Error() string {
return fmt.Sprintf("blockstream: failed to process block %s: %s", br.Cid, br.Err)
}

func (br *BlockError) Unwrap() error {
return br.Err
}
15 changes: 8 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,20 @@ module github.com/Wondertan/go-blockstream
go 1.14

require (
github.com/Wondertan/go-libp2p-access v0.0.2
github.com/Wondertan/go-libp2p-access v0.1.0
github.com/Wondertan/go-serde v1.0.1
github.com/davidlazar/go-crypto v0.0.0-20190912175916-7055855a373f // indirect
github.com/gogo/protobuf v1.3.1
github.com/ipfs/go-block-format v0.0.2
github.com/ipfs/go-cid v0.0.6
github.com/ipfs/go-datastore v0.4.4
github.com/ipfs/go-ipfs-blockstore v1.0.0
github.com/ipfs/go-ipfs-ds-help v1.0.0
github.com/ipfs/go-cid v0.0.7
github.com/ipfs/go-datastore v0.4.5
github.com/ipfs/go-ipfs-blockstore v1.0.2
github.com/ipfs/go-ipfs-exchange-interface v0.0.1
github.com/ipfs/go-ipld-format v0.2.0
github.com/ipfs/go-log/v2 v2.1.1
github.com/ipfs/go-merkledag v0.3.2
github.com/libp2p/go-libp2p v0.10.0
github.com/libp2p/go-libp2p-core v0.6.0
github.com/libp2p/go-libp2p v0.11.0
github.com/libp2p/go-libp2p-core v0.6.1
github.com/libp2p/go-sockaddr v0.1.0 // indirect
github.com/stretchr/testify v1.6.1
)
Loading