Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

discovery: swap mdns lib for grandcat/zeroconf #285

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 75 additions & 56 deletions p2p/discovery/mdns.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,17 @@ import (
"io/ioutil"
golog "log"
"net"
"strings"
"sync"
"time"

mdns "github.com/grandcat/zeroconf"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-host"
"github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
"github.com/whyrusleeping/mdns"
)

var log = logging.Logger("mdns")
Expand All @@ -35,7 +36,7 @@ type Notifee interface {

type mdnsService struct {
server *mdns.Server
service *mdns.MDNSService
service *mdns.Resolver
host host.Host
tag string

Expand All @@ -44,62 +45,76 @@ type mdnsService struct {
interval time.Duration
}

func getDialableListenAddrs(ph host.Host) ([]*net.TCPAddr, error) {
var out []*net.TCPAddr
for _, addr := range ph.Addrs() {
// Tries to pick the best port.
func getBestPort(addrs []ma.Multiaddr) (int, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be fixed by #376

var best *net.TCPAddr
for _, addr := range addrs {
na, err := manet.ToNetAddr(addr)
if err != nil {
continue
}
tcp, ok := na.(*net.TCPAddr)
if ok {
out = append(out, tcp)
if !ok {
continue
}
// Don't bother with multicast and
if tcp.IP.IsMulticast() {
continue
}
// We don't yet support link-local
if tcp.IP.IsLinkLocalUnicast() {
continue
}
// Unspecified listeners are *always* the best choice.
if tcp.IP.IsUnspecified() {
return tcp.Port, nil
}
// If we don't have a best choice, use this addr.
if best == nil {
best = tcp
continue
}
// If the best choice is a loopback address, replace it.
if best.IP.IsLoopback() {
best = tcp
}
}
if len(out) == 0 {
return nil, errors.New("failed to find good external addr from peerhost")
if best == nil {
return 0, errors.New("failed to find good external addr from peerhost")
}
return out, nil
return best.Port, nil
}

func NewMdnsService(ctx context.Context, peerhost host.Host, interval time.Duration, serviceTag string) (Service, error) {

// TODO: dont let mdns use logging...
golog.SetOutput(ioutil.Discard)

var ipaddrs []net.IP
port := 4001

addrs, err := getDialableListenAddrs(peerhost)
port, err := getBestPort(peerhost.Network().ListenAddresses())
if err != nil {
log.Warning(err)
} else {
port = addrs[0].Port
for _, a := range addrs {
ipaddrs = append(ipaddrs, a.IP)
}
return nil, err
}

myid := peerhost.ID().Pretty()

info := []string{myid}
if serviceTag == "" {
serviceTag = ServiceTag
}
service, err := mdns.NewMDNSService(myid, serviceTag, "", "", port, ipaddrs, info)

resolver, err := mdns.NewResolver(nil)
if err != nil {
return nil, err
log.Error("Failed to initialize resolver:", err)
}

// Create the mDNS server, defer shutdown
server, err := mdns.NewServer(&mdns.Config{Zone: service})
server, err := mdns.Register(myid, serviceTag, "", port, info, nil)
if err != nil {
return nil, err
}

s := &mdnsService{
server: server,
service: service,
service: resolver,
host: peerhost,
interval: interval,
tag: serviceTag,
Expand All @@ -111,34 +126,32 @@ func NewMdnsService(ctx context.Context, peerhost host.Host, interval time.Durat
}

func (m *mdnsService) Close() error {
return m.server.Shutdown()
m.server.Shutdown()
// grandcat/zerconf swallows error, satisfy interface
return nil
}

func (m *mdnsService) pollForEntries(ctx context.Context) {

ticker := time.NewTicker(m.interval)
for {
//execute mdns query right away at method call and then with every tick
entriesCh := make(chan *mdns.ServiceEntry, 16)
go func() {
for entry := range entriesCh {
go func(results <-chan *mdns.ServiceEntry) {
for entry := range results {
m.handleEntry(entry)
}
}()
}(entriesCh)

log.Debug("starting mdns query")
qp := &mdns.QueryParam{
Domain: "local",
Entries: entriesCh,
Service: m.tag,
Timeout: time.Second * 5,
}

err := mdns.Query(qp)
if err != nil {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

if err := m.service.Browse(ctx, m.tag, "local", entriesCh); err != nil {
log.Error("mdns lookup error: ", err)
}
close(entriesCh)

log.Debug("mdns query complete")

select {
Expand All @@ -152,8 +165,10 @@ func (m *mdnsService) pollForEntries(ctx context.Context) {
}

func (m *mdnsService) handleEntry(e *mdns.ServiceEntry) {
log.Debugf("Handling MDNS entry: %s:%d %s", e.AddrV4, e.Port, e.Info)
mpeer, err := peer.IDB58Decode(e.Info)
// pull out the txt
info := strings.Join(e.Text, "|")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this designed to fail if there are less than or more than one entry?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be clear, because of the decoding afterwards

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. Yeah, that looks wrong. We should just take the first entry and ignore the rest.


mpeer, err := peer.IDB58Decode(info)
if err != nil {
log.Warning("Error parsing peer ID from mdns entry: ", err)
return
Expand All @@ -164,25 +179,29 @@ func (m *mdnsService) handleEntry(e *mdns.ServiceEntry) {
return
}

maddr, err := manet.FromNetAddr(&net.TCPAddr{
IP: e.AddrV4,
Port: e.Port,
})
if err != nil {
log.Warning("Error parsing multiaddr from mdns entry: ", err)
return
}
for _, ipv4 := range e.AddrIPv4 {
log.Debugf("Handling MDNS entry: %s:%d %s", ipv4, e.Port, info)

pi := pstore.PeerInfo{
ID: mpeer,
Addrs: []ma.Multiaddr{maddr},
}
maddr, err := manet.FromNetAddr(&net.TCPAddr{
IP: ipv4,
Port: e.Port,
})
if err != nil {
log.Warning("Error parsing multiaddr from mdns entry: ", err)
return
}

m.lk.Lock()
for _, n := range m.notifees {
go n.HandlePeerFound(pi)
pi := pstore.PeerInfo{
ID: mpeer,
Addrs: []ma.Multiaddr{maddr},
}

m.lk.Lock()
for _, n := range m.notifees {
go n.HandlePeerFound(pi)
}
m.lk.Unlock()
}
m.lk.Unlock()
}

func (m *mdnsService) RegisterNotifee(n Notifee) {
Expand Down
49 changes: 47 additions & 2 deletions p2p/discovery/mdns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"

host "github.com/libp2p/go-libp2p-host"
swarmt "github.com/libp2p/go-libp2p-swarm/testing"

pstore "github.com/libp2p/go-libp2p-peerstore"
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
ma "github.com/multiformats/go-multiaddr"
)

type DiscoveryNotifee struct {
Expand All @@ -21,6 +21,51 @@ func (n *DiscoveryNotifee) HandlePeerFound(pi pstore.PeerInfo) {
n.h.Connect(context.Background(), pi)
}

func TestGetBestPort(t *testing.T) {
port, err := getBestPort([]ma.Multiaddr{ma.StringCast("/ip4/1.2.3.4/tcp/2222"), ma.StringCast("/ip4/0.0.0.0/tcp/1234")})
if err != nil {
t.Fatal(err)
}
if port != 1234 {
t.Errorf("expected port 1234, got port %d", port)
}

port, err = getBestPort([]ma.Multiaddr{ma.StringCast("/ip4/127.0.0.1/tcp/2222"), ma.StringCast("/ip4/0.0.0.0/tcp/1234")})
if err != nil {
t.Fatal(err)
}
if port != 1234 {
t.Errorf("expected port 1234, got port %d", port)
}

port, err = getBestPort([]ma.Multiaddr{ma.StringCast("/ip4/1.2.3.4/tcp/2222"), ma.StringCast("/ip4/127.0.0.1/tcp/1234")})
if err != nil {
t.Fatal(err)
}
if port != 2222 {
t.Errorf("expected port 2222, got port %d", port)
}
port, err = getBestPort([]ma.Multiaddr{ma.StringCast("/ip4/127.0.0.1/tcp/1234"), ma.StringCast("/ip4/1.2.3.4/tcp/2222")})
if err != nil {
t.Fatal(err)
}
if port != 2222 {
t.Errorf("expected port 2222, got port %d", port)
}
port, err = getBestPort([]ma.Multiaddr{ma.StringCast("/ip4/127.0.0.1/tcp/1234")})
if err != nil {
t.Fatal(err)
}
if port != 1234 {
t.Errorf("expected port 1234, got port %d", port)
}

_, err = getBestPort([]ma.Multiaddr{})
if err == nil {
t.Fatal("expected error")
}
}

func TestMdnsDiscovery(t *testing.T) {
//TODO: re-enable when the new lib will get integrated
t.Skip("TestMdnsDiscovery fails randomly with current lib")
Expand Down
11 changes: 6 additions & 5 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,6 @@
"name": "go-semver",
"version": "0.0.0"
},
{
"hash": "QmNeRzgrSpwbMU7VKLRyfvbqf1nRrAiQ7fEXaBxWGT5Ygr",
"name": "mdns",
"version": "0.1.3"
},
{
"hash": "QmPdKqUcHGFdeSpvjVoaTRPPstGif9GBZb5Q56RVw9o69A",
"name": "go-ipfs-util",
Expand Down Expand Up @@ -249,6 +244,12 @@
"hash": "QmSSeQqc5QeuefkaM6JFV5tSF9knLUkXKVhW1eYRiqe72W",
"name": "uuid",
"version": "0.1.0"
},
{
"author": "grandcat",
"hash": "QmWrJTtJCQ6kCCTtLBM82Cx9h52QMQ5hcbrsePFuRSHuWC",
"name": "zeroconf",
"version": "0.2.1"
}
],
"gxVersion": "0.4.0",
Expand Down