Skip to content

Commit

Permalink
Add include state machine
Browse files Browse the repository at this point in the history
  • Loading branch information
iand committed Aug 15, 2023
1 parent 52edf4e commit 08c3fe5
Show file tree
Hide file tree
Showing 5 changed files with 592 additions and 2 deletions.
2 changes: 1 addition & 1 deletion coord/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (q *eventQueue[E]) Enqueue(ctx context.Context, e E) {
}

// Dequeue reads an event from the queue. It returns the event and a true value
// if an event was read or the zero value if the event type and false if no event
// if an event was read or the zero value of the event type and false if no event
// was read. This method is non-blocking.
func (q *eventQueue[E]) Dequeue(ctx context.Context) (E, bool) {
select {
Expand Down
2 changes: 2 additions & 0 deletions internal/kadtest/ids.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ type ID[K kad.Key[K]] struct {
key K
}

type ID8 = ID[key.Key8]

// interface assertion. Using the concrete key type of key.Key8 does not
// limit the validity of the assertion for other key types.
var _ kad.NodeID[key.Key8] = (*ID[key.Key8])(nil)
Expand Down
2 changes: 1 addition & 1 deletion query/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestPoolStopWhenNoQueries(t *testing.T) {
p, err := NewPool[key.Key8, kadtest.StrAddr](self, cfg)
require.NoError(t, err)

state := p.Advance(ctx, &EventPoolPoll{})
state := p.Advance(ctx, &EventPoolStopQuery{})
require.IsType(t, &StatePoolIdle{}, state)
}

Expand Down
301 changes: 301 additions & 0 deletions routing/include.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,301 @@
package routing

import (
"context"
"fmt"
"time"

"github.com/benbjohnson/clock"

"github.com/plprobelab/go-kademlia/kad"
"github.com/plprobelab/go-kademlia/kaderr"
"github.com/plprobelab/go-kademlia/key"
"github.com/plprobelab/go-kademlia/network/address"
"github.com/plprobelab/go-kademlia/util"
)

// FindNodeRequestFunc is a function that creates a request to find the supplied node id
type FindNodeRequestFunc[K kad.Key[K], A kad.Address[A]] func(kad.NodeID[K]) (address.ProtocolID, kad.Request[K, A])

type check[K kad.Key[K], N kad.NodeID[K]] struct {
NodeID N
Started time.Time
}

type Include[K kad.Key[K], N kad.NodeID[K], A kad.Address[A]] struct {
rt kad.RoutingTable[K, N]

// checks is an index of checks in progress
checks map[string]check[K, N]

candidates *nodeQueue[K, N]

// findNodeFn is the function used to generate a find node request
findNodeFn FindNodeRequestFunc[K, A]

// cfg is a copy of the optional configuration supplied to the Include
cfg IncludeConfig[K, A]
}

// IncludeConfig specifies optional configuration for an Include
type IncludeConfig[K kad.Key[K], A kad.Address[A]] struct {
QueueCapacity int // the maximum number of nodes that can be in the candidate queue
Concurrency int // the maximum number of include checks that may be in progress at any one time
Timeout time.Duration // the time to wait before terminating a check that is not making progress
Clock clock.Clock // a clock that may replaced by a mock when testing
}

// Validate checks the configuration options and returns an error if any have invalid values.
func (cfg *IncludeConfig[K, A]) Validate() error {
if cfg.Clock == nil {
return &kaderr.ConfigurationError{
Component: "IncludeConfig",
Err: fmt.Errorf("clock must not be nil"),
}
}

if cfg.Concurrency < 1 {
return &kaderr.ConfigurationError{
Component: "IncludeConfig",
Err: fmt.Errorf("concurrency must be greater than zero"),
}
}

if cfg.Timeout < 1 {
return &kaderr.ConfigurationError{
Component: "IncludeConfig",
Err: fmt.Errorf("timeout must be greater than zero"),
}
}

if cfg.QueueCapacity < 1 {
return &kaderr.ConfigurationError{
Component: "IncludeConfig",
Err: fmt.Errorf("queue size must be greater than zero"),
}
}

return nil
}

// DefaultIncludeConfig returns the default configuration options for an Include.
// Options may be overridden before passing to NewInclude
func DefaultIncludeConfig[K kad.Key[K], A kad.Address[A]]() *IncludeConfig[K, A] {
return &IncludeConfig[K, A]{
Clock: clock.New(), // use standard time
Concurrency: 3,
Timeout: time.Minute,
QueueCapacity: 128,
}
}

func NewInclude[K kad.Key[K], N kad.NodeID[K], A kad.Address[A]](rt kad.RoutingTable[K, N], findNodeFn FindNodeRequestFunc[K, A], cfg *IncludeConfig[K, A]) (*Include[K, N, A], error) {
if findNodeFn == nil {
return nil, fmt.Errorf("find node function must not be nil")
}

if cfg == nil {
cfg = DefaultIncludeConfig[K, A]()
} else if err := cfg.Validate(); err != nil {
return nil, err
}

return &Include[K, N, A]{
candidates: newNodeQueue[K, N](cfg.QueueCapacity),
cfg: *cfg,
rt: rt,
findNodeFn: findNodeFn,
checks: make(map[string]check[K, N], cfg.Concurrency),
}, nil
}

// Advance advances the state of the include state machine by attempting to advance its query if running.
func (b *Include[K, N, A]) Advance(ctx context.Context, ev IncludeEvent) IncludeState {
ctx, span := util.StartSpan(ctx, "Include.Advance")
defer span.End()

switch tev := ev.(type) {

case *EventAddCandidate[K, N]:
// TODO: potentially time out a check and make room in the queue
if !b.candidates.HasCapacity() {
return &StateIncludeWaitingFull{}
}
b.candidates.Enqueue(ctx, tev.NodeID)

case *EventIncludeMessageResponse[K, N, A]:
delete(b.checks, key.HexString(tev.NodeID.Key()))
if b.rt.AddNode(tev.NodeID) {
return &StateIncludeRoutingUpdated[K, N]{
NodeID: tev.NodeID,
}
}

case *EventIncludeMessageFailure[K, N]:
delete(b.checks, key.HexString(tev.NodeID.Key()))

case *EventIncludePoll:
// ignore, nothing to do
default:
panic(fmt.Sprintf("unexpected event: %T", tev))
}

if len(b.checks) == b.cfg.Concurrency {
if !b.candidates.HasCapacity() {
return &StateIncludeWaitingFull{}
}
return &StateIncludeWaitingAtCapacity{}
}

candidate, ok := b.candidates.Dequeue(ctx)
if !ok {
// No candidate in queue
if len(b.checks) > 0 {
return &StateIncludeWaitingWithCapacity{}
}
return &StateIncludeIdle{}
}

b.checks[key.HexString(candidate.Key())] = check[K, N]{
NodeID: candidate,
Started: b.cfg.Clock.Now(),
}

// Ask the node to find itself
protoID, msg := b.findNodeFn(candidate)

return &StateIncludeMessage[K, A]{
NodeID: candidate,
ProtocolID: protoID,
Message: msg,
}
}

// nodeQueue is a bounded queue of unique NodeIDs
type nodeQueue[K kad.Key[K], N kad.NodeID[K]] struct {
capacity int
nodes []N
keys map[string]struct{}
}

func newNodeQueue[K kad.Key[K], N kad.NodeID[K]](capacity int) *nodeQueue[K, N] {
return &nodeQueue[K, N]{
capacity: capacity,
nodes: make([]N, 0, capacity),
keys: make(map[string]struct{}, capacity),
}
}

// Enqueue adds a node to the queue. It returns true if the node was
// added and false otherwise.
func (q *nodeQueue[K, N]) Enqueue(ctx context.Context, n N) bool {
if len(q.nodes) == q.capacity {
return false
}

if _, exists := q.keys[key.HexString(n.Key())]; exists {
return false
}

q.nodes = append(q.nodes, n)
q.keys[key.HexString(n.Key())] = struct{}{}
return true
}

// Dequeue reads an node from the queue. It returns the node and a true value
// if a node was read or nil and false if no node was read.
func (q *nodeQueue[K, N]) Dequeue(ctx context.Context) (N, bool) {
if len(q.nodes) == 0 {
var v N
return v, false
}

var n N
n, q.nodes = q.nodes[0], q.nodes[1:]
delete(q.keys, key.HexString(n.Key()))

return n, true
}

func (q *nodeQueue[K, N]) HasCapacity() bool {
return len(q.nodes) < q.capacity
}

// IncludeState is the state of a include.
type IncludeState interface {
includeState()
}

// StateIncludeMessage indicates that the include subsystem is waiting to message a node.
type StateIncludeMessage[K kad.Key[K], A kad.Address[A]] struct {
NodeID kad.NodeID[K]
ProtocolID address.ProtocolID
Message kad.Request[K, A]
}

// StateIncludeIdle indicates that the include is not running its query.
type StateIncludeIdle struct{}

// StateIncludeWaitingAtCapacity indicates that the include subsystem is waiting for responses for checks and
// that the maximum number of concurrent checks has been reached.
type StateIncludeWaitingAtCapacity struct{}

// StateIncludeWaitingWithCapacity indicates that the include subsystem is waiting for responses for checks
// but has capacity to perform more.
type StateIncludeWaitingWithCapacity struct{}

// StateIncludeWaitingFull indicates that the include subsystem is waiting for responses for checks and
// that the maximum number of queued candidates has been reached.
type StateIncludeWaitingFull struct{}

// StateIncludeRoutingUpdated indicates the routing table has been updated with a new node.
type StateIncludeRoutingUpdated[K kad.Key[K], N kad.NodeID[K]] struct {
NodeID N
}

// includeState() ensures that only Include states can be assigned to an IncludeState.
func (*StateIncludeMessage[K, A]) includeState() {}
func (*StateIncludeIdle) includeState() {}
func (*StateIncludeWaitingAtCapacity) includeState() {}
func (*StateIncludeWaitingWithCapacity) includeState() {}
func (*StateIncludeWaitingFull) includeState() {}
func (*StateIncludeRoutingUpdated[K, N]) includeState() {}

// IncludeEvent is an event intended to advance the state of a include.
type IncludeEvent interface {
includeEvent()
}

// EventIncludePoll is an event that signals the include that it can perform housekeeping work such as time out queries.
type EventIncludePoll struct{}

// EventAddCandidate notifies that a node should be added to the candidate list
type EventAddCandidate[K kad.Key[K], N kad.NodeID[K]] struct {
NodeID N // the candidate node
}

// EventIncludeStart is an event that attempts to start a new include
type EventIncludeStart[K kad.Key[K], N kad.NodeID[K], A kad.Address[A]] struct {
ProtocolID address.ProtocolID
Message kad.Request[K, A]
KnownClosestNodes []N
}

// EventIncludeMessageResponse notifies a include that a sent message has received a successful response.
type EventIncludeMessageResponse[K kad.Key[K], N kad.NodeID[K], A kad.Address[A]] struct {
NodeID N // the node the message was sent to
Response kad.Response[K, A] // the message response sent by the node
}

// EventIncludeMessageFailure notifiesa include that an attempt to send a message has failed.
type EventIncludeMessageFailure[K kad.Key[K], N kad.NodeID[K]] struct {
NodeID N // the node the message was sent to
Error error // the error that caused the failure, if any
}

// includeEvent() ensures that only Include events can be assigned to the IncludeEvent interface.
func (*EventIncludePoll) includeEvent() {}
func (*EventAddCandidate[K, N]) includeEvent() {}
func (*EventIncludeStart[K, N, A]) includeEvent() {}
func (*EventIncludeMessageResponse[K, N, A]) includeEvent() {}
func (*EventIncludeMessageFailure[K, N]) includeEvent() {}
Loading

0 comments on commit 08c3fe5

Please sign in to comment.