Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

blocksync: introduce interfaces; rename to ChainExchange (abbrev. chainxchg) #3624

Merged
merged 5 commits into from
Sep 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion chain/blocksync/cbor_gen.go → chain/exchange/cbor_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

139 changes: 57 additions & 82 deletions chain/blocksync/client.go → chain/exchange/client.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package blocksync
package exchange

import (
"bufio"
Expand All @@ -7,26 +7,26 @@ import (
"math/rand"
"time"

host "github.com/libp2p/go-libp2p-core/host"
inet "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"

"go.opencensus.io/trace"
"go.uber.org/fx"
"golang.org/x/xerrors"

cborutil "github.com/filecoin-project/go-cbor-util"

"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
incrt "github.com/filecoin-project/lotus/lib/increadtimeout"
"github.com/filecoin-project/lotus/lib/peermgr"
)

// Protocol client.
// FIXME: Rename to just `Client`. Not done at the moment to avoid
// disrupting too much of the consumer code, should be done along
// https://github.com/filecoin-project/lotus/issues/2612.
type BlockSync struct {
// client implements exchange.Client, using the libp2p ChainExchange protocol
// as the fetching mechanism.
type client struct {
// Connection manager used to contact the server.
// FIXME: We should have a reduced interface here, initialized
// just with our protocol ID, we shouldn't be able to open *any*
Expand All @@ -36,12 +36,12 @@ type BlockSync struct {
peerTracker *bsPeerTracker
}

func NewClient(
lc fx.Lifecycle,
host host.Host,
pmgr peermgr.MaybePeerMgr,
) *BlockSync {
return &BlockSync{
var _ Client = (*client)(nil)

// NewClient creates a new libp2p-based exchange.Client that uses the libp2p
// ChainExhange protocol as the fetching mechanism.
func NewClient(lc fx.Lifecycle, host host.Host, pmgr peermgr.MaybePeerMgr) Client {
return &client{
host: host,
peerTracker: newPeerTracker(lc, host, pmgr.Mgr),
}
Expand All @@ -64,11 +64,7 @@ func NewClient(
// request options without disrupting external calls. In the future the
// consumers should be forced to use a more standardized service and
// adhere to a single API derived from this function.
func (client *BlockSync) doRequest(
ctx context.Context,
req *Request,
singlePeer *peer.ID,
) (*validatedResponse, error) {
func (c *client) doRequest(ctx context.Context, req *Request, singlePeer *peer.ID) (*validatedResponse, error) {
// Validate request.
if req.Length == 0 {
return nil, xerrors.Errorf("invalid request of length 0")
Expand All @@ -88,7 +84,7 @@ func (client *BlockSync) doRequest(
if singlePeer != nil {
peers = []peer.ID{*singlePeer}
} else {
peers = client.getShuffledPeers()
peers = c.getShuffledPeers()
if len(peers) == 0 {
return nil, xerrors.Errorf("no peers available")
}
Expand All @@ -109,25 +105,25 @@ func (client *BlockSync) doRequest(
}

// Send request, read response.
res, err := client.sendRequestToPeer(ctx, peer, req)
res, err := c.sendRequestToPeer(ctx, peer, req)
if err != nil {
if !xerrors.Is(err, inet.ErrNoConn) {
if !xerrors.Is(err, network.ErrNoConn) {
log.Warnf("could not connect to peer %s: %s",
peer.String(), err)
}
continue
}

// Process and validate response.
validRes, err := client.processResponse(req, res)
validRes, err := c.processResponse(req, res)
if err != nil {
log.Warnf("processing peer %s response failed: %s",
peer.String(), err)
continue
}

client.peerTracker.logGlobalSuccess(build.Clock.Since(globalTime))
client.host.ConnManager().TagPeer(peer, "bsync", SUCCESS_PEER_TAG_VALUE)
c.peerTracker.logGlobalSuccess(build.Clock.Since(globalTime))
c.host.ConnManager().TagPeer(peer, "bsync", SuccessPeerTagValue)
return validRes, nil
}

Expand All @@ -146,11 +142,8 @@ func (client *BlockSync) doRequest(
// We are conflating in the single error returned both status and validation
// errors. Peer penalization should happen here then, before returning, so
// we can apply the correct penalties depending on the cause of the error.
func (client *BlockSync) processResponse(
req *Request,
res *Response,
// FIXME: Add the `peer` as argument once we implement penalties.
) (*validatedResponse, error) {
// FIXME: Add the `peer` as argument once we implement penalties.
func (c *client) processResponse(req *Request, res *Response) (*validatedResponse, error) {
err := res.statusToError()
if err != nil {
return nil, xerrors.Errorf("status error: %s", err)
Expand Down Expand Up @@ -248,16 +241,8 @@ func (client *BlockSync) processResponse(
return validRes, nil
}

// GetBlocks fetches count blocks from the network, from the provided tipset
// *backwards*, returning as many tipsets as count.
//
// {hint/usage}: This is used by the Syncer during normal chain syncing and when
// resolving forks.
func (client *BlockSync) GetBlocks(
ctx context.Context,
tsk types.TipSetKey,
count int,
) ([]*types.TipSet, error) {
// GetBlocks implements Client.GetBlocks(). Refer to the godocs there.
func (c *client) GetBlocks(ctx context.Context, tsk types.TipSetKey, count int) ([]*types.TipSet, error) {
ctx, span := trace.StartSpan(ctx, "bsync.GetBlocks")
defer span.End()
if span.IsRecordingEvents() {
Expand All @@ -273,19 +258,16 @@ func (client *BlockSync) GetBlocks(
Options: Headers,
}

validRes, err := client.doRequest(ctx, req, nil)
validRes, err := c.doRequest(ctx, req, nil)
if err != nil {
return nil, err
}

return validRes.tipsets, nil
}

func (client *BlockSync) GetFullTipSet(
ctx context.Context,
peer peer.ID,
tsk types.TipSetKey,
) (*store.FullTipSet, error) {
// GetFullTipSet implements Client.GetFullTipSet(). Refer to the godocs there.
func (c *client) GetFullTipSet(ctx context.Context, peer peer.ID, tsk types.TipSetKey) (*store.FullTipSet, error) {
// TODO: round robin through these peers on error

req := &Request{
Expand All @@ -294,7 +276,7 @@ func (client *BlockSync) GetFullTipSet(
Options: Headers | Messages,
}

validRes, err := client.doRequest(ctx, req, &peer)
validRes, err := c.doRequest(ctx, req, &peer)
if err != nil {
return nil, err
}
Expand All @@ -304,11 +286,8 @@ func (client *BlockSync) GetFullTipSet(
// *one* tipset here, so it's safe to index directly.
}

func (client *BlockSync) GetChainMessages(
ctx context.Context,
head *types.TipSet,
length uint64,
) ([]*CompactedMessages, error) {
// GetChainMessages implements Client.GetChainMessages(). Refer to the godocs there.
func (c *client) GetChainMessages(ctx context.Context, head *types.TipSet, length uint64) ([]*CompactedMessages, error) {
ctx, span := trace.StartSpan(ctx, "GetChainMessages")
if span.IsRecordingEvents() {
span.AddAttributes(
Expand All @@ -324,7 +303,7 @@ func (client *BlockSync) GetChainMessages(
Options: Messages,
}

validRes, err := client.doRequest(ctx, req, nil)
validRes, err := c.doRequest(ctx, req, nil)
if err != nil {
return nil, err
}
Expand All @@ -335,11 +314,7 @@ func (client *BlockSync) GetChainMessages(
// Send a request to a peer. Write request in the stream and read the
// response back. We do not do any processing of the request/response
// here.
func (client *BlockSync) sendRequestToPeer(
ctx context.Context,
peer peer.ID,
req *Request,
) (_ *Response, err error) {
func (c *client) sendRequestToPeer(ctx context.Context, peer peer.ID, req *Request) (_ *Response, err error) {
// Trace code.
ctx, span := trace.StartSpan(ctx, "sendRequestToPeer")
defer span.End()
Expand All @@ -360,35 +335,33 @@ func (client *BlockSync) sendRequestToPeer(
}()
// -- TRACE --

supported, err := client.host.Peerstore().SupportsProtocols(peer, BlockSyncProtocolID)
supported, err := c.host.Peerstore().SupportsProtocols(peer, BlockSyncProtocolID, ChainExchangeProtocolID)
if err != nil {
client.RemovePeer(peer)
c.RemovePeer(peer)
return nil, xerrors.Errorf("failed to get protocols for peer: %w", err)
}
if len(supported) == 0 || supported[0] != BlockSyncProtocolID {
return nil, xerrors.Errorf("peer %s does not support protocol %s",
peer, BlockSyncProtocolID)
// FIXME: `ProtoBook` should support a *single* protocol check that returns
// a bool instead of a list.
if len(supported) == 0 || (supported[0] != BlockSyncProtocolID && supported[0] != ChainExchangeProtocolID) {
return nil, xerrors.Errorf("peer %s does not support protocols %s",
peer, []string{BlockSyncProtocolID, ChainExchangeProtocolID})
}

connectionStart := build.Clock.Now()

// Open stream to peer.
stream, err := client.host.NewStream(
inet.WithNoDial(ctx, "should already have connection"),
stream, err := c.host.NewStream(
network.WithNoDial(ctx, "should already have connection"),
peer,
BlockSyncProtocolID)
ChainExchangeProtocolID, BlockSyncProtocolID)
if err != nil {
client.RemovePeer(peer)
c.RemovePeer(peer)
return nil, xerrors.Errorf("failed to open stream to peer: %w", err)
}

// Write request.
_ = stream.SetWriteDeadline(time.Now().Add(WRITE_REQ_DEADLINE))
_ = stream.SetWriteDeadline(time.Now().Add(WriteReqDeadline))
if err := cborutil.WriteCborRPC(stream, req); err != nil {
_ = stream.SetWriteDeadline(time.Time{})
client.peerTracker.logFailure(peer, build.Clock.Since(connectionStart), req.Length)
c.peerTracker.logFailure(peer, build.Clock.Since(connectionStart), req.Length)
// FIXME: Should we also remove peer here?
return nil, err
}
Expand All @@ -398,11 +371,11 @@ func (client *BlockSync) sendRequestToPeer(
// Read response.
var res Response
err = cborutil.ReadCborRPC(
bufio.NewReader(incrt.New(stream, READ_RES_MIN_SPEED, READ_RES_DEADLINE)),
bufio.NewReader(incrt.New(stream, ReadResMinSpeed, ReadResDeadline)),
&res)
if err != nil {
client.peerTracker.logFailure(peer, build.Clock.Since(connectionStart), req.Length)
return nil, xerrors.Errorf("failed to read blocksync response: %w", err)
c.peerTracker.logFailure(peer, build.Clock.Since(connectionStart), req.Length)
return nil, xerrors.Errorf("failed to read chainxchg response: %w", err)
}

// FIXME: Move all this together at the top using a defer as done elsewhere.
Expand All @@ -415,32 +388,34 @@ func (client *BlockSync) sendRequestToPeer(
)
}

client.peerTracker.logSuccess(peer, build.Clock.Since(connectionStart), uint64(len(res.Chain)))
c.peerTracker.logSuccess(peer, build.Clock.Since(connectionStart), uint64(len(res.Chain)))
// FIXME: We should really log a success only after we validate the response.
// It might be a bit hard to do.
return &res, nil
}

func (client *BlockSync) AddPeer(p peer.ID) {
client.peerTracker.addPeer(p)
// AddPeer implements Client.AddPeer(). Refer to the godocs there.
func (c *client) AddPeer(p peer.ID) {
c.peerTracker.addPeer(p)
}

func (client *BlockSync) RemovePeer(p peer.ID) {
client.peerTracker.removePeer(p)
// RemovePeer implements Client.RemovePeer(). Refer to the godocs there.
func (c *client) RemovePeer(p peer.ID) {
c.peerTracker.removePeer(p)
}

// getShuffledPeers returns a preference-sorted set of peers (by latency
// and failure counting), shuffling the first few peers so we don't always
// pick the same peer.
// FIXME: Consider merging with `shufflePrefix()s`.
func (client *BlockSync) getShuffledPeers() []peer.ID {
peers := client.peerTracker.prefSortedPeers()
func (c *client) getShuffledPeers() []peer.ID {
peers := c.peerTracker.prefSortedPeers()
shufflePrefix(peers)
return peers
}

func shufflePrefix(peers []peer.ID) {
prefix := SHUFFLE_PEERS_PREFIX
prefix := ShufflePeersPrefix
if len(peers) < prefix {
prefix = len(peers)
}
Expand Down
19 changes: 19 additions & 0 deletions chain/exchange/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Package exchange contains the ChainExchange server and client components.
//
// ChainExchange is the basic chain synchronization protocol of Filecoin.
// ChainExchange is an RPC-oriented protocol, with a single operation to
// request blocks for now.
//
// A request contains a start anchor block (referred to with a CID), and a
// amount of blocks requested beyond the anchor (including the anchor itself).
//
// A client can also pass options, encoded as a 64-bit bitfield. Lotus supports
// two options at the moment:
//
// - include block contents
// - include block messages
//
// The response will include a status code, an optional message, and the
// response payload in case of success. The payload is a slice of serialized
// tipsets.
package exchange
51 changes: 51 additions & 0 deletions chain/exchange/interfaces.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package exchange

import (
"context"

"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"

"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
)

// Server is the responder side of the ChainExchange protocol. It accepts
// requests from clients and services them by returning the requested
// chain data.
type Server interface {
// HandleStream is the protocol handler to be registered on a libp2p
// protocol router.
//
// In the current version of the protocol, streams are single-use. The
// server will read a single Request, and will respond with a single
// Response. It will dispose of the stream straight after.
HandleStream(stream network.Stream)
}

// Client is the requesting side of the ChainExchange protocol. It acts as
// a proxy for other components to request chain data from peers. It is chiefly
// used by the Syncer.
type Client interface {
// GetBlocks fetches block headers from the network, from the provided
// tipset *backwards*, returning as many tipsets as the count parameter,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does "backwards" mean here? Decending in height?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep!

// or less.
GetBlocks(ctx context.Context, tsk types.TipSetKey, count int) ([]*types.TipSet, error)

// GetChainMessages fetches messages from the network, from the provided
// tipset *backwards*, returning the messages from as many tipsets as the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here - descending?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep!

// count parameter, or less.
GetChainMessages(ctx context.Context, head *types.TipSet, length uint64) ([]*CompactedMessages, error)

// GetFullTipSet fetches a full tipset from a given peer. If successful,
// the fetched object contains block headers and all messages in full form.
GetFullTipSet(ctx context.Context, peer peer.ID, tsk types.TipSetKey) (*store.FullTipSet, error)

// AddPeer adds a peer to the pool of peers that the Client requests
// data from.
AddPeer(peer peer.ID)

// RemovePeer removes a peer from the pool of peers that the Client
// requests data from.
RemovePeer(peer peer.ID)
}
Loading