Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.17 Manual backport: Current month inconsistencies #28088

Merged
merged 5 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions changelog/27547.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
```release-note:improvement
activity log: Changes how new client counts in the current month are estimated, in order to return more
visibly sensible totals.
```
3 changes: 3 additions & 0 deletions changelog/28042.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
activity: The sys/internal/counters/activity endpoint will return current month data when the end_date parameter is set to a future date.
```
3 changes: 3 additions & 0 deletions changelog/28062.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
core/activity: Ensure client count queries that include the current month return consistent results by sorting the clients before performing estimation
```
117 changes: 23 additions & 94 deletions vault/activity_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -1715,30 +1715,6 @@ type ResponseNamespace struct {
Mounts []*ResponseMount `json:"mounts"`
}

// Add adds the namespace counts to the existing record, then either adds the
// mount counts to the existing mount (if it exists) or appends the mount to the
// list of mounts
func (r *ResponseNamespace) Add(newRecord *ResponseNamespace) {
// Create a map of the existing mounts, so we don't duplicate them
mountMap := make(map[string]*ResponseCounts)
for _, erm := range r.Mounts {
mountMap[erm.MountPath] = erm.Counts
}

r.Counts.Add(&newRecord.Counts)

// Check the current month mounts against the existing mounts and if there are matches, update counts
// accordingly. If there is no match, append the new mount to the existing mounts, so it will be counted
// later.
for _, newRecordMount := range newRecord.Mounts {
if existingRecordMountCounts, ok := mountMap[newRecordMount.MountPath]; ok {
existingRecordMountCounts.Add(newRecordMount.Counts)
} else {
r.Mounts = append(r.Mounts, newRecordMount)
}
}
}

