-
Notifications
You must be signed in to change notification settings - Fork 235
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: findProvidersAsync #938
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,7 @@ import ( | |
"github.com/libp2p/go-libp2p/core/routing" | ||
"go.opentelemetry.io/otel/attribute" | ||
otel "go.opentelemetry.io/otel/trace" | ||
"golang.org/x/exp/slog" | ||
|
||
"github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/coordt" | ||
"github.com/libp2p/go-libp2p-kad-dht/v2/kadt" | ||
|
@@ -110,20 +111,100 @@ func (d *DHT) Provide(ctx context.Context, c cid.Cid, brdcst bool) error { | |
} | ||
|
||
func (d *DHT) FindProvidersAsync(ctx context.Context, c cid.Cid, count int) <-chan peer.AddrInfo { | ||
_, span := d.tele.Tracer.Start(ctx, "DHT.FindProvidersAsync", otel.WithAttributes(attribute.String("cid", c.String()), attribute.Int("count", count))) | ||
peerOut := make(chan peer.AddrInfo) | ||
go d.findProvidersAsyncRoutine(ctx, c, count, peerOut) | ||
return peerOut | ||
} | ||
|
||
func (d *DHT) findProvidersAsyncRoutine(ctx context.Context, c cid.Cid, count int, out chan peer.AddrInfo) { | ||
_, span := d.tele.Tracer.Start(ctx, "DHT.findProvidersAsyncRoutine", otel.WithAttributes(attribute.String("cid", c.String()), attribute.Int("count", count))) | ||
defer span.End() | ||
|
||
// verify if this DHT supports provider records by checking if a "providers" | ||
// backend is registered. | ||
_, found := d.backends[namespaceProviders] | ||
defer close(out) | ||
|
||
// verify if this DHT supports provider records by checking | ||
// if a "providers" backend is registered. | ||
b, found := d.backends[namespaceProviders] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we search for providers without having a backend to store them? How would that work? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, when we search for providers, we won't store them anywhere. This is just checking if we support providers. Other users could configure custom backends for other types of records but the DHT will still support the routing.Routing interface. In that case we won't have the right backend configured here. |
||
if !found || !c.Defined() { | ||
peerOut := make(chan peer.AddrInfo) | ||
close(peerOut) | ||
return peerOut | ||
span.RecordError(fmt.Errorf("no providers backend registered or CID undefined")) | ||
return | ||
} | ||
|
||
// TODO reach out to Zikade | ||
panic("implement me") | ||
// first fetch the record locally | ||
stored, err := b.Fetch(ctx, string(c.Hash())) | ||
if err != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this check for not found errors? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This would be necessary if this was the datastore Get method. This is the backend Fetch method. The backend returns a |
||
span.RecordError(err) | ||
d.log.Warn("Fetching value from provider store", slog.String("cid", c.String()), slog.String("err", err.Error())) | ||
return | ||
} | ||
|
||
ps, ok := stored.(*providerSet) | ||
if !ok { | ||
span.RecordError(err) | ||
d.log.Warn("Stored value is not a provider set", slog.String("cid", c.String()), slog.String("type", fmt.Sprintf("%T", stored))) | ||
return | ||
} | ||
|
||
// send all providers onto the out channel until the desired count | ||
// was reached. If no count was specified, continue with network lookup. | ||
providers := map[peer.ID]struct{}{} | ||
for _, provider := range ps.providers { | ||
providers[provider.ID] = struct{}{} | ||
|
||
select { | ||
case <-ctx.Done(): | ||
return | ||
case out <- provider: | ||
} | ||
|
||
if count != 0 && len(providers) == count { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
return | ||
} | ||
} | ||
|
||
// Craft message to send to other peers | ||
msg := &pb.Message{ | ||
Type: pb.Message_GET_PROVIDERS, | ||
Key: c.Hash(), | ||
} | ||
|
||
// handle node response | ||
fn := func(ctx context.Context, id kadt.PeerID, resp *pb.Message, stats coordt.QueryStats) error { | ||
// loop through all providers that the remote peer returned | ||
for _, provider := range resp.ProviderAddrInfos() { | ||
|
||
// if we had already sent that peer on the channel -> do nothing | ||
if _, found := providers[provider.ID]; found { | ||
continue | ||
} | ||
|
||
// keep track that we will have sent this peer on the channel | ||
providers[provider.ID] = struct{}{} | ||
|
||
// actually send the provider information to the user | ||
select { | ||
case <-ctx.Done(): | ||
return coordt.ErrSkipRemaining | ||
case out <- provider: | ||
} | ||
|
||
// if count is 0, we will wait until the query has exhausted the keyspace | ||
// if count isn't 0, we will stop if the number of providers we have sent | ||
// equals the number that the user has requested. | ||
if count != 0 && len(providers) == count { | ||
return coordt.ErrSkipRemaining | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
_, err = d.kad.QueryMessage(ctx, msg, fn, 20) // TODO: parameterize | ||
if err != nil { | ||
span.RecordError(err) | ||
d.log.Warn("Failed querying", slog.String("cid", c.String()), slog.String("err", err.Error())) | ||
return | ||
} | ||
} | ||
|
||
// PutValue satisfies the [routing.Routing] interface and will add the given | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is possible to support only one or two backends? Suppose we don't want to support IPNS records - do we still need to declare it in the config?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's possible but only if the configured protocol is not
/ipfs/kad/1.0.0
. This was recommended by Adin (can't find the link to the conversation).I'm trying to enforce that if someone configures the DHT with the
/ipfs/kad/1.0.0
protocol, all the required backends are there. If someone wants to, they can still misconfigure it - but this makes it harder.