diff --git a/network/ipfs_impl.go b/network/ipfs_impl.go index 52ee64c6..005cfd58 100644 --- a/network/ipfs_impl.go +++ b/network/ipfs_impl.go @@ -8,15 +8,16 @@ import ( "time" bsmsg "github.com/ipfs/go-bitswap/message" - "github.com/libp2p/go-libp2p-core/helpers" cid "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p-core/connmgr" + "github.com/libp2p/go-libp2p-core/helpers" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" peerstore "github.com/libp2p/go-libp2p-core/peerstore" + "github.com/libp2p/go-libp2p-core/protocol" "github.com/libp2p/go-libp2p-core/routing" msgio "github.com/libp2p/go-msgio" ma "github.com/multiformats/go-multiaddr" @@ -27,10 +28,19 @@ var log = logging.Logger("bitswap_network") var sendMessageTimeout = time.Minute * 10 // NewFromIpfsHost returns a BitSwapNetwork supported by underlying IPFS host. -func NewFromIpfsHost(host host.Host, r routing.ContentRouting) BitSwapNetwork { +func NewFromIpfsHost(host host.Host, r routing.ContentRouting, opts ...NetOpt) BitSwapNetwork { + s := Settings{} + for _, opt := range opts { + opt(&s) + } + bitswapNetwork := impl{ host: host, routing: r, + + protocolBitswap: s.ProtocolPrefix + ProtocolBitswap, + protocolBitswapOne: s.ProtocolPrefix + ProtocolBitswapOne, + protocolBitswapNoVers: s.ProtocolPrefix + ProtocolBitswapNoVers, } return &bitswapNetwork } @@ -41,6 +51,10 @@ type impl struct { host host.Host routing routing.ContentRouting + protocolBitswap protocol.ID + protocolBitswapOne protocol.ID + protocolBitswapNoVers protocol.ID + // inbound messages from the network are forwarded to the receiver receiver Receiver @@ -48,7 +62,8 @@ type impl struct { } type streamMessageSender struct { - s network.Stream + s network.Stream + bsnet *impl } func (s *streamMessageSender) Close() error { @@ -60,10 +75,10 @@ func (s *streamMessageSender) Reset() error { } func (s *streamMessageSender) SendMsg(ctx context.Context, msg bsmsg.BitSwapMessage) error { - return msgToStream(ctx, s.s, msg) + return s.bsnet.msgToStream(ctx, s.s, msg) } -func msgToStream(ctx context.Context, s network.Stream, msg bsmsg.BitSwapMessage) error { +func (bsnet *impl) msgToStream(ctx context.Context, s network.Stream, msg bsmsg.BitSwapMessage) error { deadline := time.Now().Add(sendMessageTimeout) if dl, ok := ctx.Deadline(); ok { deadline = dl @@ -74,12 +89,12 @@ func msgToStream(ctx context.Context, s network.Stream, msg bsmsg.BitSwapMessage } switch s.Protocol() { - case ProtocolBitswap: + case bsnet.protocolBitswap: if err := msg.ToNetV1(s); err != nil { log.Debugf("error: %s", err) return err } - case ProtocolBitswapOne, ProtocolBitswapNoVers: + case bsnet.protocolBitswapOne, bsnet.protocolBitswapNoVers: if err := msg.ToNetV0(s); err != nil { log.Debugf("error: %s", err) return err @@ -100,11 +115,11 @@ func (bsnet *impl) NewMessageSender(ctx context.Context, p peer.ID) (MessageSend return nil, err } - return &streamMessageSender{s: s}, nil + return &streamMessageSender{s: s, bsnet: bsnet}, nil } func (bsnet *impl) newStreamToPeer(ctx context.Context, p peer.ID) (network.Stream, error) { - return bsnet.host.NewStream(ctx, p, ProtocolBitswap, ProtocolBitswapOne, ProtocolBitswapNoVers) + return bsnet.host.NewStream(ctx, p, bsnet.protocolBitswap, bsnet.protocolBitswapOne, bsnet.protocolBitswapNoVers) } func (bsnet *impl) SendMessage( @@ -117,7 +132,7 @@ func (bsnet *impl) SendMessage( return err } - if err = msgToStream(ctx, s, outgoing); err != nil { + if err = bsnet.msgToStream(ctx, s, outgoing); err != nil { s.Reset() return err } @@ -131,9 +146,9 @@ func (bsnet *impl) SendMessage( func (bsnet *impl) SetDelegate(r Receiver) { bsnet.receiver = r - bsnet.host.SetStreamHandler(ProtocolBitswap, bsnet.handleNewStream) - bsnet.host.SetStreamHandler(ProtocolBitswapOne, bsnet.handleNewStream) - bsnet.host.SetStreamHandler(ProtocolBitswapNoVers, bsnet.handleNewStream) + bsnet.host.SetStreamHandler(bsnet.protocolBitswap, bsnet.handleNewStream) + bsnet.host.SetStreamHandler(bsnet.protocolBitswapOne, bsnet.handleNewStream) + bsnet.host.SetStreamHandler(bsnet.protocolBitswapNoVers, bsnet.handleNewStream) bsnet.host.Network().Notify((*netNotifiee)(bsnet)) // TODO: StopNotify. diff --git a/network/options.go b/network/options.go new file mode 100644 index 00000000..38bb63d1 --- /dev/null +++ b/network/options.go @@ -0,0 +1,15 @@ +package network + +import "github.com/libp2p/go-libp2p-core/protocol" + +type NetOpt func(*Settings) + +type Settings struct { + ProtocolPrefix protocol.ID +} + +func Prefix(prefix protocol.ID) NetOpt { + return func(settings *Settings) { + settings.ProtocolPrefix = prefix + } +}