type ResponseMonth struct {
Timestamp string `json:"timestamp"`
Counts *ResponseCounts `json:"counts"`
Expand Down Expand Up @@ -1793,12 +1769,20 @@ func (a *ActivityLog) handleQuery(ctx context.Context, startTime, endTime time.T
startTime = timeutil.StartOfMonth(startTime)
endTime = timeutil.EndOfMonth(endTime)

// At the max, we only want to return data up until the end of the current month.
// Adjust the end time be the current month if a future date has been provided.
endOfCurrentMonth := timeutil.EndOfMonth(a.clock.Now().UTC())
adjustedEndTime := endTime
if endTime.After(endOfCurrentMonth) {
adjustedEndTime = endOfCurrentMonth
}

// If the endTime of the query is the current month, request data from the queryStore
// with the endTime equal to the end of the last month, and add in the current month
// data.
precomputedQueryEndTime := endTime
if timeutil.IsCurrentMonth(endTime, a.clock.Now().UTC()) {
precomputedQueryEndTime = timeutil.EndOfMonth(timeutil.MonthsPreviousTo(1, timeutil.StartOfMonth(endTime)))
precomputedQueryEndTime := adjustedEndTime
if timeutil.IsCurrentMonth(adjustedEndTime, a.clock.Now().UTC()) {
precomputedQueryEndTime = timeutil.EndOfMonth(timeutil.MonthsPreviousTo(1, timeutil.StartOfMonth(adjustedEndTime)))
computePartial = true
}

Expand Down Expand Up @@ -1831,54 +1815,29 @@ func (a *ActivityLog) handleQuery(ctx context.Context, startTime, endTime time.T
pq = storedQuery
}

// Calculate the namespace response breakdowns and totals for entities and tokens from the initial
// namespace data.
totalCounts, byNamespaceResponse, err := a.calculateByNamespaceResponseForQuery(ctx, pq.Namespaces)
if err != nil {
return nil, err
}

// If we need to add the current month's client counts into the total, compute the namespace
// breakdown for the current month as well.
var partialByMonth map[int64]*processMonth
var partialByNamespace map[string]*processByNamespace
var byNamespaceResponseCurrent []*ResponseNamespace
var totalCurrentCounts *ResponseCounts
if computePartial {
// Traverse through current month's activitylog data and group clients
// into months and namespaces
a.fragmentLock.RLock()
partialByMonth, partialByNamespace = a.populateNamespaceAndMonthlyBreakdowns()
partialByMonth, _ = a.populateNamespaceAndMonthlyBreakdowns()
a.fragmentLock.RUnlock()

// Convert the byNamespace breakdowns into structs that are
// consumable by the /activity endpoint, so as to reuse code between these two
// endpoints.
byNamespaceComputation := a.transformALNamespaceBreakdowns(partialByNamespace)

// Calculate the namespace response breakdowns and totals for entities
// and tokens from current month namespace data.
totalCurrentCounts, byNamespaceResponseCurrent, err = a.calculateByNamespaceResponseForQuery(ctx, byNamespaceComputation)
// Estimate the current month totals. These record contains is complete with all the
// current month data, grouped by namespace and mounts
currentMonth, err := a.computeCurrentMonthForBillingPeriod(ctx, partialByMonth, startTime, adjustedEndTime)
if err != nil {
return nil, err
}

// Create a mapping of namespace id to slice index, so that we can efficiently update our results without
// having to traverse the entire namespace response slice every time.
nsrMap := make(map[string]int)
for i, nr := range byNamespaceResponse {
nsrMap[nr.NamespaceID] = i
}
// Combine the existing months precomputed query with the current month data
pq.CombineWithCurrentMonth(currentMonth)
}

// Rather than blindly appending, which will create duplicates, check our existing counts against the current
// month counts, and append or update as necessary. We also want to account for mounts and their counts.
for _, nrc := range byNamespaceResponseCurrent {
if ndx, ok := nsrMap[nrc.NamespaceID]; ok {
byNamespaceResponse[ndx].Add(nrc)
} else {
byNamespaceResponse = append(byNamespaceResponse, nrc)
}
}
// Convert the namespace data into a protobuf format that can be returned in the response
totalCounts, byNamespaceResponse, err := a.calculateByNamespaceResponseForQuery(ctx, pq.Namespaces)
if err != nil {
return nil, err
}

// Sort clients within each namespace
Expand All @@ -1888,34 +1847,6 @@ func (a *ActivityLog) handleQuery(ctx context.Context, startTime, endTime time.T
totalCounts, byNamespaceResponse = a.limitNamespacesInALResponse(byNamespaceResponse, limitNamespaces)
}

distinctEntitiesResponse := totalCounts.EntityClients
if computePartial {
currentMonth, err := a.computeCurrentMonthForBillingPeriod(ctx, partialByMonth, startTime, endTime)
if err != nil {
return nil, err
}

// Add the namespace attribution for the current month to the newly computed current month value. Note
// that transformMonthBreakdowns calculates a superstruct of the required namespace struct due to its
// primary use-case being for precomputedQueryWorker, but we will reuse this code for brevity and extract
// the namespaces from it.
currentMonthNamespaceAttribution := a.transformMonthBreakdowns(partialByMonth)

// Ensure that there is only one element in this list -- if not, warn.
if len(currentMonthNamespaceAttribution) > 1 {
a.logger.Warn("more than one month worth of namespace and mount attribution calculated for "+
"current month values", "number of months", len(currentMonthNamespaceAttribution))
}
if len(currentMonthNamespaceAttribution) == 0 {
a.logger.Warn("no month data found, returning query with no namespace attribution for current month")
} else {
currentMonth.Namespaces = currentMonthNamespaceAttribution[0].Namespaces
currentMonth.NewClients.Namespaces = currentMonthNamespaceAttribution[0].NewClients.Namespaces
}
pq.Months = append(pq.Months, currentMonth)
distinctEntitiesResponse += pq.Months[len(pq.Months)-1].NewClients.Counts.EntityClients
}

// Now populate the response based on breakdowns.
responseData := make(map[string]interface{})
responseData["start_time"] = pq.StartTime.Format(time.RFC3339)
Expand All @@ -1932,8 +1863,6 @@ func (a *ActivityLog) handleQuery(ctx context.Context, startTime, endTime time.T
}

responseData["by_namespace"] = byNamespaceResponse
totalCounts.Add(totalCurrentCounts)
totalCounts.DistinctEntities = distinctEntitiesResponse
responseData["total"] = totalCounts

// Create and populate the month response structs based on the monthly breakdown.
Expand All @@ -1946,7 +1875,7 @@ func (a *ActivityLog) handleQuery(ctx context.Context, startTime, endTime time.T
a.sortActivityLogMonthsResponse(months)

// Modify the final month output to make response more consumable based on API request
months = a.modifyResponseMonths(months, startTime, endTime)
months = a.modifyResponseMonths(months, startTime, adjustedEndTime)
responseData["months"] = months

return responseData, nil
Expand Down
58 changes: 55 additions & 3 deletions vault/activity_log_util_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"fmt"
"io"
"slices"
"sort"
"strings"
"time"
Expand Down Expand Up @@ -156,20 +157,30 @@ func (a *ActivityLog) computeCurrentMonthForBillingPeriodInternal(ctx context.Co
return nil, errors.New("malformed current month used to calculate current month's activity")
}

for nsID, namespace := range month.Namespaces {
namespaces := month.Namespaces.sort()
for _, n := range namespaces {
nsID := n.id
namespace := n.processByNamespace
namespaceActivity := &activity.MonthlyNamespaceRecord{NamespaceID: nsID, Counts: &activity.CountsRecord{}}
newNamespaceActivity := &activity.MonthlyNamespaceRecord{NamespaceID: nsID, Counts: &activity.CountsRecord{}}
mountsActivity := make([]*activity.MountRecord, 0)
newMountsActivity := make([]*activity.MountRecord, 0)

for mountAccessor, mount := range namespace.Mounts {
mounts := namespace.Mounts.sort()
for _, m := range mounts {
mountAccessor := m.accessor
mount := m.processMount
mountPath := a.mountAccessorToMountPath(mountAccessor)

mountCounts := &activity.CountsRecord{}
newMountCounts := &activity.CountsRecord{}

for _, typ := range ActivityClientTypes {
for clientID := range mount.Counts.clientsByType(typ) {
clients := mount.Counts.clientsByType(typ)
clientIDs := clients.sort()

// sort the client IDs before inserting
for _, clientID := range clientIDs {
hllByType[typ].Insert([]byte(clientID))

// increment the per mount, per namespace, and total counts
Expand Down Expand Up @@ -241,6 +252,47 @@ func (a *ActivityLog) incrementCount(c *activity.CountsRecord, num int, typ stri
}
}

type processByNamespaceID struct {
id string
*processByNamespace
}

func (s summaryByNamespace) sort() []*processByNamespaceID {
namespaces := make([]*processByNamespaceID, 0, len(s))
for nsID, namespace := range s {
namespaces = append(namespaces, &processByNamespaceID{id: nsID, processByNamespace: namespace})
}
slices.SortStableFunc(namespaces, func(a, b *processByNamespaceID) int {
return strings.Compare(a.id, b.id)
})
return namespaces
}

type processMountAccessor struct {
accessor string
*processMount
}

func (s summaryByMount) sort() []*processMountAccessor {
mounts := make([]*processMountAccessor, 0, len(s))
for mountAccessor, mount := range s {
mounts = append(mounts, &processMountAccessor{accessor: mountAccessor, processMount: mount})
}
slices.SortStableFunc(mounts, func(a, b *processMountAccessor) int {
return strings.Compare(a.accessor, b.accessor)
})
return mounts
}

func (c clientIDSet) sort() []string {
clientIDs := make([]string, 0, len(c))
for clientID := range c {
clientIDs = append(clientIDs, clientID)
}
sort.Strings(clientIDs)
return clientIDs
}

// sortALResponseNamespaces sorts the namespaces for activity log responses.
func (a *ActivityLog) sortALResponseNamespaces(byNamespaceResponse []*ResponseNamespace) {
sort.Slice(byNamespaceResponse, func(i, j int) bool {
Expand Down
Loading
Loading