From 76d40ec4085f2ebca1d1644f06cbda15430feea6 Mon Sep 17 00:00:00 2001 From: Lucas Molas Date: Mon, 27 Jul 2020 12:31:36 -0300 Subject: [PATCH 01/17] first pass, ready for review --- build/clock.go | 2 +- chain/blocksync/blocksync.go | 276 ------------- chain/blocksync/blocksync_client.go | 603 ---------------------------- chain/blocksync/client.go | 446 ++++++++++++++++++++ chain/blocksync/graphsync_client.go | 151 ------- chain/blocksync/peer_tracker.go | 169 ++++++++ chain/blocksync/protocol.go | 177 ++++++++ chain/blocksync/server.go | 263 ++++++++++++ chain/sub/incoming.go | 110 ++++- chain/sync.go | 12 +- chain/types/tipset.go | 4 + gen/main.go | 5 +- node/builder.go | 2 +- node/modules/services.go | 4 +- 14 files changed, 1179 insertions(+), 1045 deletions(-) delete mode 100644 chain/blocksync/blocksync.go delete mode 100644 chain/blocksync/blocksync_client.go create mode 100644 chain/blocksync/client.go delete mode 100644 chain/blocksync/graphsync_client.go create mode 100644 chain/blocksync/peer_tracker.go create mode 100644 chain/blocksync/protocol.go create mode 100644 chain/blocksync/server.go diff --git a/build/clock.go b/build/clock.go index 5b172672053..a3943897d6a 100644 --- a/build/clock.go +++ b/build/clock.go @@ -6,5 +6,5 @@ import "github.com/raulk/clock" // we use a real-time clock, which maps to the `time` package. // // Tests that need control of time can replace this variable with -// clock.NewMock(). +// clock.NewMock(). Always use real time for socket/stream deadlines. var Clock = clock.New() diff --git a/chain/blocksync/blocksync.go b/chain/blocksync/blocksync.go deleted file mode 100644 index f5483eeaeea..00000000000 --- a/chain/blocksync/blocksync.go +++ /dev/null @@ -1,276 +0,0 @@ -package blocksync - -import ( - "bufio" - "context" - "time" - - "github.com/libp2p/go-libp2p-core/protocol" - "go.opencensus.io/trace" - "golang.org/x/xerrors" - - cborutil "github.com/filecoin-project/go-cbor-util" - - "github.com/filecoin-project/lotus/chain/store" - "github.com/filecoin-project/lotus/chain/types" - - "github.com/ipfs/go-cid" - logging "github.com/ipfs/go-log/v2" - inet "github.com/libp2p/go-libp2p-core/network" - "github.com/libp2p/go-libp2p-core/peer" -) - -var log = logging.Logger("blocksync") - -type NewStreamFunc func(context.Context, peer.ID, ...protocol.ID) (inet.Stream, error) - -const BlockSyncProtocolID = "/fil/sync/blk/0.0.1" - -const BlockSyncMaxRequestLength = 800 - -// BlockSyncService is the component that services BlockSync requests from -// peers. -// -// BlockSync is the basic chain synchronization protocol of Filecoin. BlockSync -// is an RPC-oriented protocol, with a single operation to request blocks. -// -// A request contains a start anchor block (referred to with a CID), and a -// amount of blocks requested beyond the anchor (including the anchor itself). -// -// A client can also pass options, encoded as a 64-bit bitfield. Lotus supports -// two options at the moment: -// -// - include block contents -// - include block messages -// -// The response will include a status code, an optional message, and the -// response payload in case of success. The payload is a slice of serialized -// tipsets. -type BlockSyncService struct { - cs *store.ChainStore -} - -type BlockSyncRequest struct { - Start []cid.Cid - RequestLength uint64 - - Options uint64 -} - -type BSOptions struct { - IncludeBlocks bool - IncludeMessages bool -} - -func ParseBSOptions(optfield uint64) *BSOptions { - return &BSOptions{ - IncludeBlocks: optfield&(BSOptBlocks) != 0, - IncludeMessages: optfield&(BSOptMessages) != 0, - } -} - -const ( - BSOptBlocks = 1 << iota - BSOptMessages -) - -const ( - StatusOK = uint64(0) - StatusPartial = uint64(101) - StatusNotFound = uint64(201) - StatusGoAway = uint64(202) - StatusInternalError = uint64(203) - StatusBadRequest = uint64(204) -) - -type BlockSyncResponse struct { - Chain []*BSTipSet - - Status uint64 - Message string -} - -type BSTipSet struct { - Blocks []*types.BlockHeader - - BlsMessages []*types.Message - BlsMsgIncludes [][]uint64 - - SecpkMessages []*types.SignedMessage - SecpkMsgIncludes [][]uint64 -} - -func NewBlockSyncService(cs *store.ChainStore) *BlockSyncService { - return &BlockSyncService{ - cs: cs, - } -} - -func (bss *BlockSyncService) HandleStream(s inet.Stream) { - ctx, span := trace.StartSpan(context.Background(), "blocksync.HandleStream") - defer span.End() - - defer s.Close() //nolint:errcheck - - var req BlockSyncRequest - if err := cborutil.ReadCborRPC(bufio.NewReader(s), &req); err != nil { - log.Warnf("failed to read block sync request: %s", err) - return - } - log.Infow("block sync request", "start", req.Start, "len", req.RequestLength) - - resp, err := bss.processRequest(ctx, s.Conn().RemotePeer(), &req) - if err != nil { - log.Warn("failed to process block sync request: ", err) - return - } - - writeDeadline := 60 * time.Second - _ = s.SetDeadline(time.Now().Add(writeDeadline)) // always use real time for socket/stream deadlines. - if err := cborutil.WriteCborRPC(s, resp); err != nil { - log.Warnw("failed to write back response for handle stream", "err", err, "peer", s.Conn().RemotePeer()) - return - } -} - -func (bss *BlockSyncService) processRequest(ctx context.Context, p peer.ID, req *BlockSyncRequest) (*BlockSyncResponse, error) { - _, span := trace.StartSpan(ctx, "blocksync.ProcessRequest") - defer span.End() - - opts := ParseBSOptions(req.Options) - if len(req.Start) == 0 { - return &BlockSyncResponse{ - Status: StatusBadRequest, - Message: "no cids given in blocksync request", - }, nil - } - - span.AddAttributes( - trace.BoolAttribute("blocks", opts.IncludeBlocks), - trace.BoolAttribute("messages", opts.IncludeMessages), - trace.Int64Attribute("reqlen", int64(req.RequestLength)), - ) - - reqlen := req.RequestLength - if reqlen > BlockSyncMaxRequestLength { - log.Warnw("limiting blocksync request length", "orig", req.RequestLength, "peer", p) - reqlen = BlockSyncMaxRequestLength - } - - chain, err := collectChainSegment(bss.cs, types.NewTipSetKey(req.Start...), reqlen, opts) - if err != nil { - log.Warn("encountered error while responding to block sync request: ", err) - return &BlockSyncResponse{ - Status: StatusInternalError, - Message: err.Error(), - }, nil - } - - status := StatusOK - if reqlen < req.RequestLength { - status = StatusPartial - } - - return &BlockSyncResponse{ - Chain: chain, - Status: status, - }, nil -} - -func collectChainSegment(cs *store.ChainStore, start types.TipSetKey, length uint64, opts *BSOptions) ([]*BSTipSet, error) { - var bstips []*BSTipSet - cur := start - for { - var bst BSTipSet - ts, err := cs.LoadTipSet(cur) - if err != nil { - return nil, xerrors.Errorf("failed loading tipset %s: %w", cur, err) - } - - if opts.IncludeMessages { - bmsgs, bmincl, smsgs, smincl, err := gatherMessages(cs, ts) - if err != nil { - return nil, xerrors.Errorf("gather messages failed: %w", err) - } - - bst.BlsMessages = bmsgs - bst.BlsMsgIncludes = bmincl - bst.SecpkMessages = smsgs - bst.SecpkMsgIncludes = smincl - } - - if opts.IncludeBlocks { - bst.Blocks = ts.Blocks() - } - - bstips = append(bstips, &bst) - - if uint64(len(bstips)) >= length || ts.Height() == 0 { - return bstips, nil - } - - cur = ts.Parents() - } -} - -func gatherMessages(cs *store.ChainStore, ts *types.TipSet) ([]*types.Message, [][]uint64, []*types.SignedMessage, [][]uint64, error) { - blsmsgmap := make(map[cid.Cid]uint64) - secpkmsgmap := make(map[cid.Cid]uint64) - var secpkmsgs []*types.SignedMessage - var blsmsgs []*types.Message - var secpkincl, blsincl [][]uint64 - - for _, b := range ts.Blocks() { - bmsgs, smsgs, err := cs.MessagesForBlock(b) - if err != nil { - return nil, nil, nil, nil, err - } - - bmi := make([]uint64, 0, len(bmsgs)) - for _, m := range bmsgs { - i, ok := blsmsgmap[m.Cid()] - if !ok { - i = uint64(len(blsmsgs)) - blsmsgs = append(blsmsgs, m) - blsmsgmap[m.Cid()] = i - } - - bmi = append(bmi, i) - } - blsincl = append(blsincl, bmi) - - smi := make([]uint64, 0, len(smsgs)) - for _, m := range smsgs { - i, ok := secpkmsgmap[m.Cid()] - if !ok { - i = uint64(len(secpkmsgs)) - secpkmsgs = append(secpkmsgs, m) - secpkmsgmap[m.Cid()] = i - } - - smi = append(smi, i) - } - secpkincl = append(secpkincl, smi) - } - - return blsmsgs, blsincl, secpkmsgs, secpkincl, nil -} - -func bstsToFullTipSet(bts *BSTipSet) (*store.FullTipSet, error) { - fts := &store.FullTipSet{} - for i, b := range bts.Blocks { - fb := &types.FullBlock{ - Header: b, - } - for _, mi := range bts.BlsMsgIncludes[i] { - fb.BlsMessages = append(fb.BlsMessages, bts.BlsMessages[mi]) - } - for _, mi := range bts.SecpkMsgIncludes[i] { - fb.SecpkMessages = append(fb.SecpkMessages, bts.SecpkMessages[mi]) - } - - fts.Blocks = append(fts.Blocks, fb) - } - - return fts, nil -} diff --git a/chain/blocksync/blocksync_client.go b/chain/blocksync/blocksync_client.go deleted file mode 100644 index 67e4fde8f76..00000000000 --- a/chain/blocksync/blocksync_client.go +++ /dev/null @@ -1,603 +0,0 @@ -package blocksync - -import ( - "bufio" - "context" - "fmt" - "math/rand" - "sort" - "sync" - "time" - - blocks "github.com/ipfs/go-block-format" - bserv "github.com/ipfs/go-blockservice" - "github.com/ipfs/go-cid" - graphsync "github.com/ipfs/go-graphsync" - gsnet "github.com/ipfs/go-graphsync/network" - host "github.com/libp2p/go-libp2p-core/host" - inet "github.com/libp2p/go-libp2p-core/network" - "github.com/libp2p/go-libp2p-core/peer" - "go.opencensus.io/trace" - "golang.org/x/xerrors" - - cborutil "github.com/filecoin-project/go-cbor-util" - "github.com/filecoin-project/lotus/build" - "github.com/filecoin-project/lotus/chain/store" - "github.com/filecoin-project/lotus/chain/types" - incrt "github.com/filecoin-project/lotus/lib/increadtimeout" - "github.com/filecoin-project/lotus/lib/peermgr" - "github.com/filecoin-project/lotus/node/modules/dtypes" -) - -type BlockSync struct { - bserv bserv.BlockService - gsync graphsync.GraphExchange - host host.Host - - syncPeers *bsPeerTracker - peerMgr *peermgr.PeerMgr -} - -func NewBlockSyncClient(bserv dtypes.ChainBlockService, h host.Host, pmgr peermgr.MaybePeerMgr, gs dtypes.Graphsync) *BlockSync { - return &BlockSync{ - bserv: bserv, - host: h, - syncPeers: newPeerTracker(pmgr.Mgr), - peerMgr: pmgr.Mgr, - gsync: gs, - } -} - -func (bs *BlockSync) processStatus(req *BlockSyncRequest, res *BlockSyncResponse) error { - switch res.Status { - case StatusPartial: // Partial Response - return xerrors.Errorf("not handling partial blocksync responses yet") - case StatusNotFound: // req.Start not found - return xerrors.Errorf("not found") - case StatusGoAway: // Go Away - return xerrors.Errorf("not handling 'go away' blocksync responses yet") - case StatusInternalError: // Internal Error - return xerrors.Errorf("block sync peer errored: %s", res.Message) - case StatusBadRequest: - return xerrors.Errorf("block sync request invalid: %s", res.Message) - default: - return xerrors.Errorf("unrecognized response code: %d", res.Status) - } -} - -// GetBlocks fetches count blocks from the network, from the provided tipset -// *backwards*, returning as many tipsets as count. -// -// {hint/usage}: This is used by the Syncer during normal chain syncing and when -// resolving forks. -func (bs *BlockSync) GetBlocks(ctx context.Context, tsk types.TipSetKey, count int) ([]*types.TipSet, error) { - ctx, span := trace.StartSpan(ctx, "bsync.GetBlocks") - defer span.End() - if span.IsRecordingEvents() { - span.AddAttributes( - trace.StringAttribute("tipset", fmt.Sprint(tsk.Cids())), - trace.Int64Attribute("count", int64(count)), - ) - } - - req := &BlockSyncRequest{ - Start: tsk.Cids(), - RequestLength: uint64(count), - Options: BSOptBlocks, - } - - // this peerset is sorted by latency and failure counting. - peers := bs.getPeers() - - // randomize the first few peers so we don't always pick the same peer - shufflePrefix(peers) - - start := build.Clock.Now() - var oerr error - - for _, p := range peers { - // TODO: doing this synchronously isnt great, but fetching in parallel - // may not be a good idea either. think about this more - select { - case <-ctx.Done(): - return nil, xerrors.Errorf("blocksync getblocks failed: %w", ctx.Err()) - default: - } - - res, err := bs.sendRequestToPeer(ctx, p, req) - if err != nil { - oerr = err - if !xerrors.Is(err, inet.ErrNoConn) { - log.Warnf("BlockSync request failed for peer %s: %s", p.String(), err) - } - continue - } - - if res.Status == StatusOK || res.Status == StatusPartial { - resp, err := bs.processBlocksResponse(req, res) - if err != nil { - return nil, xerrors.Errorf("success response from peer failed to process: %w", err) - } - bs.syncPeers.logGlobalSuccess(build.Clock.Since(start)) - bs.host.ConnManager().TagPeer(p, "bsync", 25) - return resp, nil - } - - oerr = bs.processStatus(req, res) - if oerr != nil { - log.Warnf("BlockSync peer %s response was an error: %s", p.String(), oerr) - } - } - return nil, xerrors.Errorf("GetBlocks failed with all peers: %w", oerr) -} - -func (bs *BlockSync) GetFullTipSet(ctx context.Context, p peer.ID, tsk types.TipSetKey) (*store.FullTipSet, error) { - // TODO: round robin through these peers on error - - req := &BlockSyncRequest{ - Start: tsk.Cids(), - RequestLength: 1, - Options: BSOptBlocks | BSOptMessages, - } - - res, err := bs.sendRequestToPeer(ctx, p, req) - if err != nil { - return nil, err - } - - switch res.Status { - case 0: // Success - if len(res.Chain) == 0 { - return nil, fmt.Errorf("got zero length chain response") - } - bts := res.Chain[0] - - return bstsToFullTipSet(bts) - case 101: // Partial Response - return nil, xerrors.Errorf("partial responses are not handled for single tipset fetching") - case 201: // req.Start not found - return nil, fmt.Errorf("not found") - case 202: // Go Away - return nil, xerrors.Errorf("received 'go away' response peer") - case 203: // Internal Error - return nil, fmt.Errorf("block sync peer errored: %q", res.Message) - case 204: // Invalid Request - return nil, fmt.Errorf("block sync request invalid: %q", res.Message) - default: - return nil, fmt.Errorf("unrecognized response code") - } -} - -func shufflePrefix(peers []peer.ID) { - pref := 5 - if len(peers) < pref { - pref = len(peers) - } - - buf := make([]peer.ID, pref) - perm := rand.Perm(pref) - for i, v := range perm { - buf[i] = peers[v] - } - - copy(peers, buf) -} - -func (bs *BlockSync) GetChainMessages(ctx context.Context, h *types.TipSet, count uint64) ([]*BSTipSet, error) { - ctx, span := trace.StartSpan(ctx, "GetChainMessages") - defer span.End() - - peers := bs.getPeers() - // randomize the first few peers so we don't always pick the same peer - shufflePrefix(peers) - - req := &BlockSyncRequest{ - Start: h.Cids(), - RequestLength: count, - Options: BSOptMessages, - } - - var err error - start := build.Clock.Now() - - for _, p := range peers { - res, rerr := bs.sendRequestToPeer(ctx, p, req) - if rerr != nil { - err = rerr - log.Warnf("BlockSync request failed for peer %s: %s", p.String(), err) - continue - } - - if res.Status == StatusOK { - bs.syncPeers.logGlobalSuccess(build.Clock.Since(start)) - return res.Chain, nil - } - - if res.Status == StatusPartial { - // TODO: track partial response sizes to ensure we don't overrequest too often - return res.Chain, nil - } - - err = bs.processStatus(req, res) - if err != nil { - log.Warnf("BlockSync peer %s response was an error: %s", p.String(), err) - } - } - - if err == nil { - return nil, xerrors.Errorf("GetChainMessages failed, no peers connected") - } - - // TODO: What if we have no peers (and err is nil)? - return nil, xerrors.Errorf("GetChainMessages failed with all peers(%d): %w", len(peers), err) -} - -func (bs *BlockSync) sendRequestToPeer(ctx context.Context, p peer.ID, req *BlockSyncRequest) (_ *BlockSyncResponse, err error) { - ctx, span := trace.StartSpan(ctx, "sendRequestToPeer") - defer span.End() - - defer func() { - if err != nil { - if span.IsRecordingEvents() { - span.SetStatus(trace.Status{ - Code: 5, - Message: err.Error(), - }) - } - } - }() - - if span.IsRecordingEvents() { - span.AddAttributes( - trace.StringAttribute("peer", p.Pretty()), - ) - } - - gsproto := string(gsnet.ProtocolGraphsync) - supp, err := bs.host.Peerstore().SupportsProtocols(p, BlockSyncProtocolID, gsproto) - if err != nil { - return nil, xerrors.Errorf("failed to get protocols for peer: %w", err) - } - - if len(supp) == 0 { - return nil, xerrors.Errorf("peer %s supports no known sync protocols", p) - } - - switch supp[0] { - case BlockSyncProtocolID: - res, err := bs.fetchBlocksBlockSync(ctx, p, req) - if err != nil { - return nil, xerrors.Errorf("blocksync req failed: %w", err) - } - return res, nil - case gsproto: - res, err := bs.fetchBlocksGraphSync(ctx, p, req) - if err != nil { - return nil, xerrors.Errorf("graphsync req failed: %w", err) - } - return res, nil - default: - return nil, xerrors.Errorf("peerstore somehow returned unexpected protocols: %v", supp) - } - -} -func (bs *BlockSync) fetchBlocksBlockSync(ctx context.Context, p peer.ID, req *BlockSyncRequest) (*BlockSyncResponse, error) { - ctx, span := trace.StartSpan(ctx, "blockSyncFetch") - defer span.End() - - start := build.Clock.Now() - s, err := bs.host.NewStream(inet.WithNoDial(ctx, "should already have connection"), p, BlockSyncProtocolID) - if err != nil { - bs.RemovePeer(p) - return nil, xerrors.Errorf("failed to open stream to peer: %w", err) - } - _ = s.SetWriteDeadline(time.Now().Add(5 * time.Second)) // always use real time for socket/stream deadlines. - - if err := cborutil.WriteCborRPC(s, req); err != nil { - _ = s.SetWriteDeadline(time.Time{}) - bs.syncPeers.logFailure(p, build.Clock.Since(start)) - return nil, err - } - _ = s.SetWriteDeadline(time.Time{}) - - var res BlockSyncResponse - r := incrt.New(s, 50<<10, 5*time.Second) - if err := cborutil.ReadCborRPC(bufio.NewReader(r), &res); err != nil { - bs.syncPeers.logFailure(p, build.Clock.Since(start)) - return nil, err - } - - if span.IsRecordingEvents() { - span.AddAttributes( - trace.Int64Attribute("resp_status", int64(res.Status)), - trace.StringAttribute("msg", res.Message), - trace.Int64Attribute("chain_len", int64(len(res.Chain))), - ) - } - - bs.syncPeers.logSuccess(p, build.Clock.Since(start)) - return &res, nil -} - -func (bs *BlockSync) processBlocksResponse(req *BlockSyncRequest, res *BlockSyncResponse) ([]*types.TipSet, error) { - if len(res.Chain) == 0 { - return nil, xerrors.Errorf("got no blocks in successful blocksync response") - } - - cur, err := types.NewTipSet(res.Chain[0].Blocks) - if err != nil { - return nil, err - } - - out := []*types.TipSet{cur} - for bi := 1; bi < len(res.Chain); bi++ { - next := res.Chain[bi].Blocks - nts, err := types.NewTipSet(next) - if err != nil { - return nil, err - } - - if !types.CidArrsEqual(cur.Parents().Cids(), nts.Cids()) { - return nil, fmt.Errorf("parents of tipset[%d] were not tipset[%d]", bi-1, bi) - } - - out = append(out, nts) - cur = nts - } - return out, nil -} - -func (bs *BlockSync) GetBlock(ctx context.Context, c cid.Cid) (*types.BlockHeader, error) { - sb, err := bs.bserv.GetBlock(ctx, c) - if err != nil { - return nil, err - } - - return types.DecodeBlock(sb.RawData()) -} - -func (bs *BlockSync) AddPeer(p peer.ID) { - bs.syncPeers.addPeer(p) -} - -func (bs *BlockSync) RemovePeer(p peer.ID) { - bs.syncPeers.removePeer(p) -} - -// getPeers returns a preference-sorted set of peers to query. -func (bs *BlockSync) getPeers() []peer.ID { - return bs.syncPeers.prefSortedPeers() -} - -func (bs *BlockSync) FetchMessagesByCids(ctx context.Context, cids []cid.Cid) ([]*types.Message, error) { - out := make([]*types.Message, len(cids)) - - err := bs.fetchCids(ctx, cids, func(i int, b blocks.Block) error { - msg, err := types.DecodeMessage(b.RawData()) - if err != nil { - return err - } - - if out[i] != nil { - return fmt.Errorf("received duplicate message") - } - - out[i] = msg - return nil - }) - if err != nil { - return nil, err - } - return out, nil -} - -func (bs *BlockSync) FetchSignedMessagesByCids(ctx context.Context, cids []cid.Cid) ([]*types.SignedMessage, error) { - out := make([]*types.SignedMessage, len(cids)) - - err := bs.fetchCids(ctx, cids, func(i int, b blocks.Block) error { - smsg, err := types.DecodeSignedMessage(b.RawData()) - if err != nil { - return err - } - - if out[i] != nil { - return fmt.Errorf("received duplicate message") - } - - out[i] = smsg - return nil - }) - if err != nil { - return nil, err - } - return out, nil -} - -func (bs *BlockSync) fetchCids(ctx context.Context, cids []cid.Cid, cb func(int, blocks.Block) error) error { - resp := bs.bserv.GetBlocks(context.TODO(), cids) - - m := make(map[cid.Cid]int) - for i, c := range cids { - m[c] = i - } - - for i := 0; i < len(cids); i++ { - select { - case v, ok := <-resp: - if !ok { - if i == len(cids)-1 { - break - } - - return fmt.Errorf("failed to fetch all messages") - } - - ix, ok := m[v.Cid()] - if !ok { - return fmt.Errorf("received message we didnt ask for") - } - - if err := cb(ix, v); err != nil { - return err - } - } - } - - return nil -} - -type peerStats struct { - successes int - failures int - firstSeen time.Time - averageTime time.Duration -} - -type bsPeerTracker struct { - lk sync.Mutex - - peers map[peer.ID]*peerStats - avgGlobalTime time.Duration - - pmgr *peermgr.PeerMgr -} - -func newPeerTracker(pmgr *peermgr.PeerMgr) *bsPeerTracker { - return &bsPeerTracker{ - peers: make(map[peer.ID]*peerStats), - pmgr: pmgr, - } -} - -func (bpt *bsPeerTracker) addPeer(p peer.ID) { - bpt.lk.Lock() - defer bpt.lk.Unlock() - if _, ok := bpt.peers[p]; ok { - return - } - bpt.peers[p] = &peerStats{ - firstSeen: build.Clock.Now(), - } - -} - -const ( - // newPeerMul is how much better than average is the new peer assumed to be - // less than one to encourouge trying new peers - newPeerMul = 0.9 -) - -func (bpt *bsPeerTracker) prefSortedPeers() []peer.ID { - // TODO: this could probably be cached, but as long as its not too many peers, fine for now - bpt.lk.Lock() - defer bpt.lk.Unlock() - out := make([]peer.ID, 0, len(bpt.peers)) - for p := range bpt.peers { - out = append(out, p) - } - - // sort by 'expected cost' of requesting data from that peer - // additionally handle edge cases where not enough data is available - sort.Slice(out, func(i, j int) bool { - pi := bpt.peers[out[i]] - pj := bpt.peers[out[j]] - - var costI, costJ float64 - - getPeerInitLat := func(p peer.ID) float64 { - var res float64 - if bpt.pmgr != nil { - if lat, ok := bpt.pmgr.GetPeerLatency(p); ok { - res = float64(lat) - } - } - if res == 0 { - res = float64(bpt.avgGlobalTime) - } - return res * newPeerMul - } - - if pi.successes+pi.failures > 0 { - failRateI := float64(pi.failures) / float64(pi.failures+pi.successes) - costI = float64(pi.averageTime) + failRateI*float64(bpt.avgGlobalTime) - } else { - costI = getPeerInitLat(out[i]) - } - - if pj.successes+pj.failures > 0 { - failRateJ := float64(pj.failures) / float64(pj.failures+pj.successes) - costJ = float64(pj.averageTime) + failRateJ*float64(bpt.avgGlobalTime) - } else { - costJ = getPeerInitLat(out[j]) - } - - return costI < costJ - }) - - return out -} - -const ( - // xInvAlpha = (N+1)/2 - - localInvAlpha = 5 // 86% of the value is the last 9 - globalInvAlpha = 20 // 86% of the value is the last 39 -) - -func (bpt *bsPeerTracker) logGlobalSuccess(dur time.Duration) { - bpt.lk.Lock() - defer bpt.lk.Unlock() - - if bpt.avgGlobalTime == 0 { - bpt.avgGlobalTime = dur - return - } - delta := (dur - bpt.avgGlobalTime) / globalInvAlpha - bpt.avgGlobalTime += delta -} - -func logTime(pi *peerStats, dur time.Duration) { - if pi.averageTime == 0 { - pi.averageTime = dur - return - } - delta := (dur - pi.averageTime) / localInvAlpha - pi.averageTime += delta - -} - -func (bpt *bsPeerTracker) logSuccess(p peer.ID, dur time.Duration) { - bpt.lk.Lock() - defer bpt.lk.Unlock() - - var pi *peerStats - var ok bool - if pi, ok = bpt.peers[p]; !ok { - log.Warnw("log success called on peer not in tracker", "peerid", p.String()) - return - } - - pi.successes++ - logTime(pi, dur) -} - -func (bpt *bsPeerTracker) logFailure(p peer.ID, dur time.Duration) { - bpt.lk.Lock() - defer bpt.lk.Unlock() - - var pi *peerStats - var ok bool - if pi, ok = bpt.peers[p]; !ok { - log.Warn("log failure called on peer not in tracker", "peerid", p.String()) - return - } - - pi.failures++ - logTime(pi, dur) -} - -func (bpt *bsPeerTracker) removePeer(p peer.ID) { - bpt.lk.Lock() - defer bpt.lk.Unlock() - delete(bpt.peers, p) -} diff --git a/chain/blocksync/client.go b/chain/blocksync/client.go new file mode 100644 index 00000000000..104157a82ff --- /dev/null +++ b/chain/blocksync/client.go @@ -0,0 +1,446 @@ +package blocksync + +import ( + "bufio" + "context" + "fmt" + "math/rand" + "time" + + host "github.com/libp2p/go-libp2p-core/host" + inet "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + "go.opencensus.io/trace" + "golang.org/x/xerrors" + + cborutil "github.com/filecoin-project/go-cbor-util" + "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain/store" + "github.com/filecoin-project/lotus/chain/types" + incrt "github.com/filecoin-project/lotus/lib/increadtimeout" + "github.com/filecoin-project/lotus/lib/peermgr" +) + +// Protocol client. +// FIXME: Rename to just `Client`. Not done at the moment to avoid +// disrupt too much of the consumer code, should be done along +// https://github.com/filecoin-project/lotus/issues/2612. +type BlockSync struct { + // Connection manager used to contact the server. + // FIXME: We should have a reduced interface here, initialized + // just with our protocol ID, we shouldn't be able to open *any* + // connection. + host host.Host + + peerTracker *bsPeerTracker +} + +func NewClient( + host host.Host, + pmgr peermgr.MaybePeerMgr, +) *BlockSync { + return &BlockSync{ + host: host, + peerTracker: newPeerTracker(pmgr.Mgr), + } +} + +// Main logic of the client request service. The provided `Request` +// is sent to the `singlePeer` if one is indicated or to all available +// ones otherwise. The response is processed and validated according +// to the `Request` options. Either a `ValidatedResponse` is returned +// (which can be safely accessed), or an `error` that may represent +// either a response error status, a failed validation or an internal +// error. +// +// This is the internal single-point-of-entry for all external-facing +// APIs, currently we have 3 very heterogeneous services exposed: +// * GetBlocks: Headers +// * GetFullTipSet: Headers | Messages +// * GetChainMessages: Messages +// This function handles all the different combinations of the available +// request options without disrupting external calls. In the future the +// consumers should be forced to use a more standardized service and +// adhere to a single API derived from this function. +func (client *BlockSync) doRequest( + ctx context.Context, + req *Request, + singlePeer *peer.ID, +) (*ValidatedResponse, error) { + // Validate request. + if req.Length == 0 { + return nil, xerrors.Errorf("invalid request of length 0") + } + if req.Length > MaxRequestLength { + return nil, xerrors.Errorf("request length (%d) above maximum (%d)", + req.Length, MaxRequestLength) + } + if req.Options == 0 { + return nil, xerrors.Errorf("request with no options set") + } + + // Generate the list of peers to be queried, either the + // `singlePeer` indicated or all peers available (sorted + // by an internal peer tracker with some randomness injected). + var peers []peer.ID + if singlePeer != nil { + peers = []peer.ID{*singlePeer} + } else { + peers = client.getShuffledPeers() + if len(peers) == 0 { + return nil, xerrors.Errorf("no peers available") + } + } + + // Try the request for each peer in the list, + // return on the first successful response. + // FIXME: Doing this serially isn't great, but fetching in parallel + // may not be a good idea either. Think about this more. + startTime := build.Clock.Now() + // FIXME: Should we track time per peer instead of a global one? + for _, peer := range peers { + select { + case <-ctx.Done(): + return nil, xerrors.Errorf("context cancelled: %w", ctx.Err()) + default: + } + + // Send request, read response. + res, err := client.sendRequestToPeer(ctx, peer, req) + if err != nil { + if !xerrors.Is(err, inet.ErrNoConn) { + log.Warnf("could not connect to peer %s: %s", + peer.String(), err) + } + continue + } + + // Process and validate response. + validRes, err := client.processResponse(req, res) + if err != nil { + log.Warnf("processing peer %s response failed: %s", + peer.String(), err) + continue + } + + client.peerTracker.logGlobalSuccess(build.Clock.Since(startTime)) + client.host.ConnManager().TagPeer(peer, "bsync", SUCCESS_PEER_TAG_VALUE) + return validRes, nil + } + + errString := "doRequest failed for all peers" + if singlePeer != nil { + errString = "doRequest failed for single peer" + // (The peer has already been logged before, don't print it again.) + } + return nil, xerrors.Errorf(errString) +} + +// Process and validate response. Check the status and that the information +// returned matches the request (and its integrity). Extract the information +// into a `ValidatedResponse` for the external-facing APIs to select what they +// want. +// +// We are conflating in the single error returned both status and validation +// errors. Peer penalization should happen here then, before returning, so +// we can apply the correct penalties depending on the cause of the error. +func (client *BlockSync) processResponse( + req *Request, + res *Response, + // FIXME: Add the `peer` as argument once we implement penalties. +) (*ValidatedResponse, error) { + err := res.statusToError() + if err != nil { + return nil, xerrors.Errorf("status error: %s", err) + } + + options := parseOptions(req.Options) + if options.noOptionsSet() { + // Safety check, this shouldn't happen, and even if it did + // it should be caught by the peer in its error status. + return nil, xerrors.Errorf("nothing was requested") + } + + // Verify that the chain segment returned is in the valid range. + // Note that the returned length might be less than requested. + resLength := len(res.Chain) + if resLength == 0 { + return nil, xerrors.Errorf("got no chain in successful response") + } + if resLength > int(req.Length) { + return nil, xerrors.Errorf("got longer response (%d) than requested (%d)", + resLength, req.Length) + } + if resLength < int(req.Length) && res.Status != Partial { + return nil, xerrors.Errorf("got less than requested without a proper status: %s", res.Status) + } + + validRes := &ValidatedResponse{} + if options.IncludeHeaders { + // Check for valid block sets and extract them into `TipSet`s. + validRes.Tipsets = make([]*types.TipSet, resLength) + for i := 0; i < resLength; i++ { + validRes.Tipsets[i], err = types.NewTipSet(res.Chain[i].Blocks) + if err != nil { + return nil, xerrors.Errorf("invalid tipset blocks at height (head - %d): %w", i, err) + } + } + + // Check that the returned head matches the one requested. + if !types.CidArrsEqual(validRes.Tipsets[0].Cids(), req.Head) { + return nil, xerrors.Errorf("returned chain head does not match request") + } + + // Check `TipSet` are connected (valid chain). + for i := 0; i < len(validRes.Tipsets) - 1; i++ { + if validRes.Tipsets[i].IsChildOf(validRes.Tipsets[i+1]) == false { + return nil, fmt.Errorf("tipsets are not connected at height (head - %d)/(head - %d)", + i, i+1) + // FIXME: Maybe give more information here, like CIDs. + } + } + } + + if options.IncludeMessages { + validRes.Messages = make([]*CompactedMessages, resLength) + for i := 0; i < resLength; i++ { + if res.Chain[i].Messages == nil { + return nil, xerrors.Errorf("no messages included for tipset at height (head - %d): %w", i) + } + validRes.Messages[i] = res.Chain[i].Messages + } + + if options.IncludeHeaders { + // If the headers were also returned check that the compression + // indexes are valid before `toFullTipSets()` is called by the + // consumer. + for tipsetIdx := 0; tipsetIdx < resLength; tipsetIdx++ { + msgs := res.Chain[tipsetIdx].Messages + blocksNum := len(res.Chain[tipsetIdx].Blocks) + if len(msgs.BlsIncludes) != blocksNum { + return nil, xerrors.Errorf("BlsIncludes (%d) does not match number of blocks (%d)", + len(msgs.BlsIncludes), blocksNum) + } + if len(msgs.SecpkIncludes) != blocksNum { + return nil, xerrors.Errorf("SecpkIncludes (%d) does not match number of blocks (%d)", + len(msgs.SecpkIncludes), blocksNum) + } + for blockIdx := 0; blockIdx < blocksNum; blockIdx++ { + for _, mi := range msgs.BlsIncludes[blockIdx] { + if int(mi) >= len(msgs.Bls) { + return nil, xerrors.Errorf("index in BlsIncludes (%d) exceeds number of messages (%d)", + mi, len(msgs.Bls)) + } + } + for _, mi := range msgs.SecpkIncludes[blockIdx] { + if int(mi) >= len(msgs.Secpk) { + return nil, xerrors.Errorf("index in SecpkIncludes (%d) exceeds number of messages (%d)", + mi, len(msgs.Secpk)) + } + } + } + } + } + } + + return validRes, nil +} + +// GetBlocks fetches count blocks from the network, from the provided tipset +// *backwards*, returning as many tipsets as count. +// +// {hint/usage}: This is used by the Syncer during normal chain syncing and when +// resolving forks. +func (client *BlockSync) GetBlocks( + ctx context.Context, + tsk types.TipSetKey, + count int, +) ([]*types.TipSet, error) { + ctx, span := trace.StartSpan(ctx, "bsync.GetBlocks") + defer span.End() + if span.IsRecordingEvents() { + span.AddAttributes( + trace.StringAttribute("tipset", fmt.Sprint(tsk.Cids())), + trace.Int64Attribute("count", int64(count)), + ) + } + + req := &Request{ + Head: tsk.Cids(), + Length: uint64(count), + Options: Headers, + } + + validRes, err := client.doRequest(ctx, req, nil) + if err != nil { + return nil, err + } + + return validRes.Tipsets, nil +} + +func (client *BlockSync) GetFullTipSet( + ctx context.Context, + peer peer.ID, + tsk types.TipSetKey, +) (*store.FullTipSet, error) { + // TODO: round robin through these peers on error + + req := &Request{ + Head: tsk.Cids(), + Length: 1, + Options: Headers | Messages, + } + + validRes, err := client.doRequest(ctx, req, &peer) + if err != nil { + return nil, err + } + + return validRes.toFullTipSets()[0], nil + // If `doRequest` didn't fail we are guaranteed to have at least + // *one* tipset here, so it's safe to index directly. +} + +func (client *BlockSync) GetChainMessages( + ctx context.Context, + head *types.TipSet, + length uint64, + ) ([]*CompactedMessages, error) { + ctx, span := trace.StartSpan(ctx, "GetChainMessages") + defer span.End() + + req := &Request{ + Head: head.Cids(), + Length: length, + Options: Messages, + } + + validRes, err := client.doRequest(ctx, req, nil) + if err != nil { + return nil, err + } + + return validRes.Messages, nil +} + +// Send a request to a peer. Write request in the stream and read the +// response back. We do not do any processing of the request/response +// here. +func (client *BlockSync) sendRequestToPeer( + ctx context.Context, + peer peer.ID, + req *Request, +) (_ *Response, err error) { + // Trace code. + ctx, span := trace.StartSpan(ctx, "sendRequestToPeer") + defer span.End() + if span.IsRecordingEvents() { + span.AddAttributes( + trace.StringAttribute("peer", peer.Pretty()), + ) + } + defer func() { + if err != nil { + if span.IsRecordingEvents() { + span.SetStatus(trace.Status{ + Code: 5, + Message: err.Error(), + }) + } + } + }() + // -- TRACE -- + + supported, err := client.host.Peerstore().SupportsProtocols(peer, BlockSyncProtocolID) + if err != nil { + return nil, xerrors.Errorf("failed to get protocols for peer: %w", err) + } + if len(supported) == 0 || supported[0] != BlockSyncProtocolID { + return nil, xerrors.Errorf("peer %s does not support protocol %s", + peer, BlockSyncProtocolID) + // FIXME: `ProtoBook` should support a *single* protocol check that returns + // a bool instead of a list. + } + + connectionStart := build.Clock.Now() + + // Open stream to peer. + stream, err := client.host.NewStream( + inet.WithNoDial(ctx, "should already have connection"), + peer, + BlockSyncProtocolID) + if err != nil { + client.RemovePeer(peer) + return nil, xerrors.Errorf("failed to open stream to peer: %w", err) + } + + // Write request. + _ = stream.SetWriteDeadline(time.Now().Add(WRITE_REQ_DEADLINE)) + if err := cborutil.WriteCborRPC(stream, req); err != nil { + _ = stream.SetWriteDeadline(time.Time{}) + // FIXME: What's the point of setting a blank deadline that won't time out? + // Is this our way of clearing the old one? + client.peerTracker.logFailure(peer, build.Clock.Since(connectionStart)) + return nil, err + } + // FIXME: Same, why are we doing this again here? + _ = stream.SetWriteDeadline(time.Time{}) + + // Read response. + var res Response + err = cborutil.ReadCborRPC( + // FIXME: Extract constants. + bufio.NewReader(incrt.New(stream, READ_RES_MIN_SPEED, READ_RES_DEADLINE)), + &res) + if err != nil { + client.peerTracker.logFailure(peer, build.Clock.Since(connectionStart)) + return nil, err + } + + // FIXME: Move all this together at the top using a defer as done elsewhere. + // Maybe we need to declare `res` in the signature. + if span.IsRecordingEvents() { + span.AddAttributes( + trace.Int64Attribute("resp_status", int64(res.Status)), + trace.StringAttribute("msg", res.ErrorMessage), + trace.Int64Attribute("chain_len", int64(len(res.Chain))), + ) + } + + client.peerTracker.logSuccess(peer, build.Clock.Since(connectionStart)) + return &res, nil +} + +func (client *BlockSync) AddPeer(p peer.ID) { + client.peerTracker.addPeer(p) +} + +func (client *BlockSync) RemovePeer(p peer.ID) { + client.peerTracker.removePeer(p) +} + +// getShuffledPeers returns a preference-sorted set of peers (by latency +// and failure counting), shuffling the first few peers so we don't always +// pick the same peer. +// FIXME: Merge with the shuffle if we *always* do it. +func (client *BlockSync) getShuffledPeers() []peer.ID { + peers := client.peerTracker.prefSortedPeers() + shufflePrefix(peers) + return peers +} + +func shufflePrefix(peers []peer.ID) { + prefix := SHUFFLE_PEERS_PREFIX + if len(peers) < prefix { + prefix = len(peers) + } + + buf := make([]peer.ID, prefix) + perm := rand.Perm(prefix) + for i, v := range perm { + buf[i] = peers[v] + } + + copy(peers, buf) +} diff --git a/chain/blocksync/graphsync_client.go b/chain/blocksync/graphsync_client.go deleted file mode 100644 index 03e4a30e577..00000000000 --- a/chain/blocksync/graphsync_client.go +++ /dev/null @@ -1,151 +0,0 @@ -package blocksync - -import ( - "context" - - "github.com/ipfs/go-cid" - "github.com/ipfs/go-datastore" - "github.com/ipfs/go-graphsync" - "github.com/ipld/go-ipld-prime" - "github.com/libp2p/go-libp2p-core/peer" - "golang.org/x/xerrors" - - store "github.com/filecoin-project/lotus/chain/store" - "github.com/filecoin-project/lotus/chain/types" - - cidlink "github.com/ipld/go-ipld-prime/linking/cid" - basicnode "github.com/ipld/go-ipld-prime/node/basic" - ipldselector "github.com/ipld/go-ipld-prime/traversal/selector" - selectorbuilder "github.com/ipld/go-ipld-prime/traversal/selector/builder" -) - -const ( - - // AMT selector recursion. An AMT has arity of 8 so this gives allows - // us to retrieve trees with 8^10 (1,073,741,824) elements. - amtRecursionDepth = uint32(10) - - // some constants for looking up tuple encoded struct fields - // field index of Parents field in a block header - blockIndexParentsField = 5 - - // field index of Messages field in a block header - blockIndexMessagesField = 10 - - // field index of AMT node in AMT head - amtHeadNodeFieldIndex = 2 - - // field index of links array AMT node - amtNodeLinksFieldIndex = 1 - - // field index of values array AMT node - amtNodeValuesFieldIndex = 2 - - // maximum depth per traversal - maxRequestLength = 50 -) - -var amtSelector selectorbuilder.SelectorSpec - -func init() { - // builer for selectors - ssb := selectorbuilder.NewSelectorSpecBuilder(basicnode.Style.Any) - // amt selector -- needed to selector through a messages AMT - amtSelector = ssb.ExploreIndex(amtHeadNodeFieldIndex, - ssb.ExploreRecursive(ipldselector.RecursionLimitDepth(int(amtRecursionDepth)), - ssb.ExploreUnion( - ssb.ExploreIndex(amtNodeLinksFieldIndex, - ssb.ExploreAll(ssb.ExploreRecursiveEdge())), - ssb.ExploreIndex(amtNodeValuesFieldIndex, - ssb.ExploreAll(ssb.Matcher()))))) -} - -func selectorForRequest(req *BlockSyncRequest) ipld.Node { - // builer for selectors - ssb := selectorbuilder.NewSelectorSpecBuilder(basicnode.Style.Any) - - bso := ParseBSOptions(req.Options) - if bso.IncludeMessages { - return ssb.ExploreRecursive(ipldselector.RecursionLimitDepth(int(req.RequestLength)), - ssb.ExploreIndex(blockIndexParentsField, - ssb.ExploreUnion( - ssb.ExploreAll( - ssb.ExploreIndex(blockIndexMessagesField, - ssb.ExploreRange(0, 2, amtSelector), - )), - ssb.ExploreIndex(0, ssb.ExploreRecursiveEdge()), - ))).Node() - } - return ssb.ExploreRecursive(ipldselector.RecursionLimitDepth(int(req.RequestLength)), ssb.ExploreIndex(blockIndexParentsField, - ssb.ExploreUnion( - ssb.ExploreAll( - ssb.Matcher(), - ), - ssb.ExploreIndex(0, ssb.ExploreRecursiveEdge()), - ))).Node() -} - -func firstTipsetSelector(req *BlockSyncRequest) ipld.Node { - // builer for selectors - ssb := selectorbuilder.NewSelectorSpecBuilder(basicnode.Style.Any) - - bso := ParseBSOptions(req.Options) - if bso.IncludeMessages { - return ssb.ExploreIndex(blockIndexMessagesField, - ssb.ExploreRange(0, 2, amtSelector), - ).Node() - } - return ssb.Matcher().Node() - -} - -func (bs *BlockSync) executeGsyncSelector(ctx context.Context, p peer.ID, root cid.Cid, sel ipld.Node) error { - extension := graphsync.ExtensionData{ - Name: "chainsync", - Data: nil, - } - _, errs := bs.gsync.Request(ctx, p, cidlink.Link{Cid: root}, sel, extension) - - for err := range errs { - return xerrors.Errorf("failed to complete graphsync request: %w", err) - } - return nil -} - -// Fallback for interacting with other non-lotus nodes -func (bs *BlockSync) fetchBlocksGraphSync(ctx context.Context, p peer.ID, req *BlockSyncRequest) (*BlockSyncResponse, error) { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - immediateTsSelector := firstTipsetSelector(req) - - // Do this because we can only request one root at a time - for _, r := range req.Start { - if err := bs.executeGsyncSelector(ctx, p, r, immediateTsSelector); err != nil { - return nil, err - } - } - - if req.RequestLength > maxRequestLength { - req.RequestLength = maxRequestLength - } - - sel := selectorForRequest(req) - - // execute the selector forreal - if err := bs.executeGsyncSelector(ctx, p, req.Start[0], sel); err != nil { - return nil, err - } - - // Now pull the data we fetched out of the chainstore (where it should now be persisted) - tempcs := store.NewChainStore(bs.bserv.Blockstore(), datastore.NewMapDatastore(), nil) - - opts := ParseBSOptions(req.Options) - tsk := types.NewTipSetKey(req.Start...) - chain, err := collectChainSegment(tempcs, tsk, req.RequestLength, opts) - if err != nil { - return nil, xerrors.Errorf("failed to load chain data from chainstore after successful graphsync response (start = %v): %w", req.Start, err) - } - - return &BlockSyncResponse{Chain: chain}, nil -} diff --git a/chain/blocksync/peer_tracker.go b/chain/blocksync/peer_tracker.go new file mode 100644 index 00000000000..f241265fab3 --- /dev/null +++ b/chain/blocksync/peer_tracker.go @@ -0,0 +1,169 @@ +package blocksync +// FIXME: This needs to be reviewed. + +import ( + "sort" + "sync" + "time" + + "github.com/libp2p/go-libp2p-core/peer" + + "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/lib/peermgr" +) + +type peerStats struct { + successes int + failures int + firstSeen time.Time + averageTime time.Duration +} + +type bsPeerTracker struct { + lk sync.Mutex + + peers map[peer.ID]*peerStats + avgGlobalTime time.Duration + + pmgr *peermgr.PeerMgr +} + +func newPeerTracker(pmgr *peermgr.PeerMgr) *bsPeerTracker { + return &bsPeerTracker{ + peers: make(map[peer.ID]*peerStats), + pmgr: pmgr, + } +} + +func (bpt *bsPeerTracker) addPeer(p peer.ID) { + bpt.lk.Lock() + defer bpt.lk.Unlock() + if _, ok := bpt.peers[p]; ok { + return + } + bpt.peers[p] = &peerStats{ + firstSeen: build.Clock.Now(), + } + +} + +const ( + // newPeerMul is how much better than average is the new peer assumed to be + // less than one to encourouge trying new peers + newPeerMul = 0.9 +) + +func (bpt *bsPeerTracker) prefSortedPeers() []peer.ID { + // TODO: this could probably be cached, but as long as its not too many peers, fine for now + bpt.lk.Lock() + defer bpt.lk.Unlock() + out := make([]peer.ID, 0, len(bpt.peers)) + for p := range bpt.peers { + out = append(out, p) + } + + // sort by 'expected cost' of requesting data from that peer + // additionally handle edge cases where not enough data is available + sort.Slice(out, func(i, j int) bool { + pi := bpt.peers[out[i]] + pj := bpt.peers[out[j]] + + var costI, costJ float64 + + getPeerInitLat := func(p peer.ID) float64 { + var res float64 + if bpt.pmgr != nil { + if lat, ok := bpt.pmgr.GetPeerLatency(p); ok { + res = float64(lat) + } + } + if res == 0 { + res = float64(bpt.avgGlobalTime) + } + return res * newPeerMul + } + + if pi.successes+pi.failures > 0 { + failRateI := float64(pi.failures) / float64(pi.failures+pi.successes) + costI = float64(pi.averageTime) + failRateI*float64(bpt.avgGlobalTime) + } else { + costI = getPeerInitLat(out[i]) + } + + if pj.successes+pj.failures > 0 { + failRateJ := float64(pj.failures) / float64(pj.failures+pj.successes) + costJ = float64(pj.averageTime) + failRateJ*float64(bpt.avgGlobalTime) + } else { + costJ = getPeerInitLat(out[j]) + } + + return costI < costJ + }) + + return out +} + +const ( + // xInvAlpha = (N+1)/2 + + localInvAlpha = 5 // 86% of the value is the last 9 + globalInvAlpha = 20 // 86% of the value is the last 39 +) + +func (bpt *bsPeerTracker) logGlobalSuccess(dur time.Duration) { + bpt.lk.Lock() + defer bpt.lk.Unlock() + + if bpt.avgGlobalTime == 0 { + bpt.avgGlobalTime = dur + return + } + delta := (dur - bpt.avgGlobalTime) / globalInvAlpha + bpt.avgGlobalTime += delta +} + +func logTime(pi *peerStats, dur time.Duration) { + if pi.averageTime == 0 { + pi.averageTime = dur + return + } + delta := (dur - pi.averageTime) / localInvAlpha + pi.averageTime += delta + +} + +func (bpt *bsPeerTracker) logSuccess(p peer.ID, dur time.Duration) { + bpt.lk.Lock() + defer bpt.lk.Unlock() + + var pi *peerStats + var ok bool + if pi, ok = bpt.peers[p]; !ok { + log.Warnw("log success called on peer not in tracker", "peerid", p.String()) + return + } + + pi.successes++ + logTime(pi, dur) +} + +func (bpt *bsPeerTracker) logFailure(p peer.ID, dur time.Duration) { + bpt.lk.Lock() + defer bpt.lk.Unlock() + + var pi *peerStats + var ok bool + if pi, ok = bpt.peers[p]; !ok { + log.Warn("log failure called on peer not in tracker", "peerid", p.String()) + return + } + + pi.failures++ + logTime(pi, dur) +} + +func (bpt *bsPeerTracker) removePeer(p peer.ID) { + bpt.lk.Lock() + defer bpt.lk.Unlock() + delete(bpt.peers, p) +} diff --git a/chain/blocksync/protocol.go b/chain/blocksync/protocol.go new file mode 100644 index 00000000000..6d50b822c62 --- /dev/null +++ b/chain/blocksync/protocol.go @@ -0,0 +1,177 @@ +package blocksync + +import ( + "github.com/filecoin-project/lotus/chain/store" + "time" + + "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log" + "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/chain/types" +) + +var log = logging.Logger("blocksync") + +const BlockSyncProtocolID = "/fil/sync/blk/0.0.1" + +const MaxRequestLength = 800 + +// Extracted constants from the code. +// FIXME: Should be reviewed and confirmed. +const SUCCESS_PEER_TAG_VALUE = 25 +const WRITE_REQ_DEADLINE = 5 * time.Second +const READ_RES_DEADLINE = WRITE_REQ_DEADLINE +const READ_RES_MIN_SPEED = 50<<10 +const SHUFFLE_PEERS_PREFIX = 5 +const WRITE_RES_DEADLINE = 60 * time.Second + +// FIXME: Rename. Make private. +type Request struct { + // List of ordered CIDs comprising a `TipSetKey` from where to start + // fetching backwards. + // FIXME: Why don't we send a `TipSetKey` instead of converting back + // and forth? + Head []cid.Cid + // Number of block sets to fetch from `Head` (inclusive, should always + // be in the range `[1, MaxRequestLength]`). + Length uint64 + // Request options, see `Options` type for more details. Compressed + // in a single `uint64` to save space. + Options uint64 +} + +// `Request` processed and validated to query the tipsets needed. +type validatedRequest struct { + head types.TipSetKey + length uint64 + options *parsedOptions +} + +// Request options. When fetching the chain segment we can fetch +// either block headers, messages, or both. +const ( + Headers = 1 << iota + Messages +) + +// Decompressed options into separate struct members for easy access +// during internal processing.. +type parsedOptions struct { + IncludeHeaders bool + IncludeMessages bool +} + +func (options *parsedOptions) noOptionsSet() bool { + return options.IncludeHeaders == false && + options.IncludeMessages == false +} + +func parseOptions(optfield uint64) *parsedOptions { + return &parsedOptions{ + IncludeHeaders: optfield&(uint64(Headers)) != 0, + IncludeMessages: optfield&(uint64(Messages)) != 0, + } +} + +// FIXME: Rename. Make private. +type Response struct { + Status status + // String that complements the error status when converting to an + // internal error (see `statusToError()`). + ErrorMessage string + + Chain []*BSTipSet +} + +type status uint64 +const ( + Ok status = 0 + // We could not fetch all blocks requested (but at least we returned + // the `Head` requested). Not considered an error. + Partial = 101 + + // Errors + NotFound = 201 + GoAway = 202 + InternalError = 203 + BadRequest = 204 +) + +// Convert status to internal error. +func (res *Response) statusToError() error { + switch res.Status { + case Ok, Partial: + return nil + // FIXME: Consider if we want to not process `Partial` responses + // and return an error instead. + case NotFound: + return xerrors.Errorf("not found") + case GoAway: + return xerrors.Errorf("not handling 'go away' blocksync responses yet") + case InternalError: + return xerrors.Errorf("block sync peer errored: %s", res.ErrorMessage) + case BadRequest: + return xerrors.Errorf("block sync request invalid: %s", res.ErrorMessage) + default: + return xerrors.Errorf("unrecognized response code: %d", res.Status) + } +} + +// FIXME: Rename. +type BSTipSet struct { + Blocks []*types.BlockHeader + Messages *CompactedMessages +} + +// FIXME: Describe format. The `Includes` seem to index +// from block to message. +// FIXME: The logic of this function should belong to it, not +// to the consumer. +type CompactedMessages struct { + Bls []*types.Message + BlsIncludes [][]uint64 + + Secpk []*types.SignedMessage + SecpkIncludes [][]uint64 +} + +// Response that has been validated according to the protocol +// and can be safely accessed. +// FIXME: Maybe rename to verified, keep consistent naming. +type ValidatedResponse struct { + Tipsets []*types.TipSet + Messages []*CompactedMessages +} + +// Decompress messages and form full tipsets with them. The headers +// need to have been requested as well. +func (res *ValidatedResponse) toFullTipSets() ([]*store.FullTipSet) { + if len(res.Tipsets) == 0 { + // This decompression can only be done if both headers and + // messages are returned in the response. + // FIXME: Do we need to check the messages are present also? The validation + // would seem to imply this is unnecessary, can be added just in case. + return nil + } + ftsList := make([]*store.FullTipSet, len(res.Tipsets)) + for tipsetIdx := range res.Tipsets { + fts := &store.FullTipSet{} // FIXME: We should use the `NewFullTipSet` API. + msgs := res.Messages[tipsetIdx] + for blockIdx, b := range res.Tipsets[tipsetIdx].Blocks() { + fb := &types.FullBlock{ + Header: b, + } + for _, mi := range msgs.BlsIncludes[blockIdx] { + fb.BlsMessages = append(fb.BlsMessages, msgs.Bls[mi]) + } + for _, mi := range msgs.SecpkIncludes[blockIdx] { + fb.SecpkMessages = append(fb.SecpkMessages, msgs.Secpk[mi]) + } + + fts.Blocks = append(fts.Blocks, fb) + } + ftsList[tipsetIdx] = fts + } + return ftsList +} diff --git a/chain/blocksync/server.go b/chain/blocksync/server.go new file mode 100644 index 00000000000..e406067cc65 --- /dev/null +++ b/chain/blocksync/server.go @@ -0,0 +1,263 @@ +package blocksync + +import ( + "bufio" + "context" + "fmt" + "time" + + "go.opencensus.io/trace" + "golang.org/x/xerrors" + + cborutil "github.com/filecoin-project/go-cbor-util" + + "github.com/filecoin-project/lotus/chain/store" + "github.com/filecoin-project/lotus/chain/types" + + "github.com/ipfs/go-cid" + inet "github.com/libp2p/go-libp2p-core/network" +) + + +// BlockSyncService is the component that services BlockSync requests from +// peers. +// +// BlockSync is the basic chain synchronization protocol of Filecoin. BlockSync +// is an RPC-oriented protocol, with a single operation to request blocks. +// +// A request contains a start anchor block (referred to with a CID), and a +// amount of blocks requested beyond the anchor (including the anchor itself). +// +// A client can also pass options, encoded as a 64-bit bitfield. Lotus supports +// two options at the moment: +// +// - include block contents +// - include block messages +// +// The response will include a status code, an optional message, and the +// response payload in case of success. The payload is a slice of serialized +// tipsets. +// FIXME: Rename to just `Server` (will be done later, see note on `BlockSync`). +type BlockSyncService struct { + cs *store.ChainStore +} + +func NewBlockSyncService(cs *store.ChainStore) *BlockSyncService { + return &BlockSyncService{ + cs: cs, + } +} + +// Entry point of the service, handles `Request`s. +func (server *BlockSyncService) HandleStream(stream inet.Stream) { + ctx, span := trace.StartSpan(context.Background(), "blocksync.HandleStream") + defer span.End() + + defer stream.Close() //nolint:errcheck + + var req Request + if err := cborutil.ReadCborRPC(bufio.NewReader(stream), &req); err != nil { + log.Warnf("failed to read block sync request: %s", err) + return + } + log.Infow("block sync request", + "start", req.Head, "len", req.Length) + + resp, err := server.processRequest(ctx, &req) + if err != nil { + log.Warn("failed to process request: ", err) + return + } + + _ = stream.SetDeadline(time.Now().Add(WRITE_RES_DEADLINE)) + if err := cborutil.WriteCborRPC(stream, resp); err != nil { + log.Warnw("failed to write back response for handle stream", + "err", err, "peer", stream.Conn().RemotePeer()) + return + } +} + +// Validate and service the request. We return either a protocol +// response or an internal error. The protocol response may signal +// a protocol error itself (e.g., invalid request). +func (server *BlockSyncService) processRequest( + ctx context.Context, + req *Request, +) (*Response, error) { + validReq, errResponse := validateRequest(ctx, req) + if errResponse != nil { + // The request did not pass validation, return the response + // indicating it. + return errResponse, nil + } + + return server.serviceRequest(ctx, validReq) +} + +// Validate request. We either return a `validatedRequest`, or an error +// `Response` indicating why we can't process it. We do not return any +// internal errors here, we just signal protocol ones. +func validateRequest( + ctx context.Context, + req *Request, +) ( *validatedRequest, *Response) { + _, span := trace.StartSpan(ctx, "blocksync.ValidateRequest") + defer span.End() + + validReq := validatedRequest{} + + validReq.options = parseOptions(req.Options) + if validReq.options.noOptionsSet() { + return nil, &Response{ + Status: BadRequest, + ErrorMessage: "no options set", + } + } + + validReq.length = req.Length + if validReq.length > MaxRequestLength { + return nil, &Response{ + Status: BadRequest, + ErrorMessage: fmt.Sprintf("request length over maximum allowed (%d)", + MaxRequestLength), + } + } + if validReq.length == 0 { + return nil, &Response{ + Status: BadRequest, + ErrorMessage: "invalid request length of zero", + } + } + + if len(req.Head) == 0 { + return nil, &Response{ + Status: BadRequest, + ErrorMessage: "no cids in request", + } + } + validReq.head = types.NewTipSetKey(req.Head...) + + // FIXME: Add as a defer at the start. + span.AddAttributes( + trace.BoolAttribute("blocks", validReq.options.IncludeHeaders), + trace.BoolAttribute("messages", validReq.options.IncludeMessages), + trace.Int64Attribute("reqlen", int64(validReq.length)), + ) + + return &validReq, nil +} + +func (server *BlockSyncService) serviceRequest( + ctx context.Context, + req *validatedRequest, + ) (*Response, error) { + _, span := trace.StartSpan(ctx, "blocksync.ServiceRequest") + defer span.End() + + chain, err := collectChainSegment(server.cs, req) + if err != nil { + log.Warn("block sync request: collectChainSegment failed: ", err) + return &Response{ + Status: InternalError, + ErrorMessage: err.Error(), + }, nil + } + + status := Ok + if len(chain) < int(req.length) { + status = Partial + } + + return &Response{ + Chain: chain, + Status: status, + }, nil +} + +func collectChainSegment( + cs *store.ChainStore, + req *validatedRequest, + ) ([]*BSTipSet, error) { + var bstips []*BSTipSet + + cur := req.head + for { + var bst BSTipSet + ts, err := cs.LoadTipSet(cur) + if err != nil { + return nil, xerrors.Errorf("failed loading tipset %s: %w", cur, err) + } + + if req.options.IncludeHeaders { + bst.Blocks = ts.Blocks() + } + + if req.options.IncludeMessages { + bmsgs, bmincl, smsgs, smincl, err := gatherMessages(cs, ts) + if err != nil { + return nil, xerrors.Errorf("gather messages failed: %w", err) + } + + // FIXME: Pass the response to the `gatherMessages` and set all this there. + bst.Messages = &CompactedMessages{} + bst.Messages.Bls = bmsgs + bst.Messages.BlsIncludes = bmincl + bst.Messages.Secpk = smsgs + bst.Messages.SecpkIncludes = smincl + } + + bstips = append(bstips, &bst) + + // If we collected the length requested or if we reached the + // start (genesis), then stop. + if uint64(len(bstips)) >= req.length || ts.Height() == 0 { + return bstips, nil + } + + cur = ts.Parents() + } +} + +func gatherMessages(cs *store.ChainStore, ts *types.TipSet) ([]*types.Message, [][]uint64, []*types.SignedMessage, [][]uint64, error) { + blsmsgmap := make(map[cid.Cid]uint64) + secpkmsgmap := make(map[cid.Cid]uint64) + var secpkmsgs []*types.SignedMessage + var blsmsgs []*types.Message + var secpkincl, blsincl [][]uint64 + + for _, block := range ts.Blocks() { + bmsgs, smsgs, err := cs.MessagesForBlock(block) + if err != nil { + return nil, nil, nil, nil, err + } + + // FIXME: DRY. Use `chain.Message` interface. + bmi := make([]uint64, 0, len(bmsgs)) + for _, m := range bmsgs { + i, ok := blsmsgmap[m.Cid()] + if !ok { + i = uint64(len(blsmsgs)) + blsmsgs = append(blsmsgs, m) + blsmsgmap[m.Cid()] = i + } + + bmi = append(bmi, i) + } + blsincl = append(blsincl, bmi) + + smi := make([]uint64, 0, len(smsgs)) + for _, m := range smsgs { + i, ok := secpkmsgmap[m.Cid()] + if !ok { + i = uint64(len(secpkmsgs)) + secpkmsgs = append(secpkmsgs, m) + secpkmsgmap[m.Cid()] = i + } + + smi = append(smi, i) + } + secpkincl = append(secpkincl, smi) + } + + return blsmsgs, blsincl, secpkmsgs, secpkincl, nil +} diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index dfc8ddd7187..d0330be6392 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -10,6 +10,8 @@ import ( "golang.org/x/xerrors" address "github.com/filecoin-project/go-address" + blocks "github.com/ipfs/go-block-format" + bserv "github.com/ipfs/go-blockservice" miner "github.com/filecoin-project/specs-actors/actors/builtin/miner" "github.com/filecoin-project/specs-actors/actors/util/adt" lru "github.com/hashicorp/golang-lru" @@ -38,7 +40,7 @@ import ( var log = logging.Logger("sub") -func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *chain.Syncer, cmgr connmgr.ConnManager) { +func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *chain.Syncer, bserv bserv.BlockService, cmgr connmgr.ConnManager) { for { msg, err := bsub.Next(ctx) if err != nil { @@ -61,13 +63,13 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha go func() { start := build.Clock.Now() log.Debug("about to fetch messages for block from pubsub") - bmsgs, err := s.Bsync.FetchMessagesByCids(context.TODO(), blk.BlsMessages) + bmsgs, err := FetchMessagesByCids(context.TODO(), bserv, blk.BlsMessages) if err != nil { log.Errorf("failed to fetch all bls messages for block received over pubusb: %s; source: %s", err, src) return } - smsgs, err := s.Bsync.FetchSignedMessagesByCids(context.TODO(), blk.SecpkMessages) + smsgs, err := FetchSignedMessagesByCids(context.TODO(), bserv, blk.SecpkMessages) if err != nil { log.Errorf("failed to fetch all secpk messages for block received over pubusb: %s; source: %s", err, src) return @@ -90,6 +92,108 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha } } +func FetchMessagesByCids( + ctx context.Context, + bserv bserv.BlockService, + cids []cid.Cid, +) ([]*types.Message, error) { + out := make([]*types.Message, len(cids)) + + err := fetchCids(ctx, bserv, cids, func(i int, b blocks.Block) error { + msg, err := types.DecodeMessage(b.RawData()) + if err != nil { + return err + } + + // FIXME: We already sort in `fetchCids`, we are duplicating too much work, + // we don't need to pass the index. + if out[i] != nil { + return fmt.Errorf("received duplicate message") + } + + out[i] = msg + return nil + }) + if err != nil { + return nil, err + } + return out, nil +} + +// FIXME: Duplicate of above. +func FetchSignedMessagesByCids( + ctx context.Context, + bserv bserv.BlockService, + cids []cid.Cid, +) ([]*types.SignedMessage, error) { + out := make([]*types.SignedMessage, len(cids)) + + err := fetchCids(ctx, bserv, cids, func(i int, b blocks.Block) error { + smsg, err := types.DecodeSignedMessage(b.RawData()) + if err != nil { + return err + } + + if out[i] != nil { + return fmt.Errorf("received duplicate message") + } + + out[i] = smsg + return nil + }) + if err != nil { + return nil, err + } + return out, nil +} + +// Fetch `cids` from the block service, apply `cb` on each of them. Used +// by the fetch message functions above. +// We check that each block is received only once and we do not received +// blocks we did not request. +func fetchCids( + ctx context.Context, + bserv bserv.BlockService, + cids []cid.Cid, + cb func(int, blocks.Block) error, +) error { + // FIXME: Why don't we use the context here? + fetchedBlocks := bserv.GetBlocks(context.TODO(), cids) + + cidIndex := make(map[cid.Cid]int) + for i, c := range cids { + cidIndex[c] = i + } + + for i := 0; i < len(cids); i++ { + select { + case block, ok := <-fetchedBlocks: + if !ok { + // Closed channel, no more blocks fetched, check if we have all + // of the CIDs requested. + // FIXME: Review this check. We don't call the callback on the + // last index? + if i == len(cids)-1 { + break + } + + return fmt.Errorf("failed to fetch all messages") + } + + ix, ok := cidIndex[block.Cid()] + if !ok { + return fmt.Errorf("received message we didnt ask for") + } + + if err := cb(ix, block); err != nil { + return err + } + } + } + + return nil +} + type BlockValidator struct { peers *lru.TwoQueueCache diff --git a/chain/sync.go b/chain/sync.go index 474701d64f0..2bf98046934 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -1384,7 +1384,7 @@ func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipS nextI := (i + 1) - batchSize // want to fetch batchSize values, 'i' points to last one we want to fetch, so its 'inclusive' of our request, thus we need to add one to our request start index - var bstout []*blocksync.BSTipSet + var bstout []*blocksync.CompactedMessages for len(bstout) < batchSize { next := headers[nextI] @@ -1405,10 +1405,10 @@ func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipS this := headers[i-bsi] bstip := bstout[len(bstout)-(bsi+1)] - fts, err := zipTipSetAndMessages(blks, this, bstip.BlsMessages, bstip.SecpkMessages, bstip.BlsMsgIncludes, bstip.SecpkMsgIncludes) + fts, err := zipTipSetAndMessages(blks, this, bstip.Bls, bstip.Secpk, bstip.BlsIncludes, bstip.SecpkIncludes) if err != nil { log.Warnw("zipping failed", "error", err, "bsi", bsi, "i", i, - "height", this.Height(), "bstip-height", bstip.Blocks[0].Height, + "height", this.Height(), "next-height", i+batchSize) return xerrors.Errorf("message processing failed: %w", err) } @@ -1431,15 +1431,15 @@ func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipS return nil } -func persistMessages(bs bstore.Blockstore, bst *blocksync.BSTipSet) error { - for _, m := range bst.BlsMessages { +func persistMessages(bs bstore.Blockstore, bst *blocksync.CompactedMessages) error { + for _, m := range bst.Bls { //log.Infof("putting BLS message: %s", m.Cid()) if _, err := store.PutMessage(bs, m); err != nil { log.Errorf("failed to persist messages: %+v", err) return xerrors.Errorf("BLS message processing failed: %w", err) } } - for _, m := range bst.SecpkMessages { + for _, m := range bst.Secpk { if m.Signature.Type != crypto.SigTypeSecp256k1 { return xerrors.Errorf("unknown signature type on message %s: %q", m.Cid(), m.Signature.Type) } diff --git a/chain/types/tipset.go b/chain/types/tipset.go index 57ab9178734..bd9843f94be 100644 --- a/chain/types/tipset.go +++ b/chain/types/tipset.go @@ -224,3 +224,7 @@ func (ts *TipSet) Contains(oc cid.Cid) bool { } return false } + +func (ts *TipSet) IsChildOf(parent *TipSet) bool { + return CidArrsEqual(ts.Parents().Cids(), parent.Cids()) +} diff --git a/gen/main.go b/gen/main.go index 01cd756f788..76ecbb4b58e 100644 --- a/gen/main.go +++ b/gen/main.go @@ -63,8 +63,9 @@ func main() { } err = gen.WriteTupleEncodersToFile("./chain/blocksync/cbor_gen.go", "blocksync", - blocksync.BlockSyncRequest{}, - blocksync.BlockSyncResponse{}, + blocksync.Request{}, + blocksync.Response{}, + blocksync.CompactedMessages{}, blocksync.BSTipSet{}, ) if err != nil { diff --git a/node/builder.go b/node/builder.go index 171c4a96e33..ceec3e2123e 100644 --- a/node/builder.go +++ b/node/builder.go @@ -239,7 +239,7 @@ func Online() Option { // Filecoin services Override(new(*chain.Syncer), modules.NewSyncer), - Override(new(*blocksync.BlockSync), blocksync.NewBlockSyncClient), + Override(new(*blocksync.BlockSync), blocksync.NewClient), Override(new(*messagepool.MessagePool), modules.MessagePool), Override(new(modules.Genesis), modules.ErrorGenesis), diff --git a/node/modules/services.go b/node/modules/services.go index 278b64186f4..0d148ffb4cc 100644 --- a/node/modules/services.go +++ b/node/modules/services.go @@ -73,7 +73,7 @@ func RunBlockSync(h host.Host, svc *blocksync.BlockSyncService) { h.SetStreamHandler(blocksync.BlockSyncProtocolID, svc.HandleStream) } -func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, s *chain.Syncer, chain *store.ChainStore, stmgr *stmgr.StateManager, h host.Host, nn dtypes.NetworkName) { +func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, s *chain.Syncer, bserv dtypes.ChainBlockService, chain *store.ChainStore, stmgr *stmgr.StateManager, h host.Host, nn dtypes.NetworkName) { ctx := helpers.LifecycleCtx(mctx, lc) blocksub, err := ps.Subscribe(build.BlocksTopic(nn)) @@ -92,7 +92,7 @@ func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.P panic(err) } - go sub.HandleIncomingBlocks(ctx, blocksub, s, h.ConnManager()) + go sub.HandleIncomingBlocks(ctx, blocksub, s, bserv, h.ConnManager()) } func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, mpool *messagepool.MessagePool, nn dtypes.NetworkName) { From 0d77c96bda3c24f1dac751cb6d6c95e45be39f9d Mon Sep 17 00:00:00 2001 From: Lucas Molas Date: Thu, 30 Jul 2020 17:23:42 -0300 Subject: [PATCH 02/17] extend maximum to fork length --- chain/blocksync/protocol.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/chain/blocksync/protocol.go b/chain/blocksync/protocol.go index 6d50b822c62..b8f19c43bbc 100644 --- a/chain/blocksync/protocol.go +++ b/chain/blocksync/protocol.go @@ -1,6 +1,7 @@ package blocksync import ( + "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/store" "time" @@ -15,7 +16,11 @@ var log = logging.Logger("blocksync") const BlockSyncProtocolID = "/fil/sync/blk/0.0.1" -const MaxRequestLength = 800 +// FIXME: Bumped from original 800 to this to accommodate `syncFork()` +// use of `GetBlocks()`. It seems the expectation of that API is to +// fetch any amount of blocks leaving it to the internal logic here +// to partition and reassemble the requests if they go above the maximum. +const MaxRequestLength = uint64(build.ForkLengthThreshold) // Extracted constants from the code. // FIXME: Should be reviewed and confirmed. From ccaca481406bd1de9750f6d2069ad6eec44cfbda Mon Sep 17 00:00:00 2001 From: Lucas Molas Date: Fri, 31 Jul 2020 09:18:13 -0300 Subject: [PATCH 03/17] temporarily make MaxRequestLength a var --- chain/blocksync/protocol.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/chain/blocksync/protocol.go b/chain/blocksync/protocol.go index b8f19c43bbc..04d5f71d984 100644 --- a/chain/blocksync/protocol.go +++ b/chain/blocksync/protocol.go @@ -20,7 +20,9 @@ const BlockSyncProtocolID = "/fil/sync/blk/0.0.1" // use of `GetBlocks()`. It seems the expectation of that API is to // fetch any amount of blocks leaving it to the internal logic here // to partition and reassemble the requests if they go above the maximum. -const MaxRequestLength = uint64(build.ForkLengthThreshold) +// (Also as a consequence of this temporarily removing the `const` +// qualifier to avoid "const initializer [...] is not a constant" error.) +var MaxRequestLength = uint64(build.ForkLengthThreshold) // Extracted constants from the code. // FIXME: Should be reviewed and confirmed. From 0e4d5cb67b8217038168fb5abf372a13ae3ad25a Mon Sep 17 00:00:00 2001 From: Lucas Molas Date: Fri, 31 Jul 2020 09:25:40 -0300 Subject: [PATCH 04/17] document CompactedMessages --- chain/blocksync/protocol.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/chain/blocksync/protocol.go b/chain/blocksync/protocol.go index 04d5f71d984..dc9f76d26b9 100644 --- a/chain/blocksync/protocol.go +++ b/chain/blocksync/protocol.go @@ -131,10 +131,18 @@ type BSTipSet struct { Messages *CompactedMessages } -// FIXME: Describe format. The `Includes` seem to index -// from block to message. -// FIXME: The logic of this function should belong to it, not -// to the consumer. +// All messages of a single tipset compacted together instead +// of grouped by block to save space, since there are normally +// many repeated messages per tipset in different blocks. +// +// `BlsIncludes`/`SecpkIncludes` matches `Bls`/`Secpk` messages +// to blocks in the tipsets with the format: +// `BlsIncludes[BI][MI]` +// * BI: block index in the tipset. +// * MI: message index in `Bls` list +// +// FIXME: The logic to decompress this structure should belong +// to itself, not to the consumer. type CompactedMessages struct { Bls []*types.Message BlsIncludes [][]uint64 From 3f6c418dc69c2729954efcf159d16228114db4ab Mon Sep 17 00:00:00 2001 From: Lucas Molas Date: Fri, 31 Jul 2020 09:30:45 -0300 Subject: [PATCH 05/17] rename validatedResponse to private --- chain/blocksync/client.go | 28 ++++++++++++++-------------- chain/blocksync/protocol.go | 21 +++++++++++---------- 2 files changed, 25 insertions(+), 24 deletions(-) diff --git a/chain/blocksync/client.go b/chain/blocksync/client.go index 104157a82ff..3164d45bf20 100644 --- a/chain/blocksync/client.go +++ b/chain/blocksync/client.go @@ -48,7 +48,7 @@ func NewClient( // Main logic of the client request service. The provided `Request` // is sent to the `singlePeer` if one is indicated or to all available // ones otherwise. The response is processed and validated according -// to the `Request` options. Either a `ValidatedResponse` is returned +// to the `Request` options. Either a `validatedResponse` is returned // (which can be safely accessed), or an `error` that may represent // either a response error status, a failed validation or an internal // error. @@ -66,7 +66,7 @@ func (client *BlockSync) doRequest( ctx context.Context, req *Request, singlePeer *peer.ID, -) (*ValidatedResponse, error) { +) (*validatedResponse, error) { // Validate request. if req.Length == 0 { return nil, xerrors.Errorf("invalid request of length 0") @@ -138,7 +138,7 @@ func (client *BlockSync) doRequest( // Process and validate response. Check the status and that the information // returned matches the request (and its integrity). Extract the information -// into a `ValidatedResponse` for the external-facing APIs to select what they +// into a `validatedResponse` for the external-facing APIs to select what they // want. // // We are conflating in the single error returned both status and validation @@ -148,7 +148,7 @@ func (client *BlockSync) processResponse( req *Request, res *Response, // FIXME: Add the `peer` as argument once we implement penalties. -) (*ValidatedResponse, error) { +) (*validatedResponse, error) { err := res.statusToError() if err != nil { return nil, xerrors.Errorf("status error: %s", err) @@ -175,25 +175,25 @@ func (client *BlockSync) processResponse( return nil, xerrors.Errorf("got less than requested without a proper status: %s", res.Status) } - validRes := &ValidatedResponse{} + validRes := &validatedResponse{} if options.IncludeHeaders { // Check for valid block sets and extract them into `TipSet`s. - validRes.Tipsets = make([]*types.TipSet, resLength) + validRes.tipsets = make([]*types.TipSet, resLength) for i := 0; i < resLength; i++ { - validRes.Tipsets[i], err = types.NewTipSet(res.Chain[i].Blocks) + validRes.tipsets[i], err = types.NewTipSet(res.Chain[i].Blocks) if err != nil { return nil, xerrors.Errorf("invalid tipset blocks at height (head - %d): %w", i, err) } } // Check that the returned head matches the one requested. - if !types.CidArrsEqual(validRes.Tipsets[0].Cids(), req.Head) { + if !types.CidArrsEqual(validRes.tipsets[0].Cids(), req.Head) { return nil, xerrors.Errorf("returned chain head does not match request") } // Check `TipSet` are connected (valid chain). - for i := 0; i < len(validRes.Tipsets) - 1; i++ { - if validRes.Tipsets[i].IsChildOf(validRes.Tipsets[i+1]) == false { + for i := 0; i < len(validRes.tipsets) - 1; i++ { + if validRes.tipsets[i].IsChildOf(validRes.tipsets[i+1]) == false { return nil, fmt.Errorf("tipsets are not connected at height (head - %d)/(head - %d)", i, i+1) // FIXME: Maybe give more information here, like CIDs. @@ -202,12 +202,12 @@ func (client *BlockSync) processResponse( } if options.IncludeMessages { - validRes.Messages = make([]*CompactedMessages, resLength) + validRes.messages = make([]*CompactedMessages, resLength) for i := 0; i < resLength; i++ { if res.Chain[i].Messages == nil { return nil, xerrors.Errorf("no messages included for tipset at height (head - %d): %w", i) } - validRes.Messages[i] = res.Chain[i].Messages + validRes.messages[i] = res.Chain[i].Messages } if options.IncludeHeaders { @@ -276,7 +276,7 @@ func (client *BlockSync) GetBlocks( return nil, err } - return validRes.Tipsets, nil + return validRes.tipsets, nil } func (client *BlockSync) GetFullTipSet( @@ -321,7 +321,7 @@ func (client *BlockSync) GetChainMessages( return nil, err } - return validRes.Messages, nil + return validRes.messages, nil } // Send a request to a peer. Write request in the stream and read the diff --git a/chain/blocksync/protocol.go b/chain/blocksync/protocol.go index dc9f76d26b9..14ae2332f40 100644 --- a/chain/blocksync/protocol.go +++ b/chain/blocksync/protocol.go @@ -153,27 +153,28 @@ type CompactedMessages struct { // Response that has been validated according to the protocol // and can be safely accessed. -// FIXME: Maybe rename to verified, keep consistent naming. -type ValidatedResponse struct { - Tipsets []*types.TipSet - Messages []*CompactedMessages +type validatedResponse struct { + tipsets []*types.TipSet + // List of all messages per tipset (grouped by tipset, + // not by block, hence a single index like `tipsets`). + messages []*CompactedMessages } // Decompress messages and form full tipsets with them. The headers // need to have been requested as well. -func (res *ValidatedResponse) toFullTipSets() ([]*store.FullTipSet) { - if len(res.Tipsets) == 0 { +func (res *validatedResponse) toFullTipSets() ([]*store.FullTipSet) { + if len(res.tipsets) == 0 { // This decompression can only be done if both headers and // messages are returned in the response. // FIXME: Do we need to check the messages are present also? The validation // would seem to imply this is unnecessary, can be added just in case. return nil } - ftsList := make([]*store.FullTipSet, len(res.Tipsets)) - for tipsetIdx := range res.Tipsets { + ftsList := make([]*store.FullTipSet, len(res.tipsets)) + for tipsetIdx := range res.tipsets { fts := &store.FullTipSet{} // FIXME: We should use the `NewFullTipSet` API. - msgs := res.Messages[tipsetIdx] - for blockIdx, b := range res.Tipsets[tipsetIdx].Blocks() { + msgs := res.messages[tipsetIdx] + for blockIdx, b := range res.tipsets[tipsetIdx].Blocks() { fb := &types.FullBlock{ Header: b, } From 97b37474f95b1481d61e3ca5eef8779db5a0aa68 Mon Sep 17 00:00:00 2001 From: Lucas Molas Date: Fri, 31 Jul 2020 09:33:40 -0300 Subject: [PATCH 06/17] extra check in toFullTipSets --- chain/blocksync/protocol.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/chain/blocksync/protocol.go b/chain/blocksync/protocol.go index 14ae2332f40..65933a2034c 100644 --- a/chain/blocksync/protocol.go +++ b/chain/blocksync/protocol.go @@ -163,11 +163,11 @@ type validatedResponse struct { // Decompress messages and form full tipsets with them. The headers // need to have been requested as well. func (res *validatedResponse) toFullTipSets() ([]*store.FullTipSet) { - if len(res.tipsets) == 0 { + if len(res.tipsets) == 0 || len(res.tipsets) != len(res.messages) { // This decompression can only be done if both headers and - // messages are returned in the response. - // FIXME: Do we need to check the messages are present also? The validation - // would seem to imply this is unnecessary, can be added just in case. + // messages are returned in the response. (The second check + // is already implied by the guarantees of `validatedResponse`, + // added here just for completeness.) return nil } ftsList := make([]*store.FullTipSet, len(res.tipsets)) From 755772e12df7c8a53ba9994be3c5d4ce99ff47ec Mon Sep 17 00:00:00 2001 From: Lucas Molas Date: Fri, 31 Jul 2020 09:56:09 -0300 Subject: [PATCH 07/17] review client comments --- chain/blocksync/client.go | 51 +++++++++++++++++++-------------------- 1 file changed, 25 insertions(+), 26 deletions(-) diff --git a/chain/blocksync/client.go b/chain/blocksync/client.go index 3164d45bf20..cf67b4802fc 100644 --- a/chain/blocksync/client.go +++ b/chain/blocksync/client.go @@ -23,7 +23,7 @@ import ( // Protocol client. // FIXME: Rename to just `Client`. Not done at the moment to avoid -// disrupt too much of the consumer code, should be done along +// disrupting too much of the consumer code, should be done along // https://github.com/filecoin-project/lotus/issues/2612. type BlockSync struct { // Connection manager used to contact the server. @@ -53,7 +53,7 @@ func NewClient( // either a response error status, a failed validation or an internal // error. // -// This is the internal single-point-of-entry for all external-facing +// This is the internal single point of entry for all external-facing // APIs, currently we have 3 very heterogeneous services exposed: // * GetBlocks: Headers // * GetFullTipSet: Headers | Messages @@ -67,21 +67,21 @@ func (client *BlockSync) doRequest( req *Request, singlePeer *peer.ID, ) (*validatedResponse, error) { - // Validate request. - if req.Length == 0 { - return nil, xerrors.Errorf("invalid request of length 0") - } - if req.Length > MaxRequestLength { - return nil, xerrors.Errorf("request length (%d) above maximum (%d)", - req.Length, MaxRequestLength) - } - if req.Options == 0 { - return nil, xerrors.Errorf("request with no options set") - } + // Validate request. + if req.Length == 0 { + return nil, xerrors.Errorf("invalid request of length 0") + } + if req.Length > MaxRequestLength { + return nil, xerrors.Errorf("request length (%d) above maximum (%d)", + req.Length, MaxRequestLength) + } + if req.Options == 0 { + return nil, xerrors.Errorf("request with no options set") + } - // Generate the list of peers to be queried, either the - // `singlePeer` indicated or all peers available (sorted - // by an internal peer tracker with some randomness injected). + // Generate the list of peers to be queried, either the + // `singlePeer` indicated or all peers available (sorted + // by an internal peer tracker with some randomness injected). var peers []peer.ID if singlePeer != nil { peers = []peer.ID{*singlePeer} @@ -130,16 +130,15 @@ func (client *BlockSync) doRequest( errString := "doRequest failed for all peers" if singlePeer != nil { - errString = "doRequest failed for single peer" - // (The peer has already been logged before, don't print it again.) + errString = fmt.Sprintf("doRequest failed for single peer %s", *singlePeer) } return nil, xerrors.Errorf(errString) } -// Process and validate response. Check the status and that the information -// returned matches the request (and its integrity). Extract the information +// Process and validate response. Check the status, the integrity of the +// information returned, and that it matches the request. Extract the information // into a `validatedResponse` for the external-facing APIs to select what they -// want. +// need. // // We are conflating in the single error returned both status and validation // errors. Peer penalization should happen here then, before returning, so @@ -156,8 +155,8 @@ func (client *BlockSync) processResponse( options := parseOptions(req.Options) if options.noOptionsSet() { - // Safety check, this shouldn't happen, and even if it did - // it should be caught by the peer in its error status. + // Safety check: this shouldn't have been sent, and even if it did + // it should have been caught by the peer in its error status. return nil, xerrors.Errorf("nothing was requested") } @@ -191,7 +190,7 @@ func (client *BlockSync) processResponse( return nil, xerrors.Errorf("returned chain head does not match request") } - // Check `TipSet` are connected (valid chain). + // Check `TipSet`s are connected (valid chain). for i := 0; i < len(validRes.tipsets) - 1; i++ { if validRes.tipsets[i].IsChildOf(validRes.tipsets[i+1]) == false { return nil, fmt.Errorf("tipsets are not connected at height (head - %d)/(head - %d)", @@ -382,6 +381,7 @@ func (client *BlockSync) sendRequestToPeer( // FIXME: What's the point of setting a blank deadline that won't time out? // Is this our way of clearing the old one? client.peerTracker.logFailure(peer, build.Clock.Since(connectionStart)) + // FIXME: Should we also remove peer here? return nil, err } // FIXME: Same, why are we doing this again here? @@ -390,7 +390,6 @@ func (client *BlockSync) sendRequestToPeer( // Read response. var res Response err = cborutil.ReadCborRPC( - // FIXME: Extract constants. bufio.NewReader(incrt.New(stream, READ_RES_MIN_SPEED, READ_RES_DEADLINE)), &res) if err != nil { @@ -423,7 +422,7 @@ func (client *BlockSync) RemovePeer(p peer.ID) { // getShuffledPeers returns a preference-sorted set of peers (by latency // and failure counting), shuffling the first few peers so we don't always // pick the same peer. -// FIXME: Merge with the shuffle if we *always* do it. +// FIXME: Consider merging with `shufflePrefix()s`. func (client *BlockSync) getShuffledPeers() []peer.ID { peers := client.peerTracker.prefSortedPeers() shufflePrefix(peers) From fc32410e5b3df5b2e4eea61b3c111cdbd630afdc Mon Sep 17 00:00:00 2001 From: Lucas Molas Date: Fri, 31 Jul 2020 09:59:56 -0300 Subject: [PATCH 08/17] review server --- chain/blocksync/server.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/chain/blocksync/server.go b/chain/blocksync/server.go index e406067cc65..9aefbb64851 100644 --- a/chain/blocksync/server.go +++ b/chain/blocksync/server.go @@ -78,8 +78,7 @@ func (server *BlockSyncService) HandleStream(stream inet.Stream) { } // Validate and service the request. We return either a protocol -// response or an internal error. The protocol response may signal -// a protocol error itself (e.g., invalid request). +// response or an internal error. func (server *BlockSyncService) processRequest( ctx context.Context, req *Request, @@ -198,7 +197,7 @@ func collectChainSegment( return nil, xerrors.Errorf("gather messages failed: %w", err) } - // FIXME: Pass the response to the `gatherMessages` and set all this there. + // FIXME: Pass the response to `gatherMessages()` and set all this there. bst.Messages = &CompactedMessages{} bst.Messages.Bls = bmsgs bst.Messages.BlsIncludes = bmincl From c0c692fa22799d057a4c6226cf7d6213337c28e6 Mon Sep 17 00:00:00 2001 From: Lucas Molas Date: Mon, 3 Aug 2020 12:23:11 -0300 Subject: [PATCH 09/17] fix cbor-gen --- chain/blocksync/cbor_gen.go | 382 +++++++++++++++++++++--------------- 1 file changed, 222 insertions(+), 160 deletions(-) diff --git a/chain/blocksync/cbor_gen.go b/chain/blocksync/cbor_gen.go index 7e39924e38d..a19a05a8d7d 100644 --- a/chain/blocksync/cbor_gen.go +++ b/chain/blocksync/cbor_gen.go @@ -14,36 +14,36 @@ import ( var _ = xerrors.Errorf -var lengthBufBlockSyncRequest = []byte{131} +var lengthBufRequest = []byte{131} -func (t *BlockSyncRequest) MarshalCBOR(w io.Writer) error { +func (t *Request) MarshalCBOR(w io.Writer) error { if t == nil { _, err := w.Write(cbg.CborNull) return err } - if _, err := w.Write(lengthBufBlockSyncRequest); err != nil { + if _, err := w.Write(lengthBufRequest); err != nil { return err } scratch := make([]byte, 9) - // t.Start ([]cid.Cid) (slice) - if len(t.Start) > cbg.MaxLength { - return xerrors.Errorf("Slice value in field t.Start was too long") + // t.Head ([]cid.Cid) (slice) + if len(t.Head) > cbg.MaxLength { + return xerrors.Errorf("Slice value in field t.Head was too long") } - if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajArray, uint64(len(t.Start))); err != nil { + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajArray, uint64(len(t.Head))); err != nil { return err } - for _, v := range t.Start { + for _, v := range t.Head { if err := cbg.WriteCidBuf(scratch, w, v); err != nil { - return xerrors.Errorf("failed writing cid field t.Start: %w", err) + return xerrors.Errorf("failed writing cid field t.Head: %w", err) } } - // t.RequestLength (uint64) (uint64) + // t.Length (uint64) (uint64) - if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajUnsignedInt, uint64(t.RequestLength)); err != nil { + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajUnsignedInt, uint64(t.Length)); err != nil { return err } @@ -56,8 +56,8 @@ func (t *BlockSyncRequest) MarshalCBOR(w io.Writer) error { return nil } -func (t *BlockSyncRequest) UnmarshalCBOR(r io.Reader) error { - *t = BlockSyncRequest{} +func (t *Request) UnmarshalCBOR(r io.Reader) error { + *t = Request{} br := cbg.GetPeeker(r) scratch := make([]byte, 8) @@ -74,7 +74,7 @@ func (t *BlockSyncRequest) UnmarshalCBOR(r io.Reader) error { return fmt.Errorf("cbor input had wrong number of fields") } - // t.Start ([]cid.Cid) (slice) + // t.Head ([]cid.Cid) (slice) maj, extra, err = cbg.CborReadHeaderBuf(br, scratch) if err != nil { @@ -82,7 +82,7 @@ func (t *BlockSyncRequest) UnmarshalCBOR(r io.Reader) error { } if extra > cbg.MaxLength { - return fmt.Errorf("t.Start: array too large (%d)", extra) + return fmt.Errorf("t.Head: array too large (%d)", extra) } if maj != cbg.MajArray { @@ -90,19 +90,19 @@ func (t *BlockSyncRequest) UnmarshalCBOR(r io.Reader) error { } if extra > 0 { - t.Start = make([]cid.Cid, extra) + t.Head = make([]cid.Cid, extra) } for i := 0; i < int(extra); i++ { c, err := cbg.ReadCid(br) if err != nil { - return xerrors.Errorf("reading cid field t.Start failed: %w", err) + return xerrors.Errorf("reading cid field t.Head failed: %w", err) } - t.Start[i] = c + t.Head[i] = c } - // t.RequestLength (uint64) (uint64) + // t.Length (uint64) (uint64) { @@ -113,7 +113,7 @@ func (t *BlockSyncRequest) UnmarshalCBOR(r io.Reader) error { if maj != cbg.MajUnsignedInt { return fmt.Errorf("wrong type for uint64 field") } - t.RequestLength = uint64(extra) + t.Length = uint64(extra) } // t.Options (uint64) (uint64) @@ -133,55 +133,55 @@ func (t *BlockSyncRequest) UnmarshalCBOR(r io.Reader) error { return nil } -var lengthBufBlockSyncResponse = []byte{131} +var lengthBufResponse = []byte{131} -func (t *BlockSyncResponse) MarshalCBOR(w io.Writer) error { +func (t *Response) MarshalCBOR(w io.Writer) error { if t == nil { _, err := w.Write(cbg.CborNull) return err } - if _, err := w.Write(lengthBufBlockSyncResponse); err != nil { + if _, err := w.Write(lengthBufResponse); err != nil { return err } scratch := make([]byte, 9) - // t.Chain ([]*blocksync.BSTipSet) (slice) - if len(t.Chain) > cbg.MaxLength { - return xerrors.Errorf("Slice value in field t.Chain was too long") - } + // t.Status (blocksync.status) (uint64) - if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajArray, uint64(len(t.Chain))); err != nil { + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajUnsignedInt, uint64(t.Status)); err != nil { return err } - for _, v := range t.Chain { - if err := v.MarshalCBOR(w); err != nil { - return err - } - } - // t.Status (uint64) (uint64) + // t.ErrorMessage (string) (string) + if len(t.ErrorMessage) > cbg.MaxLength { + return xerrors.Errorf("Value in field t.ErrorMessage was too long") + } - if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajUnsignedInt, uint64(t.Status)); err != nil { + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len(t.ErrorMessage))); err != nil { + return err + } + if _, err := io.WriteString(w, string(t.ErrorMessage)); err != nil { return err } - // t.Message (string) (string) - if len(t.Message) > cbg.MaxLength { - return xerrors.Errorf("Value in field t.Message was too long") + // t.Chain ([]*blocksync.BSTipSet) (slice) + if len(t.Chain) > cbg.MaxLength { + return xerrors.Errorf("Slice value in field t.Chain was too long") } - if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len(t.Message))); err != nil { + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajArray, uint64(len(t.Chain))); err != nil { return err } - if _, err := io.WriteString(w, string(t.Message)); err != nil { - return err + for _, v := range t.Chain { + if err := v.MarshalCBOR(w); err != nil { + return err + } } return nil } -func (t *BlockSyncResponse) UnmarshalCBOR(r io.Reader) error { - *t = BlockSyncResponse{} +func (t *Response) UnmarshalCBOR(r io.Reader) error { + *t = Response{} br := cbg.GetPeeker(r) scratch := make([]byte, 8) @@ -198,6 +198,30 @@ func (t *BlockSyncResponse) UnmarshalCBOR(r io.Reader) error { return fmt.Errorf("cbor input had wrong number of fields") } + // t.Status (blocksync.status) (uint64) + + { + + maj, extra, err = cbg.CborReadHeaderBuf(br, scratch) + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.Status = status(extra) + + } + // t.ErrorMessage (string) (string) + + { + sval, err := cbg.ReadStringBuf(br, scratch) + if err != nil { + return err + } + + t.ErrorMessage = string(sval) + } // t.Chain ([]*blocksync.BSTipSet) (slice) maj, extra, err = cbg.CborReadHeaderBuf(br, scratch) @@ -227,83 +251,45 @@ func (t *BlockSyncResponse) UnmarshalCBOR(r io.Reader) error { t.Chain[i] = &v } - // t.Status (uint64) (uint64) - - { - - maj, extra, err = cbg.CborReadHeaderBuf(br, scratch) - if err != nil { - return err - } - if maj != cbg.MajUnsignedInt { - return fmt.Errorf("wrong type for uint64 field") - } - t.Status = uint64(extra) - - } - // t.Message (string) (string) - - { - sval, err := cbg.ReadStringBuf(br, scratch) - if err != nil { - return err - } - - t.Message = string(sval) - } return nil } -var lengthBufBSTipSet = []byte{133} +var lengthBufCompactedMessages = []byte{132} -func (t *BSTipSet) MarshalCBOR(w io.Writer) error { +func (t *CompactedMessages) MarshalCBOR(w io.Writer) error { if t == nil { _, err := w.Write(cbg.CborNull) return err } - if _, err := w.Write(lengthBufBSTipSet); err != nil { + if _, err := w.Write(lengthBufCompactedMessages); err != nil { return err } scratch := make([]byte, 9) - // t.Blocks ([]*types.BlockHeader) (slice) - if len(t.Blocks) > cbg.MaxLength { - return xerrors.Errorf("Slice value in field t.Blocks was too long") + // t.Bls ([]*types.Message) (slice) + if len(t.Bls) > cbg.MaxLength { + return xerrors.Errorf("Slice value in field t.Bls was too long") } - if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajArray, uint64(len(t.Blocks))); err != nil { + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajArray, uint64(len(t.Bls))); err != nil { return err } - for _, v := range t.Blocks { + for _, v := range t.Bls { if err := v.MarshalCBOR(w); err != nil { return err } } - // t.BlsMessages ([]*types.Message) (slice) - if len(t.BlsMessages) > cbg.MaxLength { - return xerrors.Errorf("Slice value in field t.BlsMessages was too long") + // t.BlsIncludes ([][]uint64) (slice) + if len(t.BlsIncludes) > cbg.MaxLength { + return xerrors.Errorf("Slice value in field t.BlsIncludes was too long") } - if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajArray, uint64(len(t.BlsMessages))); err != nil { + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajArray, uint64(len(t.BlsIncludes))); err != nil { return err } - for _, v := range t.BlsMessages { - if err := v.MarshalCBOR(w); err != nil { - return err - } - } - - // t.BlsMsgIncludes ([][]uint64) (slice) - if len(t.BlsMsgIncludes) > cbg.MaxLength { - return xerrors.Errorf("Slice value in field t.BlsMsgIncludes was too long") - } - - if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajArray, uint64(len(t.BlsMsgIncludes))); err != nil { - return err - } - for _, v := range t.BlsMsgIncludes { + for _, v := range t.BlsIncludes { if len(v) > cbg.MaxLength { return xerrors.Errorf("Slice value in field v was too long") } @@ -318,29 +304,29 @@ func (t *BSTipSet) MarshalCBOR(w io.Writer) error { } } - // t.SecpkMessages ([]*types.SignedMessage) (slice) - if len(t.SecpkMessages) > cbg.MaxLength { - return xerrors.Errorf("Slice value in field t.SecpkMessages was too long") + // t.Secpk ([]*types.SignedMessage) (slice) + if len(t.Secpk) > cbg.MaxLength { + return xerrors.Errorf("Slice value in field t.Secpk was too long") } - if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajArray, uint64(len(t.SecpkMessages))); err != nil { + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajArray, uint64(len(t.Secpk))); err != nil { return err } - for _, v := range t.SecpkMessages { + for _, v := range t.Secpk { if err := v.MarshalCBOR(w); err != nil { return err } } - // t.SecpkMsgIncludes ([][]uint64) (slice) - if len(t.SecpkMsgIncludes) > cbg.MaxLength { - return xerrors.Errorf("Slice value in field t.SecpkMsgIncludes was too long") + // t.SecpkIncludes ([][]uint64) (slice) + if len(t.SecpkIncludes) > cbg.MaxLength { + return xerrors.Errorf("Slice value in field t.SecpkIncludes was too long") } - if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajArray, uint64(len(t.SecpkMsgIncludes))); err != nil { + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajArray, uint64(len(t.SecpkIncludes))); err != nil { return err } - for _, v := range t.SecpkMsgIncludes { + for _, v := range t.SecpkIncludes { if len(v) > cbg.MaxLength { return xerrors.Errorf("Slice value in field v was too long") } @@ -357,8 +343,8 @@ func (t *BSTipSet) MarshalCBOR(w io.Writer) error { return nil } -func (t *BSTipSet) UnmarshalCBOR(r io.Reader) error { - *t = BSTipSet{} +func (t *CompactedMessages) UnmarshalCBOR(r io.Reader) error { + *t = CompactedMessages{} br := cbg.GetPeeker(r) scratch := make([]byte, 8) @@ -371,11 +357,11 @@ func (t *BSTipSet) UnmarshalCBOR(r io.Reader) error { return fmt.Errorf("cbor input should be of type array") } - if extra != 5 { + if extra != 4 { return fmt.Errorf("cbor input had wrong number of fields") } - // t.Blocks ([]*types.BlockHeader) (slice) + // t.Bls ([]*types.Message) (slice) maj, extra, err = cbg.CborReadHeaderBuf(br, scratch) if err != nil { @@ -383,7 +369,7 @@ func (t *BSTipSet) UnmarshalCBOR(r io.Reader) error { } if extra > cbg.MaxLength { - return fmt.Errorf("t.Blocks: array too large (%d)", extra) + return fmt.Errorf("t.Bls: array too large (%d)", extra) } if maj != cbg.MajArray { @@ -391,36 +377,7 @@ func (t *BSTipSet) UnmarshalCBOR(r io.Reader) error { } if extra > 0 { - t.Blocks = make([]*types.BlockHeader, extra) - } - - for i := 0; i < int(extra); i++ { - - var v types.BlockHeader - if err := v.UnmarshalCBOR(br); err != nil { - return err - } - - t.Blocks[i] = &v - } - - // t.BlsMessages ([]*types.Message) (slice) - - maj, extra, err = cbg.CborReadHeaderBuf(br, scratch) - if err != nil { - return err - } - - if extra > cbg.MaxLength { - return fmt.Errorf("t.BlsMessages: array too large (%d)", extra) - } - - if maj != cbg.MajArray { - return fmt.Errorf("expected cbor array") - } - - if extra > 0 { - t.BlsMessages = make([]*types.Message, extra) + t.Bls = make([]*types.Message, extra) } for i := 0; i < int(extra); i++ { @@ -430,10 +387,10 @@ func (t *BSTipSet) UnmarshalCBOR(r io.Reader) error { return err } - t.BlsMessages[i] = &v + t.Bls[i] = &v } - // t.BlsMsgIncludes ([][]uint64) (slice) + // t.BlsIncludes ([][]uint64) (slice) maj, extra, err = cbg.CborReadHeaderBuf(br, scratch) if err != nil { @@ -441,7 +398,7 @@ func (t *BSTipSet) UnmarshalCBOR(r io.Reader) error { } if extra > cbg.MaxLength { - return fmt.Errorf("t.BlsMsgIncludes: array too large (%d)", extra) + return fmt.Errorf("t.BlsIncludes: array too large (%d)", extra) } if maj != cbg.MajArray { @@ -449,7 +406,7 @@ func (t *BSTipSet) UnmarshalCBOR(r io.Reader) error { } if extra > 0 { - t.BlsMsgIncludes = make([][]uint64, extra) + t.BlsIncludes = make([][]uint64, extra) } for i := 0; i < int(extra); i++ { @@ -464,7 +421,7 @@ func (t *BSTipSet) UnmarshalCBOR(r io.Reader) error { } if extra > cbg.MaxLength { - return fmt.Errorf("t.BlsMsgIncludes[i]: array too large (%d)", extra) + return fmt.Errorf("t.BlsIncludes[i]: array too large (%d)", extra) } if maj != cbg.MajArray { @@ -472,27 +429,27 @@ func (t *BSTipSet) UnmarshalCBOR(r io.Reader) error { } if extra > 0 { - t.BlsMsgIncludes[i] = make([]uint64, extra) + t.BlsIncludes[i] = make([]uint64, extra) } for j := 0; j < int(extra); j++ { maj, val, err := cbg.CborReadHeaderBuf(br, scratch) if err != nil { - return xerrors.Errorf("failed to read uint64 for t.BlsMsgIncludes[i] slice: %w", err) + return xerrors.Errorf("failed to read uint64 for t.BlsIncludes[i] slice: %w", err) } if maj != cbg.MajUnsignedInt { - return xerrors.Errorf("value read for array t.BlsMsgIncludes[i] was not a uint, instead got %d", maj) + return xerrors.Errorf("value read for array t.BlsIncludes[i] was not a uint, instead got %d", maj) } - t.BlsMsgIncludes[i][j] = uint64(val) + t.BlsIncludes[i][j] = uint64(val) } } } - // t.SecpkMessages ([]*types.SignedMessage) (slice) + // t.Secpk ([]*types.SignedMessage) (slice) maj, extra, err = cbg.CborReadHeaderBuf(br, scratch) if err != nil { @@ -500,7 +457,7 @@ func (t *BSTipSet) UnmarshalCBOR(r io.Reader) error { } if extra > cbg.MaxLength { - return fmt.Errorf("t.SecpkMessages: array too large (%d)", extra) + return fmt.Errorf("t.Secpk: array too large (%d)", extra) } if maj != cbg.MajArray { @@ -508,7 +465,7 @@ func (t *BSTipSet) UnmarshalCBOR(r io.Reader) error { } if extra > 0 { - t.SecpkMessages = make([]*types.SignedMessage, extra) + t.Secpk = make([]*types.SignedMessage, extra) } for i := 0; i < int(extra); i++ { @@ -518,10 +475,10 @@ func (t *BSTipSet) UnmarshalCBOR(r io.Reader) error { return err } - t.SecpkMessages[i] = &v + t.Secpk[i] = &v } - // t.SecpkMsgIncludes ([][]uint64) (slice) + // t.SecpkIncludes ([][]uint64) (slice) maj, extra, err = cbg.CborReadHeaderBuf(br, scratch) if err != nil { @@ -529,7 +486,7 @@ func (t *BSTipSet) UnmarshalCBOR(r io.Reader) error { } if extra > cbg.MaxLength { - return fmt.Errorf("t.SecpkMsgIncludes: array too large (%d)", extra) + return fmt.Errorf("t.SecpkIncludes: array too large (%d)", extra) } if maj != cbg.MajArray { @@ -537,7 +494,7 @@ func (t *BSTipSet) UnmarshalCBOR(r io.Reader) error { } if extra > 0 { - t.SecpkMsgIncludes = make([][]uint64, extra) + t.SecpkIncludes = make([][]uint64, extra) } for i := 0; i < int(extra); i++ { @@ -552,7 +509,7 @@ func (t *BSTipSet) UnmarshalCBOR(r io.Reader) error { } if extra > cbg.MaxLength { - return fmt.Errorf("t.SecpkMsgIncludes[i]: array too large (%d)", extra) + return fmt.Errorf("t.SecpkIncludes[i]: array too large (%d)", extra) } if maj != cbg.MajArray { @@ -560,21 +517,21 @@ func (t *BSTipSet) UnmarshalCBOR(r io.Reader) error { } if extra > 0 { - t.SecpkMsgIncludes[i] = make([]uint64, extra) + t.SecpkIncludes[i] = make([]uint64, extra) } for j := 0; j < int(extra); j++ { maj, val, err := cbg.CborReadHeaderBuf(br, scratch) if err != nil { - return xerrors.Errorf("failed to read uint64 for t.SecpkMsgIncludes[i] slice: %w", err) + return xerrors.Errorf("failed to read uint64 for t.SecpkIncludes[i] slice: %w", err) } if maj != cbg.MajUnsignedInt { - return xerrors.Errorf("value read for array t.SecpkMsgIncludes[i] was not a uint, instead got %d", maj) + return xerrors.Errorf("value read for array t.SecpkIncludes[i] was not a uint, instead got %d", maj) } - t.SecpkMsgIncludes[i][j] = uint64(val) + t.SecpkIncludes[i][j] = uint64(val) } } @@ -582,3 +539,108 @@ func (t *BSTipSet) UnmarshalCBOR(r io.Reader) error { return nil } + +var lengthBufBSTipSet = []byte{130} + +func (t *BSTipSet) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + if _, err := w.Write(lengthBufBSTipSet); err != nil { + return err + } + + scratch := make([]byte, 9) + + // t.Blocks ([]*types.BlockHeader) (slice) + if len(t.Blocks) > cbg.MaxLength { + return xerrors.Errorf("Slice value in field t.Blocks was too long") + } + + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajArray, uint64(len(t.Blocks))); err != nil { + return err + } + for _, v := range t.Blocks { + if err := v.MarshalCBOR(w); err != nil { + return err + } + } + + // t.Messages (blocksync.CompactedMessages) (struct) + if err := t.Messages.MarshalCBOR(w); err != nil { + return err + } + return nil +} + +func (t *BSTipSet) UnmarshalCBOR(r io.Reader) error { + *t = BSTipSet{} + + br := cbg.GetPeeker(r) + scratch := make([]byte, 8) + + maj, extra, err := cbg.CborReadHeaderBuf(br, scratch) + if err != nil { + return err + } + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 2 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.Blocks ([]*types.BlockHeader) (slice) + + maj, extra, err = cbg.CborReadHeaderBuf(br, scratch) + if err != nil { + return err + } + + if extra > cbg.MaxLength { + return fmt.Errorf("t.Blocks: array too large (%d)", extra) + } + + if maj != cbg.MajArray { + return fmt.Errorf("expected cbor array") + } + + if extra > 0 { + t.Blocks = make([]*types.BlockHeader, extra) + } + + for i := 0; i < int(extra); i++ { + + var v types.BlockHeader + if err := v.UnmarshalCBOR(br); err != nil { + return err + } + + t.Blocks[i] = &v + } + + // t.Messages (blocksync.CompactedMessages) (struct) + + { + + pb, err := br.PeekByte() + if err != nil { + return err + } + if pb == cbg.CborNull[0] { + var nbuf [1]byte + if _, err := br.Read(nbuf[:]); err != nil { + return err + } + } else { + t.Messages = new(CompactedMessages) + if err := t.Messages.UnmarshalCBOR(br); err != nil { + return xerrors.Errorf("unmarshaling t.Messages pointer: %w", err) + } + } + + } + return nil +} From 9f82e98f281102bf04485825e2f0546aa7f27145 Mon Sep 17 00:00:00 2001 From: Lucas Molas Date: Tue, 4 Aug 2020 14:44:27 -0300 Subject: [PATCH 10/17] check parent height --- chain/types/tipset.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/chain/types/tipset.go b/chain/types/tipset.go index bd9843f94be..dff782fe6e5 100644 --- a/chain/types/tipset.go +++ b/chain/types/tipset.go @@ -226,5 +226,9 @@ func (ts *TipSet) Contains(oc cid.Cid) bool { } func (ts *TipSet) IsChildOf(parent *TipSet) bool { - return CidArrsEqual(ts.Parents().Cids(), parent.Cids()) + return CidArrsEqual(ts.Parents().Cids(), parent.Cids()) && + // FIXME: The height check might go beyond what is meant by + // "parent", but many parts of the code rely on the tipset's + // height for their processing logic at the moment to obviate it. + ts.height > parent.height } From 1ec0ded9531dfaf2c3cb02d06b609f8de48a5be1 Mon Sep 17 00:00:00 2001 From: Lucas Molas Date: Tue, 4 Aug 2020 14:44:48 -0300 Subject: [PATCH 11/17] document NewTipSet checks --- chain/types/tipset.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/chain/types/tipset.go b/chain/types/tipset.go index dff782fe6e5..271301ea6f0 100644 --- a/chain/types/tipset.go +++ b/chain/types/tipset.go @@ -97,6 +97,12 @@ func tipsetSortFunc(blks []*BlockHeader) func(i, j int) bool { } } +// Checks: +// * A tipset is composed of at least one block. (Because of our variable +// number of blocks per tipset, determined by randomness, we do not impose +// an upper limit.) +// * All blocks have the same height. +// * All blocks have the same parents (same number of them and matching CIDs). func NewTipSet(blks []*BlockHeader) (*TipSet, error) { if len(blks) == 0 { return nil, xerrors.Errorf("NewTipSet called with zero length array of blocks") From 7982ab52a7727be77ce45268c70455d75f2db1d5 Mon Sep 17 00:00:00 2001 From: Lucas Molas Date: Tue, 4 Aug 2020 15:06:30 -0300 Subject: [PATCH 12/17] check connection between fetched segments during sync --- chain/sync.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/chain/sync.go b/chain/sync.go index 2bf98046934..901c915c4d6 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -1244,6 +1244,22 @@ loop: } log.Info("Got blocks: ", blks[0].Height(), len(blks)) + // Check that the fetched segment of the chain matches what we already + // have. Since we fetch from the head backwards our reassembled chain + // is sorted in reverse here: we have a child -> parent order, our last + // tipset then should be child of the first tipset retrieved. + // FIXME: The reassembly logic should be part of the `BlockSync` + // service, the consumer should not be concerned with the + // `MaxRequestLength` limitation, it should just be able to request + // an segment of arbitrary length. The same burden is put on + // `syncFork()` which needs to be aware this as well. + if blockSet[len(blockSet)-1].IsChildOf(blks[0]) == false { + return nil, xerrors.Errorf("retrieved segments of the chain are not connected at heights %d/%d", + blockSet[len(blockSet)-1].Height(), blks[0].Height()) + // A successful `GetBlocks()` call is guaranteed to fetch at least + // one tipset so the acess `blks[0]` is safe. + } + for _, b := range blks { if b.Height() < untilHeight { break loop From 65512193ccf97e6fd9b22c9953086a1ea3ad967a Mon Sep 17 00:00:00 2001 From: Lucas Molas Date: Wed, 5 Aug 2020 19:40:00 -0300 Subject: [PATCH 13/17] clear deadline in server --- chain/blocksync/client.go | 6 ++---- chain/blocksync/server.go | 2 ++ 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/chain/blocksync/client.go b/chain/blocksync/client.go index cf67b4802fc..feee7015c75 100644 --- a/chain/blocksync/client.go +++ b/chain/blocksync/client.go @@ -378,14 +378,12 @@ func (client *BlockSync) sendRequestToPeer( _ = stream.SetWriteDeadline(time.Now().Add(WRITE_REQ_DEADLINE)) if err := cborutil.WriteCborRPC(stream, req); err != nil { _ = stream.SetWriteDeadline(time.Time{}) - // FIXME: What's the point of setting a blank deadline that won't time out? - // Is this our way of clearing the old one? client.peerTracker.logFailure(peer, build.Clock.Since(connectionStart)) // FIXME: Should we also remove peer here? return nil, err } - // FIXME: Same, why are we doing this again here? - _ = stream.SetWriteDeadline(time.Time{}) + _ = stream.SetWriteDeadline(time.Time{}) // clear deadline // FIXME: Needs + // its own API (https://github.com/libp2p/go-libp2p-core/issues/162). // Read response. var res Response diff --git a/chain/blocksync/server.go b/chain/blocksync/server.go index 9aefbb64851..32ed4a16f23 100644 --- a/chain/blocksync/server.go +++ b/chain/blocksync/server.go @@ -71,10 +71,12 @@ func (server *BlockSyncService) HandleStream(stream inet.Stream) { _ = stream.SetDeadline(time.Now().Add(WRITE_RES_DEADLINE)) if err := cborutil.WriteCborRPC(stream, resp); err != nil { + _ = stream.SetDeadline(time.Time{}) log.Warnw("failed to write back response for handle stream", "err", err, "peer", stream.Conn().RemotePeer()) return } + _ = stream.SetDeadline(time.Time{}) } // Validate and service the request. We return either a protocol From 8e044be7c268582ab55c9a545ed62158b552b82e Mon Sep 17 00:00:00 2001 From: Lucas Molas Date: Wed, 5 Aug 2020 19:45:10 -0300 Subject: [PATCH 14/17] protocol: req: use TipSetKey comment --- chain/blocksync/protocol.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/chain/blocksync/protocol.go b/chain/blocksync/protocol.go index 65933a2034c..8be4e44c5e5 100644 --- a/chain/blocksync/protocol.go +++ b/chain/blocksync/protocol.go @@ -37,8 +37,8 @@ const WRITE_RES_DEADLINE = 60 * time.Second type Request struct { // List of ordered CIDs comprising a `TipSetKey` from where to start // fetching backwards. - // FIXME: Why don't we send a `TipSetKey` instead of converting back - // and forth? + // FIXME: Consider using `TipSetKey` now (introduced after the creation + // of this protocol) instead of converting back and forth. Head []cid.Cid // Number of block sets to fetch from `Head` (inclusive, should always // be in the range `[1, MaxRequestLength]`). From ac255023084b3ba018d6954a6c570d58f20236f5 Mon Sep 17 00:00:00 2001 From: Lucas Molas Date: Wed, 5 Aug 2020 19:47:27 -0300 Subject: [PATCH 15/17] go fmt --- chain/blocksync/client.go | 6 +++--- chain/blocksync/peer_tracker.go | 1 + chain/blocksync/protocol.go | 17 +++++++++-------- chain/blocksync/server.go | 9 ++++----- 4 files changed, 17 insertions(+), 16 deletions(-) diff --git a/chain/blocksync/client.go b/chain/blocksync/client.go index feee7015c75..078ea5ad416 100644 --- a/chain/blocksync/client.go +++ b/chain/blocksync/client.go @@ -30,7 +30,7 @@ type BlockSync struct { // FIXME: We should have a reduced interface here, initialized // just with our protocol ID, we shouldn't be able to open *any* // connection. - host host.Host + host host.Host peerTracker *bsPeerTracker } @@ -191,7 +191,7 @@ func (client *BlockSync) processResponse( } // Check `TipSet`s are connected (valid chain). - for i := 0; i < len(validRes.tipsets) - 1; i++ { + for i := 0; i < len(validRes.tipsets)-1; i++ { if validRes.tipsets[i].IsChildOf(validRes.tipsets[i+1]) == false { return nil, fmt.Errorf("tipsets are not connected at height (head - %d)/(head - %d)", i, i+1) @@ -305,7 +305,7 @@ func (client *BlockSync) GetChainMessages( ctx context.Context, head *types.TipSet, length uint64, - ) ([]*CompactedMessages, error) { +) ([]*CompactedMessages, error) { ctx, span := trace.StartSpan(ctx, "GetChainMessages") defer span.End() diff --git a/chain/blocksync/peer_tracker.go b/chain/blocksync/peer_tracker.go index f241265fab3..f1f6ede07ac 100644 --- a/chain/blocksync/peer_tracker.go +++ b/chain/blocksync/peer_tracker.go @@ -1,4 +1,5 @@ package blocksync + // FIXME: This needs to be reviewed. import ( diff --git a/chain/blocksync/protocol.go b/chain/blocksync/protocol.go index 8be4e44c5e5..5a499f3672c 100644 --- a/chain/blocksync/protocol.go +++ b/chain/blocksync/protocol.go @@ -29,7 +29,7 @@ var MaxRequestLength = uint64(build.ForkLengthThreshold) const SUCCESS_PEER_TAG_VALUE = 25 const WRITE_REQ_DEADLINE = 5 * time.Second const READ_RES_DEADLINE = WRITE_REQ_DEADLINE -const READ_RES_MIN_SPEED = 50<<10 +const READ_RES_MIN_SPEED = 50 << 10 const SHUFFLE_PEERS_PREFIX = 5 const WRITE_RES_DEADLINE = 60 * time.Second @@ -83,7 +83,7 @@ func parseOptions(optfield uint64) *parsedOptions { // FIXME: Rename. Make private. type Response struct { - Status status + Status status // String that complements the error status when converting to an // internal error (see `statusToError()`). ErrorMessage string @@ -92,6 +92,7 @@ type Response struct { } type status uint64 + const ( Ok status = 0 // We could not fetch all blocks requested (but at least we returned @@ -127,8 +128,8 @@ func (res *Response) statusToError() error { // FIXME: Rename. type BSTipSet struct { - Blocks []*types.BlockHeader - Messages *CompactedMessages + Blocks []*types.BlockHeader + Messages *CompactedMessages } // All messages of a single tipset compacted together instead @@ -144,17 +145,17 @@ type BSTipSet struct { // FIXME: The logic to decompress this structure should belong // to itself, not to the consumer. type CompactedMessages struct { - Bls []*types.Message + Bls []*types.Message BlsIncludes [][]uint64 - Secpk []*types.SignedMessage + Secpk []*types.SignedMessage SecpkIncludes [][]uint64 } // Response that has been validated according to the protocol // and can be safely accessed. type validatedResponse struct { - tipsets []*types.TipSet + tipsets []*types.TipSet // List of all messages per tipset (grouped by tipset, // not by block, hence a single index like `tipsets`). messages []*CompactedMessages @@ -162,7 +163,7 @@ type validatedResponse struct { // Decompress messages and form full tipsets with them. The headers // need to have been requested as well. -func (res *validatedResponse) toFullTipSets() ([]*store.FullTipSet) { +func (res *validatedResponse) toFullTipSets() []*store.FullTipSet { if len(res.tipsets) == 0 || len(res.tipsets) != len(res.messages) { // This decompression can only be done if both headers and // messages are returned in the response. (The second check diff --git a/chain/blocksync/server.go b/chain/blocksync/server.go index 32ed4a16f23..001f2e6406b 100644 --- a/chain/blocksync/server.go +++ b/chain/blocksync/server.go @@ -18,7 +18,6 @@ import ( inet "github.com/libp2p/go-libp2p-core/network" ) - // BlockSyncService is the component that services BlockSync requests from // peers. // @@ -101,7 +100,7 @@ func (server *BlockSyncService) processRequest( func validateRequest( ctx context.Context, req *Request, -) ( *validatedRequest, *Response) { +) (*validatedRequest, *Response) { _, span := trace.StartSpan(ctx, "blocksync.ValidateRequest") defer span.End() @@ -118,7 +117,7 @@ func validateRequest( validReq.length = req.Length if validReq.length > MaxRequestLength { return nil, &Response{ - Status: BadRequest, + Status: BadRequest, ErrorMessage: fmt.Sprintf("request length over maximum allowed (%d)", MaxRequestLength), } @@ -151,7 +150,7 @@ func validateRequest( func (server *BlockSyncService) serviceRequest( ctx context.Context, req *validatedRequest, - ) (*Response, error) { +) (*Response, error) { _, span := trace.StartSpan(ctx, "blocksync.ServiceRequest") defer span.End() @@ -178,7 +177,7 @@ func (server *BlockSyncService) serviceRequest( func collectChainSegment( cs *store.ChainStore, req *validatedRequest, - ) ([]*BSTipSet, error) { +) ([]*BSTipSet, error) { var bstips []*BSTipSet cur := req.head From 2c43138043ebb1926f4430fce9cb7b12346c847a Mon Sep 17 00:00:00 2001 From: Lucas Molas Date: Thu, 6 Aug 2020 12:38:31 -0300 Subject: [PATCH 16/17] record global time in client --- chain/blocksync/client.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/chain/blocksync/client.go b/chain/blocksync/client.go index 078ea5ad416..579d40f8e0f 100644 --- a/chain/blocksync/client.go +++ b/chain/blocksync/client.go @@ -96,8 +96,9 @@ func (client *BlockSync) doRequest( // return on the first successful response. // FIXME: Doing this serially isn't great, but fetching in parallel // may not be a good idea either. Think about this more. - startTime := build.Clock.Now() - // FIXME: Should we track time per peer instead of a global one? + globalTime := build.Clock.Now() + // Global time used to track what is the expected time we will need to get + // a response if a client fails us. for _, peer := range peers { select { case <-ctx.Done(): @@ -123,7 +124,7 @@ func (client *BlockSync) doRequest( continue } - client.peerTracker.logGlobalSuccess(build.Clock.Since(startTime)) + client.peerTracker.logGlobalSuccess(build.Clock.Since(globalTime)) client.host.ConnManager().TagPeer(peer, "bsync", SUCCESS_PEER_TAG_VALUE) return validRes, nil } From 7216363d2e8b2c29e4f5409362203a18233f0da6 Mon Sep 17 00:00:00 2001 From: Lucas Molas Date: Thu, 6 Aug 2020 12:40:01 -0300 Subject: [PATCH 17/17] log success fixme --- chain/blocksync/client.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/chain/blocksync/client.go b/chain/blocksync/client.go index 579d40f8e0f..87cb41d44ea 100644 --- a/chain/blocksync/client.go +++ b/chain/blocksync/client.go @@ -407,6 +407,8 @@ func (client *BlockSync) sendRequestToPeer( } client.peerTracker.logSuccess(peer, build.Clock.Since(connectionStart)) + // FIXME: We should really log a success only after we validate the response. + // It might be a bit hard to do. return &res, nil }