Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: findProvidersAsync #938

Merged
merged 3 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion v2/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ type Backend interface {
Store(ctx context.Context, key string, value any) (any, error)

// Fetch returns the record for the given path or a [ds.ErrNotFound] if it
// wasn't found or another error if any occurred.
// wasn't found or another error if any occurred. key won't contain the
// namespace prefix.
Fetch(ctx context.Context, key string) (any, error)
}

Expand Down
27 changes: 27 additions & 0 deletions v2/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,33 @@ func TestConfig_Validate(t *testing.T) {
assert.Error(t, cfg.Validate())
})

t.Run("backends for ipfs protocol (public key missing)", func(t *testing.T) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible to support only one or two backends? Suppose we don't want to support IPNS records - do we still need to declare it in the config?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's possible but only if the configured protocol is not /ipfs/kad/1.0.0. This was recommended by Adin (can't find the link to the conversation).

I'm trying to enforce that if someone configures the DHT with the /ipfs/kad/1.0.0 protocol, all the required backends are there. If someone wants to, they can still misconfigure it - but this makes it harder.

cfg := DefaultConfig()
cfg.ProtocolID = ProtocolIPFS
cfg.Backends[namespaceProviders] = &RecordBackend{}
cfg.Backends[namespaceIPNS] = &RecordBackend{}
cfg.Backends["another"] = &RecordBackend{}
assert.Error(t, cfg.Validate())
})

t.Run("backends for ipfs protocol (ipns missing)", func(t *testing.T) {
cfg := DefaultConfig()
cfg.ProtocolID = ProtocolIPFS
cfg.Backends[namespaceProviders] = &RecordBackend{}
cfg.Backends["another"] = &RecordBackend{}
cfg.Backends[namespacePublicKey] = &RecordBackend{}
assert.Error(t, cfg.Validate())
})

t.Run("backends for ipfs protocol (providers missing)", func(t *testing.T) {
cfg := DefaultConfig()
cfg.ProtocolID = ProtocolIPFS
cfg.Backends["another"] = &RecordBackend{}
cfg.Backends[namespaceIPNS] = &RecordBackend{}
cfg.Backends[namespacePublicKey] = &RecordBackend{}
assert.Error(t, cfg.Validate())
})

