Skip to content

Commit

Permalink
feat: Only run one DB expiry check per cluster at a time
Browse files Browse the repository at this point in the history
Added a mechanism to the expiry service to allow it to coordinate with other Orb servers within a cluster so that they can ensure only one DB expiry check at a time.

Signed-off-by: Derek Trider <Derek.Trider@securekey.com>
  • Loading branch information
Derek Trider committed Oct 20, 2021
1 parent 2addba7 commit f921c8c
Show file tree
Hide file tree
Showing 7 changed files with 381 additions and 138 deletions.
23 changes: 23 additions & 0 deletions cmd/orb-server/startcmd/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"strings"
"time"

"github.com/google/uuid"
"github.com/spf13/cobra"
cmdutils "github.com/trustbloc/edge-core/pkg/utils/cmd"
"github.com/trustbloc/sidetree-core-go/pkg/api/operation"
Expand Down Expand Up @@ -368,6 +369,11 @@ const (
"For example, a setting of '1m' will cause the expiry service to run a check every 1 minute. " +
"Defaults to 1 minute if not set. " + commonEnvVarUsageText + dataExpiryCheckIntervalEnvKey

instanceIDFlagName = "orb-id"
instanceIDFlagUsage = "An ID that uniquely identifies an Orb instance within a cluster. " +
"If not set, then it will default to a random UUID. " + commonEnvVarUsageText + instanceIDEnvKey
instanceIDEnvKey = "ORB_ID"

// TODO: Add verification method

)
Expand Down Expand Up @@ -433,6 +439,7 @@ type orbParameters struct {
contextProviderURLs []string
unpublishedOperationLifespan time.Duration
dataExpiryCheckInterval time.Duration
instanceID string
}

