Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add explore state machine to expand population of routing table #934

Merged
merged 17 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ require (
github.com/libp2p/go-msgio v0.3.0
github.com/multiformats/go-base32 v0.1.0
github.com/multiformats/go-multiaddr v0.11.0
github.com/multiformats/go-multihash v0.2.3 // indirect
github.com/multiformats/go-multihash v0.2.3
github.com/pkg/errors v0.9.1 // indirect
github.com/plprobelab/go-kademlia v0.0.0-20230913171354-443ec1f56080
github.com/prometheus/client_golang v1.16.0 // indirect
Expand Down
7 changes: 0 additions & 7 deletions v2/internal/coord/behaviour_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,6 @@ import (
"context"
)

type NullSM[E any, S any] struct{}

func (NullSM[E, S]) Advance(context.Context, E) S {
var v S
return v
}

type RecordingSM[E any, S any] struct {
State S
Received E
Expand Down
18 changes: 17 additions & 1 deletion v2/internal/coord/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

"github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/brdcst"
"github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/coordt"
"github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/cplutil"
"github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/query"
"github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/routing"
"github.com/libp2p/go-libp2p-kad-dht/v2/kadt"
Expand Down Expand Up @@ -169,16 +170,16 @@

func NewCoordinator(self kadt.PeerID, rtr coordt.Router[kadt.Key, kadt.PeerID, *pb.Message], rt routing.RoutingTableCpl[kadt.Key, kadt.PeerID], cfg *CoordinatorConfig) (*Coordinator, error) {
if cfg == nil {
cfg = DefaultCoordinatorConfig()

Check warning on line 173 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L173

Added line #L173 was not covered by tests
} else if err := cfg.Validate(); err != nil {
return nil, err
}

Check warning on line 176 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L175-L176

Added lines #L175 - L176 were not covered by tests

// initialize a new telemetry struct
tele, err := NewTelemetry(cfg.MeterProvider, cfg.TracerProvider)
if err != nil {
return nil, fmt.Errorf("init telemetry: %w", err)
}

Check warning on line 182 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L181-L182

Added lines #L181 - L182 were not covered by tests

qpCfg := query.DefaultPoolConfig()
qpCfg.Clock = cfg.Clock
Expand All @@ -189,8 +190,8 @@

qp, err := query.NewPool[kadt.Key, kadt.PeerID, *pb.Message](self, qpCfg)
if err != nil {
return nil, fmt.Errorf("query pool: %w", err)
}

Check warning on line 194 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L193-L194

Added lines #L193 - L194 were not covered by tests
queryBehaviour := NewPooledQueryBehaviour(qp, cfg.Logger, tele.Tracer)

bootstrapCfg := routing.DefaultBootstrapConfig[kadt.Key]()
Expand All @@ -201,8 +202,8 @@

bootstrap, err := routing.NewBootstrap(self, bootstrapCfg)
if err != nil {
return nil, fmt.Errorf("bootstrap: %w", err)
}

Check warning on line 206 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L205-L206

Added lines #L205 - L206 were not covered by tests

includeCfg := routing.DefaultIncludeConfig()
includeCfg.Clock = cfg.Clock
Expand All @@ -215,8 +216,8 @@

include, err := routing.NewInclude[kadt.Key, kadt.PeerID](rt, includeCfg)
if err != nil {
return nil, fmt.Errorf("include: %w", err)
}

Check warning on line 220 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L219-L220

Added lines #L219 - L220 were not covered by tests

probeCfg := routing.DefaultProbeConfig()
probeCfg.Clock = cfg.Clock
Expand All @@ -226,17 +227,32 @@
// probeCfg.Concurrency = cfg.ProbeConcurrency
probe, err := routing.NewProbe[kadt.Key](rt, probeCfg)
if err != nil {
return nil, fmt.Errorf("probe: %w", err)
}

Check warning on line 231 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L230-L231

Added lines #L230 - L231 were not covered by tests

routingBehaviour := NewRoutingBehaviour(self, bootstrap, include, probe, cfg.Logger, tele.Tracer)
exploreCfg := routing.DefaultExploreConfig()
exploreCfg.Clock = cfg.Clock
exploreCfg.Timeout = cfg.QueryTimeout

schedule, err := routing.NewDynamicExploreSchedule(14, cfg.Clock.Now(), time.Hour, 1, 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also: // TODO: expose more config

if err != nil {
return nil, fmt.Errorf("explore schedule: %w", err)
}

Check warning on line 240 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L239-L240

Added lines #L239 - L240 were not covered by tests

// TODO: expose more config
explore, err := routing.NewExplore[kadt.Key](self, rt, cplutil.GenRandPeerID, schedule, exploreCfg)
if err != nil {
return nil, fmt.Errorf("explore: %w", err)
}

Check warning on line 246 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L245-L246

Added lines #L245 - L246 were not covered by tests

routingBehaviour := NewRoutingBehaviour(self, bootstrap, include, probe, explore, cfg.Logger, tele.Tracer)

networkBehaviour := NewNetworkBehaviour(rtr, cfg.Logger, tele.Tracer)

b, err := brdcst.NewPool[kadt.Key, kadt.PeerID, *pb.Message](self, nil)
if err != nil {
return nil, fmt.Errorf("broadcast: %w", err)
}

Check warning on line 255 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L254-L255

Added lines #L254 - L255 were not covered by tests

brdcstBehaviour := NewPooledBroadcastBehaviour(b, cfg.Logger, tele.Tracer)

Expand Down Expand Up @@ -271,8 +287,8 @@
return nil
}

func (c *Coordinator) ID() kadt.PeerID {
return c.self

Check warning on line 291 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L290-L291

Added lines #L290 - L291 were not covered by tests
}

func (c *Coordinator) eventLoop(ctx context.Context) {
Expand All @@ -289,14 +305,14 @@
case <-ctx.Done():
// coordinator is closing
return
case <-c.networkBehaviour.Ready():
ev, ok = c.networkBehaviour.Perform(ctx)

Check warning on line 309 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L308-L309

Added lines #L308 - L309 were not covered by tests
case <-c.routingBehaviour.Ready():
ev, ok = c.routingBehaviour.Perform(ctx)
case <-c.queryBehaviour.Ready():
ev, ok = c.queryBehaviour.Perform(ctx)
case <-c.brdcstBehaviour.Ready():
ev, ok = c.brdcstBehaviour.Perform(ctx)

Check warning on line 315 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L314-L315

Added lines #L314 - L315 were not covered by tests
}

if ok {
Expand All @@ -312,10 +328,10 @@
switch ev := ev.(type) {
case NetworkCommand:
c.networkBehaviour.Notify(ctx, ev)
case QueryCommand:
c.queryBehaviour.Notify(ctx, ev)
case BrdcstCommand:
c.brdcstBehaviour.Notify(ctx, ev)

Check warning on line 334 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L331-L334

Added lines #L331 - L334 were not covered by tests
case RoutingCommand:
c.routingBehaviour.Notify(ctx, ev)
case RoutingNotification:
Expand All @@ -323,8 +339,8 @@
rn := c.routingNotifier
c.routingNotifierMu.RUnlock()
rn.Notify(ctx, ev)
default:
panic(fmt.Sprintf("unexpected event: %T", ev))

Check warning on line 343 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L342-L343

Added lines #L342 - L343 were not covered by tests
}
}

Expand All @@ -345,8 +361,8 @@

nh, err := c.networkBehaviour.getNodeHandler(ctx, id)
if err != nil {
return nil, err
}

Check warning on line 365 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L364-L365

Added lines #L364 - L365 were not covered by tests
return nh, nil
}

Expand All @@ -359,8 +375,8 @@
for _, id := range closest {
nh, err := c.networkBehaviour.getNodeHandler(ctx, id)
if err != nil {
return nil, err
}

Check warning on line 379 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L378-L379

Added lines #L378 - L379 were not covered by tests
nodes = append(nodes, nh)
}
return nodes, nil
Expand All @@ -368,14 +384,14 @@

// GetValue requests that the node return any value associated with the supplied key.
// If the node does not have a value for the key it returns ErrValueNotFound.
func (c *Coordinator) GetValue(ctx context.Context, k kadt.Key) (coordt.Value, error) {
panic("not implemented")

Check warning on line 388 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L387-L388

Added lines #L387 - L388 were not covered by tests
}

// PutValue requests that the node stores a value to be associated with the supplied key.
// If the node cannot or chooses not to store the value for the key it returns ErrValueNotAccepted.
func (c *Coordinator) PutValue(ctx context.Context, r coordt.Value, q int) error {
panic("not implemented")

Check warning on line 394 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L393-L394

Added lines #L393 - L394 were not covered by tests
}

// QueryClosest starts a query that attempts to find the closest nodes to the target key.
Expand All @@ -398,8 +414,8 @@

seeds, err := c.GetClosestNodes(ctx, target, 20)
if err != nil {
return nil, coordt.QueryStats{}, err
}

Check warning on line 418 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L417-L418

Added lines #L417 - L418 were not covered by tests

seedIDs := make([]kadt.PeerID, 0, len(seeds))
for _, s := range seeds {
Expand Down Expand Up @@ -442,13 +458,13 @@
defer cancel()

if numResults < 1 {
numResults = 20
}

Check warning on line 462 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L461-L462

Added lines #L461 - L462 were not covered by tests

seeds, err := c.GetClosestNodes(ctx, msg.Target(), numResults)
if err != nil {
return coordt.QueryStats{}, err
}

Check warning on line 467 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L466-L467

Added lines #L466 - L467 were not covered by tests

seedIDs := make([]kadt.PeerID, 0, len(seeds))
for _, s := range seeds {
Expand All @@ -474,51 +490,51 @@
return stats, err
}

func (c *Coordinator) BroadcastRecord(ctx context.Context, msg *pb.Message) error {
ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.BroadcastRecord")
defer span.End()

ctx, cancel := context.WithCancel(ctx)
defer cancel()

seeds, err := c.GetClosestNodes(ctx, msg.Target(), 20)
if err != nil {
return err
}

Check warning on line 503 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L493-L503

Added lines #L493 - L503 were not covered by tests

seedIDs := make([]kadt.PeerID, 0, len(seeds))
for _, s := range seeds {
seedIDs = append(seedIDs, s.ID())
}

Check warning on line 508 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L505-L508

Added lines #L505 - L508 were not covered by tests

waiter := NewWaiter[BehaviourEvent]()
queryID := c.newOperationID()

cmd := &EventStartBroadcast{
QueryID: queryID,
Target: msg.Target(),
Message: msg,
Seed: seedIDs,
Notify: waiter,
Config: brdcst.DefaultConfigFollowUp(),
}

// queue the start of the query
c.brdcstBehaviour.Notify(ctx, cmd)

contacted, errs, err := c.waitForBroadcast(ctx, waiter)
fmt.Println(contacted)
fmt.Println(errs)

return err

Check warning on line 529 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L510-L529

Added lines #L510 - L529 were not covered by tests
}

func (c *Coordinator) waitForQuery(ctx context.Context, queryID coordt.QueryID, waiter *Waiter[BehaviourEvent], fn coordt.QueryFunc) ([]kadt.PeerID, coordt.QueryStats, error) {
var lastStats coordt.QueryStats
for {
select {
case <-ctx.Done():
return nil, lastStats, ctx.Err()

Check warning on line 537 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L536-L537

Added lines #L536 - L537 were not covered by tests
case wev := <-waiter.Chan():
ctx, ev := wev.Ctx, wev.Event
switch ev := ev.(type) {
Expand All @@ -531,8 +547,8 @@
}
nh, err := c.networkBehaviour.getNodeHandler(ctx, ev.NodeID)
if err != nil {
// ignore unknown node
break

Check warning on line 551 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L550-L551

Added lines #L550 - L551 were not covered by tests
}

err = fn(ctx, nh.ID(), ev.Response, lastStats)
Expand All @@ -542,18 +558,18 @@
return nil, lastStats, nil
}
if err != nil {
// user defined error that terminates the query
c.queryBehaviour.Notify(ctx, &EventStopQuery{QueryID: queryID})
return nil, lastStats, err
}

Check warning on line 564 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L561-L564

Added lines #L561 - L564 were not covered by tests

case *EventQueryFinished:
// query is done
lastStats.Exhausted = true
return ev.ClosestNodes, lastStats, nil

default:
panic(fmt.Sprintf("unexpected event: %T", ev))

Check warning on line 572 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L571-L572

Added lines #L571 - L572 were not covered by tests
}
}
}
Expand All @@ -563,19 +579,19 @@
Node kadt.PeerID
Err error
}, error,
) {
for {
select {
case <-ctx.Done():
return nil, nil, ctx.Err()
case wev := <-waiter.Chan():
switch ev := wev.Event.(type) {
case *EventQueryProgressed:
case *EventBroadcastFinished:
return ev.Contacted, ev.Errors, nil

Check warning on line 591 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L582-L591

Added lines #L582 - L591 were not covered by tests

default:
panic(fmt.Sprintf("unexpected event: %T", ev))

Check warning on line 594 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L593-L594

Added lines #L593 - L594 were not covered by tests
}
}
}
Expand All @@ -589,8 +605,8 @@
defer span.End()
for _, id := range ids {
if id.Equal(c.self) {
// skip self
continue

Check warning on line 609 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L608-L609

Added lines #L608 - L609 were not covered by tests
}

c.routingBehaviour.Notify(ctx, &EventAddNode{
Expand Down Expand Up @@ -629,15 +645,15 @@

// NotifyNonConnectivity notifies the coordinator that a peer has failed a connectivity check
// which means it is not connected and/or it doesn't support finding closer nodes
func (c *Coordinator) NotifyNonConnectivity(ctx context.Context, id kadt.PeerID) error {
ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.NotifyNonConnectivity")
defer span.End()

c.routingBehaviour.Notify(ctx, &EventNotifyNonConnectivity{
NodeID: id,
})

return nil

Check warning on line 656 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L648-L656

Added lines #L648 - L656 were not covered by tests
}

func (c *Coordinator) newOperationID() coordt.QueryID {
Expand Down Expand Up @@ -685,8 +701,8 @@

// wait to be signaled that there is a new event
select {
case <-ctx.Done():
return nil, fmt.Errorf("test deadline exceeded while waiting for event %T", expected)

Check warning on line 705 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L704-L705

Added lines #L704 - L705 were not covered by tests
case <-w.signal:
}
}
Expand All @@ -711,8 +727,8 @@

// wait to be signaled that there is a new event
select {
case <-ctx.Done():
return nil, fmt.Errorf("test deadline exceeded while waiting for routing updated event")

Check warning on line 731 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L730-L731

Added lines #L730 - L731 were not covered by tests
case <-w.signal:
}
}
Expand All @@ -737,8 +753,8 @@

// wait to be signaled that there is a new event
select {
case <-ctx.Done():
return nil, fmt.Errorf("test deadline exceeded while waiting for routing removed event")

Check warning on line 757 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L756-L757

Added lines #L756 - L757 were not covered by tests
case <-w.signal:
}
}
Expand Down
8 changes: 4 additions & 4 deletions v2/internal/coord/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,11 @@ func TestExhaustiveQuery(t *testing.T) {
// A (ids[0]) is looking for D (ids[3])
// A will first ask B, B will reply with C's address (and A's address)
// A will then ask C, C will reply with D's address (and B's address)
self := kadt.PeerID(nodes[0].NodeID)
self := nodes[0].NodeID
c, err := NewCoordinator(self, nodes[0].Router, nodes[0].RoutingTable, ccfg)
require.NoError(t, err)

target := kadt.PeerID(nodes[3].NodeID).Key()
target := nodes[3].NodeID.Key()

visited := make(map[string]int)

Expand Down Expand Up @@ -137,7 +137,7 @@ func TestRoutingUpdatedEventEmittedForCloserNodes(t *testing.T) {
// A (ids[0]) is looking for D (ids[3])
// A will first ask B, B will reply with C's address (and A's address)
// A will then ask C, C will reply with D's address (and B's address)
self := kadt.PeerID(nodes[0].NodeID)
self := nodes[0].NodeID
c, err := NewCoordinator(self, nodes[0].Router, nodes[0].RoutingTable, ccfg)
if err != nil {
log.Fatalf("unexpected error creating coordinator: %v", err)
Expand Down Expand Up @@ -194,7 +194,7 @@ func TestBootstrap(t *testing.T) {

ccfg.Clock = clk

self := kadt.PeerID(nodes[0].NodeID)
self := nodes[0].NodeID
d, err := NewCoordinator(self, nodes[0].Router, nodes[0].RoutingTable, ccfg)
require.NoError(t, err)

Expand Down
58 changes: 58 additions & 0 deletions v2/internal/coord/cplutil/cpl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package cplutil

import (
"crypto/rand"
"encoding/binary"
"fmt"

mh "github.com/multiformats/go-multihash"

"github.com/libp2p/go-libp2p-kad-dht/v2/kadt"
)

//go:generate go run ./gen.go

// GenRandPeerID generates a random [kadt.PeerID] whose key has a common prefix length of exactly cpl with the supplied key.
// Ported from go-libp2p-kbucket
func GenRandPeerID(k kadt.Key, cpl int) (kadt.PeerID, error) {
if cpl > 15 {
return "", fmt.Errorf("cannot generate peer ID for Cpl greater than 15")
}

Check warning on line 20 in v2/internal/coord/cplutil/cpl.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/cplutil/cpl.go#L19-L20

Added lines #L19 - L20 were not covered by tests

targetPrefix := prefix(k, cpl)

// Convert to a known peer ID.
key := keyPrefixMap[targetPrefix]
id := [32 + 2]byte{mh.SHA2_256, 32}
binary.BigEndian.PutUint32(id[2:], key)
return kadt.PeerID(string(id[:])), nil
}

type keybit interface {
Bit(i int) uint
}

// prefix generates random bits that have a common prefix length of exactly cpl with the supplied key.
func prefix(k keybit, cpl int) uint16 {
var p uint16
// copy the first cpl+1 bits so we can flip the last one
for i := 0; i < cpl+1; i++ {
bit := uint16(k.Bit(i)) << (15 - i)
p |= bit
}

// flip the bit at cpl (cpl 5 means bits 0-4 must be the same)
mask := uint16(1) << (15 - cpl)
p ^= mask

if cpl < 15 {
// pad with random data
var buf [2]byte
_, _ = rand.Read(buf[:])
r := binary.BigEndian.Uint16(buf[:])

mask = (^uint16(0)) << (15 - cpl)
p = (p & mask) | (r & ^mask)
}
return p
}
Loading
Loading