t.Run("nil address filter", func(t *testing.T) {
cfg := DefaultConfig()
cfg.AddressFilter = nil
Expand Down
1 change: 1 addition & 0 deletions v2/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ var rng = rand.New(rand.NewSource(1337))

func newTestDHT(t testing.TB) *DHT {
cfg := DefaultConfig()
cfg.Logger = devnull

return newTestDHTWithConfig(t, cfg)
}
Expand Down
8 changes: 3 additions & 5 deletions v2/internal/coord/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ func (c *Coordinator) QueryMessage(ctx context.Context, msg *pb.Message, fn coor
defer cancel()

if numResults < 1 {
numResults = 20
numResults = 20 // TODO: parameterize
}

seeds, err := c.GetClosestNodes(ctx, msg.Target(), numResults)
Expand Down Expand Up @@ -424,7 +424,7 @@ func (c *Coordinator) BroadcastRecord(ctx context.Context, msg *pb.Message) erro
ctx, cancel := context.WithCancel(ctx)
defer cancel()

seeds, err := c.GetClosestNodes(ctx, msg.Target(), 20)
seeds, err := c.GetClosestNodes(ctx, msg.Target(), 20) // TODO: parameterize
if err != nil {
return err
}
Expand All @@ -449,9 +449,7 @@ func (c *Coordinator) BroadcastRecord(ctx context.Context, msg *pb.Message) erro
// queue the start of the query
c.brdcstBehaviour.Notify(ctx, cmd)

contacted, errs, err := c.waitForBroadcast(ctx, waiter)
fmt.Println(contacted)
fmt.Println(errs)
_, _, err = c.waitForBroadcast(ctx, waiter)

return err
}
Expand Down
2 changes: 1 addition & 1 deletion v2/internal/coord/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ type Query[K kad.Key[K], N kad.NodeID[K], M coordt.Message] struct {
findCloser bool
stats QueryStats

// finished indicates that that the query has completed its work or has been stopped.
// finished indicates that the query has completed its work or has been stopped.
finished bool

// targetNodes is the set of responsive nodes thought to be closest to the target.
Expand Down
99 changes: 90 additions & 9 deletions v2/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/libp2p/go-libp2p/core/routing"
"go.opentelemetry.io/otel/attribute"
otel "go.opentelemetry.io/otel/trace"
"golang.org/x/exp/slog"

"github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/coordt"
"github.com/libp2p/go-libp2p-kad-dht/v2/kadt"
Expand Down Expand Up @@ -110,20 +111,100 @@ func (d *DHT) Provide(ctx context.Context, c cid.Cid, brdcst bool) error {
}

func (d *DHT) FindProvidersAsync(ctx context.Context, c cid.Cid, count int) <-chan peer.AddrInfo {
_, span := d.tele.Tracer.Start(ctx, "DHT.FindProvidersAsync", otel.WithAttributes(attribute.String("cid", c.String()), attribute.Int("count", count)))
peerOut := make(chan peer.AddrInfo)
go d.findProvidersAsyncRoutine(ctx, c, count, peerOut)
return peerOut
}

func (d *DHT) findProvidersAsyncRoutine(ctx context.Context, c cid.Cid, count int, out chan peer.AddrInfo) {
_, span := d.tele.Tracer.Start(ctx, "DHT.findProvidersAsyncRoutine", otel.WithAttributes(attribute.String("cid", c.String()), attribute.Int("count", count)))
defer span.End()

// verify if this DHT supports provider records by checking if a "providers"
// backend is registered.
_, found := d.backends[namespaceProviders]
defer close(out)

// verify if this DHT supports provider records by checking
// if a "providers" backend is registered.
b, found := d.backends[namespaceProviders]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we search for providers without having a backend to store them? How would that work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, when we search for providers, we won't store them anywhere. This is just checking if we support providers. Other users could configure custom backends for other types of records but the DHT will still support the routing.Routing interface. In that case we won't have the right backend configured here.

if !found || !c.Defined() {
peerOut := make(chan peer.AddrInfo)
close(peerOut)
return peerOut
span.RecordError(fmt.Errorf("no providers backend registered or CID undefined"))
return
}

// TODO reach out to Zikade
panic("implement me")
// first fetch the record locally
stored, err := b.Fetch(ctx, string(c.Hash()))
if err != nil {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this check for not found errors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be necessary if this was the datastore Get method. This is the backend Fetch method. The backend returns a *providerSet. If there are no providers, the set will be empty. IIRC internally the backend will query the datastore (not Get). This means even the datastore won't return a NotFound error.

span.RecordError(err)
d.log.Warn("Fetching value from provider store", slog.String("cid", c.String()), slog.String("err", err.Error()))
return
}

ps, ok := stored.(*providerSet)
if !ok {
span.RecordError(err)
d.log.Warn("Stored value is not a provider set", slog.String("cid", c.String()), slog.String("type", fmt.Sprintf("%T", stored)))
return
}

// send all providers onto the out channel until the desired count
// was reached. If no count was specified, continue with network lookup.
providers := map[peer.ID]struct{}{}
for _, provider := range ps.providers {
providers[provider.ID] = struct{}{}

select {
case <-ctx.Done():
return
case out <- provider:
}

if count != 0 && len(providers) == count {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

count != 0 isn't strictly necessary since len(providers) can never be zero here

return
}
}

// Craft message to send to other peers
msg := &pb.Message{
Type: pb.Message_GET_PROVIDERS,
Key: c.Hash(),
}

// handle node response
fn := func(ctx context.Context, id kadt.PeerID, resp *pb.Message, stats coordt.QueryStats) error {
// loop through all providers that the remote peer returned
for _, provider := range resp.ProviderAddrInfos() {

// if we had already sent that peer on the channel -> do nothing
if _, found := providers[provider.ID]; found {
continue
}

// keep track that we will have sent this peer on the channel
providers[provider.ID] = struct{}{}

// actually send the provider information to the user
select {
case <-ctx.Done():
return coordt.ErrSkipRemaining
case out <- provider:
}

// if count is 0, we will wait until the query has exhausted the keyspace
// if count isn't 0, we will stop if the number of providers we have sent
// equals the number that the user has requested.
if count != 0 && len(providers) == count {
return coordt.ErrSkipRemaining
}
}

return nil
}

_, err = d.kad.QueryMessage(ctx, msg, fn, 20) // TODO: parameterize
if err != nil {
span.RecordError(err)
d.log.Warn("Failed querying", slog.String("cid", c.String()), slog.String("err", err.Error()))
return
}
}

// PutValue satisfies the [routing.Routing] interface and will add the given
Expand Down
Loading