Skip to content

Commit

Permalink
Merge branch 'feat/network-error-propagation-1'
Browse files Browse the repository at this point in the history
  • Loading branch information
Brian Tiger Chow committed Sep 25, 2014
2 parents bc883bd + e2a9c5d commit 671b095
Show file tree
Hide file tree
Showing 9 changed files with 62 additions and 71 deletions.
17 changes: 11 additions & 6 deletions exchange/bitswap/bitswap.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package bitswap

import (
"errors"

context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"

Expand Down Expand Up @@ -120,14 +118,16 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error {

// TODO(brian): handle errors
func (bs *bitswap) ReceiveMessage(ctx context.Context, p *peer.Peer, incoming bsmsg.BitSwapMessage) (
*peer.Peer, bsmsg.BitSwapMessage, error) {
*peer.Peer, bsmsg.BitSwapMessage) {
u.DOut("ReceiveMessage from %v\n", p.Key().Pretty())

if p == nil {
return nil, nil, errors.New("Received nil Peer")
// TODO propagate the error upward
return nil, nil
}
if incoming == nil {
return nil, nil, errors.New("Received nil Message")
// TODO propagate the error upward
return nil, nil
}

bs.strategy.MessageReceived(p, incoming) // FIRST
Expand Down Expand Up @@ -157,7 +157,12 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p *peer.Peer, incoming bs
}
}
defer bs.strategy.MessageSent(p, message)
return p, message, nil
return p, message
}

func (bs *bitswap) ReceiveError(err error) {
// TODO log the network error
// TODO bubble the network error up to the parent context/error logger
}

// send strives to ensure that accounting is always performed when a message is
Expand Down
4 changes: 3 additions & 1 deletion exchange/bitswap/network/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ type Adapter interface {
type Receiver interface {
ReceiveMessage(
ctx context.Context, sender *peer.Peer, incoming bsmsg.BitSwapMessage) (
destination *peer.Peer, outgoing bsmsg.BitSwapMessage, err error)
destination *peer.Peer, outgoing bsmsg.BitSwapMessage)

ReceiveError(error)
}

// TODO(brian): move this to go-ipfs/net package
Expand Down
21 changes: 9 additions & 12 deletions exchange/bitswap/network/net_message_adapter.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package network

