Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Crawler based DHT client #709

Merged
merged 29 commits into from
May 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
48c1814
crawler client v1
aschmahmann Mar 12, 2021
365c18c
bug fixes and add some options
aschmahmann Apr 8, 2021
753a1ea
go fmt + prov fix
aschmahmann Apr 9, 2021
9bc8a2f
optimize provide many to only calculate peer info once
aschmahmann Apr 13, 2021
41d6f44
a bit of cleanup and added a Ready function
aschmahmann Apr 22, 2021
e7a2e57
moved TestInvalidMessageSenderTracking to the internal package contai…
aschmahmann Apr 22, 2021
7702392
move IpfsDHT options to the internal package. Make a breaking change …
aschmahmann Apr 22, 2021
b26c889
switch experimental client to share options with the standard client
aschmahmann Apr 22, 2021
c8f5857
add support for bulk puts
aschmahmann Apr 22, 2021
4880f62
cleanup some linting errors and make findpeer only dial the peer once…
aschmahmann Apr 22, 2021
b62d4af
cleanup + add more logging to bulk sending
aschmahmann Apr 23, 2021
4532fb0
more findpeer changes
aschmahmann Apr 23, 2021
7e79983
changed signature of DHT filter options to be more general
aschmahmann Apr 30, 2021
d543baa
experimental dht: add stored addresses to peerstore temporarily durin…
aschmahmann Apr 30, 2021
ee4a44e
dht: switch GetClosestPeers to return a slice of peers instead of a c…
aschmahmann Apr 30, 2021
43f5e30
linter cleanup
aschmahmann Apr 30, 2021
de6e380
fullrtdht: better progress logging on bulk operations
aschmahmann May 11, 2021
359d3de
fullrtdht: do not error on bulk operations if we are not asked to do …
aschmahmann May 11, 2021
aefc008
fix logging output
aschmahmann May 11, 2021
77e7408
fullrt: have support for both standard and fullrt options
aschmahmann May 12, 2021
d0236f0
oops
aschmahmann May 12, 2021
1b0922d
reorder imports
aschmahmann May 12, 2021
c22b4bf
renamed StreamDisconnect to OnDisconnect
aschmahmann May 12, 2021
57eeffe
skip disconnect events if the message sender does not need them
aschmahmann May 12, 2021
ff66a31
fullrtdht: add Close function and remove passed in context
aschmahmann May 13, 2021
03d275f
fix percentage logging during bulk sends
aschmahmann May 13, 2021
8b9a22e
crawler: starting peers and peers found during crawl have their addre…
aschmahmann May 14, 2021
74d4e7e
import ordering
aschmahmann May 14, 2021
6e5e5d6
import ordering
aschmahmann May 14, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 20 additions & 6 deletions crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ type HandleQueryResult func(p peer.ID, rtPeers []*peer.AddrInfo)
// HandleQueryFail is a callback on failed peer query
type HandleQueryFail func(p peer.ID, err error)

const dialAddressExtendDur time.Duration = time.Minute * 30

