Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Exit anchor status monitoring loop on shutdown #1575

Merged
merged 1 commit into from
May 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 122 additions & 91 deletions cmd/orb-server/startcmd/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,9 @@ const (
defaultVCTLogMonitoringMaxTreeSize = 50000
defaultVCTLogMonitoringGetEntriesRange = 1000
defaultVCTLogEntriesStoreEnabled = false
defaultAnchorStatusMonitoringInterval = 5 * time.Second
defaultAnchorStatusInProcessGracePeriod = 30 * time.Second
defaultAnchorStatusMonitoringInterval = 10 * time.Second
defaultAnchorStatusMaxRecords = 500
defaultAnchorStatusInProcessGracePeriod = time.Minute
mqDefaultMaxConnectionSubscriptions = 1000
mqDefaultPublisherChannelPoolSize = 25
mqDefaultPublisherConfirmDelivery = true
Expand Down Expand Up @@ -226,6 +227,12 @@ const (
"witnessed(completed) as per policy. Defaults to 5s if not set. " +
commonEnvVarUsageText + anchorStatusMonitoringIntervalEnvKey

anchorStatusMaxRecordsFlagName = "anchor-status-max-records"
anchorStatusMaxRecordsEnvKey = "ANCHOR_STATUS_MAX_RECORDS"
anchorStatusMaxRecordsFlagUsage = "The maximum number of anchor status records to process per monitoring interval " +
"Defaults to 500 if not set. " +
commonEnvVarUsageText + anchorStatusMaxRecordsEnvKey

anchorStatusInProcessGracePeriodFlagName = "anchor-status-in-process-grace-period"
anchorStatusInProcessGracePeriodEnvKey = "ANCHOR_STATUS_IN_PROCESS_GRACE_PERIOD"
anchorStatusInProcessGracePeriodFlagUsage = "The period in which witnesses will not be re-selected for 'in-process' anchors." +
Expand Down Expand Up @@ -764,47 +771,46 @@ type tlsParameters struct {
}

type orbParameters struct {
http *httpParams
sidetree *sidetreeParams
apServiceParams *apServiceParams
discoveryDomain string
dataURIMediaType datauri.MediaType
batchWriterTimeout time.Duration
cas *casParams
mqParams *mqParams
opQueueParams *opqueue.Config
dbParameters *dbParameters
logLevel string
methodContext []string
baseEnabled bool
allowedOrigins []string
allowedOriginsCacheExpiration time.Duration
anchorCredentialParams *anchorCredentialParams
discovery *discoveryParams
witnessProof *witnessProofParams
syncTimeout uint64
didDiscoveryEnabled bool
unpublishedOperations *unpublishedOperationsStoreParams
resolveFromAnchorOrigin bool
verifyLatestFromAnchorOrigin bool
activityPub *activityPubParams
auth *authParams
enableDevMode bool
enableMaintenanceMode bool
enableVCT bool
nodeInfoRefreshInterval time.Duration
contextProviderURLs []string
dataExpiryCheckInterval time.Duration
taskMgrCheckInterval time.Duration
vct *vctParams
anchorStatusMonitoringInterval time.Duration
anchorStatusInProcessGracePeriod time.Duration
witnessPolicyCacheExpiration time.Duration
kmsParams *kmsParameters
requestTokens map[string]string
allowedDIDWebDomains []*url.URL
observability *observabilityParams
anchorRefPendingRecordLifespan time.Duration
http *httpParams
sidetree *sidetreeParams
apServiceParams *apServiceParams
discoveryDomain string
dataURIMediaType datauri.MediaType
batchWriterTimeout time.Duration
cas *casParams
mqParams *mqParams
opQueueParams *opqueue.Config
dbParameters *dbParameters
logLevel string
methodContext []string
baseEnabled bool
allowedOrigins []string
allowedOriginsCacheExpiration time.Duration
anchorCredentialParams *anchorCredentialParams
discovery *discoveryParams
witnessProof *witnessProofParams
syncTimeout uint64
didDiscoveryEnabled bool
unpublishedOperations *unpublishedOperationsStoreParams
resolveFromAnchorOrigin bool
verifyLatestFromAnchorOrigin bool
activityPub *activityPubParams
auth *authParams
enableDevMode bool
enableMaintenanceMode bool
enableVCT bool
nodeInfoRefreshInterval time.Duration
contextProviderURLs []string
dataExpiryCheckInterval time.Duration
taskMgrCheckInterval time.Duration
vct *vctParams
anchorStatus *anchorStatusParams
witnessPolicyCacheExpiration time.Duration
kmsParams *kmsParameters
requestTokens map[string]string
allowedDIDWebDomains []*url.URL
observability *observabilityParams
anchorRefPendingRecordLifespan time.Duration
}

type observabilityParams struct {
Expand Down Expand Up @@ -1127,16 +1133,9 @@ func getOrbParameters(cmd *cobra.Command) (*orbParameters, error) {
return nil, err
}

anchorStatusMonitoringInterval, err := cmdutil.GetDuration(cmd, anchorStatusMonitoringIntervalFlagName,
anchorStatusMonitoringIntervalEnvKey, defaultAnchorStatusMonitoringInterval)
if err != nil {
return nil, fmt.Errorf("%s: %w", anchorStatusMonitoringIntervalFlagName, err)
}

anchorStatusInProcessGracePeriod, err := cmdutil.GetDuration(cmd, anchorStatusInProcessGracePeriodFlagName,
anchorStatusInProcessGracePeriodEnvKey, defaultAnchorStatusInProcessGracePeriod)
anchorStatusParams, err := getAnchorStatusParams(cmd)
if err != nil {
return nil, fmt.Errorf("%s: %w", anchorStatusInProcessGracePeriodFlagName, err)
return nil, err
}

witnessPolicyCacheExpiration, err := cmdutil.GetDuration(cmd, witnessPolicyCacheExpirationFlagName,
Expand All @@ -1161,45 +1160,44 @@ func getOrbParameters(cmd *cobra.Command) (*orbParameters, error) {
}

return &orbParameters{
http: httpParams,
sidetree: sidetreeParams,
discoveryDomain: discoveryDomain,
apServiceParams: apServiceParams,
allowedOrigins: allowedOrigins,
allowedOriginsCacheExpiration: allowedOriginsCacheExpiration,
allowedDIDWebDomains: allowedDIDWebDomains,
cas: casParams,
mqParams: mqParams,
opQueueParams: opQueueParams,
batchWriterTimeout: batchWriterTimeout,
anchorCredentialParams: anchorCredentialParams,
logLevel: loggingLevel,
dbParameters: dbParams,
discovery: discoveryParams,
witnessProof: witnessProofParams,
syncTimeout: syncTimeout,
didDiscoveryEnabled: didDiscoveryEnabled,
unpublishedOperations: unpublishedOperationsParams,
resolveFromAnchorOrigin: resolveFromAnchorOrigin,
verifyLatestFromAnchorOrigin: verifyLatestFromAnchorOrigin,
auth: authParams,
activityPub: activityPubParams,
enableDevMode: enableDevMode,
enableMaintenanceMode: enableMaintenanceMode,
enableVCT: enableVCT,
nodeInfoRefreshInterval: nodeInfoRefreshInterval,
contextProviderURLs: contextProviderURLs,
dataExpiryCheckInterval: dataExpiryCheckInterval,
taskMgrCheckInterval: taskMgrCheckInterval,
vct: vctParams,
anchorStatusMonitoringInterval: anchorStatusMonitoringInterval,
anchorStatusInProcessGracePeriod: anchorStatusInProcessGracePeriod,
witnessPolicyCacheExpiration: witnessPolicyCacheExpiration,
dataURIMediaType: dataURIMediaType,
kmsParams: kmsParams,
requestTokens: requestTokens,
observability: observabilityParams,
anchorRefPendingRecordLifespan: anchorRefPendingRecordLifespan,
http: httpParams,
sidetree: sidetreeParams,
discoveryDomain: discoveryDomain,
apServiceParams: apServiceParams,
allowedOrigins: allowedOrigins,
allowedOriginsCacheExpiration: allowedOriginsCacheExpiration,
allowedDIDWebDomains: allowedDIDWebDomains,
cas: casParams,
mqParams: mqParams,
opQueueParams: opQueueParams,
batchWriterTimeout: batchWriterTimeout,
anchorCredentialParams: anchorCredentialParams,
logLevel: loggingLevel,
dbParameters: dbParams,
discovery: discoveryParams,
witnessProof: witnessProofParams,
syncTimeout: syncTimeout,
didDiscoveryEnabled: didDiscoveryEnabled,
unpublishedOperations: unpublishedOperationsParams,
resolveFromAnchorOrigin: resolveFromAnchorOrigin,
verifyLatestFromAnchorOrigin: verifyLatestFromAnchorOrigin,
auth: authParams,
activityPub: activityPubParams,
enableDevMode: enableDevMode,
enableMaintenanceMode: enableMaintenanceMode,
enableVCT: enableVCT,
nodeInfoRefreshInterval: nodeInfoRefreshInterval,
contextProviderURLs: contextProviderURLs,
dataExpiryCheckInterval: dataExpiryCheckInterval,
taskMgrCheckInterval: taskMgrCheckInterval,
vct: vctParams,
anchorStatus: anchorStatusParams,
witnessPolicyCacheExpiration: witnessPolicyCacheExpiration,
dataURIMediaType: dataURIMediaType,
kmsParams: kmsParams,
requestTokens: requestTokens,
observability: observabilityParams,
anchorRefPendingRecordLifespan: anchorRefPendingRecordLifespan,
}, nil
}

Expand Down Expand Up @@ -1694,6 +1692,38 @@ func getActivityPubIRICacheParameters(cmd *cobra.Command) (int, time.Duration, e
})
}

type anchorStatusParams struct {
monitoringInterval time.Duration
maxRecordsPerInterval int
inProcessGracePeriod time.Duration
}

func getAnchorStatusParams(cmd *cobra.Command) (*anchorStatusParams, error) {
monitoringInterval, err := cmdutil.GetDuration(cmd, anchorStatusMonitoringIntervalFlagName,
anchorStatusMonitoringIntervalEnvKey, defaultAnchorStatusMonitoringInterval)
if err != nil {
return nil, fmt.Errorf("%s: %w", anchorStatusMonitoringIntervalFlagName, err)
}

maxRecords, err := cmdutil.GetInt(cmd, anchorStatusMaxRecordsFlagName,
anchorStatusMaxRecordsEnvKey, defaultAnchorStatusMaxRecords)
if err != nil {
return nil, fmt.Errorf("%s: %w", anchorStatusMaxRecordsFlagName, err)
}

inProcessGracePeriod, err := cmdutil.GetDuration(cmd, anchorStatusInProcessGracePeriodFlagName,
anchorStatusInProcessGracePeriodEnvKey, defaultAnchorStatusInProcessGracePeriod)
if err != nil {
return nil, fmt.Errorf("%s: %w", anchorStatusInProcessGracePeriodFlagName, err)
}

return &anchorStatusParams{
monitoringInterval: monitoringInterval,
maxRecordsPerInterval: maxRecords,
inProcessGracePeriod: inProcessGracePeriod,
}, nil
}

func getAllowedDIDWebDomains(cmd *cobra.Command) ([]*url.URL, error) {
allowedDIDWebDomainsArray, err := cmdutil.GetUserSetVarFromArrayString(cmd, allowedDIDWebDomainsFlagName,
allowedDIDWebDomainsEnvKey, true)
Expand Down Expand Up @@ -2449,6 +2479,7 @@ func createFlags(startCmd *cobra.Command) {
startCmd.Flags().StringP(vctLogMonitoringGetEntriesRangeFlagName, "", "", vctLogMonitoringGetEntriesRangeFlagUsage)
startCmd.Flags().StringP(vctLogEntriesStoreEnabledFlagName, "", "", vctLogEntriesStoreEnabledFlagUsage)
startCmd.Flags().StringP(anchorStatusMonitoringIntervalFlagName, "", "", anchorStatusMonitoringIntervalFlagUsage)
startCmd.Flags().StringP(anchorStatusMaxRecordsFlagName, "", "", anchorStatusMaxRecordsFlagUsage)
startCmd.Flags().StringP(anchorStatusInProcessGracePeriodFlagName, "", "", anchorStatusInProcessGracePeriodFlagUsage)
startCmd.Flags().StringP(witnessPolicyCacheExpirationFlagName, "", "", witnessPolicyCacheExpirationFlagUsage)
startCmd.Flags().StringP(activityPubClientCacheSizeFlagName, "", "", activityPubClientCacheSizeFlagUsage)
Expand Down
13 changes: 13 additions & 0 deletions cmd/orb-server/startcmd/params_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,19 @@ func TestStartCmdWithMissingArg(t *testing.T) {
require.Contains(t, err.Error(), "invalid value for anchor-status-monitoring-interval [xxx]")
})

t.Run("anchor status max records", func(t *testing.T) {
restoreEnv := setEnv(t, anchorStatusMaxRecordsEnvKey, "xxx")
defer restoreEnv()

startCmd := GetStartCmd()

startCmd.SetArgs(getTestArgs("localhost:8081", "local", "false", databaseTypeMemOption))

err := startCmd.Execute()
require.Error(t, err)
require.Contains(t, err.Error(), "invalid value for anchor-status-max-records [xxx]")
})

t.Run("anchor status in-process grace period", func(t *testing.T) {
restoreEnv := setEnv(t, anchorStatusInProcessGracePeriodEnvKey, "xxx")
defer restoreEnv()
Expand Down
15 changes: 9 additions & 6 deletions cmd/orb-server/startcmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,15 +807,17 @@ func startOrbServices(parameters *orbParameters) error {
return fmt.Errorf("failed to create witness policy inspector: %s", err.Error())
}

anchorEventStatusStore, err := anchorstatus.New(storeProviders.provider, expiryService,
parameters.witnessProof.maxWitnessDelay, anchorstatus.WithPolicyHandler(policyInspector),
anchorstatus.WithCheckStatusAfterTime(parameters.anchorStatusInProcessGracePeriod))
anchorEventStatusStore, err := anchorstatus.New(storeProviders.provider, taskMgr, expiryService,
parameters.witnessProof.maxWitnessDelay,
anchorstatus.WithPolicyHandler(policyInspector),
anchorstatus.WithMonitoringInterval(parameters.anchorStatus.monitoringInterval),
anchorstatus.WithMaxRecordsPerInterval(parameters.anchorStatus.maxRecordsPerInterval),
anchorstatus.WithCheckStatusAfterTime(parameters.anchorStatus.inProcessGracePeriod),
)
if err != nil {
return fmt.Errorf("failed to create vc status store: %s", err.Error())
}

taskMgr.RegisterTask("anchor-status-monitor", parameters.anchorStatusMonitoringInterval, anchorEventStatusStore.CheckInProcessAnchors)

pubSub := newPubSub(parameters)

proofHandler := proof.New(
Expand Down Expand Up @@ -1196,7 +1198,8 @@ func startOrbServices(parameters *orbParameters) error {
)

err = run(httpServer, activityPubService, opQueue, obsrv, batchWriter, taskMgr, apClient,
nodeInfoService, newMPLifecycleWrapper(mp), tracerProvider, proofMonitoringSvc)
nodeInfoService, newMPLifecycleWrapper(mp), tracerProvider, proofMonitoringSvc,
anchorEventStatusStore)
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions internal/pkg/log/fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ const (
FieldMaxActivitiesToSync = "maxActivitiesToSync"
FieldNumActivitiesSynced = "numActivitiesSynced"
FieldNextActivitySyncInterval = "nextActivitySyncInterval"
FieldMaxProofMonitorRecords = "maxProofMonitorRecords"
FieldRecordsProcessed = "recordsProcessed"
)

// WithMessageID sets the message-id field.
Expand Down Expand Up @@ -905,9 +905,9 @@ func WithNextActivitySyncInterval(value time.Duration) zap.Field {
return zap.Duration(FieldNextActivitySyncInterval, value)
}

// WithMaxProofMonitorRecords sets the maxProofMonitorRecords field.
func WithMaxProofMonitorRecords(value int) zap.Field {
return zap.Int(FieldMaxProofMonitorRecords, value)
// WithRecordsProcessed sets the recordsProcessed field.
func WithRecordsProcessed(value int) zap.Field {
return zap.Int(FieldRecordsProcessed, value)
}

type jsonMarshaller struct {
Expand Down
6 changes: 3 additions & 3 deletions internal/pkg/log/fields_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestStandardFields(t *testing.T) {
WithCreatedTime(now), WithWitnessURI(u1), WithWitnessURIs(u1, u2), WithWitnessPolicy("some policy"),
WithAnchorOrigin(u1.String()), WithOperationType("Create"), WithCoreIndex("1234"),
WithMaxOperationsToRepost(300), WithMaxActivitiesToSync(11), WithNextActivitySyncInterval(3*time.Second),
WithNumActivitiesSynced(123), WithMaxProofMonitorRecords(23),
WithNumActivitiesSynced(123), WithRecordsProcessed(23),
)

t.Logf(stdOut.String())
Expand Down Expand Up @@ -123,7 +123,7 @@ func TestStandardFields(t *testing.T) {
require.Equal(t, 11, l.MaxActivitiesToSync)
require.Equal(t, "3s", l.NextActivitySyncInterval)
require.Equal(t, 123, l.NumActivitiesSynced)
require.Equal(t, 23, l.MaxProofMonitorRecords)
require.Equal(t, 23, l.RecordsProcessed)
})

t.Run("json fields 2", func(t *testing.T) {
Expand Down Expand Up @@ -453,7 +453,7 @@ type logData struct {
MaxActivitiesToSync int `json:"maxActivitiesToSync"`
NextActivitySyncInterval string `json:"nextActivitySyncInterval"`
NumActivitiesSynced int `json:"numActivitiesSynced"`
MaxProofMonitorRecords int `json:"maxProofMonitorRecords"`
RecordsProcessed int `json:"recordsProcessed"`
}

func unmarshalLogData(t *testing.T, b []byte) *logData {
Expand Down
Loading