Skip to content

Commit

Permalink
fix: Exit anchor status monitoring loop on shutdown
Browse files Browse the repository at this point in the history
Also, updated witness proof monitor to accelerate the next task time if it is known that more records need to be processed.

closes #1572

Signed-off-by: Bob Stasyszyn <Bob.Stasyszyn@securekey.com>
  • Loading branch information
bstasyszyn committed May 31, 2023
1 parent 7017bf0 commit 3a11195
Show file tree
Hide file tree
Showing 11 changed files with 359 additions and 202 deletions.
213 changes: 122 additions & 91 deletions cmd/orb-server/startcmd/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,9 @@ const (
defaultVCTLogMonitoringMaxTreeSize = 50000
defaultVCTLogMonitoringGetEntriesRange = 1000
defaultVCTLogEntriesStoreEnabled = false
defaultAnchorStatusMonitoringInterval = 5 * time.Second
defaultAnchorStatusInProcessGracePeriod = 30 * time.Second
defaultAnchorStatusMonitoringInterval = 10 * time.Second
defaultAnchorStatusMaxRecords = 500
defaultAnchorStatusInProcessGracePeriod = time.Minute
mqDefaultMaxConnectionSubscriptions = 1000
mqDefaultPublisherChannelPoolSize = 25
mqDefaultPublisherConfirmDelivery = true
Expand Down Expand Up @@ -226,6 +227,12 @@ const (
"witnessed(completed) as per policy. Defaults to 5s if not set. " +
commonEnvVarUsageText + anchorStatusMonitoringIntervalEnvKey

anchorStatusMaxRecordsFlagName = "anchor-status-max-records"
anchorStatusMaxRecordsEnvKey = "ANCHOR_STATUS_MAX_RECORDS"
anchorStatusMaxRecordsFlagUsage = "The maximum number of anchor status records to process per monitoring interval " +
"Defaults to 500 if not set. " +
commonEnvVarUsageText + anchorStatusMaxRecordsEnvKey

anchorStatusInProcessGracePeriodFlagName = "anchor-status-in-process-grace-period"
anchorStatusInProcessGracePeriodEnvKey = "ANCHOR_STATUS_IN_PROCESS_GRACE_PERIOD"
anchorStatusInProcessGracePeriodFlagUsage = "The period in which witnesses will not be re-selected for 'in-process' anchors." +
Expand Down Expand Up @@ -764,47 +771,46 @@ type tlsParameters struct {
}

type orbParameters struct {
http *httpParams
sidetree *sidetreeParams
apServiceParams *apServiceParams
discoveryDomain string
dataURIMediaType datauri.MediaType
batchWriterTimeout time.Duration
cas *casParams
mqParams *mqParams
opQueueParams *opqueue.Config
dbParameters *dbParameters
logLevel string
methodContext []string
baseEnabled bool
allowedOrigins []string
allowedOriginsCacheExpiration time.Duration
anchorCredentialParams *anchorCredentialParams
discovery *discoveryParams
witnessProof *witnessProofParams
syncTimeout uint64
didDiscoveryEnabled bool
unpublishedOperations *unpublishedOperationsStoreParams
resolveFromAnchorOrigin bool
verifyLatestFromAnchorOrigin bool
activityPub *activityPubParams
auth *authParams
enableDevMode bool
enableMaintenanceMode bool
enableVCT bool
nodeInfoRefreshInterval time.Duration
contextProviderURLs []string
dataExpiryCheckInterval time.Duration
taskMgrCheckInterval time.Duration
vct *vctParams
anchorStatusMonitoringInterval time.Duration
anchorStatusInProcessGracePeriod time.Duration
witnessPolicyCacheExpiration time.Duration
kmsParams *kmsParameters
requestTokens map[string]string
allowedDIDWebDomains []*url.URL
observability *observabilityParams
anchorRefPendingRecordLifespan time.Duration
http *httpParams
sidetree *sidetreeParams
apServiceParams *apServiceParams
discoveryDomain string
dataURIMediaType datauri.MediaType
batchWriterTimeout time.Duration
cas *casParams
mqParams *mqParams
opQueueParams *opqueue.Config
dbParameters *dbParameters
logLevel string
methodContext []string
baseEnabled bool
allowedOrigins []string
allowedOriginsCacheExpiration time.Duration
anchorCredentialParams *anchorCredentialParams
discovery *discoveryParams
witnessProof *witnessProofParams
syncTimeout uint64
didDiscoveryEnabled bool
unpublishedOperations *unpublishedOperationsStoreParams
resolveFromAnchorOrigin bool
verifyLatestFromAnchorOrigin bool
activityPub *activityPubParams
auth *authParams
enableDevMode bool
enableMaintenanceMode bool
enableVCT bool
nodeInfoRefreshInterval time.Duration
contextProviderURLs []string
dataExpiryCheckInterval time.Duration
taskMgrCheckInterval time.Duration
vct *vctParams
anchorStatus *anchorStatusParams
witnessPolicyCacheExpiration time.Duration
kmsParams *kmsParameters
requestTokens map[string]string
allowedDIDWebDomains []*url.URL
observability *observabilityParams
anchorRefPendingRecordLifespan time.Duration
}

type observabilityParams struct {
Expand Down Expand Up @@ -1127,16 +1133,9 @@ func getOrbParameters(cmd *cobra.Command) (*orbParameters, error) {
return nil, err
}

anchorStatusMonitoringInterval, err := cmdutil.GetDuration(cmd, anchorStatusMonitoringIntervalFlagName,
anchorStatusMonitoringIntervalEnvKey, defaultAnchorStatusMonitoringInterval)
if err != nil {
return nil, fmt.Errorf("%s: %w", anchorStatusMonitoringIntervalFlagName, err)
}

anchorStatusInProcessGracePeriod, err := cmdutil.GetDuration(cmd, anchorStatusInProcessGracePeriodFlagName,
anchorStatusInProcessGracePeriodEnvKey, defaultAnchorStatusInProcessGracePeriod)
anchorStatusParams, err := getAnchorStatusParams(cmd)
if err != nil {
return nil, fmt.Errorf("%s: %w", anchorStatusInProcessGracePeriodFlagName, err)
return nil, err
}

witnessPolicyCacheExpiration, err := cmdutil.GetDuration(cmd, witnessPolicyCacheExpirationFlagName,
Expand All @@ -1161,45 +1160,44 @@ func getOrbParameters(cmd *cobra.Command) (*orbParameters, error) {
}

return &orbParameters{
http: httpParams,
sidetree: sidetreeParams,
discoveryDomain: discoveryDomain,
apServiceParams: apServiceParams,
allowedOrigins: allowedOrigins,
allowedOriginsCacheExpiration: allowedOriginsCacheExpiration,
allowedDIDWebDomains: allowedDIDWebDomains,
cas: casParams,
mqParams: mqParams,
opQueueParams: opQueueParams,
batchWriterTimeout: batchWriterTimeout,
anchorCredentialParams: anchorCredentialParams,
logLevel: loggingLevel,
dbParameters: dbParams,
discovery: discoveryParams,
witnessProof: witnessProofParams,
syncTimeout: syncTimeout,
didDiscoveryEnabled: didDiscoveryEnabled,
unpublishedOperations: unpublishedOperationsParams,
resolveFromAnchorOrigin: resolveFromAnchorOrigin,
verifyLatestFromAnchorOrigin: verifyLatestFromAnchorOrigin,
auth: authParams,
activityPub: activityPubParams,
enableDevMode: enableDevMode,
enableMaintenanceMode: enableMaintenanceMode,
enableVCT: enableVCT,
nodeInfoRefreshInterval: nodeInfoRefreshInterval,
contextProviderURLs: contextProviderURLs,
dataExpiryCheckInterval: dataExpiryCheckInterval,
taskMgrCheckInterval: taskMgrCheckInterval,
vct: vctParams,
anchorStatusMonitoringInterval: anchorStatusMonitoringInterval,
anchorStatusInProcessGracePeriod: anchorStatusInProcessGracePeriod,
witnessPolicyCacheExpiration: witnessPolicyCacheExpiration,
dataURIMediaType: dataURIMediaType,
kmsParams: kmsParams,
requestTokens: requestTokens,
observability: observabilityParams,
anchorRefPendingRecordLifespan: anchorRefPendingRecordLifespan,
http: httpParams,
sidetree: sidetreeParams,
discoveryDomain: discoveryDomain,
apServiceParams: apServiceParams,
allowedOrigins: allowedOrigins,
allowedOriginsCacheExpiration: allowedOriginsCacheExpiration,
allowedDIDWebDomains: allowedDIDWebDomains,
cas: casParams,
mqParams: mqParams,
opQueueParams: opQueueParams,
batchWriterTimeout: batchWriterTimeout,
anchorCredentialParams: anchorCredentialParams,
logLevel: loggingLevel,
dbParameters: dbParams,
discovery: discoveryParams,
witnessProof: witnessProofParams,
syncTimeout: syncTimeout,
didDiscoveryEnabled: didDiscoveryEnabled,
unpublishedOperations: unpublishedOperationsParams,
resolveFromAnchorOrigin: resolveFromAnchorOrigin,
verifyLatestFromAnchorOrigin: verifyLatestFromAnchorOrigin,
auth: authParams,
activityPub: activityPubParams,
enableDevMode: enableDevMode,
enableMaintenanceMode: enableMaintenanceMode,
enableVCT: enableVCT,
nodeInfoRefreshInterval: nodeInfoRefreshInterval,
contextProviderURLs: contextProviderURLs,
dataExpiryCheckInterval: dataExpiryCheckInterval,
taskMgrCheckInterval: taskMgrCheckInterval,
vct: vctParams,
anchorStatus: anchorStatusParams,
witnessPolicyCacheExpiration: witnessPolicyCacheExpiration,
dataURIMediaType: dataURIMediaType,
kmsParams: kmsParams,
requestTokens: requestTokens,
observability: observabilityParams,
anchorRefPendingRecordLifespan: anchorRefPendingRecordLifespan,
}, nil
}

Expand Down Expand Up @@ -1694,6 +1692,38 @@ func getActivityPubIRICacheParameters(cmd *cobra.Command) (int, time.Duration, e
})
}

type anchorStatusParams struct {
monitoringInterval time.Duration
maxRecordsPerInterval int
inProcessGracePeriod time.Duration
}

func getAnchorStatusParams(cmd *cobra.Command) (*anchorStatusParams, error) {
monitoringInterval, err := cmdutil.GetDuration(cmd, anchorStatusMonitoringIntervalFlagName,
anchorStatusMonitoringIntervalEnvKey, defaultAnchorStatusMonitoringInterval)
if err != nil {
return nil, fmt.Errorf("%s: %w", anchorStatusMonitoringIntervalFlagName, err)
}

maxRecords, err := cmdutil.GetInt(cmd, anchorStatusMaxRecordsFlagName,
anchorStatusMaxRecordsEnvKey, defaultAnchorStatusMaxRecords)
if err != nil {
return nil, fmt.Errorf("%s: %w", anchorStatusMaxRecordsFlagName, err)
}

inProcessGracePeriod, err := cmdutil.GetDuration(cmd, anchorStatusInProcessGracePeriodFlagName,
anchorStatusInProcessGracePeriodEnvKey, defaultAnchorStatusInProcessGracePeriod)
if err != nil {
return nil, fmt.Errorf("%s: %w", anchorStatusInProcessGracePeriodFlagName, err)
}

return &anchorStatusParams{
monitoringInterval: monitoringInterval,
maxRecordsPerInterval: maxRecords,
inProcessGracePeriod: inProcessGracePeriod,
}, nil
}

func getAllowedDIDWebDomains(cmd *cobra.Command) ([]*url.URL, error) {
allowedDIDWebDomainsArray, err := cmdutil.GetUserSetVarFromArrayString(cmd, allowedDIDWebDomainsFlagName,
allowedDIDWebDomainsEnvKey, true)
Expand Down Expand Up @@ -2449,6 +2479,7 @@ func createFlags(startCmd *cobra.Command) {
startCmd.Flags().StringP(vctLogMonitoringGetEntriesRangeFlagName, "", "", vctLogMonitoringGetEntriesRangeFlagUsage)
startCmd.Flags().StringP(vctLogEntriesStoreEnabledFlagName, "", "", vctLogEntriesStoreEnabledFlagUsage)
startCmd.Flags().StringP(anchorStatusMonitoringIntervalFlagName, "", "", anchorStatusMonitoringIntervalFlagUsage)
startCmd.Flags().StringP(anchorStatusMaxRecordsFlagName, "", "", anchorStatusMaxRecordsFlagUsage)
startCmd.Flags().StringP(anchorStatusInProcessGracePeriodFlagName, "", "", anchorStatusInProcessGracePeriodFlagUsage)
startCmd.Flags().StringP(witnessPolicyCacheExpirationFlagName, "", "", witnessPolicyCacheExpirationFlagUsage)
startCmd.Flags().StringP(activityPubClientCacheSizeFlagName, "", "", activityPubClientCacheSizeFlagUsage)
Expand Down
13 changes: 13 additions & 0 deletions cmd/orb-server/startcmd/params_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,19 @@ func TestStartCmdWithMissingArg(t *testing.T) {
require.Contains(t, err.Error(), "invalid value for anchor-status-monitoring-interval [xxx]")
})

t.Run("anchor status max records", func(t *testing.T) {
restoreEnv := setEnv(t, anchorStatusMaxRecordsEnvKey, "xxx")
defer restoreEnv()

startCmd := GetStartCmd()

startCmd.SetArgs(getTestArgs("localhost:8081", "local", "false", databaseTypeMemOption))

err := startCmd.Execute()
require.Error(t, err)
require.Contains(t, err.Error(), "invalid value for anchor-status-max-records [xxx]")
})

t.Run("anchor status in-process grace period", func(t *testing.T) {
restoreEnv := setEnv(t, anchorStatusInProcessGracePeriodEnvKey, "xxx")
defer restoreEnv()
Expand Down
15 changes: 9 additions & 6 deletions cmd/orb-server/startcmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,15 +807,17 @@ func startOrbServices(parameters *orbParameters) error {
return fmt.Errorf("failed to create witness policy inspector: %s", err.Error())
}

anchorEventStatusStore, err := anchorstatus.New(storeProviders.provider, expiryService,
parameters.witnessProof.maxWitnessDelay, anchorstatus.WithPolicyHandler(policyInspector),
anchorstatus.WithCheckStatusAfterTime(parameters.anchorStatusInProcessGracePeriod))
anchorEventStatusStore, err := anchorstatus.New(storeProviders.provider, taskMgr, expiryService,
parameters.witnessProof.maxWitnessDelay,
anchorstatus.WithPolicyHandler(policyInspector),
anchorstatus.WithMonitoringInterval(parameters.anchorStatus.monitoringInterval),
anchorstatus.WithMaxRecordsPerInterval(parameters.anchorStatus.maxRecordsPerInterval),
anchorstatus.WithCheckStatusAfterTime(parameters.anchorStatus.inProcessGracePeriod),
)
if err != nil {
return fmt.Errorf("failed to create vc status store: %s", err.Error())
}

taskMgr.RegisterTask("anchor-status-monitor", parameters.anchorStatusMonitoringInterval, anchorEventStatusStore.CheckInProcessAnchors)

pubSub := newPubSub(parameters)

proofHandler := proof.New(
Expand Down Expand Up @@ -1196,7 +1198,8 @@ func startOrbServices(parameters *orbParameters) error {
)

err = run(httpServer, activityPubService, opQueue, obsrv, batchWriter, taskMgr, apClient,
nodeInfoService, newMPLifecycleWrapper(mp), tracerProvider, proofMonitoringSvc)
nodeInfoService, newMPLifecycleWrapper(mp), tracerProvider, proofMonitoringSvc,
anchorEventStatusStore)
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions internal/pkg/log/fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ const (
FieldMaxActivitiesToSync = "maxActivitiesToSync"
FieldNumActivitiesSynced = "numActivitiesSynced"
FieldNextActivitySyncInterval = "nextActivitySyncInterval"
FieldMaxProofMonitorRecords = "maxProofMonitorRecords"
FieldRecordsProcessed = "recordsProcessed"
)

// WithMessageID sets the message-id field.
Expand Down Expand Up @@ -905,9 +905,9 @@ func WithNextActivitySyncInterval(value time.Duration) zap.Field {
return zap.Duration(FieldNextActivitySyncInterval, value)
}

// WithMaxProofMonitorRecords sets the maxProofMonitorRecords field.
func WithMaxProofMonitorRecords(value int) zap.Field {
return zap.Int(FieldMaxProofMonitorRecords, value)
// WithRecordsProcessed sets the recordsProcessed field.
func WithRecordsProcessed(value int) zap.Field {
return zap.Int(FieldRecordsProcessed, value)
}

type jsonMarshaller struct {
Expand Down
6 changes: 3 additions & 3 deletions internal/pkg/log/fields_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestStandardFields(t *testing.T) {
WithCreatedTime(now), WithWitnessURI(u1), WithWitnessURIs(u1, u2), WithWitnessPolicy("some policy"),
WithAnchorOrigin(u1.String()), WithOperationType("Create"), WithCoreIndex("1234"),
WithMaxOperationsToRepost(300), WithMaxActivitiesToSync(11), WithNextActivitySyncInterval(3*time.Second),
WithNumActivitiesSynced(123), WithMaxProofMonitorRecords(23),
WithNumActivitiesSynced(123), WithRecordsProcessed(23),
)

t.Logf(stdOut.String())
Expand Down Expand Up @@ -123,7 +123,7 @@ func TestStandardFields(t *testing.T) {
require.Equal(t, 11, l.MaxActivitiesToSync)
require.Equal(t, "3s", l.NextActivitySyncInterval)
require.Equal(t, 123, l.NumActivitiesSynced)
require.Equal(t, 23, l.MaxProofMonitorRecords)
require.Equal(t, 23, l.RecordsProcessed)
})

t.Run("json fields 2", func(t *testing.T) {
Expand Down Expand Up @@ -453,7 +453,7 @@ type logData struct {
MaxActivitiesToSync int `json:"maxActivitiesToSync"`
NextActivitySyncInterval string `json:"nextActivitySyncInterval"`
NumActivitiesSynced int `json:"numActivitiesSynced"`
MaxProofMonitorRecords int `json:"maxProofMonitorRecords"`
RecordsProcessed int `json:"recordsProcessed"`
}

func unmarshalLogData(t *testing.T, b []byte) *logData {
Expand Down
Loading

0 comments on commit 3a11195

Please sign in to comment.