// Run crawls dht peers from an initial seed of `startingPeers`
func (c *Crawler) Run(ctx context.Context, startingPeers []*peer.AddrInfo, handleSuccess HandleQueryResult, handleFail HandleQueryFail) {
jobs := make(chan peer.ID, 1)
Expand All @@ -140,15 +142,27 @@ func (c *Crawler) Run(ctx context.Context, startingPeers []*peer.AddrInfo, handl
defer wg.Wait()
defer close(jobs)

toDial := make([]*peer.AddrInfo, 0, len(startingPeers))
var toDial []*peer.AddrInfo
peersSeen := make(map[peer.ID]struct{})

numSkipped := 0
for _, ai := range startingPeers {
extendAddrs := c.host.Peerstore().Addrs(ai.ID)
if len(ai.Addrs) > 0 {
extendAddrs = append(extendAddrs, ai.Addrs...)
c.host.Peerstore().AddAddrs(ai.ID, extendAddrs, dialAddressExtendDur)
}
if len(extendAddrs) == 0 {
numSkipped++
continue
}

toDial = append(toDial, ai)
peersSeen[ai.ID] = struct{}{}
extendAddrs := c.host.Peerstore().Addrs(ai.ID)
extendAddrs = append(extendAddrs, ai.Addrs...)
c.host.Peerstore().AddAddrs(ai.ID, extendAddrs, time.Hour)
}

if numSkipped > 0 {
logger.Infof("%d starting peers were skipped due to lack of addresses. Starting crawl with %d peers", numSkipped, len(toDial))
}

numQueried := 0
Expand All @@ -168,7 +182,7 @@ func (c *Crawler) Run(ctx context.Context, startingPeers []*peer.AddrInfo, handl
logger.Debugf("peer %v had %d peers", res.peer, len(res.data))
rtPeers := make([]*peer.AddrInfo, 0, len(res.data))
for p, ai := range res.data {
c.host.Peerstore().AddAddrs(p, ai.Addrs, time.Hour)
c.host.Peerstore().AddAddrs(p, ai.Addrs, dialAddressExtendDur)
if _, ok := peersSeen[p]; !ok {
peersSeen[p] = struct{}{}
toDial = append(toDial, ai)
Expand Down Expand Up @@ -208,7 +222,7 @@ func (c *Crawler) queryPeer(ctx context.Context, nextPeer peer.ID) *queryResult
defer cancel()
err = c.host.Connect(connCtx, peer.AddrInfo{ID: nextPeer})
if err != nil {
logger.Infof("could not connect to peer %v: %v", nextPeer, err)
logger.Debugf("could not connect to peer %v: %v", nextPeer, err)
return &queryResult{nextPeer, nil, err}
}

Expand Down
28 changes: 0 additions & 28 deletions ctx_mutex.go

This file was deleted.

86 changes: 42 additions & 44 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"github.com/libp2p/go-libp2p-core/routing"

"github.com/libp2p/go-libp2p-kad-dht/internal"
dhtcfg "github.com/libp2p/go-libp2p-kad-dht/internal/config"
"github.com/libp2p/go-libp2p-kad-dht/internal/net"
"github.com/libp2p/go-libp2p-kad-dht/metrics"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
"github.com/libp2p/go-libp2p-kad-dht/providers"
Expand Down Expand Up @@ -96,7 +98,7 @@ type IpfsDHT struct {
proc goprocess.Process

protoMessenger *pb.ProtocolMessenger
msgSender *messageSenderImpl
msgSender pb.MessageSender

plk sync.Mutex

Expand Down Expand Up @@ -163,15 +165,15 @@ var (
// If the Routing Table has more than "minRTRefreshThreshold" peers, we consider a peer as a Routing Table candidate ONLY when
// we successfully get a query response from it OR if it send us a query.
func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error) {
var cfg config
if err := cfg.apply(append([]Option{defaults}, options...)...); err != nil {
var cfg dhtcfg.Config
if err := cfg.Apply(append([]Option{dhtcfg.Defaults}, options...)...); err != nil {
return nil, err
}
if err := cfg.applyFallbacks(h); err != nil {
if err := cfg.ApplyFallbacks(h); err != nil {
return nil, err
}

if err := cfg.validate(); err != nil {
if err := cfg.Validate(); err != nil {
return nil, err
}

Expand All @@ -180,34 +182,30 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
return nil, fmt.Errorf("failed to create DHT, err=%s", err)
}

dht.autoRefresh = cfg.routingTable.autoRefresh
dht.autoRefresh = cfg.RoutingTable.AutoRefresh

dht.maxRecordAge = cfg.maxRecordAge
dht.enableProviders = cfg.enableProviders
dht.enableValues = cfg.enableValues
dht.disableFixLowPeers = cfg.disableFixLowPeers
dht.maxRecordAge = cfg.MaxRecordAge
dht.enableProviders = cfg.EnableProviders
dht.enableValues = cfg.EnableValues
dht.disableFixLowPeers = cfg.DisableFixLowPeers

dht.Validator = cfg.validator
dht.msgSender = &messageSenderImpl{
host: h,
strmap: make(map[peer.ID]*peerMessageSender),
protocols: dht.protocols,
}
dht.Validator = cfg.Validator
dht.msgSender = net.NewMessageSenderImpl(h, dht.protocols)
dht.protoMessenger, err = pb.NewProtocolMessenger(dht.msgSender, pb.WithValidator(dht.Validator))
if err != nil {
return nil, err
}

dht.testAddressUpdateProcessing = cfg.testAddressUpdateProcessing
dht.testAddressUpdateProcessing = cfg.TestAddressUpdateProcessing

dht.auto = cfg.mode
switch cfg.mode {
dht.auto = cfg.Mode
switch cfg.Mode {
case ModeAuto, ModeClient:
dht.mode = modeClient
case ModeAutoServer, ModeServer:
dht.mode = modeServer
default:
return nil, fmt.Errorf("invalid dht mode %d", cfg.mode)
return nil, fmt.Errorf("invalid dht mode %d", cfg.Mode)
}

if dht.mode == modeServer {
Expand Down Expand Up @@ -265,20 +263,20 @@ func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT
return dht
}

func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
func makeDHT(ctx context.Context, h host.Host, cfg dhtcfg.Config) (*IpfsDHT, error) {
var protocols, serverProtocols []protocol.ID

v1proto := cfg.protocolPrefix + kad1
v1proto := cfg.ProtocolPrefix + kad1

if cfg.v1ProtocolOverride != "" {
v1proto = cfg.v1ProtocolOverride
if cfg.V1ProtocolOverride != "" {
v1proto = cfg.V1ProtocolOverride
}

protocols = []protocol.ID{v1proto}
serverProtocols = []protocol.ID{v1proto}

dht := &IpfsDHT{
datastore: cfg.datastore,
datastore: cfg.Datastore,
self: h.ID(),
selfKey: kb.ConvertPeerID(h.ID()),
peerstore: h.Peerstore(),
Expand All @@ -287,12 +285,12 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
protocols: protocols,
protocolsStrs: protocol.ConvertToStrings(protocols),
serverProtocols: serverProtocols,
bucketSize: cfg.bucketSize,
alpha: cfg.concurrency,
beta: cfg.resiliency,
queryPeerFilter: cfg.queryPeerFilter,
routingTablePeerFilter: cfg.routingTable.peerFilter,
rtPeerDiversityFilter: cfg.routingTable.diversityFilter,
bucketSize: cfg.BucketSize,
alpha: cfg.Concurrency,
beta: cfg.Resiliency,
queryPeerFilter: cfg.QueryPeerFilter,
routingTablePeerFilter: cfg.RoutingTable.PeerFilter,
rtPeerDiversityFilter: cfg.RoutingTable.DiversityFilter,

fixLowPeersChan: make(chan struct{}, 1),

Expand All @@ -306,12 +304,12 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
// query a peer as part of our refresh cycle.
// To grok the Math Wizardy that produced these exact equations, please be patient as a document explaining it will
// be published soon.
if cfg.concurrency < cfg.bucketSize { // (alpha < K)
l1 := math.Log(float64(1) / float64(cfg.bucketSize)) //(Log(1/K))
l2 := math.Log(float64(1) - (float64(cfg.concurrency) / float64(cfg.bucketSize))) // Log(1 - (alpha / K))
maxLastSuccessfulOutboundThreshold = time.Duration(l1 / l2 * float64(cfg.routingTable.refreshInterval))
if cfg.Concurrency < cfg.BucketSize { // (alpha < K)
l1 := math.Log(float64(1) / float64(cfg.BucketSize)) //(Log(1/K))
l2 := math.Log(float64(1) - (float64(cfg.Concurrency) / float64(cfg.BucketSize))) // Log(1 - (alpha / K))
maxLastSuccessfulOutboundThreshold = time.Duration(l1 / l2 * float64(cfg.RoutingTable.RefreshInterval))
} else {
maxLastSuccessfulOutboundThreshold = cfg.routingTable.refreshInterval
maxLastSuccessfulOutboundThreshold = cfg.RoutingTable.RefreshInterval
}

// construct routing table
Expand All @@ -321,7 +319,7 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
return nil, fmt.Errorf("failed to construct routing table,err=%s", err)
}
dht.routingTable = rt
dht.bootstrapPeers = cfg.bootstrapPeers
dht.bootstrapPeers = cfg.BootstrapPeers

// rt refresh manager
rtRefresh, err := makeRtRefreshManager(dht, cfg, maxLastSuccessfulOutboundThreshold)
Expand All @@ -340,7 +338,7 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
// the DHT context should be done when the process is closed
dht.ctx = goprocessctx.WithProcessClosing(ctxTags, dht.proc)

pm, err := providers.NewProviderManager(dht.ctx, h.ID(), cfg.datastore, cfg.providersOptions...)
pm, err := providers.NewProviderManager(dht.ctx, h.ID(), cfg.Datastore, cfg.ProvidersOptions...)
if err != nil {
return nil, err
}
Expand All @@ -351,7 +349,7 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
return dht, nil
}

func makeRtRefreshManager(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThreshold time.Duration) (*rtrefresh.RtRefreshManager, error) {
func makeRtRefreshManager(dht *IpfsDHT, cfg dhtcfg.Config, maxLastSuccessfulOutboundThreshold time.Duration) (*rtrefresh.RtRefreshManager, error) {
keyGenFnc := func(cpl uint) (string, error) {
p, err := dht.routingTable.GenRandPeerID(cpl)
return string(p), err
Expand All @@ -363,18 +361,18 @@ func makeRtRefreshManager(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThr
}

r, err := rtrefresh.NewRtRefreshManager(
dht.host, dht.routingTable, cfg.routingTable.autoRefresh,
dht.host, dht.routingTable, cfg.RoutingTable.AutoRefresh,
keyGenFnc,
queryFnc,
cfg.routingTable.refreshQueryTimeout,
cfg.routingTable.refreshInterval,
cfg.RoutingTable.RefreshQueryTimeout,
cfg.RoutingTable.RefreshInterval,
maxLastSuccessfulOutboundThreshold,
dht.refreshFinishedCh)

return r, err
}

func makeRoutingTable(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThreshold time.Duration) (*kb.RoutingTable, error) {
func makeRoutingTable(dht *IpfsDHT, cfg dhtcfg.Config, maxLastSuccessfulOutboundThreshold time.Duration) (*kb.RoutingTable, error) {
// make a Routing Table Diversity Filter
var filter *peerdiversity.Filter
if dht.rtPeerDiversityFilter != nil {
Expand All @@ -389,7 +387,7 @@ func makeRoutingTable(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThresho
filter = df
}

rt, err := kb.NewRoutingTable(cfg.bucketSize, dht.selfKey, time.Minute, dht.host.Peerstore(), maxLastSuccessfulOutboundThreshold, filter)
rt, err := kb.NewRoutingTable(cfg.BucketSize, dht.selfKey, time.Minute, dht.host.Peerstore(), maxLastSuccessfulOutboundThreshold, filter)
if err != nil {
return nil, err
}
Expand Down
35 changes: 27 additions & 8 deletions dht_filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"
"time"

"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"

Expand All @@ -14,14 +15,16 @@ import (

ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"

dhtcfg "github.com/libp2p/go-libp2p-kad-dht/internal/config"
)

// QueryFilterFunc is a filter applied when considering peers to dial when querying
type QueryFilterFunc func(dht *IpfsDHT, ai peer.AddrInfo) bool
type QueryFilterFunc = dhtcfg.QueryFilterFunc

// RouteTableFilterFunc is a filter applied when considering connections to keep in
// the local route table.
type RouteTableFilterFunc func(dht *IpfsDHT, conns []network.Conn) bool
type RouteTableFilterFunc = dhtcfg.RouteTableFilterFunc

var publicCIDR6 = "2000::/3"
var public6 *net.IPNet
Expand Down Expand Up @@ -59,7 +62,7 @@ func isPrivateAddr(a ma.Multiaddr) bool {
}

// PublicQueryFilter returns true if the peer is suspected of being publicly accessible
func PublicQueryFilter(_ *IpfsDHT, ai peer.AddrInfo) bool {
func PublicQueryFilter(_ interface{}, ai peer.AddrInfo) bool {
if len(ai.Addrs) == 0 {
return false
}
Expand All @@ -73,18 +76,25 @@ func PublicQueryFilter(_ *IpfsDHT, ai peer.AddrInfo) bool {
return hasPublicAddr
}

type hasHost interface {
Host() host.Host
}

var _ QueryFilterFunc = PublicQueryFilter

// PublicRoutingTableFilter allows a peer to be added to the routing table if the connections to that peer indicate
// that it is on a public network
func PublicRoutingTableFilter(dht *IpfsDHT, conns []network.Conn) bool {
func PublicRoutingTableFilter(dht interface{}, p peer.ID) bool {
d := dht.(hasHost)
Stebalien marked this conversation as resolved.
Show resolved Hide resolved

conns := d.Host().Network().ConnsToPeer(p)
if len(conns) == 0 {
return false
}

// Do we have a public address for this peer?
id := conns[0].RemotePeer()
known := dht.peerstore.PeerInfo(id)
known := d.Host().Peerstore().PeerInfo(id)
for _, a := range known.Addrs {
if !isRelayAddr(a) && isPublicAddr(a) {
return true
Expand All @@ -97,7 +107,7 @@ func PublicRoutingTableFilter(dht *IpfsDHT, conns []network.Conn) bool {
var _ RouteTableFilterFunc = PublicRoutingTableFilter

// PrivateQueryFilter doens't currently restrict which peers we are willing to query from the local DHT.
func PrivateQueryFilter(dht *IpfsDHT, ai peer.AddrInfo) bool {
func PrivateQueryFilter(_ interface{}, ai peer.AddrInfo) bool {
return len(ai.Addrs) > 0
}

Expand Down Expand Up @@ -137,10 +147,19 @@ func getCachedRouter() routing.Router {

// PrivateRoutingTableFilter allows a peer to be added to the routing table if the connections to that peer indicate
// that it is on a private network
func PrivateRoutingTableFilter(dht *IpfsDHT, conns []network.Conn) bool {
func PrivateRoutingTableFilter(dht interface{}, p peer.ID) bool {
d := dht.(hasHost)
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
conns := d.Host().Network().ConnsToPeer(p)
return privRTFilter(d, conns)
}

func privRTFilter(dht interface{}, conns []network.Conn) bool {
d := dht.(hasHost)
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
h := d.Host()

router := getCachedRouter()
myAdvertisedIPs := make([]net.IP, 0)
for _, a := range dht.Host().Addrs() {
for _, a := range h.Addrs() {
if isPublicAddr(a) && !isRelayAddr(a) {
ip, err := manet.ToIP(a)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion dht_filters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestFilterCaching(t *testing.T) {
d := setupDHT(ctx, t, true)

remote, _ := manet.FromIP(net.IPv4(8, 8, 8, 8))
if PrivateRoutingTableFilter(d, []network.Conn{&mockConn{
if privRTFilter(d, []network.Conn{&mockConn{
local: d.Host().Peerstore().PeerInfo(d.Host().ID()),
remote: peer.AddrInfo{ID: "", Addrs: []ma.Multiaddr{remote}},
}}) {
Expand Down
Loading