Skip to content

Commit

Permalink
Merge pull request #3624 from filecoin-project/blocksync-refactor
Browse files Browse the repository at this point in the history
blocksync: introduce interfaces; rename to ChainExchange (abbrev. chainxchg)
  • Loading branch information
magik6k authored Sep 9, 2020
2 parents 7dbbe84 + cb3b0ab commit 42b9d42
Show file tree
Hide file tree
Showing 15 changed files with 217 additions and 185 deletions.
2 changes: 1 addition & 1 deletion chain/blocksync/cbor_gen.go → chain/exchange/cbor_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

139 changes: 57 additions & 82 deletions chain/blocksync/client.go → chain/exchange/client.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package blocksync
package exchange

import (
"bufio"
Expand All @@ -7,26 +7,26 @@ import (
"math/rand"
"time"

host "github.com/libp2p/go-libp2p-core/host"
inet "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"

"go.opencensus.io/trace"
"go.uber.org/fx"
"golang.org/x/xerrors"

cborutil "github.com/filecoin-project/go-cbor-util"

"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
incrt "github.com/filecoin-project/lotus/lib/increadtimeout"
"github.com/filecoin-project/lotus/lib/peermgr"
)

// Protocol client.
// FIXME: Rename to just `Client`. Not done at the moment to avoid
// disrupting too much of the consumer code, should be done along
// https://github.com/filecoin-project/lotus/issues/2612.
type BlockSync struct {
// client implements exchange.Client, using the libp2p ChainExchange protocol
// as the fetching mechanism.
type client struct {
// Connection manager used to contact the server.
// FIXME: We should have a reduced interface here, initialized
// just with our protocol ID, we shouldn't be able to open *any*
Expand All @@ -36,12 +36,12 @@ type BlockSync struct {
peerTracker *bsPeerTracker
}

func NewClient(
lc fx.Lifecycle,
host host.Host,
pmgr peermgr.MaybePeerMgr,
) *BlockSync {
return &BlockSync{
var _ Client = (*client)(nil)

// NewClient creates a new libp2p-based exchange.Client that uses the libp2p
// ChainExhange protocol as the fetching mechanism.
func NewClient(lc fx.Lifecycle, host host.Host, pmgr peermgr.MaybePeerMgr) Client {
return &client{
host: host,
peerTracker: newPeerTracker(lc, host, pmgr.Mgr),
}
Expand All @@ -64,11 +64,7 @@ func NewClient(
// request options without disrupting external calls. In the future the
// consumers should be forced to use a more standardized service and
// adhere to a single API derived from this function.
func (client *BlockSync) doRequest(
ctx context.Context,
req *Request,
singlePeer *peer.ID,
) (*validatedResponse, error) {
func (c *client) doRequest(ctx context.Context, req *Request, singlePeer *peer.ID) (*validatedResponse, error) {
// Validate request.
if req.Length == 0 {
return nil, xerrors.Errorf("invalid request of length 0")
Expand All @@ -88,7 +84,7 @@ func (client *BlockSync) doRequest(
if singlePeer != nil {
peers = []peer.ID{*singlePeer}
} else {
peers = client.getShuffledPeers()
peers = c.getShuffledPeers()
if len(peers) == 0 {
return nil, xerrors.Errorf("no peers available")
}
Expand All @@ -109,25 +105,25 @@ func (client *BlockSync) doRequest(
}

// Send request, read response.
res, err := client.sendRequestToPeer(ctx, peer, req)
res, err := c.sendRequestToPeer(ctx, peer, req)
if err != nil {
if !xerrors.Is(err, inet.ErrNoConn) {
if !xerrors.Is(err, network.ErrNoConn) {
log.Warnf("could not connect to peer %s: %s",
peer.String(), err)
}
continue
}

// Process and validate response.
validRes, err := client.processResponse(req, res)
validRes, err := c.processResponse(req, res)
if err != nil {
log.Warnf("processing peer %s response failed: %s",
peer.String(), err)
continue
}

client.peerTracker.logGlobalSuccess(build.Clock.Since(globalTime))
client.host.ConnManager().TagPeer(peer, "bsync", SUCCESS_PEER_TAG_VALUE)
c.peerTracker.logGlobalSuccess(build.Clock.Since(globalTime))
c.host.ConnManager().TagPeer(peer, "bsync", SuccessPeerTagValue)
return validRes, nil
}

Expand All @@ -146,11 +142,8 @@ func (client *BlockSync) doRequest(
// We are conflating in the single error returned both status and validation
// errors. Peer penalization should happen here then, before returning, so
// we can apply the correct penalties depending on the cause of the error.
func (client *BlockSync) processResponse(
req *Request,
res *Response,
// FIXME: Add the `peer` as argument once we implement penalties.
) (*validatedResponse, error) {
// FIXME: Add the `peer` as argument once we implement penalties.
func (c *client) processResponse(req *Request, res *Response) (*validatedResponse, error) {
err := res.statusToError()
if err != nil {
return nil, xerrors.Errorf("status error: %s", err)
Expand Down Expand Up @@ -248,16 +241,8 @@ func (client *BlockSync) processResponse(
return validRes, nil
}

// GetBlocks fetches count blocks from the network, from the provided tipset
// *backwards*, returning as many tipsets as count.
//
// {hint/usage}: This is used by the Syncer during normal chain syncing and when
// resolving forks.
func (client *BlockSync) GetBlocks(
ctx context.Context,
tsk types.TipSetKey,
count int,
) ([]*types.TipSet, error) {
// GetBlocks implements Client.GetBlocks(). Refer to the godocs there.
func (c *client) GetBlocks(ctx context.Context, tsk types.TipSetKey, count int) ([]*types.TipSet, error) {
ctx, span := trace.StartSpan(ctx, "bsync.GetBlocks")
defer span.End()
if span.IsRecordingEvents() {
Expand All @@ -273,19 +258,16 @@ func (client *BlockSync) GetBlocks(
Options: Headers,
}

validRes, err := client.doRequest(ctx, req, nil)
validRes, err := c.doRequest(ctx, req, nil)
if err != nil {
return nil, err
}

return validRes.tipsets, nil
}

func (client *BlockSync) GetFullTipSet(
ctx context.Context,
peer peer.ID,
tsk types.TipSetKey,
) (*store.FullTipSet, error) {
// GetFullTipSet implements Client.GetFullTipSet(). Refer to the godocs there.
func (c *client) GetFullTipSet(ctx context.Context, peer peer.ID, tsk types.TipSetKey) (*store.FullTipSet, error) {
// TODO: round robin through these peers on error

req := &Request{
Expand All @@ -294,7 +276,7 @@ func (client *BlockSync) GetFullTipSet(
Options: Headers | Messages,
}

validRes, err := client.doRequest(ctx, req, &peer)
validRes, err := c.doRequest(ctx, req, &peer)
if err != nil {
return nil, err
}
Expand All @@ -304,11 +286,8 @@ func (client *BlockSync) GetFullTipSet(
// *one* tipset here, so it's safe to index directly.
}

func (client *BlockSync) GetChainMessages(
ctx context.Context,
head *types.TipSet,
length uint64,
) ([]*CompactedMessages, error) {
// GetChainMessages implements Client.GetChainMessages(). Refer to the godocs there.
func (c *client) GetChainMessages(ctx context.Context, head *types.TipSet, length uint64) ([]*CompactedMessages, error) {
ctx, span := trace.StartSpan(ctx, "GetChainMessages")
if span.IsRecordingEvents() {
span.AddAttributes(
Expand All @@ -324,7 +303,7 @@ func (client *BlockSync) GetChainMessages(
Options: Messages,
}

validRes, err := client.doRequest(ctx, req, nil)
validRes, err := c.doRequest(ctx, req, nil)
if err != nil {
return nil, err
}
Expand All @@ -335,11 +314,7 @@ func (client *BlockSync) GetChainMessages(
// Send a request to a peer. Write request in the stream and read the
// response back. We do not do any processing of the request/response
// here.
func (client *BlockSync) sendRequestToPeer(
ctx context.Context,
peer peer.ID,
req *Request,
) (_ *Response, err error) {
func (c *client) sendRequestToPeer(ctx context.Context, peer peer.ID, req *Request) (_ *Response, err error) {
// Trace code.
ctx, span := trace.StartSpan(ctx, "sendRequestToPeer")
defer span.End()
Expand All @@ -360,35 +335,33 @@ func (client *BlockSync) sendRequestToPeer(
}()
// -- TRACE --

supported, err := client.host.Peerstore().SupportsProtocols(peer, BlockSyncProtocolID)
supported, err := c.host.Peerstore().SupportsProtocols(peer, BlockSyncProtocolID, ChainExchangeProtocolID)
if err != nil {
client.RemovePeer(peer)
c.RemovePeer(peer)
return nil, xerrors.Errorf("failed to get protocols for peer: %w", err)
}
if len(supported) == 0 || supported[0] != BlockSyncProtocolID {
return nil, xerrors.Errorf("peer %s does not support protocol %s",
peer, BlockSyncProtocolID)
// FIXME: `ProtoBook` should support a *single* protocol check that returns
// a bool instead of a list.
if len(supported) == 0 || (supported[0] != BlockSyncProtocolID && supported[0] != ChainExchangeProtocolID) {
return nil, xerrors.Errorf("peer %s does not support protocols %s",
peer, []string{BlockSyncProtocolID, ChainExchangeProtocolID})
}

connectionStart := build.Clock.Now()

// Open stream to peer.
stream, err := client.host.NewStream(
inet.WithNoDial(ctx, "should already have connection"),
stream, err := c.host.NewStream(
network.WithNoDial(ctx, "should already have connection"),
peer,
BlockSyncProtocolID)
ChainExchangeProtocolID, BlockSyncProtocolID)
if err != nil {
client.RemovePeer(peer)
c.RemovePeer(peer)
return nil, xerrors.Errorf("failed to open stream to peer: %w", err)
}

// Write request.
_ = stream.SetWriteDeadline(time.Now().Add(WRITE_REQ_DEADLINE))
_ = stream.SetWriteDeadline(time.Now().Add(WriteReqDeadline))
if err := cborutil.WriteCborRPC(stream, req); err != nil {
_ = stream.SetWriteDeadline(time.Time{})
client.peerTracker.logFailure(peer, build.Clock.Since(connectionStart), req.Length)
c.peerTracker.logFailure(peer, build.Clock.Since(connectionStart), req.Length)
// FIXME: Should we also remove peer here?
return nil, err
}
Expand All @@ -398,11 +371,11 @@ func (client *BlockSync) sendRequestToPeer(
// Read response.
var res Response
err = cborutil.ReadCborRPC(
bufio.NewReader(incrt.New(stream, READ_RES_MIN_SPEED, READ_RES_DEADLINE)),
bufio.NewReader(incrt.New(stream, ReadResMinSpeed, ReadResDeadline)),
&res)
if err != nil {
client.peerTracker.logFailure(peer, build.Clock.Since(connectionStart), req.Length)
return nil, xerrors.Errorf("failed to read blocksync response: %w", err)
c.peerTracker.logFailure(peer, build.Clock.Since(connectionStart), req.Length)
return nil, xerrors.Errorf("failed to read chainxchg response: %w", err)
}

// FIXME: Move all this together at the top using a defer as done elsewhere.
Expand All @@ -415,32 +388,34 @@ func (client *BlockSync) sendRequestToPeer(
)
}

client.peerTracker.logSuccess(peer, build.Clock.Since(connectionStart), uint64(len(res.Chain)))
c.peerTracker.logSuccess(peer, build.Clock.Since(connectionStart), uint64(len(res.Chain)))
// FIXME: We should really log a success only after we validate the response.
// It might be a bit hard to do.
return &res, nil
}

func (client *BlockSync) AddPeer(p peer.ID) {
client.peerTracker.addPeer(p)
// AddPeer implements Client.AddPeer(). Refer to the godocs there.
func (c *client) AddPeer(p peer.ID) {
c.peerTracker.addPeer(p)
}

func (client *BlockSync) RemovePeer(p peer.ID) {
client.peerTracker.removePeer(p)
// RemovePeer implements Client.RemovePeer(). Refer to the godocs there.
func (c *client) RemovePeer(p peer.ID) {
c.peerTracker.removePeer(p)
}

// getShuffledPeers returns a preference-sorted set of peers (by latency
// and failure counting), shuffling the first few peers so we don't always
// pick the same peer.
// FIXME: Consider merging with `shufflePrefix()s`.
func (client *BlockSync) getShuffledPeers() []peer.ID {
peers := client.peerTracker.prefSortedPeers()
func (c *client) getShuffledPeers() []peer.ID {
peers := c.peerTracker.prefSortedPeers()
shufflePrefix(peers)
return peers
}

func shufflePrefix(peers []peer.ID) {
prefix := SHUFFLE_PEERS_PREFIX
prefix := ShufflePeersPrefix
if len(peers) < prefix {
prefix = len(peers)
}
Expand Down
19 changes: 19 additions & 0 deletions chain/exchange/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Package exchange contains the ChainExchange server and client components.
//
// ChainExchange is the basic chain synchronization protocol of Filecoin.
// ChainExchange is an RPC-oriented protocol, with a single operation to
// request blocks for now.
//
// A request contains a start anchor block (referred to with a CID), and a
// amount of blocks requested beyond the anchor (including the anchor itself).
//
// A client can also pass options, encoded as a 64-bit bitfield. Lotus supports
// two options at the moment:
//
// - include block contents
// - include block messages
//
// The response will include a status code, an optional message, and the
// response payload in case of success. The payload is a slice of serialized
// tipsets.
package exchange
51 changes: 51 additions & 0 deletions chain/exchange/interfaces.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package exchange

import (
"context"

"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"

"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
)

// Server is the responder side of the ChainExchange protocol. It accepts
// requests from clients and services them by returning the requested
// chain data.
type Server interface {
// HandleStream is the protocol handler to be registered on a libp2p
// protocol router.
//
// In the current version of the protocol, streams are single-use. The
// server will read a single Request, and will respond with a single
// Response. It will dispose of the stream straight after.
HandleStream(stream network.Stream)
}

// Client is the requesting side of the ChainExchange protocol. It acts as
// a proxy for other components to request chain data from peers. It is chiefly
// used by the Syncer.
type Client interface {
// GetBlocks fetches block headers from the network, from the provided
// tipset *backwards*, returning as many tipsets as the count parameter,
// or less.
GetBlocks(ctx context.Context, tsk types.TipSetKey, count int) ([]*types.TipSet, error)

// GetChainMessages fetches messages from the network, from the provided
// tipset *backwards*, returning the messages from as many tipsets as the
// count parameter, or less.
GetChainMessages(ctx context.Context, head *types.TipSet, length uint64) ([]*CompactedMessages, error)

// GetFullTipSet fetches a full tipset from a given peer. If successful,
// the fetched object contains block headers and all messages in full form.
GetFullTipSet(ctx context.Context, peer peer.ID, tsk types.TipSetKey) (*store.FullTipSet, error)

// AddPeer adds a peer to the pool of peers that the Client requests
// data from.
AddPeer(peer peer.ID)

// RemovePeer removes a peer from the pool of peers that the Client
// requests data from.
RemovePeer(peer peer.ID)
}
Loading

0 comments on commit 42b9d42

Please sign in to comment.