Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Limit the number of documents deleted per batch #1555

Merged
merged 1 commit into from
May 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 38 additions & 8 deletions pkg/store/expiry/expiry.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ import (
storeutil "github.com/trustbloc/orb/pkg/store"
)

const taskName = "data-expiry"
const (
taskName = "data-expiry"
defaultMaxBatchSize = 5000
)

var logger = log.New("expiry-service")

Expand All @@ -32,6 +35,7 @@ type registeredStore struct {

expiryTagName string
expiryHandler expiryHandler
maxBatchSize int
}

// Option is an option for registered store.
Expand All @@ -44,6 +48,13 @@ func WithExpiryHandler(handler expiryHandler) Option {
}
}

// WithMaxBatchSize sets maximum number of documents to delete in a single batch.
func WithMaxBatchSize(value int) Option {
return func(opts *registeredStore) {
opts.maxBatchSize = value
}
}

type expiryHandler interface {
HandleExpiredKeys(keys ...string) error
}
Expand Down Expand Up @@ -87,6 +98,7 @@ func (s *Service) Register(store storage.Store, expiryTagName, storeName string,
store: store,
name: storeName,
expiryTagName: expiryTagName,
maxBatchSize: defaultMaxBatchSize,
expiryHandler: &noopExpiryHandler{},
}

Expand Down Expand Up @@ -114,11 +126,25 @@ func (s *Service) deleteExpiredData() {
}

func (r *registeredStore) deleteExpiredData() error {
more := true

for more {
var err error

if more, err = r.doDeleteExpiredData(); err != nil {
return err
}
}

return nil
}

func (r *registeredStore) doDeleteExpiredData() (bool, error) {
logger.Debug("Checking for expired data in store", logfields.WithStoreName(r.name))

iterator, err := r.store.Query(fmt.Sprintf("%s<=%d", r.expiryTagName, time.Now().Unix()))
if err != nil {
return fmt.Errorf("query store for expired data: %w", err)
return false, fmt.Errorf("query store for expired data: %w", err)
}

defer storeutil.CloseIterator(iterator)
Expand All @@ -127,13 +153,13 @@ func (r *registeredStore) deleteExpiredData() error {

more, err := iterator.Next()
if err != nil {
return fmt.Errorf("get next value from iterator: %w", err)
return false, fmt.Errorf("get next value from iterator: %w", err)
}

for more {
key, errKey := iterator.Key()
if errKey != nil {
return fmt.Errorf("get key from iterator: %w", errKey)
return false, fmt.Errorf("get key from iterator: %w", errKey)
}

keysToDelete = append(keysToDelete, key)
Expand All @@ -142,15 +168,19 @@ func (r *registeredStore) deleteExpiredData() error {

more, errNext = iterator.Next()
if errNext != nil {
return fmt.Errorf("get next value from iterator: %w", errNext)
return false, fmt.Errorf("get next value from iterator: %w", errNext)
}

if len(keysToDelete) >= r.maxBatchSize {
break
}
}

logger.Debug("Found expired data to delete.", logfields.WithTotal(len(keysToDelete)), logfields.WithStoreName(r.name))

err = r.expiryHandler.HandleExpiredKeys(keysToDelete...)
if err != nil {
return fmt.Errorf("invoke expiry handler: %w", err)
return false, fmt.Errorf("invoke expiry handler: %w", err)
}

if len(keysToDelete) > 0 {
Expand All @@ -164,14 +194,14 @@ func (r *registeredStore) deleteExpiredData() error {

err = r.store.Batch(operations)
if err != nil {
return fmt.Errorf("delete expired data: %w", err)
return false, fmt.Errorf("delete expired data - NumDocuments: %d: %w", len(operations), err)
}

logger.Debug("Successfully deleted expired data.", logfields.WithStoreName(r.name),
logfields.WithTotal(len(operations)))
}

return nil
return more, nil
}

type noopExpiryHandler struct{}
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/expiry/expiry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestService(t *testing.T) {
require.NoError(t, err)

serviceInfo1, serviceInfo2 := getTestExpiryServices(coordinationStore, storeToRunExpiryChecksOn,
expiryTagName, storeToRunExpiryChecksOnName)
expiryTagName, storeToRunExpiryChecksOnName, WithMaxBatchSize(2))

serviceInfo1.taskMgr.Start()
defer serviceInfo1.taskMgr.Stop()
Expand Down