Skip to content

Commit

Permalink
Only poll assigned publishers when using assigner (#1094)
Browse files Browse the repository at this point in the history
* Only poll assigned publishers when using assigner

When an existing indexer is configured to use an assigner service, that indexer may have registered providers/publishers where the publishers are not assigned. These should not be polled since the indexer does not have responsibility for them.

* Fix stats test by refreshing stats
* update core
  • Loading branch information
gammazero committed Dec 3, 2022
1 parent d270f74 commit 9bdf702
Show file tree
Hide file tree
Showing 16 changed files with 203 additions and 148 deletions.
5 changes: 3 additions & 2 deletions assigner/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,9 @@ func TestAnnounce(t *testing.T) {
t.Fatal("timed out waiting for indexer to start")
}

// Allow a peer to test that assigner reads this at startup.
assign(ctx, "localhost:3602", pubIdent2.PeerID)
// Assign a peer, to test that assigner reads this at startup.
err = assign(ctx, "localhost:3602", pubIdent2.PeerID)
require.NoError(t, err)

// Initialize everything
peerID, _, err := pubIdent.Decode()
Expand Down
4 changes: 3 additions & 1 deletion dagsync/dtsync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ func (s *Syncer) Sync(ctx context.Context, nextCid cid.Cid, sel ipld.Node) error
log.Debugw("Starting data channel for message source", "cid", nextCid, "source_peer", s.peerID)

v := Voucher{&nextCid}
_, err := s.sync.dtManager.OpenPullDataChannel(ctx, s.peerID, &v, nextCid, sel)
// Do not pass cancelable context into OpenPullDataChannel because a
// canceled context causes it to hang.
_, err := s.sync.dtManager.OpenPullDataChannel(context.Background(), s.peerID, &v, nextCid, sel)
if err != nil {
s.sync.signalSyncDone(inProgressSyncK, nil)
return fmt.Errorf("cannot open data channel: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/filecoin-project/go-address v0.0.5
github.com/filecoin-project/go-dagaggregator-unixfs v0.2.0
github.com/filecoin-project/go-data-transfer v1.15.2
github.com/filecoin-project/go-indexer-core v0.6.20-0.20221124171445-aaa74eb24a1a
github.com/filecoin-project/go-indexer-core v0.6.20-0.20221203094717-421546fb9fb6
github.com/frankban/quicktest v1.14.3
github.com/gammazero/deque v0.2.0
github.com/gogo/protobuf v1.3.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,8 @@ github.com/filecoin-project/go-data-transfer v1.15.2 h1:PzqsFr2Q/onMGKrGh7TtRT0d
github.com/filecoin-project/go-data-transfer v1.15.2/go.mod h1:qXOJ3IF5dEJQHykXXTwcaRxu17bXAxr+LglXzkL6bZQ=
github.com/filecoin-project/go-ds-versioning v0.1.2 h1:to4pTadv3IeV1wvgbCbN6Vqd+fu+7tveXgv/rCEZy6w=
github.com/filecoin-project/go-ds-versioning v0.1.2/go.mod h1:C9/l9PnB1+mwPa26BBVpCjG/XQCB0yj/q5CK2J8X1I4=
github.com/filecoin-project/go-indexer-core v0.6.20-0.20221124171445-aaa74eb24a1a h1:d7nXUve0qaRg15N3AWTO6Ul/tFW+AtXHr2KGDULukFU=
github.com/filecoin-project/go-indexer-core v0.6.20-0.20221124171445-aaa74eb24a1a/go.mod h1:Q3SSHCIdEN8bdfgpJuqMTC3BvO+8huqx5OEMRalNCXw=
github.com/filecoin-project/go-indexer-core v0.6.20-0.20221203094717-421546fb9fb6 h1:r+dhFYFR2DAf3Gxr2bxwHcbYFPm3zORsmhefBYnAQNU=
github.com/filecoin-project/go-indexer-core v0.6.20-0.20221203094717-421546fb9fb6/go.mod h1:Q3SSHCIdEN8bdfgpJuqMTC3BvO+8huqx5OEMRalNCXw=
github.com/filecoin-project/go-statemachine v0.0.0-20200925024713-05bd7c71fbfe/go.mod h1:FGwQgZAt2Gh5mjlwJUlVB62JeYdo+if0xWxSEfBD9ig=
github.com/filecoin-project/go-statemachine v1.0.2 h1:421SSWBk8GIoCoWYYTE/d+qCWccgmRH0uXotXRDjUbc=
github.com/filecoin-project/go-statemachine v1.0.2/go.mod h1:jZdXXiHa61n4NmgWFG4w8tnqgvZVHYbJ3yW7+y8bF54=
Expand Down
10 changes: 6 additions & 4 deletions internal/ingest/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,11 +640,13 @@ func (ing *Ingester) metricsUpdater() {
log.Errorw("Error getting indexer value store size", "err", err)
return
}
indexCount, err := ing.indexCounts.Total()
if err != nil {
log.Errorw("Error getting index counts", "err", err)
if ing.indexCounts != nil {
indexCount, err := ing.indexCounts.Total()
if err != nil {
log.Errorw("Error getting index counts", "err", err)
}
stats.Record(context.Background(), coremetrics.StoreSize.M(size), metrics.IndexCount.M(int64(indexCount)))
}
stats.Record(context.Background(), coremetrics.StoreSize.M(size), metrics.IndexCount.M(int64(indexCount)))
hasUpdate = false
}
t.Reset(time.Minute)
Expand Down
17 changes: 2 additions & 15 deletions internal/ingest/ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@ import (
"time"

"github.com/filecoin-project/go-indexer-core"
"github.com/filecoin-project/go-indexer-core/cache"
"github.com/filecoin-project/go-indexer-core/cache/radixcache"
"github.com/filecoin-project/go-indexer-core/engine"
"github.com/filecoin-project/go-indexer-core/store/storethehash"
"github.com/filecoin-project/go-indexer-core/store/memory"
schema "github.com/filecoin-project/storetheindex/api/v0/ingest/schema"
"github.com/filecoin-project/storetheindex/config"
"github.com/filecoin-project/storetheindex/dagsync"
Expand All @@ -34,7 +32,6 @@ import (
"github.com/ipld/go-ipld-prime/linking"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/traversal/selector"
sth "github.com/ipld/go-storethehash/store"
"github.com/libp2p/go-libp2p"
crypto "github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
Expand All @@ -46,8 +43,6 @@ import (
)

const (
testCorePutConcurrency = 4

testRetryInterval = 2 * time.Second
testRetryTimeout = 15 * time.Second

Expand Down Expand Up @@ -1487,15 +1482,7 @@ func TestAnnounceArrivedJustBeforeEntriesProcessingStartsDoesNotDeadlock(t *test

// Make new indexer engine
func mkIndexer(t *testing.T, withCache bool) *engine.Engine {
valueStore, err := storethehash.New(context.Background(), t.TempDir(), testCorePutConcurrency, sth.IndexBitSize(8))
if err != nil {
t.Fatal(err)
}
var resultCache cache.Interface
if withCache {
resultCache = radixcache.New(1000)
}
return engine.New(resultCache, valueStore)
return engine.New(nil, memory.New())
}

func mkRegistry(t *testing.T) *registry.Registry {
Expand Down
187 changes: 100 additions & 87 deletions internal/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ type ProviderInfo struct {
// ExtendedProviders registered for that provider
ExtendedProviders *ExtendedProviders `json:",omitempty"`

// lastContactTime is the last time the publisher contacted the
// indexer. This is not persisted, so that the time since last contact is
// reset when the indexer is started. If not reset, then it would appear
// the publisher was unreachable for the indexer downtime.
// lastContactTime is the last time the publisher contacted the indexer.
// This is not persisted, so that the time since last contact is reset when
// the indexer is started. If not reset, then it would appear the publisher
// was unreachable for the indexer downtime.
lastContactTime time.Time

// deleted is used as a signal to the ingester to delete the provider's data.
Expand All @@ -98,37 +98,40 @@ type ProviderInfo struct {
inactive bool
}

// ExtendedProviderInfo is an immutable data structure that holds infromation about
// an extended provider.
// ExtendedProviderInfo is an immutable data structure that holds information
// about an extended provider.
type ExtendedProviderInfo struct {
// PeerID contains a peer.ID of the extended provider
// PeerID contains a peer.ID of the extended provider.
PeerID peer.ID
// Metadata contains a metadata override for this provider within the extended provider context.
// If extended provider's metadata hasn't been specified - the main provider's
// metadata is going to be used instead.
// Metadata contains a metadata override for this provider within the
// extended provider context. If extended provider's metadata hasn't been
// specified - the main provider's metadata is going to be used instead.
Metadata []byte `json:",omitempty"`
// Addrs contains advertised multiaddresses for this extended provider
// Addrs contains advertised multiaddresses for this extended provider.
Addrs []multiaddr.Multiaddr
}

// ContextualExtendedProviders holds infromation about a context-level extended providers.
// These can either replace or compliment (union) the chain-level extended providers, which is driven
// by the Override flag.
// ContextualExtendedProviders holds information about a context-level extended
// providers. These can either replace or compliment (union) the chain-level
// extended providers, which is driven by the Override flag.
type ContextualExtendedProviders struct {
// Providers contains a list of context-level extended providers
Providers []ExtendedProviderInfo
// Override defines whether chain-level extended providers should be used for
// this ContextID. If true, then the chain-level extended providers are going to be ignored.
// Override defines whether chain-level extended providers should be used
// for this ContextID. If true, then the chain-level extended providers are
// going to be ignored.
Override bool
// ContextID deifnes the context ID that the extended providers have been published for
// ContextID defines the context ID that the extended providers have been
// published for
ContextID []byte `json:",omitempty"`
}

// ExtendedProviders contains chain-level and context-level extended provider sets
// ExtendedProviders contains chain-level and context-level extended provider
// sets.
type ExtendedProviders struct {
// Providers contains a chain-level set of extended providers
// Providers contains a chain-level set of extended providers.
Providers []ExtendedProviderInfo `json:",omitempty"`
// ContextualProviders contains a context-level sets of extended providers
// ContextualProviders contains a context-level sets of extended providers.
ContextualProviders map[string]ContextualExtendedProviders `json:",omitempty"`
}

Expand Down Expand Up @@ -823,14 +826,14 @@ func (r *Registry) syncStartDiscover(peerID peer.ID, spID string, errCh chan<- e
return
}

// Mark discovery as in progress
// Mark discovery as in progress.
if r.discoverTimes == nil {
r.discoverTimes = make(map[string]time.Time)
}
r.discoverTimes[spID] = time.Time{}
r.discoverWait.Add(1)

// Do discovery asynchronously; do not block other discovery requests
// Do discovery asynchronously; do not block other discovery requests.
go func() {
ctx := context.Background()
if r.discoveryTimeout != 0 {
Expand Down Expand Up @@ -876,12 +879,12 @@ func (r *Registry) syncRegister(ctx context.Context, info *ProviderInfo) error {
func (r *Registry) syncNeedDiscover(spID string) error {
completed, ok := r.discoverTimes[spID]
if ok {
// Check if discovery already in progress
// Check if discovery already in progress.
if completed.IsZero() {
return ErrInProgress
}

// Check if last discovery completed too recently
// Check if last discovery completed too recently.
if r.rediscoverWait != 0 && time.Since(completed) < r.rediscoverWait {
return ErrTooSoon
}
Expand All @@ -905,67 +908,6 @@ func (r *Registry) syncPersistProvider(ctx context.Context, info *ProviderInfo)
return r.dstore.Sync(ctx, dsKey)
}

func (r *Registry) loadPersistedProviders(ctx context.Context) error {
if r.dstore == nil {
return nil
}

// Load all providers from the datastore.
q := query.Query{
Prefix: providerKeyPath,
}
results, err := r.dstore.Query(ctx, q)
if err != nil {
return err
}
defer results.Close()

var count int
for result := range results.Next() {
if result.Error != nil {
return fmt.Errorf("cannot read provider data: %v", result.Error)
}
ent := result.Entry

peerID, err := peer.Decode(path.Base(ent.Key))
if err != nil {
return fmt.Errorf("cannot decode provider ID: %s", err)
}

pinfo := new(ProviderInfo)
err = json.Unmarshal(ent.Value, pinfo)
if err != nil {
log.Errorw("Cannot load provider info", "err", err, "provider", peerID)
pinfo.AddrInfo.ID = peerID
// Add the provider to the set of registered providers so that it
// does not get delisted. The next update should fix the addresses.
}

if r.filterIPs {
pinfo.AddrInfo.Addrs = mautil.FilterPrivateIPs(pinfo.AddrInfo.Addrs)
if pinfo.Publisher.Validate() == nil && pinfo.PublisherAddr != nil {
pubAddrs := mautil.FilterPrivateIPs([]multiaddr.Multiaddr{pinfo.PublisherAddr})
if len(pubAddrs) == 0 {
pinfo.PublisherAddr = nil
} else {
pinfo.PublisherAddr = pubAddrs[0]
}
}
}

if pinfo.Publisher.Validate() == nil && pinfo.PublisherAddr == nil && pinfo.Publisher == pinfo.AddrInfo.ID &&
len(pinfo.AddrInfo.Addrs) != 0 {
pinfo.PublisherAddr = pinfo.AddrInfo.Addrs[0]
}

r.providers[peerID] = pinfo
count++
}

log.Infow("loaded providers into registry", "count", count)
return nil
}

func (r *Registry) discover(ctx context.Context, peerID peer.ID, spID string) (*discovery.Discovered, error) {
if r.discoverer == nil {
return nil, ErrNoDiscovery
Expand Down Expand Up @@ -1007,14 +949,21 @@ func (r *Registry) pollProviders(poll polling, pollOverrides map[peer.ID]polling
r.actions <- func() {
now := time.Now()
for peerID, info := range r.providers {
// If the provider is not allowed, then do not poll or delist.
// If the provider is not allowed, then do not poll or de-list.
if !r.policy.Allowed(peerID) {
continue
}
if info.Publisher.Validate() != nil || !r.policy.Allowed(info.Publisher) {
// No publisher.
continue
}
// If using assigner service, and the provider's publisher is not
// assigned, then do not poll.
if r.assigned != nil {
if _, ok := r.assigned[info.Publisher]; !ok {
continue
}
}
override, ok := pollOverrides[peerID]
if ok {
poll = override
Expand All @@ -1036,7 +985,10 @@ func (r *Registry) pollProviders(poll polling, pollOverrides map[peer.ID]polling
// remove it.
if sincePollingStarted >= poll.stopAfter {
// Too much time since last contact.
log.Warnw("Lost contact with provider, too long with no updates", "publisher", info.Publisher, "provider", info.AddrInfo.ID, "since", info.lastContactTime)
log.Warnw("Lost contact with provider, too long with no updates",
"publisher", info.Publisher,
"provider", info.AddrInfo.ID,
"since", info.lastContactTime)
// Remove the dead provider from the registry.
if err := r.syncRemoveProvider(context.Background(), peerID); err != nil {
log.Errorw("Failed to update deleted provider info", "err", err)
Expand Down Expand Up @@ -1122,6 +1074,67 @@ func (r *Registry) deleteOldAssignments(ctx context.Context, prefix string) erro
return nil
}

func (r *Registry) loadPersistedProviders(ctx context.Context) error {
if r.dstore == nil {
return nil
}

// Load all providers from the datastore.
q := query.Query{
Prefix: providerKeyPath,
}
results, err := r.dstore.Query(ctx, q)
if err != nil {
return err
}
defer results.Close()

var count int
for result := range results.Next() {
if result.Error != nil {
return fmt.Errorf("cannot read provider data: %v", result.Error)
}
ent := result.Entry

peerID, err := peer.Decode(path.Base(ent.Key))
if err != nil {
return fmt.Errorf("cannot decode provider ID: %s", err)
}

pinfo := new(ProviderInfo)
err = json.Unmarshal(ent.Value, pinfo)
if err != nil {
log.Errorw("Cannot load provider info", "err", err, "provider", peerID)
pinfo.AddrInfo.ID = peerID
// Add the provider to the set of registered providers so that it
// does not get delisted. The next update should fix the addresses.
}

if r.filterIPs {
pinfo.AddrInfo.Addrs = mautil.FilterPrivateIPs(pinfo.AddrInfo.Addrs)
if pinfo.Publisher.Validate() == nil && pinfo.PublisherAddr != nil {
pubAddrs := mautil.FilterPrivateIPs([]multiaddr.Multiaddr{pinfo.PublisherAddr})
if len(pubAddrs) == 0 {
pinfo.PublisherAddr = nil
} else {
pinfo.PublisherAddr = pubAddrs[0]
}
}
}

if pinfo.Publisher.Validate() == nil && pinfo.PublisherAddr == nil && pinfo.Publisher == pinfo.AddrInfo.ID &&
len(pinfo.AddrInfo.Addrs) != 0 {
pinfo.PublisherAddr = pinfo.AddrInfo.Addrs[0]
}

r.providers[peerID] = pinfo
count++
}

log.Infow("loaded providers into registry", "count", count)
return nil
}

func (r *Registry) loadPersistedAssignments(ctx context.Context) error {
if r.dstore == nil {
return nil
Expand Down
Loading

0 comments on commit 9bdf702

Please sign in to comment.