import (
"errors"

context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"

bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
Expand Down Expand Up @@ -31,33 +29,32 @@ type impl struct {
// HandleMessage marshals and unmarshals net messages, forwarding them to the
// BitSwapMessage receiver
func (adapter *impl) HandleMessage(
ctx context.Context, incoming netmsg.NetMessage) (netmsg.NetMessage, error) {
ctx context.Context, incoming netmsg.NetMessage) netmsg.NetMessage {

if adapter.receiver == nil {
return nil, errors.New("No receiver. NetMessage dropped")
return nil
}

received, err := bsmsg.FromNet(incoming)
if err != nil {
return nil, err
go adapter.receiver.ReceiveError(err)
return nil
}

p, bsmsg, err := adapter.receiver.ReceiveMessage(ctx, incoming.Peer(), received)
if err != nil {
return nil, err
}
p, bsmsg := adapter.receiver.ReceiveMessage(ctx, incoming.Peer(), received)

// TODO(brian): put this in a helper function
if bsmsg == nil || p == nil {
return nil, nil
return nil
}

outgoing, err := bsmsg.ToNet(p)
if err != nil {
return nil, err
go adapter.receiver.ReceiveError(err)
return nil
}

return outgoing, nil
return outgoing
}

func (adapter *impl) SendMessage(
Expand Down
24 changes: 5 additions & 19 deletions exchange/bitswap/testnet/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,18 +76,7 @@ func (n *network) deliver(
return errors.New("Invalid input")
}

nextPeer, nextMsg, err := r.ReceiveMessage(context.TODO(), from, message)
if err != nil {

// TODO should this error be returned across network boundary?

// TODO this raises an interesting question about network contract. How
// can the network be expected to behave under different failure
// conditions? What if peer is unreachable? Will we know if messages
// aren't delivered?

return err
}
nextPeer, nextMsg := r.ReceiveMessage(context.TODO(), from, message)

if (nextPeer == nil && nextMsg != nil) || (nextMsg == nil && nextPeer != nil) {
return errors.New("Malformed client request")
Expand Down Expand Up @@ -119,15 +108,12 @@ func (n *network) SendRequest(
if !ok {
return nil, errors.New("Cannot locate peer on network")
}
nextPeer, nextMsg, err := r.ReceiveMessage(context.TODO(), from, message)
if err != nil {
return nil, err
// TODO return nil, NoResponse
}
nextPeer, nextMsg := r.ReceiveMessage(context.TODO(), from, message)

// TODO dedupe code
if (nextPeer == nil && nextMsg != nil) || (nextMsg == nil && nextPeer != nil) {
return nil, errors.New("Malformed client request")
r.ReceiveError(errors.New("Malformed client request"))
return nil, nil
}

// TODO dedupe code
Expand All @@ -144,7 +130,7 @@ func (n *network) SendRequest(
}
n.deliver(nextReceiver, nextPeer, nextMsg)
}()
return nil, NoResponse
return nil, nil
}
return nextMsg, nil
}
Expand Down
25 changes: 14 additions & 11 deletions exchange/bitswap/testnet/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestSendRequestToCooperativePeer(t *testing.T) {
ctx context.Context,
from *peer.Peer,
incoming bsmsg.BitSwapMessage) (
*peer.Peer, bsmsg.BitSwapMessage, error) {
*peer.Peer, bsmsg.BitSwapMessage) {

t.Log("Recipient received a message from the network")

Expand All @@ -35,7 +35,7 @@ func TestSendRequestToCooperativePeer(t *testing.T) {
m := bsmsg.New()
m.AppendBlock(testutil.NewBlockOrFail(t, expectedStr))

return from, m, nil
return from, m
}))

t.Log("Build a message and send a synchronous request to recipient")
Expand Down Expand Up @@ -74,19 +74,19 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
ctx context.Context,
fromWaiter *peer.Peer,
msgFromWaiter bsmsg.BitSwapMessage) (
*peer.Peer, bsmsg.BitSwapMessage, error) {
*peer.Peer, bsmsg.BitSwapMessage) {

msgToWaiter := bsmsg.New()
msgToWaiter.AppendBlock(testutil.NewBlockOrFail(t, expectedStr))

return fromWaiter, msgToWaiter, nil
return fromWaiter, msgToWaiter
}))

waiter.SetDelegate(lambda(func(
ctx context.Context,
fromResponder *peer.Peer,
msgFromResponder bsmsg.BitSwapMessage) (
*peer.Peer, bsmsg.BitSwapMessage, error) {
*peer.Peer, bsmsg.BitSwapMessage) {

// TODO assert that this came from the correct peer and that the message contents are as expected
ok := false
Expand All @@ -101,7 +101,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
t.Fatal("Message not received from the responder")

}
return nil, nil, nil
return nil, nil
}))

messageSentAsync := bsmsg.New()
Expand All @@ -116,7 +116,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
}

type receiverFunc func(ctx context.Context, p *peer.Peer,
incoming bsmsg.BitSwapMessage) (*peer.Peer, bsmsg.BitSwapMessage, error)
incoming bsmsg.BitSwapMessage) (*peer.Peer, bsmsg.BitSwapMessage)

