Skip to content

Commit

Permalink
Extending metrics for subaccount-sync app (#721)
Browse files Browse the repository at this point in the history
* counting failures for CIS requests

metrics update back to main loop

extra logging

in-memory stats metrics

metrics for inmemory states corrected

one more call corrected

time conversion done

time conversion corrected with verbose logging

facepalm, we get interval from the queue not point in time

corrected sign

* comment removed
  • Loading branch information
jaroslaw-pieszka authored Apr 30, 2024
1 parent 61b436f commit 045cc86
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 16 deletions.
4 changes: 2 additions & 2 deletions internal/subaccountsync/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func NewMetrics(reg prometheus.Registerer, namespace string) *Metrics {
Namespace: namespace,
Name: "in_memory_states",
Help: "Information about in-memory states.",
}, []string{"type"}),
}, []string{"type", "value"}),
queueOps: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "priority_queue_ops",
Expand All @@ -27,7 +27,7 @@ func NewMetrics(reg prometheus.Registerer, namespace string) *Metrics {
Namespace: namespace,
Name: "cis_requests",
Help: "CIS requests.",
}, []string{"endpoint"}),
}, []string{"endpoint", "status"}),
informer: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "informer",
Expand Down
50 changes: 39 additions & 11 deletions internal/subaccountsync/state_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ import (
"github.com/kyma-project/kyma-environment-broker/internal/syncqueues"
)

const (
usedForProductionLabel = "USED_FOR_PRODUCTION"
notUsedForProductionLabel = "NOT_USED_FOR_PRODUCTION"
)