type anchorCredentialParams struct {
Expand Down Expand Up @@ -829,6 +836,20 @@ func getOrbParameters(cmd *cobra.Command) (*orbParameters, error) {
return nil, fmt.Errorf("%s: %w", dataExpiryCheckIntervalFlagName, err)
}

instanceID, err := cmdutils.GetUserSetVarFromString(cmd, instanceIDFlagName, instanceIDEnvKey, true)
if err != nil {
return nil, err
}

if instanceID == "" {
instanceID := uuid.New().String()

logger.Warnf("Orb ID not set. A randomly generated UUID (%s) was generated and will be used. "+
"It's recommended to set this value yourself. If this Orb instance restarts, a new UUID will be "+
"generated which won't break anything but is less efficient for the automatic expired data cleanup "+
"service. A custom ID will also make for better logging.", instanceID)
}

return &orbParameters{
hostURL: hostURL,
hostMetricsURL: hostMetricsURL,
Expand Down Expand Up @@ -880,6 +901,7 @@ func getOrbParameters(cmd *cobra.Command) (*orbParameters, error) {
contextProviderURLs: contextProviderURLs,
unpublishedOperationLifespan: unpublishedOperationLifespan,
dataExpiryCheckInterval: dataExpiryCheckInterval,
instanceID: instanceID,
}, nil
}

Expand Down Expand Up @@ -1221,4 +1243,5 @@ func createFlags(startCmd *cobra.Command) {
startCmd.Flags().StringP(databaseTimeoutFlagName, "", "", databaseTimeoutFlagUsage)
startCmd.Flags().StringP(unpublishedOperationLifespanFlagName, "", "", unpublishedOperationLifespanFlagUsage)
startCmd.Flags().StringP(dataExpiryCheckIntervalFlagName, "", "", dataExpiryCheckIntervalFlagUsage)
startCmd.Flags().StringP(instanceIDFlagName, "", "", instanceIDFlagUsage)
}
2 changes: 1 addition & 1 deletion cmd/orb-server/startcmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ func startOrbServices(parameters *orbParameters) error {
if parameters.updateDocumentStoreEnabled {
// TODO (#810): Make it possible to run the expiry service from only one instance within a cluster (or as
// a separate server)
expiryService = expiry.NewService(parameters.dataExpiryCheckInterval)
expiryService = expiry.NewService(parameters.dataExpiryCheckInterval, configStore, parameters.instanceID)

updateDocumentStore, err = unpublishedopstore.New(storeProviders.provider,
parameters.unpublishedOperationLifespan, expiryService)
Expand Down
5 changes: 4 additions & 1 deletion pkg/protocolversion/versions/v1_0/factory/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,11 @@ func TestFactory_Create(t *testing.T) {
})

t.Run("success - with update store config", func(t *testing.T) {
coordinationStore, err := mem.NewProvider().OpenStore("coordination")
require.NoError(t, err)

updateDocumentStore, err := unpublishedopstore.New(storeProvider, time.Minute,
expiry.NewService(time.Millisecond))
expiry.NewService(time.Millisecond, coordinationStore, "InstanceID"))
require.NoError(t, err)

cfg := &config.Sidetree{
Expand Down
169 changes: 149 additions & 20 deletions pkg/store/expiry/expiry.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ SPDX-License-Identifier: Apache-2.0
package expiry

import (
"encoding/json"
"errors"
"fmt"
"time"

Expand All @@ -16,11 +18,19 @@ import (
"github.com/trustbloc/orb/pkg/lifecycle"
)

const loggerModule = "expiry-service"
const (
loggerModule = "expiry-service"
coordinationPermitKey = "expired_data_cleanup_permit"
// When the Orb server with the expired data cleanup duty (permit holder) has not done it for an unusually
// long time (indicating it's down), another Orb server will take over the duty. This value multiplied by the
// configured interval time defines what an "unusually long time" is.
permitTimeLimitIntervalMultiplier = 3
)

type logger interface {
Debugf(msg string, args ...interface{})
Infof(msg string, args ...interface{})
Warnf(msg string, args ...interface{})
Errorf(msg string, args ...interface{})
}

Expand All @@ -30,28 +40,51 @@ type registeredStore struct {
name string
}

// expiredDataCleanupPermit is used as an entry within the coordination store to ensure that only one Orb instance
// within a cluster has the duty of performing periodic expired data clean up.
type expiredDataCleanupPermit struct {
// CurrentHolder indicates which Orb server currently has the responsibility.
CurrentHolder string `json:"currentHolder,omitempty"`
// TimeLastCleanupPerformed indicates when the last cleanup was successfully performed.
TimeLastCleanupPerformed int64 `json:"timeCleanupLastPerformed,omitempty"` // This is a Unix timestamp.
}

// Service is an expiry service that periodically polls registered stores and removes data past a specified
// expiration time.
// expiration time.
type Service struct {
*lifecycle.Lifecycle

done chan struct{}
logger logger
registeredStores []registeredStore
interval time.Duration
done chan struct{}
logger logger
registeredStores []registeredStore
interval time.Duration
coordinationStore storage.Store
instanceID string
}

// NewService returns a new expiry Service.
// interval is how frequently this service will check for (and delete as needed) expired data. Shorter intervals will
// remove expired data sooner at the expense of increased resource usage.
// remove expired data sooner at the expense of increased resource usage. Each Orb instance within a cluster should
// have the same interval configured in order for this service to work efficiently.
// coordinationStore is used for ensuring that only one Orb instance within a cluster has the duty of performing
// expired data cleanup (in order to avoid every instance doing the same work, which is wasteful). Every Orb instance
// within the cluster needs to be connected to the same database for it to work correctly. Note that when initializing
// Orb servers (or if the Orb server with the duty goes down) it is possible for multiple Orb instances to briefly
// assign themselves the duty, but only for one round. This will automatically be resolved on
// the next check and only one will end up with the duty from that point on. This situation is not of concern since
// it's safe for two instances to perform the check at the same time.
// instanceID is used in the coordinationStore for determining who currently has the duty of doing the expired data
// cleanup. It must be unique for every Orb instance within the cluster in order for this service to work efficiently.
// You must register each store you want this service to run on using the Register method. Once all your stores are
// registered, call the Start method to start the service.
func NewService(interval time.Duration) *Service {
func NewService(interval time.Duration, coordinationStore storage.Store, instanceID string) *Service {
s := &Service{
done: make(chan struct{}),
logger: log.New(loggerModule),
registeredStores: make([]registeredStore, 0),
interval: interval,
done: make(chan struct{}),
logger: log.New(loggerModule),
registeredStores: make([]registeredStore, 0),
interval: interval,
coordinationStore: coordinationStore,
instanceID: instanceID,
}

s.Lifecycle = lifecycle.New("expiry",
Expand All @@ -62,7 +95,7 @@ func NewService(interval time.Duration) *Service {
}

// Register adds a store to this expiry service.
// store is the store on which to check for expired data.
// store is the store on which to periodically cleanup expired data.
// name is used to identify the purpose of this expiry service for logging purposes.
// expiryTagName is the tag name used to store expiry values under. The expiry values must be standard Unix timestamps.
func (s *Service) Register(store storage.Store, expiryTagName, storeName string) {
Expand Down Expand Up @@ -91,8 +124,7 @@ func (s *Service) refresh() {
for {
select {
case <-time.After(s.interval):
s.logger.Debugf("Checking for expired data...")
s.deleteExpiredData()
s.runExpiryCheck()
case <-s.done:
s.logger.Debugf("Stopping expiry service.")

Expand All @@ -101,20 +133,117 @@ func (s *Service) refresh() {
}
}

func (s *Service) runExpiryCheck() {
s.logger.Debugf("Checking to see if it's my duty to clean up expired data.")

if s.isMyDutyToCheckForExpiredData() {
s.deleteExpiredData()

err := s.updatePermit()
if err != nil {
s.logger.Errorf("Failed to update permit: %s", err.Error())
}
}
}

func (s *Service) isMyDutyToCheckForExpiredData() bool {
currentExpiryCheckPermitBytes, err := s.coordinationStore.Get(coordinationPermitKey)
if err != nil {
if errors.Is(err, storage.ErrDataNotFound) {
s.logger.Infof("No existing permit found. " +
"I will take on the duty of periodically deleting expired data.")

return true
}

s.logger.Errorf("Unexpected failure while checking for the expired data check permit: %s", err.Error())

return false
}

var currentPermit expiredDataCleanupPermit

err = json.Unmarshal(currentExpiryCheckPermitBytes, &currentPermit)
if err != nil {
s.logger.Errorf("Failed to unmarshal the current permit: %s", err.Error())

return false
}

timeOfLastCleanup := time.Unix(currentPermit.TimeLastCleanupPerformed, 0)

// Time.Since uses Time.Now() to determine the current time to a fine degree of precision. Here we are checking the
// time since a specific Unix timestamp, which is a value that is effectively truncated to the nearest second.
// Thus, the result of this calculation should also be truncated down to the nearest second since that's all the
// precision we have. This also makes the log statements look cleaner since it won't display an excessive amount
// of (meaningless) precision.
timeSinceLastCleanup := time.Since(timeOfLastCleanup).Truncate(time.Second)

if currentPermit.CurrentHolder == s.instanceID {
s.logger.Debugf("It's currently my duty to clean up expired data. I last did this %s ago. I will "+
"perform another cleanup and then update the permit timestamp.", timeSinceLastCleanup.String())

return true
}

// The idea here is to only take away the data cleanup responsibilities from the current permit holder if it's
// been an unusually long time since the current permit holder has performed a successful cleanup. If that happens
// then it indicates that the other Orb server with the permit is down, so someone else needs to grab the permit
// and take over the duty of doing expired data checks. Note that the assumption here is that all Orb servers
// within the cluster have the same interval setting (which they should).
timeLimit := s.interval * permitTimeLimitIntervalMultiplier

if timeSinceLastCleanup > timeLimit {
s.logger.Warnf("The current permit holder (%s) has not performed an expired data cleanup in an "+
"unusually long time (%s ago, over %d times longer than the configured interval of %s). This indicates a "+
"problem with %s - it may be down or not responding. I will take over the expired data "+
"cleanup duty and grab the permit.", currentPermit.CurrentHolder, timeSinceLastCleanup.String(),
permitTimeLimitIntervalMultiplier, s.interval.String(), currentPermit.CurrentHolder)

return true
}

s.logger.Debugf("I will not do an expired data cleanup since %s currently has the duty and did it recently "+
"(%s ago).", currentPermit.CurrentHolder, timeSinceLastCleanup.String())

return false
}

func (s *Service) deleteExpiredData() {
for _, registeredStore := range s.registeredStores {
registeredStore.deleteExpiredData(s.logger)
}
}

func (r *registeredStore) deleteExpiredData(logger logger) {
queryExpression := fmt.Sprintf("%s<=%d", r.expiryTagName, time.Now().Unix())
func (s *Service) updatePermit() error {
s.logger.Debugf("Updating the permit with the current time.")

permit := expiredDataCleanupPermit{
CurrentHolder: s.instanceID,
TimeLastCleanupPerformed: time.Now().Unix(),
}

permitBytes, err := json.Marshal(permit)
if err != nil {
return fmt.Errorf("failed to marshal permit: %w", err)
}

logger.Debugf("About to run the following query in %s: %s", r.name, queryExpression)
err = s.coordinationStore.Put(coordinationPermitKey, permitBytes)
if err != nil {
return fmt.Errorf("failed to store permit: %w", err)
}

s.logger.Debugf("Permit successfully updated with the current time.")

return nil
}

func (r *registeredStore) deleteExpiredData(logger logger) {
logger.Debugf("Checking for expired data in %s.", r.name)

iterator, err := r.store.Query(queryExpression)
iterator, err := r.store.Query(fmt.Sprintf("%s<=%d", r.expiryTagName, time.Now().Unix()))
if err != nil {
logger.Errorf("failed to query store: %s", err.Error())
logger.Errorf("failed to query store for expired data: %s", err.Error())

return
}
Expand Down
Loading

0 comments on commit f921c8c

Please sign in to comment.