// lambda returns a Receiver instance given a receiver function
func lambda(f receiverFunc) bsnet.Receiver {
Expand All @@ -126,13 +126,16 @@ func lambda(f receiverFunc) bsnet.Receiver {
}

type lambdaImpl struct {
f func(ctx context.Context, p *peer.Peer,
incoming bsmsg.BitSwapMessage) (
*peer.Peer, bsmsg.BitSwapMessage, error)
f func(ctx context.Context, p *peer.Peer, incoming bsmsg.BitSwapMessage) (
*peer.Peer, bsmsg.BitSwapMessage)
}

func (lam *lambdaImpl) ReceiveMessage(ctx context.Context,
p *peer.Peer, incoming bsmsg.BitSwapMessage) (
*peer.Peer, bsmsg.BitSwapMessage, error) {
*peer.Peer, bsmsg.BitSwapMessage) {
return lam.f(ctx, p, incoming)
}

func (lam *lambdaImpl) ReceiveError(err error) {
// TODO log error
}
8 changes: 2 additions & 6 deletions net/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type Handler interface {

// HandleMessage receives an incoming message, and potentially returns
// a response message to send back.
HandleMessage(context.Context, msg.NetMessage) (msg.NetMessage, error)
HandleMessage(context.Context, msg.NetMessage) msg.NetMessage
}

// Service is a networking component that protocols can use to multiplex
Expand Down Expand Up @@ -181,11 +181,7 @@ func (s *Service) handleIncomingMessage(ctx context.Context, m msg.NetMessage) {
}

// should this be "go HandleMessage ... ?"
r1, err := s.Handler.HandleMessage(ctx, m2)
if err != nil {
u.PErr("handled message yielded error %v\n", err)
return
}
r1 := s.Handler.HandleMessage(ctx, m2)

// if handler gave us a response, send it back out!
if r1 != nil {
Expand Down
5 changes: 2 additions & 3 deletions net/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,14 @@ import (
// ReverseHandler reverses all Data it receives and sends it back.
type ReverseHandler struct{}

func (t *ReverseHandler) HandleMessage(ctx context.Context, m msg.NetMessage) (
msg.NetMessage, error) {
func (t *ReverseHandler) HandleMessage(ctx context.Context, m msg.NetMessage) msg.NetMessage {

d := m.Data()
for i, j := 0, len(d)-1; i < j; i, j = i+1, j-1 {
d[i], d[j] = d[j], d[i]
}

return msg.New(m.Peer(), d), nil
return msg.New(m.Peer(), d)
}

func newPeer(t *testing.T, id string) *peer.Peer {
Expand Down
24 changes: 15 additions & 9 deletions routing/dht/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,23 +103,26 @@ func (dht *IpfsDHT) Connect(ctx context.Context, npeer *peer.Peer) (*peer.Peer,
}

// HandleMessage implements the inet.Handler interface.
func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) (msg.NetMessage, error) {
func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) msg.NetMessage {

mData := mes.Data()
if mData == nil {
return nil, errors.New("message did not include Data")
// TODO handle/log err
return nil
}

mPeer := mes.Peer()
if mPeer == nil {
return nil, errors.New("message did not include a Peer")
// TODO handle/log err
return nil
}

// deserialize msg
pmes := new(Message)
err := proto.Unmarshal(mData, pmes)
if err != nil {
return nil, fmt.Errorf("Failed to decode protobuf message: %v\n", err)
// TODO handle/log err
return nil
}

// update the peer (on valid msgs only)
Expand All @@ -133,27 +136,30 @@ func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) (msg.
// get handler for this msg type.
handler := dht.handlerForMsgType(pmes.GetType())
if handler == nil {
return nil, errors.New("Recieved invalid message type")
// TODO handle/log err
return nil
}

// dispatch handler.
rpmes, err := handler(mPeer, pmes)
if err != nil {
return nil, err
// TODO handle/log err
return nil
}

// if nil response, return it before serializing
if rpmes == nil {
return nil, nil
return nil
}

// serialize response msg
rmes, err := msg.FromObject(mPeer, rpmes)
if err != nil {
return nil, fmt.Errorf("Failed to encode protobuf message: %v\n", err)
// TODO handle/log err
return nil
}

return rmes, nil
return rmes
}

// sendRequest sends out a request using dht.sender, but also makes sure to
Expand Down
5 changes: 1 addition & 4 deletions routing/dht/ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,7 @@ func TestGetFailures(t *testing.T) {
t.Error(err)
}

mes, err = d.HandleMessage(ctx, mes)
if err != nil {
t.Error(err)
}
mes = d.HandleMessage(ctx, mes)

pmes := new(Message)
err = proto.Unmarshal(mes.Data(), pmes)
Expand Down

0 comments on commit 671b095

Please sign in to comment.