From df4f28c31b26057660370f1975f3f1f1fcc8d591 Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Wed, 3 Jun 2020 23:02:34 +0530 Subject: [PATCH] Peer Diversity in the Routing Table (#658) * Peer Diversity in the DHT --- dht.go | 27 +++++++- dht_options.go | 20 +++++- dht_test.go | 1 - dual/dual.go | 10 +++ go.mod | 5 +- go.sum | 17 +++++ rt_diversity_filter.go | 100 +++++++++++++++++++++++++++ rt_diversity_filter_test.go | 87 +++++++++++++++++++++++ rtrefresh/rt_refresh_manager_test.go | 4 +- 9 files changed, 261 insertions(+), 10 deletions(-) create mode 100644 rt_diversity_filter.go create mode 100644 rt_diversity_filter_test.go diff --git a/dht.go b/dht.go index 548d683facd..6c88329948f 100644 --- a/dht.go +++ b/dht.go @@ -22,6 +22,7 @@ import ( "github.com/libp2p/go-libp2p-kad-dht/providers" "github.com/libp2p/go-libp2p-kad-dht/rtrefresh" kb "github.com/libp2p/go-libp2p-kbucket" + "github.com/libp2p/go-libp2p-kbucket/peerdiversity" record "github.com/libp2p/go-libp2p-record" recpb "github.com/libp2p/go-libp2p-record/pb" @@ -126,6 +127,7 @@ type IpfsDHT struct { queryPeerFilter QueryFilterFunc routingTablePeerFilter RouteTableFilterFunc + rtPeerDiversityFilter peerdiversity.PeerIPGroupFilter autoRefresh bool @@ -283,7 +285,9 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) { beta: cfg.resiliency, queryPeerFilter: cfg.queryPeerFilter, routingTablePeerFilter: cfg.routingTable.peerFilter, - fixLowPeersChan: make(chan struct{}, 1), + rtPeerDiversityFilter: cfg.routingTable.diversityFilter, + + fixLowPeersChan: make(chan struct{}, 1), } var maxLastSuccessfulOutboundThreshold time.Duration @@ -358,7 +362,21 @@ func makeRtRefreshManager(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThr } func makeRoutingTable(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThreshold time.Duration) (*kb.RoutingTable, error) { - rt, err := kb.NewRoutingTable(cfg.bucketSize, dht.selfKey, time.Minute, dht.host.Peerstore(), maxLastSuccessfulOutboundThreshold) + // make a Routing Table Diversity Filter + var filter *peerdiversity.Filter + if dht.rtPeerDiversityFilter != nil { + df, err := peerdiversity.NewFilter(dht.rtPeerDiversityFilter, "rt/diversity", func(p peer.ID) int { + return kb.CommonPrefixLen(dht.selfKey, kb.ConvertPeerID(p)) + }) + + if err != nil { + return nil, fmt.Errorf("failed to construct peer diversity filter: %w", err) + } + + filter = df + } + + rt, err := kb.NewRoutingTable(cfg.bucketSize, dht.selfKey, time.Minute, dht.host.Peerstore(), maxLastSuccessfulOutboundThreshold, filter) cmgr := dht.host.ConnManager() rt.PeerAdded = func(p peer.ID) { @@ -382,6 +400,11 @@ func makeRoutingTable(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThresho return rt, err } +// GetRoutingTableDiversityStats returns the diversity stats for the Routing Table. +func (d *IpfsDHT) GetRoutingTableDiversityStats() []peerdiversity.CplDiversityStats { + return d.routingTable.GetDiversityStats() +} + // Mode allows introspection of the operation mode of the DHT func (dht *IpfsDHT) Mode() ModeOpt { return dht.auto diff --git a/dht_options.go b/dht_options.go index d08d119db1c..191081fcfd6 100644 --- a/dht_options.go +++ b/dht_options.go @@ -4,15 +4,18 @@ import ( "fmt" "time" - ds "github.com/ipfs/go-datastore" - dssync "github.com/ipfs/go-datastore/sync" - "github.com/ipfs/go-ipns" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" "github.com/libp2p/go-libp2p-kad-dht/providers" + + "github.com/libp2p/go-libp2p-kbucket/peerdiversity" record "github.com/libp2p/go-libp2p-record" + + ds "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" + "github.com/ipfs/go-ipns" ) // ModeOpt describes what mode the dht should operate in @@ -56,6 +59,7 @@ type config struct { latencyTolerance time.Duration checkInterval time.Duration peerFilter RouteTableFilterFunc + diversityFilter peerdiversity.PeerIPGroupFilter } // set to true if we're operating in v1 dht compatible mode @@ -403,3 +407,13 @@ func BootstrapPeers(bootstrappers ...peer.AddrInfo) Option { return nil } } + +// RoutingTablePeerDiversityFilter configures the implementation of the `PeerIPGroupFilter` that will be used +// to construct the diversity filter for the Routing Table. +// Please see the docs for `peerdiversity.PeerIPGroupFilter` AND `peerdiversity.Filter` for more details. +func RoutingTablePeerDiversityFilter(pg peerdiversity.PeerIPGroupFilter) Option { + return func(c *config) error { + c.routingTable.diversityFilter = pg + return nil + } +} diff --git a/dht_test.go b/dht_test.go index 3d7f5838bf5..cb487087d3f 100644 --- a/dht_test.go +++ b/dht_test.go @@ -2056,7 +2056,6 @@ func TestBootStrapWhenRTIsEmpty(t *testing.T) { require.NoError(t, dht1.host.Network().ClosePeer(dht2.self)) dht1.routingTable.RemovePeer(dht2.self) require.NotContains(t, dht2.self, dht1.routingTable.ListPeers()) - require.Eventually(t, func() bool { return dht1.routingTable.Size() == 2 && dht1.routingTable.Find(bootstrappers[0].self) != "" && dht1.routingTable.Find(bootstrapcons[0].self) != "" diff --git a/dual/dual.go b/dual/dual.go index c8e487caec2..0b43179999c 100644 --- a/dual/dual.go +++ b/dual/dual.go @@ -4,6 +4,7 @@ package dual import ( "context" + "github.com/libp2p/go-libp2p-kbucket/peerdiversity" "sync" dht "github.com/libp2p/go-libp2p-kad-dht" @@ -50,6 +51,7 @@ func New(ctx context.Context, h host.Host, options ...dht.Option) (*DHT, error) wanOpts := append(options, dht.QueryFilter(dht.PublicQueryFilter), dht.RoutingTableFilter(dht.PublicRoutingTableFilter), + dht.RoutingTablePeerDiversityFilter(dht.NewRTPeerDiversityFilter(h, 2, 3)), ) wan, err := dht.New(ctx, h, wanOpts...) if err != nil { @@ -93,6 +95,14 @@ func (dht *DHT) Provide(ctx context.Context, key cid.Cid, announce bool) error { return dht.LAN.Provide(ctx, key, announce) } +// GetRoutingTableDiversityStats fetches the Routing Table Diversity Stats. +func (dht *DHT) GetRoutingTableDiversityStats() []peerdiversity.CplDiversityStats { + if dht.WANActive() { + return dht.WAN.GetRoutingTableDiversityStats() + } + return nil +} + // FindProvidersAsync searches for peers who are able to provide a given key func (dht *DHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo { reqCtx, cancel := context.WithCancel(ctx) diff --git a/go.mod b/go.mod index f7e2fb9f1a2..493e65bdd7d 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/libp2p/go-eventbus v0.1.0 github.com/libp2p/go-libp2p v0.8.2 github.com/libp2p/go-libp2p-core v0.5.4 - github.com/libp2p/go-libp2p-kbucket v0.4.2 + github.com/libp2p/go-libp2p-kbucket v0.4.3 github.com/libp2p/go-libp2p-peerstore v0.2.4 github.com/libp2p/go-libp2p-record v0.1.2 github.com/libp2p/go-libp2p-routing-helpers v0.2.3 @@ -27,12 +27,13 @@ require ( github.com/libp2p/go-msgio v0.0.4 github.com/libp2p/go-netroute v0.1.2 github.com/multiformats/go-base32 v0.0.3 - github.com/multiformats/go-multiaddr v0.2.1 + github.com/multiformats/go-multiaddr v0.2.2 github.com/multiformats/go-multiaddr-net v0.1.5 github.com/multiformats/go-multihash v0.0.13 github.com/multiformats/go-multistream v0.1.1 github.com/stretchr/testify v1.5.1 github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 + github.com/yl2chen/cidranger v1.0.0 go.opencensus.io v0.22.3 go.uber.org/zap v1.14.1 ) diff --git a/go.sum b/go.sum index e5756a88762..60dde96b6d3 100644 --- a/go.sum +++ b/go.sum @@ -163,6 +163,8 @@ github.com/libp2p/go-libp2p v0.7.0/go.mod h1:hZJf8txWeCduQRDC/WSqBGMxaTHCOYHt2xS github.com/libp2p/go-libp2p v0.7.4/go.mod h1:oXsBlTLF1q7pxr+9w6lqzS1ILpyHsaBPniVO7zIHGMw= github.com/libp2p/go-libp2p v0.8.2 h1:gVuk8nZGjnRagJ/mLpBCSJw7bW1yWJrq3EwOk/AC6FM= github.com/libp2p/go-libp2p v0.8.2/go.mod h1:NQDA/F/qArMHGe0J7sDScaKjW8Jh4y/ozQqBbYJ+BnA= +github.com/libp2p/go-libp2p-asn-util v0.0.0-20200528110405-70ea36266519 h1:74RZSlAbbSyXs8IhMRCCe5b7lp87Ge7gRbxNzFD53nU= +github.com/libp2p/go-libp2p-asn-util v0.0.0-20200528110405-70ea36266519/go.mod h1:RJjfWPr40cDrLpTXUpXk+R5IOnCKkQ6EHEK1RgXKGf8= github.com/libp2p/go-libp2p-autonat v0.1.1/go.mod h1:OXqkeGOY2xJVWKAGV2inNF5aKN/djNA3fdpCWloIudE= github.com/libp2p/go-libp2p-autonat v0.2.0/go.mod h1:DX+9teU4pEEoZUqR1PiMlqliONQdNbfzE1C718tcViI= github.com/libp2p/go-libp2p-autonat v0.2.1/go.mod h1:MWtAhV5Ko1l6QBsHQNSuM6b1sRkXrpk0/LqCr+vCVxI= @@ -195,6 +197,14 @@ github.com/libp2p/go-libp2p-discovery v0.3.0 h1:+JnYBRLzZQtRq0mK3xhyjBwHytLmJXMT github.com/libp2p/go-libp2p-discovery v0.3.0/go.mod h1:o03drFnz9BVAZdzC/QUQ+NeQOu38Fu7LJGEOK2gQltw= github.com/libp2p/go-libp2p-kbucket v0.4.2 h1:wg+VPpCtY61bCasGRexCuXOmEmdKjN+k1w+JtTwu9gA= github.com/libp2p/go-libp2p-kbucket v0.4.2/go.mod h1:7sCeZx2GkNK1S6lQnGUW5JYZCFPnXzAZCCBBS70lytY= +github.com/libp2p/go-libp2p-kbucket v0.4.3-0.20200601163705-6904e7977e6a h1:ruG7JMpcZ7rsyGQvuauwO2L8ydJQpv6Lwh4l59h0ITQ= +github.com/libp2p/go-libp2p-kbucket v0.4.3-0.20200601163705-6904e7977e6a/go.mod h1:/PMj5dxV7yebkcXg7SD3OtXFdMNeqtS6UnByTRu1+Is= +github.com/libp2p/go-libp2p-kbucket v0.4.3-0.20200603120038-9fba62b55043 h1:7cMJQ6HqWIxuqaRH446oTCgItrwO0rYXg3MfvH6OswM= +github.com/libp2p/go-libp2p-kbucket v0.4.3-0.20200603120038-9fba62b55043/go.mod h1:/PMj5dxV7yebkcXg7SD3OtXFdMNeqtS6UnByTRu1+Is= +github.com/libp2p/go-libp2p-kbucket v0.4.3-0.20200603162158-145d6af2e842 h1:Co5A9U4tw5geJGj+j4rn47k8t9ay717lX0AgBNQD09E= +github.com/libp2p/go-libp2p-kbucket v0.4.3-0.20200603162158-145d6af2e842/go.mod h1:/PMj5dxV7yebkcXg7SD3OtXFdMNeqtS6UnByTRu1+Is= +github.com/libp2p/go-libp2p-kbucket v0.4.3 h1:6SWZ52TWpAUwg8vk8r9ApwYsnPhN67kBwvQhdnXF8KQ= +github.com/libp2p/go-libp2p-kbucket v0.4.3/go.mod h1:/PMj5dxV7yebkcXg7SD3OtXFdMNeqtS6UnByTRu1+Is= github.com/libp2p/go-libp2p-loggables v0.1.0 h1:h3w8QFfCt2UJl/0/NW4K829HX/0S4KD31PQ7m8UXXO8= github.com/libp2p/go-libp2p-loggables v0.1.0/go.mod h1:EyumB2Y6PrYjr55Q3/tiJ/o3xoDasoRYM7nOzEpoa90= github.com/libp2p/go-libp2p-mplex v0.2.0/go.mod h1:Ejl9IyjvXJ0T9iqUTE1jpYATQ9NM3g+OtR+EMMODbKo= @@ -266,6 +276,7 @@ github.com/libp2p/go-netroute v0.1.2 h1:UHhB35chwgvcRI392znJA3RCBtZ3MpE3ahNCN5MR github.com/libp2p/go-netroute v0.1.2/go.mod h1:jZLDV+1PE8y5XxBySEBgbuVAXbhtuHSdmLPL2n9MKbk= github.com/libp2p/go-openssl v0.0.2/go.mod h1:v8Zw2ijCSWBQi8Pq5GAixw6DbFfa9u6VIYDXnvOXkc0= github.com/libp2p/go-openssl v0.0.3/go.mod h1:unDrJpgy3oFr+rqXsarWifmJuNnJR4chtO1HmaZjggc= +github.com/libp2p/go-openssl v0.0.4 h1:d27YZvLoTyMhIN4njrkr8zMDOM4lfpHIp6A+TK9fovg= github.com/libp2p/go-openssl v0.0.4/go.mod h1:unDrJpgy3oFr+rqXsarWifmJuNnJR4chtO1HmaZjggc= github.com/libp2p/go-reuseport v0.0.1 h1:7PhkfH73VXfPJYKQ6JwS5I/eVcoyYi9IMNGc6FWpFLw= github.com/libp2p/go-reuseport v0.0.1/go.mod h1:jn6RmB1ufnQwl0Q1f+YxAj8isJgDCQzaaxIFYDhcYEA= @@ -322,6 +333,8 @@ github.com/multiformats/go-multiaddr v0.1.1/go.mod h1:aMKBKNEYmzmDmxfX88/vz+J5IU github.com/multiformats/go-multiaddr v0.2.0/go.mod h1:0nO36NvPpyV4QzvTLi/lafl2y95ncPj0vFwVF6k6wJ4= github.com/multiformats/go-multiaddr v0.2.1 h1:SgG/cw5vqyB5QQe5FPe2TqggU9WtrA9X4nZw7LlVqOI= github.com/multiformats/go-multiaddr v0.2.1/go.mod h1:s/Apk6IyxfvMjDafnhJgJ3/46z7tZ04iMk5wP4QMGGE= +github.com/multiformats/go-multiaddr v0.2.2 h1:XZLDTszBIJe6m0zF6ITBrEcZR73OPUhCBBS9rYAuUzI= +github.com/multiformats/go-multiaddr v0.2.2/go.mod h1:NtfXiOtHvghW9KojvtySjH5y0u0xW5UouOmQQrn6a3Y= github.com/multiformats/go-multiaddr-dns v0.0.1/go.mod h1:9kWcqw/Pj6FwxAwW38n/9403szc57zJPs45fmnznu3Q= github.com/multiformats/go-multiaddr-dns v0.0.2/go.mod h1:9kWcqw/Pj6FwxAwW38n/9403szc57zJPs45fmnznu3Q= github.com/multiformats/go-multiaddr-dns v0.2.0 h1:YWJoIDwLePniH7OU5hBnDZV6SWuvJqJ0YtN6pLeH9zA= @@ -375,6 +388,7 @@ github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/smola/gocompat v0.2.0/go.mod h1:1B0MlxbmoZNo3h8guHp8HztB3BSYR5itql9qtVc0ypY= github.com/spacemonkeygo/openssl v0.0.0-20181017203307-c2dcc5cca94a/go.mod h1:7AyxJNCJ7SBZ1MfVQCWD6Uqo2oubI2Eq2y2eqf+A5r0= +github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 h1:RC6RW7j+1+HkWaX/Yh71Ee5ZHaHYt7ZP4sQgUrm6cDU= github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572/go.mod h1:w0SWMsp6j9O/dk4/ZpIhL+3CkG8ofA2vuv7k+ltqUMc= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= @@ -388,6 +402,7 @@ github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DM github.com/src-d/envconfig v1.0.0/go.mod h1:Q9YQZ7BKITldTBnoxsE5gOeB5y66RyPXeue/R4aaNBc= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.1/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= @@ -407,6 +422,8 @@ github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 h1: github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7/go.mod h1:X2c0RVCI1eSUFI8eLcY3c0423ykwiUdxLJtkDvruhjI= github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= +github.com/yl2chen/cidranger v1.0.0 h1:9tdo0orHQJvXsX6mf+1Goou/R4kq21AfpbYeTcpXs2Q= +github.com/yl2chen/cidranger v1.0.0/go.mod h1:L7Msw4X7EQK7zMVjOtv7o8xMyjv1rJcNlYlMgGwP7ko= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.1/go.mod h1:Ap50jQcDJrx6rB6VgeeFPtuPIf3wMRvRfrfYDO6+BmA= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= diff --git a/rt_diversity_filter.go b/rt_diversity_filter.go new file mode 100644 index 00000000000..35d9021ef3d --- /dev/null +++ b/rt_diversity_filter.go @@ -0,0 +1,100 @@ +package dht + +import ( + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-kbucket/peerdiversity" + ma "github.com/multiformats/go-multiaddr" + "sync" +) + +var _ peerdiversity.PeerIPGroupFilter = (*rtPeerIPGroupFilter)(nil) + +type rtPeerIPGroupFilter struct { + mu sync.RWMutex + h host.Host + + maxPerCpl int + maxForTable int + + cplIpGroupCount map[int]map[peerdiversity.PeerIPGroupKey]int + tableIpGroupCount map[peerdiversity.PeerIPGroupKey]int +} + +// NewRTPeerDiversityFilter constructs the `PeerIPGroupFilter` that will be used to configure +// the diversity filter for the Routing Table. +// Please see the docs for `peerdiversity.PeerIPGroupFilter` AND `peerdiversity.Filter` for more details. +func NewRTPeerDiversityFilter(h host.Host, maxPerCpl, maxForTable int) *rtPeerIPGroupFilter { + return &rtPeerIPGroupFilter{ + h: h, + + maxPerCpl: maxPerCpl, + maxForTable: maxForTable, + + cplIpGroupCount: make(map[int]map[peerdiversity.PeerIPGroupKey]int), + tableIpGroupCount: make(map[peerdiversity.PeerIPGroupKey]int), + } + +} + +func (r *rtPeerIPGroupFilter) Allow(g peerdiversity.PeerGroupInfo) bool { + r.mu.RLock() + defer r.mu.RUnlock() + + key := g.IPGroupKey + cpl := g.Cpl + + if r.tableIpGroupCount[key] >= r.maxForTable { + + return false + } + + c, ok := r.cplIpGroupCount[cpl] + allow := !ok || c[key] < r.maxPerCpl + return allow +} + +func (r *rtPeerIPGroupFilter) Increment(g peerdiversity.PeerGroupInfo) { + r.mu.Lock() + defer r.mu.Unlock() + + key := g.IPGroupKey + cpl := g.Cpl + + r.tableIpGroupCount[key] = r.tableIpGroupCount[key] + 1 + if _, ok := r.cplIpGroupCount[cpl]; !ok { + r.cplIpGroupCount[cpl] = make(map[peerdiversity.PeerIPGroupKey]int) + } + + r.cplIpGroupCount[cpl][key] = r.cplIpGroupCount[cpl][key] + 1 +} + +func (r *rtPeerIPGroupFilter) Decrement(g peerdiversity.PeerGroupInfo) { + r.mu.Lock() + defer r.mu.Unlock() + + key := g.IPGroupKey + cpl := g.Cpl + + r.tableIpGroupCount[key] = r.tableIpGroupCount[key] - 1 + if r.tableIpGroupCount[key] == 0 { + delete(r.tableIpGroupCount, key) + } + + r.cplIpGroupCount[cpl][key] = r.cplIpGroupCount[cpl][key] - 1 + if r.cplIpGroupCount[cpl][key] == 0 { + delete(r.cplIpGroupCount[cpl], key) + } + if len(r.cplIpGroupCount[cpl]) == 0 { + delete(r.cplIpGroupCount, cpl) + } +} + +func (r *rtPeerIPGroupFilter) PeerAddresses(p peer.ID) []ma.Multiaddr { + cs := r.h.Network().ConnsToPeer(p) + addr := make([]ma.Multiaddr, 0, len(cs)) + for _, c := range cs { + addr = append(addr, c.RemoteMultiaddr()) + } + return addr +} diff --git a/rt_diversity_filter_test.go b/rt_diversity_filter_test.go new file mode 100644 index 00000000000..1843e3604d3 --- /dev/null +++ b/rt_diversity_filter_test.go @@ -0,0 +1,87 @@ +package dht + +import ( + "context" + "testing" + "time" + + "github.com/libp2p/go-libp2p-kbucket/peerdiversity" + swarmt "github.com/libp2p/go-libp2p-swarm/testing" + bhost "github.com/libp2p/go-libp2p/p2p/host/basic" + + "github.com/stretchr/testify/require" +) + +func TestRTPeerDiversityFilter(t *testing.T) { + ctx := context.Background() + h := bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)) + r := NewRTPeerDiversityFilter(h, 2, 3) + + // table should only have 2 for each prefix per cpl + key := "key" + g := peerdiversity.PeerGroupInfo{Cpl: 1, IPGroupKey: peerdiversity.PeerIPGroupKey(key)} + require.True(t, r.Allow(g)) + r.Increment(g) + require.True(t, r.Allow(g)) + r.Increment(g) + require.False(t, r.Allow(g)) + + // table should ONLY have 3 for a Prefix + key = "random" + g2 := peerdiversity.PeerGroupInfo{Cpl: 2, IPGroupKey: peerdiversity.PeerIPGroupKey(key)} + require.True(t, r.Allow(g2)) + r.Increment(g2) + + g2.Cpl = 3 + require.True(t, r.Allow(g2)) + r.Increment(g2) + + g2.Cpl = 4 + require.True(t, r.Allow(g2)) + r.Increment(g2) + + require.False(t, r.Allow(g2)) + + // remove a peer with a prefix and it works + r.Decrement(g2) + require.True(t, r.Allow(g2)) + r.Increment(g2) + + // and then it dosen't work again + require.False(t, r.Allow(g2)) +} + +func TestRoutingTableEndToEnd(t *testing.T) { + ctx := context.Background() + h := bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)) + r := NewRTPeerDiversityFilter(h, 2, 3) + + d, err := New( + ctx, + h, + testPrefix, + NamespacedValidator("v", blankValidator{}), + Mode(ModeServer), + DisableAutoRefresh(), + RoutingTablePeerDiversityFilter(r), + ) + require.NoError(t, err) + + // only 3 peers per prefix for a cpl + d2 := setupDHT(ctx, t, false, DisableAutoRefresh()) + connect(t, ctx, d, d2) + waitForWellFormedTables(t, []*IpfsDHT{d}, 1, 1, 1*time.Second) + + d3 := setupDHT(ctx, t, false, DisableAutoRefresh()) + connect(t, ctx, d, d3) + waitForWellFormedTables(t, []*IpfsDHT{d}, 2, 2, 1*time.Second) + + d4 := setupDHT(ctx, t, false, DisableAutoRefresh()) + connect(t, ctx, d, d4) + waitForWellFormedTables(t, []*IpfsDHT{d}, 3, 3, 1*time.Second) + + d5 := setupDHT(ctx, t, false, DisableAutoRefresh()) + connectNoSync(t, ctx, d, d5) + time.Sleep(1 * time.Second) + require.Len(t, d.routingTable.ListPeers(), 3) +} diff --git a/rtrefresh/rt_refresh_manager_test.go b/rtrefresh/rt_refresh_manager_test.go index d384aa4327d..355e4cf4be7 100644 --- a/rtrefresh/rt_refresh_manager_test.go +++ b/rtrefresh/rt_refresh_manager_test.go @@ -53,7 +53,7 @@ func TestSkipRefreshOnGapCpls(t *testing.T) { // when 2*gapcpl < maxCpl // gap is 2 and max is 10 - rt, err := kb.NewRoutingTable(2, kb.ConvertPeerID(local), time.Hour, pstore.NewMetrics(), 100*time.Hour) + rt, err := kb.NewRoutingTable(2, kb.ConvertPeerID(local), time.Hour, pstore.NewMetrics(), 100*time.Hour, nil) require.NoError(t, err) r := &RtRefreshManager{ctx: ctx, rt: rt, refreshKeyGenFnc: kfnc, dhtPeerId: local} icpl := uint(2) @@ -77,7 +77,7 @@ func TestSkipRefreshOnGapCpls(t *testing.T) { } // when 2 * (gapcpl + 1) > maxCpl - rt, err = kb.NewRoutingTable(2, kb.ConvertPeerID(local), time.Hour, pstore.NewMetrics(), 100*time.Hour) + rt, err = kb.NewRoutingTable(2, kb.ConvertPeerID(local), time.Hour, pstore.NewMetrics(), 100*time.Hour, nil) require.NoError(t, err) r = &RtRefreshManager{ctx: ctx, rt: rt, refreshKeyGenFnc: kfnc, dhtPeerId: local} icpl = uint(6)