Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
dennis-tra committed Oct 13, 2023
1 parent 490e947 commit 69ce4f4
Show file tree
Hide file tree
Showing 22 changed files with 722 additions and 345 deletions.
70 changes: 70 additions & 0 deletions fullrt.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/routing"
"github.com/multiformats/go-multiaddr"
mh "github.com/multiformats/go-multihash"
"go.opentelemetry.io/otel/attribute"
otel "go.opentelemetry.io/otel/trace"
"golang.org/x/exp/slog"
Expand Down Expand Up @@ -534,3 +535,72 @@ func (f *FullRT) searchValueRoutine(ctx context.Context, backend Backend, ns str
}
}()
}

func (f *FullRT) ProvideMany(ctx context.Context, mhashes []mh.Multihash) error {
_, span := f.tele.Tracer.Start(ctx, "FullRT.ProvideMany")
defer span.End()

_, found := f.backends[namespaceProviders]
if !found {
return routing.ErrNotSupported
}

// Compute addresses once for all provides
self := peer.AddrInfo{
ID: f.host.ID(),
Addrs: f.host.Addrs(),
}
if len(self.Addrs) < 1 {
return fmt.Errorf("no known addresses for self, cannot put provider")
}

msgFn := func(k kadt.Key) *pb.Message {
return &pb.Message{
Type: pb.Message_ADD_PROVIDER,
Key: k.MsgKey(),
ProviderPeers: []*pb.Message_Peer{
pb.FromAddrInfo(self),
},
}
}

keys := make([]kadt.Key, 0, len(mhashes))
for _, mhash := range mhashes {
keys = append(keys, kadt.NewKey(mhash))
}

// TODO: get seed set of peers
return f.kad.BroadcastMany(ctx, keys, nil, msgFn)
}