func (reconciler *stateReconcilerType) recreateStateFromDB() {
logs := reconciler.logger
dbStates, err := reconciler.db.SubaccountStates().ListStates()
Expand Down Expand Up @@ -64,28 +69,39 @@ func (reconciler *stateReconcilerType) setMetrics() {
if reconciler.metrics == nil {
return
}
reconciler.metrics.states.With(prometheus.Labels{"type": "total"}).Set(float64(len(reconciler.inMemoryState)))
total := len(reconciler.inMemoryState)
reconciler.metrics.states.With(prometheus.Labels{"type": "total", "value": "total"}).Set(float64(total))
//count subaccounts with beta enabled
betaEnabled := 0
betaDisabled := 0
//create map for UsedForProduction
usedForProduction := make(map[string]int)
for _, state := range reconciler.inMemoryState {
if state.cisState != (CisStateType{}) {
if state.cisState.BetaEnabled {
betaEnabled++
} else {
betaDisabled++
}
//increment counter for UsedForProduction
usedForProduction[state.cisState.UsedForProduction]++
}
}
reconciler.metrics.states.With(prometheus.Labels{"type": "betaEnabled"}).Set(float64(betaEnabled))
// for each UsedForProduction value set the counter
reconciler.metrics.states.With(prometheus.Labels{"type": "betaEnabled", "value": "true"}).Set(float64(betaEnabled))
reconciler.metrics.states.With(prometheus.Labels{"type": "betaEnabled", "value": "false"}).Set(float64(betaDisabled))

others := 0
for key, value := range usedForProduction {
reconciler.metrics.states.With(prometheus.Labels{"type": key}).Set(float64(value))
if key != usedForProductionLabel && key != notUsedForProductionLabel {
others += value
}
}
reconciler.metrics.states.With(prometheus.Labels{"type": usedForProductionLabel, "value": usedForProductionLabel}).Set(float64(usedForProduction[usedForProductionLabel]))
reconciler.metrics.states.With(prometheus.Labels{"type": usedForProductionLabel, "value": notUsedForProductionLabel}).Set(float64(usedForProduction[notUsedForProductionLabel]))
reconciler.metrics.states.With(prometheus.Labels{"type": usedForProductionLabel, "value": "others"}).Set(float64(others))
}

func (reconciler *stateReconcilerType) periodicAccountsSync() {
func (reconciler *stateReconcilerType) periodicAccountsSync() (successes int, failures int) {
logs := reconciler.logger

// get distinct subaccounts from inMemoryState
Expand All @@ -99,23 +115,28 @@ func (reconciler *stateReconcilerType) periodicAccountsSync() {
continue
}
if err != nil {
failures++
logs.Error(fmt.Sprintf("while getting data for subaccount:%s", err))
} else {
successes++
reconciler.reconcileCisAccount(subaccountID, subaccountDataFromCis)
}
}
return successes, failures
}

func (reconciler *stateReconcilerType) periodicEventsSync(fromActionTime int64) {
func (reconciler *stateReconcilerType) periodicEventsSync(fromActionTime int64) (success bool) {

logs := reconciler.logger
eventsClient := reconciler.eventsClient
subaccountsSet := reconciler.getAllSubaccountIDsFromState()
success = true

logs.Info(fmt.Sprintf("Running CIS events synchronization from epoch: %d for %d subaccounts", fromActionTime, len(subaccountsSet)))

eventsOfInterest, err := eventsClient.getEventsForSubaccounts(fromActionTime, *logs, subaccountsSet)
if err != nil {
success = false
logs.Error(fmt.Sprintf("while getting subaccount events: %s", err))
// we will retry in the next run
}
Expand All @@ -125,6 +146,7 @@ func (reconciler *stateReconcilerType) periodicEventsSync(fromActionTime int64)
reconciler.eventWindow.UpdateToTime(event.ActionTime)
}
logs.Debug(fmt.Sprintf("Events synchronization finished, the most recent reconciled event time: %d", reconciler.eventWindow.lastToTime))
return success
}

func (reconciler *stateReconcilerType) getAllSubaccountIDsFromState() subaccountsSetType {
Expand All @@ -144,8 +166,12 @@ func (reconciler *stateReconcilerType) runCronJobs(cfg Config, ctx context.Conte
// establish actual time window
eventsFrom := reconciler.eventWindow.GetNextFromTime()

reconciler.periodicEventsSync(eventsFrom)
reconciler.metrics.cisRequests.With(prometheus.Labels{"endpoint": "events"}).Inc()
ok := reconciler.periodicEventsSync(eventsFrom)
if ok {
reconciler.metrics.cisRequests.With(prometheus.Labels{"endpoint": "events", "status": "success"}).Inc()
} else {
reconciler.metrics.cisRequests.With(prometheus.Labels{"endpoint": "events", "status": "failure"}).Inc()
}

reconciler.eventWindow.UpdateFromTime(eventsFrom)
logs.Debug(fmt.Sprintf("Running events synchronization from epoch: %d, lastFromTime: %d, lastToTime: %d", eventsFrom, reconciler.eventWindow.lastFromTime, reconciler.eventWindow.lastToTime))
Expand All @@ -155,8 +181,10 @@ func (reconciler *stateReconcilerType) runCronJobs(cfg Config, ctx context.Conte
}

_, err = s.Every(cfg.AccountsSyncInterval).Do(func() {
reconciler.periodicAccountsSync()
reconciler.metrics.cisRequests.With(prometheus.Labels{"endpoint": "accounts"}).Inc()
successes, failures := reconciler.periodicAccountsSync()

reconciler.metrics.cisRequests.With(prometheus.Labels{"endpoint": "accounts", "status": "failure"}).Add(float64(failures))
reconciler.metrics.cisRequests.With(prometheus.Labels{"endpoint": "accounts", "status": "success"}).Add(float64(successes))
})
if err != nil {
logs.Error(fmt.Sprintf("while scheduling accounts sync job: %s", err))
Expand Down Expand Up @@ -292,7 +320,7 @@ func (reconciler *stateReconcilerType) isResourceOutdated(state subaccountStateT
runtimes := state.resourcesState
cisState := state.cisState
for _, runtimeState := range runtimes {
outdated = outdated || runtimeState.betaEnabled == "" // label not set at all
outdated = outdated || runtimeState.betaEnabled == ""
outdated = outdated || (cisState.BetaEnabled && runtimeState.betaEnabled != "true")
outdated = outdated || (!cisState.BetaEnabled && runtimeState.betaEnabled != "false")
}
Expand Down
5 changes: 3 additions & 2 deletions internal/subaccountsync/subaccount_sync_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,11 @@ func (s *SyncService) Run() {
metrics.queue.Set(float64(queueSize))
metrics.queueOps.With(prometheus.Labels{"operation": "insert"}).Inc()
},
OnExtract: func(queueSize int, timeEnqueued int64) {
OnExtract: func(queueSize int, timeEnqueuedNano int64) {
metrics.queue.Set(float64(queueSize))
metrics.queueOps.With(prometheus.Labels{"operation": "extract"}).Inc()
metrics.timeInQueue.Set(float64(epochInMillis() - timeEnqueued))
timeEnqueuedMillis := timeEnqueuedNano / int64(time.Millisecond)
metrics.timeInQueue.Set(float64(timeEnqueuedMillis))
},
})

Expand Down
2 changes: 1 addition & 1 deletion internal/syncqueues/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (q *SubaccountAwarePriorityQueueWithCallbacks) Extract() (QueueElement, boo
q.siftDown()

if q.eventHandler != nil && q.eventHandler.OnExtract != nil {
q.eventHandler.OnExtract(q.size, e.entryTime-time.Now().UnixNano())
q.eventHandler.OnExtract(q.size, time.Now().UnixNano()-e.entryTime)
}
return e.QueueElement, true
}
Expand Down

0 comments on commit 045cc86

Please sign in to comment.