Skip to content

Commit

Permalink
chore: remove coord.Node type
Browse files Browse the repository at this point in the history
  • Loading branch information
iand committed Oct 9, 2023
1 parent 997ed98 commit 9d75baa
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 193 deletions.
9 changes: 3 additions & 6 deletions dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/plprobelab/zikade/internal/coord/coordt"
"github.com/plprobelab/zikade/internal/kadtest"
"github.com/plprobelab/zikade/kadt"
)
Expand Down Expand Up @@ -79,8 +78,7 @@ func TestAddAddresses(t *testing.T) {
remote := top.AddServer(nil)

// local routing table should not contain the node
_, err := local.kad.GetNode(ctx, kadt.PeerID(remote.host.ID()))
require.ErrorIs(t, err, coordt.ErrNodeNotFound)
require.False(t, local.kad.IsRoutable(ctx, kadt.PeerID(remote.host.ID())))

remoteAddrInfo := peer.AddrInfo{
ID: remote.host.ID(),
Expand All @@ -90,16 +88,15 @@ func TestAddAddresses(t *testing.T) {
require.NotEmpty(t, remoteAddrInfo.Addrs)

// Add remote's addresss to the local dht
err = local.AddAddresses(ctx, []peer.AddrInfo{remoteAddrInfo}, time.Minute)
err := local.AddAddresses(ctx, []peer.AddrInfo{remoteAddrInfo}, time.Minute)
require.NoError(t, err)

// the include state machine runs in the background and eventually should add the node to routing table
_, err = top.ExpectRoutingUpdated(ctx, local, remote.host.ID())
require.NoError(t, err)

// the routing table should now contain the node
_, err = local.kad.GetNode(ctx, kadt.PeerID(remote.host.ID()))
require.NoError(t, err)
require.True(t, local.kad.IsRoutable(ctx, kadt.PeerID(remote.host.ID())))
}

func TestDHT_Close_idempotent(t *testing.T) {
Expand Down
63 changes: 10 additions & 53 deletions internal/coord/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,48 +278,20 @@ func (c *Coordinator) SetRoutingNotifier(rn RoutingNotifier) {
c.routingNotifierMu.Unlock()
}

// GetNode retrieves the node associated with the given node id from the DHT's local routing table.
// If the node isn't found in the table, it returns ErrNodeNotFound.
func (c *Coordinator) GetNode(ctx context.Context, id kadt.PeerID) (coordt.Node, error) {
ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.GetNode")
// IsRoutable reports whether the supplied node is present in the local routing table.
func (c *Coordinator) IsRoutable(ctx context.Context, id kadt.PeerID) bool {
ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.IsRoutable")

Check failure on line 283 in internal/coord/coordinator.go

View workflow job for this annotation

GitHub Actions / go-check / All

this value of ctx is never used (SA4006)
defer span.End()
if _, exists := c.rt.GetNode(id.Key()); !exists {
return nil, coordt.ErrNodeNotFound
}
_, exists := c.rt.GetNode(id.Key())

nh, err := c.networkBehaviour.getNodeHandler(ctx, id)
if err != nil {
return nil, err
}
return nh, nil
return exists
}

// GetClosestNodes requests the n closest nodes to the key from the node's local routing table.
func (c *Coordinator) GetClosestNodes(ctx context.Context, k kadt.Key, n int) ([]coordt.Node, error) {
func (c *Coordinator) GetClosestNodes(ctx context.Context, k kadt.Key, n int) ([]kadt.PeerID, error) {
ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.GetClosestNodes")

Check failure on line 292 in internal/coord/coordinator.go

View workflow job for this annotation

GitHub Actions / go-check / All

this value of ctx is never used (SA4006)
defer span.End()
closest := c.rt.NearestNodes(k, n)
nodes := make([]coordt.Node, 0, len(closest))
for _, id := range closest {
nh, err := c.networkBehaviour.getNodeHandler(ctx, id)
if err != nil {
return nil, err
}
nodes = append(nodes, nh)
}
return nodes, nil
}

// GetValue requests that the node return any value associated with the supplied key.
// If the node does not have a value for the key it returns ErrValueNotFound.
func (c *Coordinator) GetValue(ctx context.Context, k kadt.Key) (coordt.Value, error) {
panic("not implemented")
}

// PutValue requests that the node stores a value to be associated with the supplied key.
// If the node cannot or chooses not to store the value for the key it returns ErrValueNotAccepted.
func (c *Coordinator) PutValue(ctx context.Context, r coordt.Value, q int) error {
panic("not implemented")
return c.rt.NearestNodes(k, n), nil
}

// QueryClosest starts a query that attempts to find the closest nodes to the target key.
Expand All @@ -341,16 +313,11 @@ func (c *Coordinator) QueryClosest(ctx context.Context, target kadt.Key, fn coor
ctx, cancel := context.WithCancel(ctx)
defer cancel()

seeds, err := c.GetClosestNodes(ctx, target, 20)
seedIDs, err := c.GetClosestNodes(ctx, target, 20)
if err != nil {
return nil, coordt.QueryStats{}, err
}

seedIDs := make([]kadt.PeerID, 0, len(seeds))
for _, s := range seeds {
seedIDs = append(seedIDs, s.ID())
}

waiter := NewWaiter[BehaviourEvent]()
queryID := c.newOperationID()

Expand Down Expand Up @@ -394,16 +361,11 @@ func (c *Coordinator) QueryMessage(ctx context.Context, msg *pb.Message, fn coor
numResults = 20 // TODO: parameterize
}

seeds, err := c.GetClosestNodes(ctx, msg.Target(), numResults)
seedIDs, err := c.GetClosestNodes(ctx, msg.Target(), numResults)
if err != nil {
return nil, coordt.QueryStats{}, err
}

seedIDs := make([]kadt.PeerID, 0, len(seeds))
for _, s := range seeds {
seedIDs = append(seedIDs, s.ID())
}

waiter := NewWaiter[BehaviourEvent]()
queryID := c.newOperationID()

Expand Down Expand Up @@ -434,15 +396,10 @@ func (c *Coordinator) BroadcastRecord(ctx context.Context, msg *pb.Message) erro
ctx, cancel := context.WithCancel(ctx)
defer cancel()

seedNodes, err := c.GetClosestNodes(ctx, msg.Target(), 20) // TODO: parameterize
seeds, err := c.GetClosestNodes(ctx, msg.Target(), 20) // TODO: parameterize
if err != nil {
return err
}

seeds := make([]kadt.PeerID, 0, len(seedNodes))
for _, s := range seedNodes {
seeds = append(seeds, s.ID())
}
return c.broadcast(ctx, msg, seeds, brdcst.DefaultConfigFollowUp())
}

Expand Down
15 changes: 5 additions & 10 deletions internal/coord/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,16 +185,13 @@ func TestBootstrap(t *testing.T) {
require.NoError(t, err)

// coordinator will have node1 in its routing table
_, err = d.GetNode(ctx, nodes[1].NodeID)
require.NoError(t, err)
require.True(t, d.IsRoutable(ctx, nodes[1].NodeID))

// coordinator should now have node2 in its routing table
_, err = d.GetNode(ctx, nodes[2].NodeID)
require.NoError(t, err)
require.True(t, d.IsRoutable(ctx, nodes[2].NodeID))

// coordinator should now have node3 in its routing table
_, err = d.GetNode(ctx, nodes[3].NodeID)
require.NoError(t, err)
require.True(t, d.IsRoutable(ctx, nodes[3].NodeID))
}

func TestIncludeNode(t *testing.T) {
Expand All @@ -219,8 +216,7 @@ func TestIncludeNode(t *testing.T) {
d.SetRoutingNotifier(rn)

// the routing table should not contain the node yet
_, err = d.GetNode(ctx, candidate)
require.ErrorIs(t, err, coordt.ErrNodeNotFound)
require.False(t, d.IsRoutable(ctx, candidate))

// inject a new node
err = d.AddNodes(ctx, []kadt.PeerID{candidate})
Expand All @@ -234,6 +230,5 @@ func TestIncludeNode(t *testing.T) {
require.Equal(t, candidate, tev.NodeID)

// the routing table should now contain the node
_, err = d.GetNode(ctx, candidate)
require.NoError(t, err)
require.True(t, d.IsRoutable(ctx, candidate))
}
19 changes: 0 additions & 19 deletions internal/coord/coordt/coretypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,6 @@ type Value interface {
MarshalBinary() ([]byte, error)
}

// Node represents the local or a remote node participating in the DHT.
type Node interface {
// ID returns the peer ID identifying this node.
ID() kadt.PeerID

// GetClosestNodes requests the n closest nodes to the key from the node's
// local routing table. The node may return fewer nodes than requested.
GetClosestNodes(ctx context.Context, key kadt.Key, n int) ([]Node, error)

// GetValue requests that the node return any value associated with the
// supplied key. If the node does not have a value for the key it returns
// ErrValueNotFound.
GetValue(ctx context.Context, key kadt.Key) (Value, error)

// PutValue requests that the node stores a value to be associated with the supplied key.
// If the node cannot or chooses not to store the value for the key it returns ErrValueNotAccepted.
PutValue(ctx context.Context, r Value, q int) error
}

var (
ErrNodeNotFound = errors.New("node not found")
ErrValueNotFound = errors.New("value not found")
Expand Down
107 changes: 53 additions & 54 deletions internal/coord/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"sync"

"github.com/plprobelab/go-libdht/kad/key"
"go.opentelemetry.io/otel/trace"
"golang.org/x/exp/slog"

Expand Down Expand Up @@ -201,56 +200,56 @@ func (h *NodeHandler) ID() kadt.PeerID {
return h.self
}

// GetClosestNodes requests the n closest nodes to the key from the node's local routing table.
// The node may return fewer nodes than requested.
func (h *NodeHandler) GetClosestNodes(ctx context.Context, k kadt.Key, n int) ([]coordt.Node, error) {
ctx, span := h.tracer.Start(ctx, "NodeHandler.GetClosestNodes")
defer span.End()
w := NewWaiter[BehaviourEvent]()

ev := &EventOutboundGetCloserNodes{
QueryID: coordt.QueryID(key.HexString(k)),
To: h.self,
Target: k,
Notify: w,
}

h.queue.Enqueue(ctx, ev)

select {
case <-ctx.Done():
return nil, ctx.Err()
case we := <-w.Chan():

switch res := we.Event.(type) {
case *EventGetCloserNodesSuccess:
nodes := make([]coordt.Node, 0, len(res.CloserNodes))
for _, info := range res.CloserNodes {
// TODO use a global registry of node handlers
nodes = append(nodes, NewNodeHandler(info, h.rtr, h.logger, h.tracer))
n--
if n == 0 {
break
}
}
return nodes, nil

case *EventGetCloserNodesFailure:
return nil, res.Err
default:
panic(fmt.Sprintf("unexpected node handler event: %T", ev))
}
}
}

// GetValue requests that the node return any value associated with the supplied key.
// If the node does not have a value for the key it returns ErrValueNotFound.
func (h *NodeHandler) GetValue(ctx context.Context, key kadt.Key) (coordt.Value, error) {
panic("not implemented")
}

// PutValue requests that the node stores a value to be associated with the supplied key.
// If the node cannot or chooses not to store the value for the key it returns ErrValueNotAccepted.
func (h *NodeHandler) PutValue(ctx context.Context, r coordt.Value, q int) error {
panic("not implemented")
}
// // GetClosestNodes requests the n closest nodes to the key from the node's local routing table.
// // The node may return fewer nodes than requested.
// func (h *NodeHandler) GetClosestNodes(ctx context.Context, k kadt.Key, n int) ([]coordt.Node, error) {
// ctx, span := h.tracer.Start(ctx, "NodeHandler.GetClosestNodes")
// defer span.End()
// w := NewWaiter[BehaviourEvent]()

// ev := &EventOutboundGetCloserNodes{
// QueryID: coordt.QueryID(key.HexString(k)),
// To: h.self,
// Target: k,
// Notify: w,
// }

// h.queue.Enqueue(ctx, ev)

// select {
// case <-ctx.Done():
// return nil, ctx.Err()
// case we := <-w.Chan():

// switch res := we.Event.(type) {
// case *EventGetCloserNodesSuccess:
// nodes := make([]coordt.Node, 0, len(res.CloserNodes))
// for _, info := range res.CloserNodes {
// // TODO use a global registry of node handlers
// nodes = append(nodes, NewNodeHandler(info, h.rtr, h.logger, h.tracer))
// n--
// if n == 0 {
// break
// }
// }
// return nodes, nil

// case *EventGetCloserNodesFailure:
// return nil, res.Err
// default:
// panic(fmt.Sprintf("unexpected node handler event: %T", ev))
// }
// }
// }

// // GetValue requests that the node return any value associated with the supplied key.
// // If the node does not have a value for the key it returns ErrValueNotFound.
// func (h *NodeHandler) GetValue(ctx context.Context, key kadt.Key) (coordt.Value, error) {
// panic("not implemented")
// }

// // PutValue requests that the node stores a value to be associated with the supplied key.
// // If the node cannot or chooses not to store the value for the key it returns ErrValueNotAccepted.
// func (h *NodeHandler) PutValue(ctx context.Context, r coordt.Value, q int) error {
// panic("not implemented")
// }
32 changes: 0 additions & 32 deletions internal/coord/network_test.go

This file was deleted.

Loading

0 comments on commit 9d75baa

Please sign in to comment.