//func (f *FullRT) PutMany(ctx context.Context, keys []string, values [][]byte) error {
// _, span := f.tele.Tracer.Start(ctx, "FullRT.PutMany")
// defer span.End()
//
//
// if !dht.enableValues {
// return routing.ErrNotSupported
// }
//
// if len(keys) != len(values) {
// return fmt.Errorf("number of keys does not match the number of values")
// }
//
// keysAsPeerIDs := make([]peer.ID, 0, len(keys))
// keyRecMap := make(map[string][]byte)
// for i, k := range keys {
// keysAsPeerIDs = append(keysAsPeerIDs, peer.ID(k))
// keyRecMap[k] = values[i]
// }
//
// if len(keys) != len(keyRecMap) {
// return fmt.Errorf("does not support duplicate keys")
// }
//
// fn := func(ctx context.Context, p, k peer.ID) error {
// keyStr := string(k)
// return dht.protoMessenger.PutValue(ctx, p, record.MakePutRecord(keyStr, keyRecMap[keyStr]))
// }
//
// return dht.bulkMessageSend(ctx, keysAsPeerIDs, fn, false)
//}
6 changes: 4 additions & 2 deletions internal/coord/brdcst.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,7 @@ func (b *PooledBroadcastBehaviour) perfomNextInbound(ctx context.Context) (Behav
case *EventStartBroadcast:
cmd = &brdcst.EventPoolStartBroadcast[kadt.Key, kadt.PeerID, *pb.Message]{
QueryID: ev.QueryID,
Target: ev.Target,
Message: ev.Message,
MsgFunc: ev.MsgFunc,
Seed: ev.Seed,
Config: ev.Config,
}
Expand Down Expand Up @@ -227,6 +226,7 @@ func (b *PooledBroadcastBehaviour) perfomNextInbound(ctx context.Context) (Behav
cmd = &brdcst.EventPoolStoreRecordSuccess[kadt.Key, kadt.PeerID, *pb.Message]{
QueryID: ev.QueryID,
NodeID: ev.To,
Target: ev.Target,
Request: ev.Request,
Response: ev.Response,
}
Expand All @@ -242,6 +242,7 @@ func (b *PooledBroadcastBehaviour) perfomNextInbound(ctx context.Context) (Behav
NodeID: ev.To,
QueryID: ev.QueryID,
Request: ev.Request,
Target: ev.Target,
Error: ev.Err,
}

Expand Down Expand Up @@ -277,6 +278,7 @@ func (b *PooledBroadcastBehaviour) advancePool(ctx context.Context, ev brdcst.Po
return &EventOutboundSendMessage{
QueryID: st.QueryID,
To: st.NodeID,
Target: st.Target,
Message: st.Message,
Notify: b,
}, true
Expand Down
15 changes: 5 additions & 10 deletions internal/coord/brdcst/brdcst.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ type StateBroadcastFindCloser[K kad.Key[K], N kad.NodeID[K]] struct {
type StateBroadcastStoreRecord[K kad.Key[K], N kad.NodeID[K], M coordt.Message] struct {
QueryID coordt.QueryID // the id of the broadcast operation that wants to send the message
NodeID N // the node to send the message to
Message M // the message the broadcast behaviour wants to send
Target K
Message M // the message the broadcast behaviour wants to send
}

// StateBroadcastWaiting indicates that a [Broadcast] state machine is waiting
Expand Down Expand Up @@ -84,13 +85,6 @@ type BroadcastEvent interface {
// it can perform housekeeping work such as time out queries.
type EventBroadcastPoll struct{}

// EventBroadcastStart is an event that instructs a broadcast state machine to
// start the operation.
type EventBroadcastStart[K kad.Key[K], N kad.NodeID[K]] struct {
Target K // the key we want to store the record for
Seed []N // the closest nodes we know so far and from where we start the operation
}

// EventBroadcastStop notifies a [Broadcast] state machine to stop the
// operation. This comprises all in-flight queries.
type EventBroadcastStop struct{}
Expand Down Expand Up @@ -119,6 +113,7 @@ type EventBroadcastNodeFailure[K kad.Key[K], N kad.NodeID[K]] struct {
// receive a response.
type EventBroadcastStoreRecordSuccess[K kad.Key[K], N kad.NodeID[K], M coordt.Message] struct {
NodeID N // the node the message was sent to
Target K
Request M // the message that was sent to the remote node
Response M // the reply we got from the remote node (nil in many cases of the Amino DHT)
}
Expand All @@ -127,7 +122,8 @@ type EventBroadcastStoreRecordSuccess[K kad.Key[K], N kad.NodeID[K], M coordt.Me
// machine that storing a record with a remote node (NodeID) has failed. The
// message that was sent is held in Request, and the error will be in Error.
type EventBroadcastStoreRecordFailure[K kad.Key[K], N kad.NodeID[K], M coordt.Message] struct {
NodeID N // the node the message was sent to
NodeID N // the node the message was sent to
Target K
Request M // the message that was sent to the remote node
Error error // the error that caused the failure, if any
}
Expand All @@ -136,7 +132,6 @@ type EventBroadcastStoreRecordFailure[K kad.Key[K], N kad.NodeID[K], M coordt.Me
// machine can be assigned to the [BroadcastEvent] interface.
func (*EventBroadcastStop) broadcastEvent() {}
func (*EventBroadcastPoll) broadcastEvent() {}
func (*EventBroadcastStart[K, N]) broadcastEvent() {}
func (*EventBroadcastNodeResponse[K, N]) broadcastEvent() {}
func (*EventBroadcastNodeFailure[K, N]) broadcastEvent() {}
func (*EventBroadcastStoreRecordSuccess[K, N, M]) broadcastEvent() {}
Expand Down
1 change: 0 additions & 1 deletion internal/coord/brdcst/brdcst_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ func TestBroadcastEvent_interface_conformance(t *testing.T) {
events := []BroadcastEvent{
&EventBroadcastStop{},
&EventBroadcastPoll{},
&EventBroadcastStart[tiny.Key, tiny.Node]{},
&EventBroadcastNodeResponse[tiny.Key, tiny.Node]{},
&EventBroadcastNodeFailure[tiny.Key, tiny.Node]{},
&EventBroadcastStoreRecordSuccess[tiny.Key, tiny.Node, tiny.Message]{},
Expand Down
65 changes: 43 additions & 22 deletions internal/coord/brdcst/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package brdcst
import (
"fmt"

"github.com/plprobelab/go-libdht/kad"

"github.com/plprobelab/zikade/internal/coord/query"
)

Expand Down Expand Up @@ -31,61 +33,80 @@ func DefaultConfigPool() *ConfigPool {

// Config is an interface that all broadcast configurations must implement.
// Because we have multiple ways of broadcasting records to the network, like
// [FollowUp] or [Static], the [EventPoolStartBroadcast] has a configuration
// [FollowUp] or [OneToMany], the [EventPoolStartBroadcast] has a configuration
// field that depending on the concrete type of [Config] initializes the
// respective state machine. Then the broadcast operation will be performed
// based on the encoded rules in that state machine.
type Config interface {
broadcastConfig()
}

func (c *ConfigFollowUp) broadcastConfig() {}
func (c *ConfigOptimistic) broadcastConfig() {}
func (c *ConfigStatic) broadcastConfig() {}
func (c *ConfigFollowUp[K]) broadcastConfig() {}
func (c *ConfigOneToMany[K]) broadcastConfig() {}
func (c *ConfigManyToMany[K]) broadcastConfig() {}

// ConfigFollowUp specifies the configuration for the [FollowUp] state machine.
type ConfigFollowUp struct{}
type ConfigFollowUp[K kad.Key[K]] struct {
Target K
}

// Validate checks the configuration options and returns an error if any have
// invalid values.
func (c *ConfigFollowUp) Validate() error {
func (c *ConfigFollowUp[K]) Validate() error {
return nil
}

// DefaultConfigFollowUp returns the default configuration options for the
// [FollowUp] state machine.
func DefaultConfigFollowUp() *ConfigFollowUp {
return &ConfigFollowUp{}
func DefaultConfigFollowUp[K kad.Key[K]](target K) *ConfigFollowUp[K] {
return &ConfigFollowUp[K]{
Target: target,
}
}

// ConfigOptimistic specifies the configuration for the [Optimistic] state
// ConfigOneToMany specifies the configuration for the [OneToMany] state
// machine.
type ConfigOptimistic struct{}
type ConfigOneToMany[K kad.Key[K]] struct {
Target K
}

// Validate checks the configuration options and returns an error if any have
// invalid values.
func (c *ConfigOptimistic) Validate() error {
func (c *ConfigOneToMany[K]) Validate() error {
return nil
}

// DefaultConfigOptimistic returns the default configuration options for the
// [Optimistic] state machine.
func DefaultConfigOptimistic() *ConfigOptimistic {
return &ConfigOptimistic{}
// DefaultConfigOneToMany returns the default configuration options for the
// [OneToMany] state machine.
func DefaultConfigOneToMany[K kad.Key[K]](target K) *ConfigOneToMany[K] {
return &ConfigOneToMany[K]{
Target: target,
}
}

// ConfigStatic specifies the configuration for the [Static] state
// ConfigManyToMany specifies the configuration for the [ManyToMany] state
// machine.
type ConfigStatic struct{}
type ConfigManyToMany[K kad.Key[K]] struct {
NodeConcurrency int
StreamConcurrency int
Targets []K
}

// Validate checks the configuration options and returns an error if any have
// invalid values.
func (c *ConfigStatic) Validate() error {
func (c *ConfigManyToMany[K]) Validate() error {
if len(c.Targets) == 0 {
return fmt.Errorf("targets must not be empty")
}
return nil
}

// DefaultConfigStatic returns the default configuration options for the
// [Static] state machine.
func DefaultConfigStatic() *ConfigStatic {
return &ConfigStatic{}
// DefaultConfigManyToMany returns the default configuration options for the
// [ManyToMany] state machine.
func DefaultConfigManyToMany[K kad.Key[K]](targets []K) *ConfigManyToMany[K] {
return &ConfigManyToMany[K]{
NodeConcurrency: 100, // MAGIC
StreamConcurrency: 10, // MAGIC
Targets: targets,
}
}
17 changes: 6 additions & 11 deletions internal/coord/brdcst/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package brdcst
import (
"testing"

"github.com/plprobelab/zikade/internal/tiny"

"github.com/stretchr/testify/assert"
)

Expand All @@ -21,23 +23,16 @@ func TestConfigPool_Validate(t *testing.T) {

func TestConfigFollowUp_Validate(t *testing.T) {
t.Run("default is valid", func(t *testing.T) {
cfg := DefaultConfigFollowUp()
assert.NoError(t, cfg.Validate())
})
}

func TestConfigOptimistic_Validate(t *testing.T) {
t.Run("default is valid", func(t *testing.T) {
cfg := DefaultConfigOptimistic()
cfg := DefaultConfigFollowUp[tiny.Key](tiny.Key(0))
assert.NoError(t, cfg.Validate())
})
}

func TestConfig_interface_conformance(t *testing.T) {
configs := []Config{
&ConfigFollowUp{},
&ConfigOptimistic{},
&ConfigStatic{},
&ConfigFollowUp[tiny.Key]{},
&ConfigOneToMany[tiny.Key]{},
&ConfigManyToMany[tiny.Key]{},
}
for _, c := range configs {
c.broadcastConfig() // drives test coverage
Expand Down
Loading

0 comments on commit 69ce4f4

Please sign in to comment.