Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configure message parameters #247

Merged
merged 3 commits into from
Oct 13, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benchmarks/testnet/virtual.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (mp *messagePasser) Reset() error {
return nil
}

func (nc *networkClient) NewMessageSender(ctx context.Context, p peer.ID) (gsnet.MessageSender, error) {
func (nc *networkClient) NewMessageSender(ctx context.Context, p peer.ID, _ gsnet.MessageSenderOpts) (gsnet.MessageSender, error) {
return &messagePasser{
net: nc,
target: p,
Expand Down
30 changes: 29 additions & 1 deletion impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package graphsync

import (
"context"
"time"

logging "github.com/ipfs/go-log/v2"
"github.com/ipfs/go-peertaskqueue"
Expand Down Expand Up @@ -33,6 +34,8 @@ const maxRecursionDepth = 100
const defaultTotalMaxMemory = uint64(256 << 20)
const defaultMaxMemoryPerPeer = uint64(16 << 20)
const defaultMaxInProgressRequests = uint64(6)
const defaultMessageSendRetries = 10
const defaultSendMessageTimeout = 10 * time.Minute

// GraphSync is an instance of a GraphSync exchange that implements
// the graphsync protocol.
Expand Down Expand Up @@ -77,6 +80,8 @@ type graphsyncConfigOptions struct {
registerDefaultValidator bool
maxLinksPerOutgoingRequest uint64
maxLinksPerIncomingRequest uint64
messageSendRetries int
sendMessageTimeout time.Duration
}

// Option defines the functional option type that can be used to configure
Expand Down Expand Up @@ -171,6 +176,27 @@ func MaxLinksPerIncomingRequests(maxLinksPerIncomingRequest uint64) Option {
}
}

// MessageSendRetries sets the number of times graphsync will send
// attempt to send a message before giving up.
// Lower to increase the speed at which an unresponsive peer is
// detected.
hannahhoward marked this conversation as resolved.
Show resolved Hide resolved
func MessageSendRetries(messageSendRetries int) Option {
return func(gs *graphsyncConfigOptions) {
gs.messageSendRetries = messageSendRetries
}
}

// SendMessageTimeout sets the amount of time graphsync will wait
// for a message to go across the wire before giving up and
// trying again (up to max retries).
// Lower to increase the speed at which an unresponsive peer is
// detected.
hannahhoward marked this conversation as resolved.
Show resolved Hide resolved
func SendMessageTimeout(sendMessageTimeout time.Duration) Option {
return func(gs *graphsyncConfigOptions) {
gs.sendMessageTimeout = sendMessageTimeout
}
}

// New creates a new GraphSync Exchange on the given network,
// and the given link loader+storer.
func New(parent context.Context, network gsnet.GraphSyncNetwork,
Expand All @@ -185,6 +211,8 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
maxInProgressIncomingRequests: defaultMaxInProgressRequests,
maxInProgressOutgoingRequests: defaultMaxInProgressRequests,
registerDefaultValidator: true,
messageSendRetries: defaultMessageSendRetries,
sendMessageTimeout: defaultSendMessageTimeout,
}
for _, option := range options {
option(gsConfig)
Expand All @@ -207,7 +235,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
}
responseAllocator := allocator.NewAllocator(gsConfig.totalMaxMemoryResponder, gsConfig.maxMemoryPerPeerResponder)
createMessageQueue := func(ctx context.Context, p peer.ID) peermanager.PeerQueue {
return messagequeue.New(ctx, p, network, responseAllocator)
return messagequeue.New(ctx, p, network, responseAllocator, gsConfig.messageSendRetries, gsConfig.sendMessageTimeout)
}
peerManager := peermanager.NewMessageManager(ctx, createMessageQueue)
requestAllocator := allocator.NewAllocator(gsConfig.totalMaxMemoryRequestor, gsConfig.maxMemoryPerPeerRequestor)
Expand Down
44 changes: 23 additions & 21 deletions messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ import (

var log = logging.Logger("graphsync")

const maxRetries = 10

// max block size is the maximum size for batching blocks in a single payload
const maxBlockSize uint64 = 512 * 1024

Expand All @@ -38,7 +36,7 @@ type Event struct {
// MessageNetwork is any network that can connect peers and generate a message
// sender.
type MessageNetwork interface {
NewMessageSender(context.Context, peer.ID) (gsnet.MessageSender, error)
NewMessageSender(context.Context, peer.ID, gsnet.MessageSenderOpts) (gsnet.MessageSender, error)
ConnectTo(context.Context, peer.ID) error
}

Expand All @@ -58,24 +56,28 @@ type MessageQueue struct {
done chan struct{}

// internal do not touch outside go routines
sender gsnet.MessageSender
eventPublisher notifications.Publisher
buildersLk sync.RWMutex
builders []*gsmsg.Builder
nextBuilderTopic gsmsg.Topic
allocator Allocator
sender gsnet.MessageSender
eventPublisher notifications.Publisher
buildersLk sync.RWMutex
builders []*gsmsg.Builder
nextBuilderTopic gsmsg.Topic
allocator Allocator
maxRetries int
sendMessageTimeout time.Duration
}

// New creats a new MessageQueue.
func New(ctx context.Context, p peer.ID, network MessageNetwork, allocator Allocator) *MessageQueue {
func New(ctx context.Context, p peer.ID, network MessageNetwork, allocator Allocator, maxRetries int, sendMessageTimeout time.Duration) *MessageQueue {
return &MessageQueue{
ctx: ctx,
network: network,
p: p,
outgoingWork: make(chan struct{}, 1),
done: make(chan struct{}),
eventPublisher: notifications.NewPublisher(),
allocator: allocator,
ctx: ctx,
network: network,
p: p,
outgoingWork: make(chan struct{}, 1),
done: make(chan struct{}),
eventPublisher: notifications.NewPublisher(),
allocator: allocator,
maxRetries: maxRetries,
sendMessageTimeout: sendMessageTimeout,
}
}

Expand Down Expand Up @@ -220,7 +222,7 @@ func (mq *MessageQueue) sendMessage() {
return
}

for i := 0; i < maxRetries; i++ { // try to send this message until we fail.
for i := 0; i < mq.maxRetries; i++ { // try to send this message until we fail.
if mq.attemptSendAndRecovery(message, publisher) {
return
}
Expand All @@ -232,7 +234,7 @@ func (mq *MessageQueue) initializeSender() error {
if mq.sender != nil {
return nil
}
nsender, err := openSender(mq.ctx, mq.network, mq.p)
nsender, err := openSender(mq.ctx, mq.network, mq.p, mq.sendMessageTimeout)
if err != nil {
return err
}
Expand Down Expand Up @@ -277,7 +279,7 @@ func (mq *MessageQueue) attemptSendAndRecovery(message gsmsg.GraphSyncMessage, p
return false
}

func openSender(ctx context.Context, network MessageNetwork, p peer.ID) (gsnet.MessageSender, error) {
func openSender(ctx context.Context, network MessageNetwork, p peer.ID, sendTimeout time.Duration) (gsnet.MessageSender, error) {
// allow ten minutes for connections this includes looking them up in the
// dht dialing them, and handshaking
conctx, cancel := context.WithTimeout(ctx, time.Minute*10)
Expand All @@ -288,7 +290,7 @@ func openSender(ctx context.Context, network MessageNetwork, p peer.ID) (gsnet.M
return nil, err
}

nsender, err := network.NewMessageSender(ctx, p)
nsender, err := network.NewMessageSender(ctx, p, gsnet.MessageSenderOpts{SendTimeout: sendTimeout})
if err != nil {
return nil, err
}
Expand Down
17 changes: 10 additions & 7 deletions messagequeue/messagequeue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import (
"github.com/ipfs/go-graphsync/testutil"
)

const sendMessageTimeout = 10 * time.Minute
const messageSendRetries = 10

type fakeMessageNetwork struct {
connectError error
messageSenderError error
Expand All @@ -32,7 +35,7 @@ func (fmn *fakeMessageNetwork) ConnectTo(context.Context, peer.ID) error {
return fmn.connectError
}

func (fmn *fakeMessageNetwork) NewMessageSender(context.Context, peer.ID) (gsnet.MessageSender, error) {
func (fmn *fakeMessageNetwork) NewMessageSender(context.Context, peer.ID, gsnet.MessageSenderOpts) (gsnet.MessageSender, error) {
fmn.wait.Done()
if fmn.messageSenderError == nil {
return fmn.messageSender, nil
Expand Down Expand Up @@ -68,7 +71,7 @@ func TestStartupAndShutdown(t *testing.T) {
messageNetwork := &fakeMessageNetwork{nil, nil, messageSender, &waitGroup}
allocator := allocator2.NewAllocator(1<<30, 1<<30)

messageQueue := New(ctx, peer, messageNetwork, allocator)
messageQueue := New(ctx, peer, messageNetwork, allocator, messageSendRetries, sendMessageTimeout)
messageQueue.Startup()
id := graphsync.RequestID(rand.Int31())
priority := graphsync.Priority(rand.Int31())
Expand Down Expand Up @@ -106,7 +109,7 @@ func TestShutdownDuringMessageSend(t *testing.T) {
messageNetwork := &fakeMessageNetwork{nil, nil, messageSender, &waitGroup}
allocator := allocator2.NewAllocator(1<<30, 1<<30)

messageQueue := New(ctx, peer, messageNetwork, allocator)
messageQueue := New(ctx, peer, messageNetwork, allocator, messageSendRetries, sendMessageTimeout)
messageQueue.Startup()
id := graphsync.RequestID(rand.Int31())
priority := graphsync.Priority(rand.Int31())
Expand Down Expand Up @@ -154,7 +157,7 @@ func TestProcessingNotification(t *testing.T) {
messageNetwork := &fakeMessageNetwork{nil, nil, messageSender, &waitGroup}
allocator := allocator2.NewAllocator(1<<30, 1<<30)

messageQueue := New(ctx, peer, messageNetwork, allocator)
messageQueue := New(ctx, peer, messageNetwork, allocator, messageSendRetries, sendMessageTimeout)
messageQueue.Startup()
waitGroup.Add(1)
blks := testutil.GenerateBlocksOfSize(3, 128)
Expand Down Expand Up @@ -210,7 +213,7 @@ func TestDedupingMessages(t *testing.T) {
messageNetwork := &fakeMessageNetwork{nil, nil, messageSender, &waitGroup}
allocator := allocator2.NewAllocator(1<<30, 1<<30)

messageQueue := New(ctx, peer, messageNetwork, allocator)
messageQueue := New(ctx, peer, messageNetwork, allocator, messageSendRetries, sendMessageTimeout)
messageQueue.Startup()
waitGroup.Add(1)
id := graphsync.RequestID(rand.Int31())
Expand Down Expand Up @@ -282,7 +285,7 @@ func TestSendsVeryLargeBlocksResponses(t *testing.T) {
messageNetwork := &fakeMessageNetwork{nil, nil, messageSender, &waitGroup}
allocator := allocator2.NewAllocator(1<<30, 1<<30)

messageQueue := New(ctx, peer, messageNetwork, allocator)
messageQueue := New(ctx, peer, messageNetwork, allocator, messageSendRetries, sendMessageTimeout)
messageQueue.Startup()
waitGroup.Add(1)

Expand Down Expand Up @@ -342,7 +345,7 @@ func TestSendsResponsesMemoryPressure(t *testing.T) {
// use allocator with very small limit
allocator := allocator2.NewAllocator(1000, 1000)

messageQueue := New(ctx, p, messageNetwork, allocator)
messageQueue := New(ctx, p, messageNetwork, allocator, messageSendRetries, sendMessageTimeout)
messageQueue.Startup()
waitGroup.Add(1)

Expand Down
8 changes: 7 additions & 1 deletion network/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package network

import (
"context"
"time"

"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
Expand Down Expand Up @@ -30,11 +31,16 @@ type GraphSyncNetwork interface {
// ConnectTo establishes a connection to the given peer
ConnectTo(context.Context, peer.ID) error

NewMessageSender(context.Context, peer.ID) (MessageSender, error)
NewMessageSender(context.Context, peer.ID, MessageSenderOpts) (MessageSender, error)

ConnectionManager() ConnManager
}

// MessageSenderOpts sets parameters for a message sender
type MessageSenderOpts struct {
SendTimeout time.Duration
}

// ConnManager provides the methods needed to protect and unprotect connections
type ConnManager interface {
Protect(peer.ID, string)
Expand Down
23 changes: 16 additions & 7 deletions network/libp2p_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ type libp2pGraphSyncNetwork struct {
}

type streamMessageSender struct {
s network.Stream
s network.Stream
opts MessageSenderOpts
}

func (s *streamMessageSender) Close() error {
Expand All @@ -50,14 +51,14 @@ func (s *streamMessageSender) Reset() error {
}

func (s *streamMessageSender) SendMsg(ctx context.Context, msg gsmsg.GraphSyncMessage) error {
return msgToStream(ctx, s.s, msg)
return msgToStream(ctx, s.s, msg, s.opts.SendTimeout)
}

func msgToStream(ctx context.Context, s network.Stream, msg gsmsg.GraphSyncMessage) error {
func msgToStream(ctx context.Context, s network.Stream, msg gsmsg.GraphSyncMessage, timeout time.Duration) error {
log.Debugf("Outgoing message with %d requests, %d responses, and %d blocks",
len(msg.Requests()), len(msg.Responses()), len(msg.Blocks()))

deadline := time.Now().Add(sendMessageTimeout)
deadline := time.Now().Add(timeout)
if dl, ok := ctx.Deadline(); ok {
deadline = dl
}
Expand All @@ -81,13 +82,13 @@ func msgToStream(ctx context.Context, s network.Stream, msg gsmsg.GraphSyncMessa
return nil
}

func (gsnet *libp2pGraphSyncNetwork) NewMessageSender(ctx context.Context, p peer.ID) (MessageSender, error) {
func (gsnet *libp2pGraphSyncNetwork) NewMessageSender(ctx context.Context, p peer.ID, opts MessageSenderOpts) (MessageSender, error) {
s, err := gsnet.newStreamToPeer(ctx, p)
if err != nil {
return nil, err
}

return &streamMessageSender{s: s}, nil
return &streamMessageSender{s: s, opts: setDefaults(opts)}, nil
}

func (gsnet *libp2pGraphSyncNetwork) newStreamToPeer(ctx context.Context, p peer.ID) (network.Stream, error) {
Expand All @@ -104,7 +105,7 @@ func (gsnet *libp2pGraphSyncNetwork) SendMessage(
return err
}

if err = msgToStream(ctx, s, outgoing); err != nil {
if err = msgToStream(ctx, s, outgoing, sendMessageTimeout); err != nil {
_ = s.Reset()
return err
}
Expand Down Expand Up @@ -173,3 +174,11 @@ func (nn *libp2pGraphSyncNotifee) OpenedStream(n network.Network, v network.Stre
func (nn *libp2pGraphSyncNotifee) ClosedStream(n network.Network, v network.Stream) {}
func (nn *libp2pGraphSyncNotifee) Listen(n network.Network, a ma.Multiaddr) {}
func (nn *libp2pGraphSyncNotifee) ListenClose(n network.Network, a ma.Multiaddr) {}

func setDefaults(opts MessageSenderOpts) MessageSenderOpts {
copy := opts
if opts.SendTimeout == 0 {
copy.SendTimeout = sendMessageTimeout
}
return copy
}