Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Local addr updated event #847

Merged
merged 2 commits into from
Mar 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 49 additions & 9 deletions p2p/host/basic/basic_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ type BasicHost struct {
lastAddrs []ma.Multiaddr
emitters struct {
evtLocalProtocolsUpdated event.Emitter
evtLocalAddrsUpdated event.Emitter
}
}

Expand Down Expand Up @@ -141,6 +142,9 @@ func NewHost(ctx context.Context, net network.Network, opts *HostOpts) (*BasicHo
if h.emitters.evtLocalProtocolsUpdated, err = h.eventbus.Emitter(&event.EvtLocalProtocolsUpdated{}); err != nil {
return nil, err
}
if h.emitters.evtLocalAddrsUpdated, err = h.eventbus.Emitter(&event.EvtLocalAddressesUpdated{}); err != nil {
return nil, err
}

h.proc = goprocessctx.WithContextAndTeardown(ctx, func() error {
if h.natmgr != nil {
Expand All @@ -150,6 +154,7 @@ func NewHost(ctx context.Context, net network.Network, opts *HostOpts) (*BasicHo
h.cmgr.Close()
}
_ = h.emitters.evtLocalProtocolsUpdated.Close()
_ = h.emitters.evtLocalAddrsUpdated.Close()
return h.Network().Close()
})

Expand Down Expand Up @@ -295,28 +300,63 @@ func (h *BasicHost) newStreamHandler(s network.Stream) {
go handle(protoID, s)
}

// PushIdentify pushes an identify update through the identify push protocol
// CheckForAddressChanges determines whether our listen addresses have recently
// changed and emits an EvtLocalAddressesUpdatedEvent & a Push Identify if so.
// Warning: this interface is unstable and may disappear in the future.
func (h *BasicHost) PushIdentify() {
push := false

func (h *BasicHost) CheckForAddressChanges() {
h.mx.Lock()
addrs := h.Addrs()
if !sameAddrs(addrs, h.lastAddrs) {
push = true
changeEvt := makeUpdatedAddrEvent(h.lastAddrs, addrs)
if changeEvt != nil {
h.lastAddrs = addrs
}
h.mx.Unlock()

if push {
if changeEvt != nil {
err := h.emitters.evtLocalAddrsUpdated.Emit(*changeEvt)
if err != nil {
log.Warnf("error emitting event for updated addrs: %s", err)
}
h.ids.Push()
}
}

func makeUpdatedAddrEvent(prev, current []ma.Multiaddr) *event.EvtLocalAddressesUpdated {
prevmap := make(map[string]ma.Multiaddr, len(prev))
evt := event.EvtLocalAddressesUpdated{Diffs: true}
addrsAdded := false

for _, addr := range prev {
prevmap[string(addr.Bytes())] = addr
}
for _, addr := range current {
_, ok := prevmap[string(addr.Bytes())]
updated := event.UpdatedAddress{Address: addr}
if ok {
updated.Action = event.Maintained
} else {
updated.Action = event.Added
addrsAdded = true
}
evt.Current = append(evt.Current, updated)
delete(prevmap, string(addr.Bytes()))
}
for _, addr := range prevmap {
updated := event.UpdatedAddress{Action: event.Removed, Address: addr}
evt.Removed = append(evt.Removed, updated)
}

if !addrsAdded && len(evt.Removed) == 0 {
return nil
}

return &evt
}

func (h *BasicHost) background(p goprocess.Process) {
// periodically schedules an IdentifyPush to update our peers for changes
// in our address set (if needed)
ticker := time.NewTicker(1 * time.Minute)
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()

// initialize lastAddrs
Expand All @@ -329,7 +369,7 @@ func (h *BasicHost) background(p goprocess.Process) {
for {
select {
case <-ticker.C:
h.PushIdentify()
h.CheckForAddressChanges()

case <-p.Closing():
return
Expand Down
139 changes: 139 additions & 0 deletions p2p/host/basic/basic_host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,145 @@ func TestAddrResolutionRecursive(t *testing.T) {
}
}

func TestHostAddrChangeDetection(t *testing.T) {
// This test uses the address factory to provide several
// sets of listen addresses for the host. It advances through
// the sets by changing the currentAddrSet index var below.
addrSets := [][]ma.Multiaddr{
{},
{ma.StringCast("/ip4/1.2.3.4/tcp/1234")},
{ma.StringCast("/ip4/1.2.3.4/tcp/1234"), ma.StringCast("/ip4/2.3.4.5/tcp/1234")},
{ma.StringCast("/ip4/2.3.4.5/tcp/1234"), ma.StringCast("/ip4/3.4.5.6/tcp/4321")},
}

// The events we expect the host to emit when CheckForAddressChanges is called
// and the changes between addr sets are detected
expectedEvents := []event.EvtLocalAddressesUpdated{
{
Diffs: true,
Current: []event.UpdatedAddress{
{Action: event.Added, Address: ma.StringCast("/ip4/1.2.3.4/tcp/1234")},
},
Removed: []event.UpdatedAddress{},
},
{
Diffs: true,
Current: []event.UpdatedAddress{
{Action: event.Maintained, Address: ma.StringCast("/ip4/1.2.3.4/tcp/1234")},
{Action: event.Added, Address: ma.StringCast("/ip4/2.3.4.5/tcp/1234")},
},
Removed: []event.UpdatedAddress{},
},
{
Diffs: true,
Current: []event.UpdatedAddress{
{Action: event.Added, Address: ma.StringCast("/ip4/3.4.5.6/tcp/4321")},
{Action: event.Maintained, Address: ma.StringCast("/ip4/2.3.4.5/tcp/1234")},
},
Removed: []event.UpdatedAddress{
{Action: event.Removed, Address: ma.StringCast("/ip4/1.2.3.4/tcp/1234")},
},
},
}

currentAddrSet := 0
addrsFactory := func(addrs []ma.Multiaddr) []ma.Multiaddr {
return addrSets[currentAddrSet]
}

ctx := context.Background()
h := New(swarmt.GenSwarm(t, ctx), AddrsFactory(addrsFactory))
defer h.Close()

sub, err := h.EventBus().Subscribe(&event.EvtLocalAddressesUpdated{}, eventbus.BufSize(10))
if err != nil {
t.Error(err)
}
defer sub.Close()

// host should start with no addrs (addrSet 0)
addrs := h.Addrs()
if len(addrs) != 0 {
t.Fatalf("expected 0 addrs, got %d", len(addrs))
}

// Advance between addrSets
for i := 1; i < len(addrSets); i++ {
currentAddrSet = i
h.CheckForAddressChanges() // forces the host to check for changes now, instead of waiting for background update
}

// drain events from the subscription
var receivedEvents []event.EvtLocalAddressesUpdated
readEvents:
for {
select {
case evt, more := <-sub.Out():
if !more {
break readEvents
}
receivedEvents = append(receivedEvents, evt.(event.EvtLocalAddressesUpdated))
if len(receivedEvents) == len(expectedEvents) {
break readEvents
}
case <-ctx.Done():
break readEvents
case <-time.After(1 * time.Second):
break readEvents
}
}

// assert that we received the events we expected
if len(receivedEvents) != len(expectedEvents) {
t.Errorf("expected to receive %d addr change events, got %d", len(expectedEvents), len(receivedEvents))
}
for i, expected := range expectedEvents {
actual := receivedEvents[i]
if !updatedAddrEventsEqual(expected, actual) {
t.Errorf("change events not equal: \n\texpected: %v \n\tactual: %v", expected, actual)
}
}
}

// updatedAddrsEqual is a helper to check whether two lists of
// event.UpdatedAddress have the same contents, ignoring ordering.
func updatedAddrsEqual(a, b []event.UpdatedAddress) bool {
if len(a) != len(b) {
return false
}

// We can't use an UpdatedAddress directly as a map key, since
// Multiaddr is an interface, and go won't know how to compare
// for equality. So we convert to this little struct, which
// stores the multiaddr as a string.
type ua struct {
action event.AddrAction
addrStr string
}
aSet := make(map[ua]struct{})
for _, addr := range a {
k := ua{action: addr.Action, addrStr: string(addr.Address.Bytes())}
aSet[k] = struct{}{}
}
for _, addr := range b {
k := ua{action: addr.Action, addrStr: string(addr.Address.Bytes())}
_, ok := aSet[k]
if !ok {
return false
}
}
return true
}

// updatedAddrEventsEqual is a helper to check whether two
// event.EvtLocalAddressesUpdated are equal, ignoring the ordering of
// addresses in the inner lists.
func updatedAddrEventsEqual(a, b event.EvtLocalAddressesUpdated) bool {
return a.Diffs == b.Diffs &&
updatedAddrsEqual(a.Current, b.Current) &&
updatedAddrsEqual(a.Removed, b.Removed)
}

type sortedMultiaddrs []ma.Multiaddr

func (sma sortedMultiaddrs) Len() int { return len(sma) }
Expand Down
2 changes: 1 addition & 1 deletion p2p/host/relay/autorelay.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (ar *AutoRelay) background(ctx context.Context) {
ar.cachedAddrs = nil
ar.mx.Unlock()
push = false
ar.host.PushIdentify()
ar.host.CheckForAddressChanges()
}

select {
Expand Down