Skip to content

Commit

Permalink
Avoid blocking in notification handlers.
Browse files Browse the repository at this point in the history
This isn't the correct fix but the correct fix is probably not worth it.

introduces ipfs#70
  • Loading branch information
Stebalien committed Jul 26, 2017
1 parent 4e8a520 commit 796e96c
Showing 1 changed file with 41 additions and 20 deletions.
61 changes: 41 additions & 20 deletions notif.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
package dht

import (
"context"
"io"
"time"

inet "github.com/libp2p/go-libp2p-net"
ma "github.com/multiformats/go-multiaddr"
mstream "github.com/multiformats/go-multistream"
)

// TODO: There is a race condition here where we could process notifications
// out-of-order and incorrectly mark some peers as DHT nodes (or not DHT nodes).
// The correct fix for this is nasty so I'm not really sure it's worth it.
// Incorrectly recording or failing to record a DHT node in the routing table
// isn't a big issue.

const dhtCheckTimeout = 10 * time.Second

// netNotifiee defines methods to be used with the IpfsDHT
type netNotifiee IpfsDHT

Expand All @@ -23,25 +33,36 @@ func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) {
default:
}

// Note: We *could* just check the peerstore to see if the remote side supports the dht
// protocol, but its not clear that that information will make it into the peerstore
// by the time this notification is sent. So just to be very careful, we brute force this
// and open a new stream
s, err := dht.host.NewStream(dht.Context(), v.RemotePeer(), ProtocolDHT, ProtocolDHTOld)
switch err {
case nil:
s.Close()
// connected fine? full dht node
dht.Update(dht.Context(), v.RemotePeer())
case mstream.ErrNotSupported:
// Client mode only, don't bother adding them to our routing table
case io.EOF:
// This is kindof an error, but it happens someone often so make it a warning
log.Warningf("checking dht client type: %s", err)
default:
// real error? thats odd
log.Errorf("checking dht client type: %s", err)
}
go func() {

// Note: We *could* just check the peerstore to see if the remote side supports the dht
// protocol, but its not clear that that information will make it into the peerstore
// by the time this notification is sent. So just to be very careful, we brute force this
// and open a new stream

// TODO: There's a race condition here where the connection may
// not be open (and we may sit here trying to connect). I've
// added a timeout but that's not really the correct fix.

ctx, cancel := context.WithTimeout(dht.Context(), dhtCheckTimeout)
defer cancel()
s, err := dht.host.NewStream(ctx, v.RemotePeer(), ProtocolDHT, ProtocolDHTOld)

switch err {
case nil:
s.Close()
// connected fine? full dht node
dht.Update(dht.Context(), v.RemotePeer())
case mstream.ErrNotSupported:
// Client mode only, don't bother adding them to our routing table
case io.EOF:
// This is kindof an error, but it happens someone often so make it a warning
log.Warningf("checking dht client type: %s", err)
default:
// real error? thats odd
log.Errorf("checking dht client type: %s", err)
}
}()
}

func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) {
Expand All @@ -51,7 +72,7 @@ func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) {
return
default:
}
dht.routingTable.Remove(v.RemotePeer())
go dht.routingTable.Remove(v.RemotePeer())
}

func (nn *netNotifiee) OpenedStream(n inet.Network, v inet.Stream) {}
Expand Down

0 comments on commit 796e96c

Please sign in to comment.