Skip to content

Commit

Permalink
Implement cache update deduplication per fetch cycle (#5509)
Browse files Browse the repository at this point in the history
Signed-off-by: Edwin Buck <edwbuck@gmail.com>
  • Loading branch information
edwbuck authored Oct 17, 2024
1 parent 20ad838 commit 5186212
Show file tree
Hide file tree
Showing 19 changed files with 4,181 additions and 664 deletions.
16 changes: 8 additions & 8 deletions pkg/common/telemetry/server/datastore/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ import (
"github.com/spiffe/spire/pkg/common/telemetry"
)

// StartListRegistrationEntriesEventsCall return metric
// StartListRegistrationEntryEventsCall return metric
// for server's datastore, on listing registration entry events.
func StartListRegistrationEntriesEventsCall(m telemetry.Metrics) *telemetry.CallCounter {
func StartListRegistrationEntryEventsCall(m telemetry.Metrics) *telemetry.CallCounter {
return telemetry.StartCall(m, telemetry.Datastore, telemetry.RegistrationEntryEvent, telemetry.List)
}

// StartPruneRegistrationEntriesEventsCall return metric
// StartPruneRegistrationEntryEventsCall return metric
// for server's datastore, on pruning registration entry events.
func StartPruneRegistrationEntriesEventsCall(m telemetry.Metrics) *telemetry.CallCounter {
func StartPruneRegistrationEntryEventsCall(m telemetry.Metrics) *telemetry.CallCounter {
return telemetry.StartCall(m, telemetry.Datastore, telemetry.RegistrationEntryEvent, telemetry.Prune)
}

Expand All @@ -34,15 +34,15 @@ func StartFetchRegistrationEntryEventCall(m telemetry.Metrics) *telemetry.CallCo
return telemetry.StartCall(m, telemetry.Datastore, telemetry.RegistrationEntryEvent, telemetry.Fetch)
}

// StartListAttestedNodesEventsCall return metric
// StartListAttestedNodeEventsCall return metric
// for server's datastore, on listing attested node events.
func StartListAttestedNodesEventsCall(m telemetry.Metrics) *telemetry.CallCounter {
func StartListAttestedNodeEventsCall(m telemetry.Metrics) *telemetry.CallCounter {
return telemetry.StartCall(m, telemetry.Datastore, telemetry.NodeEvent, telemetry.List)
}

// StartPruneAttestedNodesEventsCall return metric
// StartPruneAttestedNodeEventsCall return metric
// for server's datastore, on pruning attested node events.
func StartPruneAttestedNodesEventsCall(m telemetry.Metrics) *telemetry.CallCounter {
func StartPruneAttestedNodeEventsCall(m telemetry.Metrics) *telemetry.CallCounter {
return telemetry.StartCall(m, telemetry.Datastore, telemetry.NodeEvent, telemetry.Prune)
}

Expand Down
24 changes: 12 additions & 12 deletions pkg/common/telemetry/server/datastore/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,10 @@ func (w metricsWrapper) ListAttestedNodes(ctx context.Context, req *datastore.Li
return w.ds.ListAttestedNodes(ctx, req)
}

func (w metricsWrapper) ListAttestedNodesEvents(ctx context.Context, req *datastore.ListAttestedNodesEventsRequest) (_ *datastore.ListAttestedNodesEventsResponse, err error) {
callCounter := StartListAttestedNodesEventsCall(w.m)
func (w metricsWrapper) ListAttestedNodeEvents(ctx context.Context, req *datastore.ListAttestedNodeEventsRequest) (_ *datastore.ListAttestedNodeEventsResponse, err error) {
callCounter := StartListAttestedNodeEventsCall(w.m)
defer callCounter.Done(&err)
return w.ds.ListAttestedNodesEvents(ctx, req)
return w.ds.ListAttestedNodeEvents(ctx, req)
}

func (w metricsWrapper) ListBundles(ctx context.Context, req *datastore.ListBundlesRequest) (_ *datastore.ListBundlesResponse, err error) {
Expand All @@ -203,10 +203,10 @@ func (w metricsWrapper) ListRegistrationEntries(ctx context.Context, req *datast
return w.ds.ListRegistrationEntries(ctx, req)
}

func (w metricsWrapper) ListRegistrationEntriesEvents(ctx context.Context, req *datastore.ListRegistrationEntriesEventsRequest) (_ *datastore.ListRegistrationEntriesEventsResponse, err error) {
callCounter := StartListRegistrationEntriesEventsCall(w.m)
func (w metricsWrapper) ListRegistrationEntryEvents(ctx context.Context, req *datastore.ListRegistrationEntryEventsRequest) (_ *datastore.ListRegistrationEntryEventsResponse, err error) {
callCounter := StartListRegistrationEntryEventsCall(w.m)
defer callCounter.Done(&err)
return w.ds.ListRegistrationEntriesEvents(ctx, req)
return w.ds.ListRegistrationEntryEvents(ctx, req)
}

func (w metricsWrapper) CountAttestedNodes(ctx context.Context, req *datastore.CountAttestedNodesRequest) (_ int32, err error) {
Expand All @@ -227,10 +227,10 @@ func (w metricsWrapper) CountRegistrationEntries(ctx context.Context, req *datas
return w.ds.CountRegistrationEntries(ctx, req)
}

func (w metricsWrapper) PruneAttestedNodesEvents(ctx context.Context, olderThan time.Duration) (err error) {
callCounter := StartPruneAttestedNodesEventsCall(w.m)
func (w metricsWrapper) PruneAttestedNodeEvents(ctx context.Context, olderThan time.Duration) (err error) {
callCounter := StartPruneAttestedNodeEventsCall(w.m)
defer callCounter.Done(&err)
return w.ds.PruneAttestedNodesEvents(ctx, olderThan)
return w.ds.PruneAttestedNodeEvents(ctx, olderThan)
}

func (w metricsWrapper) PruneBundle(ctx context.Context, trustDomainID string, expiresBefore time.Time) (_ bool, err error) {
Expand All @@ -251,10 +251,10 @@ func (w metricsWrapper) PruneRegistrationEntries(ctx context.Context, expiresBef
return w.ds.PruneRegistrationEntries(ctx, expiresBefore)
}

func (w metricsWrapper) PruneRegistrationEntriesEvents(ctx context.Context, olderThan time.Duration) (err error) {
callCounter := StartPruneRegistrationEntriesEventsCall(w.m)
func (w metricsWrapper) PruneRegistrationEntryEvents(ctx context.Context, olderThan time.Duration) (err error) {
callCounter := StartPruneRegistrationEntryEventsCall(w.m)
defer callCounter.Done(&err)
return w.ds.PruneRegistrationEntriesEvents(ctx, olderThan)
return w.ds.PruneRegistrationEntryEvents(ctx, olderThan)
}

func (w metricsWrapper) SetBundle(ctx context.Context, bundle *common.Bundle) (_ *common.Bundle, err error) {
Expand Down
20 changes: 10 additions & 10 deletions pkg/common/telemetry/server/datastore/wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func TestWithMetrics(t *testing.T) {
},
{
key: "datastore.node_event.list",
methodName: "ListAttestedNodesEvents",
methodName: "ListAttestedNodeEvents",
},
{
key: "datastore.bundle.list",
Expand All @@ -167,15 +167,15 @@ func TestWithMetrics(t *testing.T) {
},
{
key: "datastore.registration_entry_event.list",
methodName: "ListRegistrationEntriesEvents",
methodName: "ListRegistrationEntryEvents",
},
{
key: "datastore.federation_relationship.list",
methodName: "ListFederationRelationships",
},
{
key: "datastore.node_event.prune",
methodName: "PruneAttestedNodesEvents",
methodName: "PruneAttestedNodeEvents",
},
{
key: "datastore.bundle.prune",
Expand All @@ -191,7 +191,7 @@ func TestWithMetrics(t *testing.T) {
},
{
key: "datastore.registration_entry_event.prune",
methodName: "PruneRegistrationEntriesEvents",
methodName: "PruneRegistrationEntryEvents",
},
{
key: "datastore.bundle.set",
Expand Down Expand Up @@ -445,8 +445,8 @@ func (ds *fakeDataStore) ListAttestedNodes(context.Context, *datastore.ListAttes
return &datastore.ListAttestedNodesResponse{}, ds.err
}

func (ds *fakeDataStore) ListAttestedNodesEvents(context.Context, *datastore.ListAttestedNodesEventsRequest) (*datastore.ListAttestedNodesEventsResponse, error) {
return &datastore.ListAttestedNodesEventsResponse{}, ds.err
func (ds *fakeDataStore) ListAttestedNodeEvents(context.Context, *datastore.ListAttestedNodeEventsRequest) (*datastore.ListAttestedNodeEventsResponse, error) {
return &datastore.ListAttestedNodeEventsResponse{}, ds.err
}

func (ds *fakeDataStore) ListBundles(context.Context, *datastore.ListBundlesRequest) (*datastore.ListBundlesResponse, error) {
Expand All @@ -461,11 +461,11 @@ func (ds *fakeDataStore) ListRegistrationEntries(context.Context, *datastore.Lis
return &datastore.ListRegistrationEntriesResponse{}, ds.err
}

func (ds *fakeDataStore) ListRegistrationEntriesEvents(context.Context, *datastore.ListRegistrationEntriesEventsRequest) (*datastore.ListRegistrationEntriesEventsResponse, error) {
return &datastore.ListRegistrationEntriesEventsResponse{}, ds.err
func (ds *fakeDataStore) ListRegistrationEntryEvents(context.Context, *datastore.ListRegistrationEntryEventsRequest) (*datastore.ListRegistrationEntryEventsResponse, error) {
return &datastore.ListRegistrationEntryEventsResponse{}, ds.err
}

func (ds *fakeDataStore) PruneAttestedNodesEvents(context.Context, time.Duration) error {
func (ds *fakeDataStore) PruneAttestedNodeEvents(context.Context, time.Duration) error {
return ds.err
}

Expand All @@ -481,7 +481,7 @@ func (ds *fakeDataStore) PruneRegistrationEntries(context.Context, time.Time) er
return ds.err
}

func (ds *fakeDataStore) PruneRegistrationEntriesEvents(context.Context, time.Duration) error {
func (ds *fakeDataStore) PruneRegistrationEntryEvents(context.Context, time.Duration) error {
return ds.err
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/server/authorizedentries/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,8 @@ func (c *Cache) removeEntry(entryID string) {
}
}

func (c *Cache) Stats() cacheStats {
return cacheStats{
func (c *Cache) Stats() CacheStats {
return CacheStats{
AgentsByID: c.agentsByID.Len(),
AgentsByExpiresAt: c.agentsByExpiresAt.Len(),
AliasesByEntryID: c.aliasesByEntryID.Len(),
Expand All @@ -286,7 +286,7 @@ func isNodeAlias(e *types.Entry) bool {
return e.ParentId.Path == idutil.ServerIDPath
}

type cacheStats struct {
type CacheStats struct {
AgentsByID int
AgentsByExpiresAt int
AliasesByEntryID int
Expand Down
16 changes: 8 additions & 8 deletions pkg/server/authorizedentries/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,27 +186,27 @@ func TestCacheInternalStats(t *testing.T) {

cache := NewCache(clk)
cache.UpdateEntry(entry1)
require.Equal(t, cacheStats{
require.Equal(t, CacheStats{
EntriesByEntryID: 1,
EntriesByParentID: 1,
}, cache.Stats())

cache.UpdateEntry(entry2a)
require.Equal(t, cacheStats{
require.Equal(t, CacheStats{
EntriesByEntryID: 2,
EntriesByParentID: 2,
}, cache.Stats())

cache.UpdateEntry(entry2b)
require.Equal(t, cacheStats{
require.Equal(t, CacheStats{
EntriesByEntryID: 1,
EntriesByParentID: 1,
AliasesByEntryID: 2, // one for each selector
AliasesBySelector: 2, // one for each selector
}, cache.Stats())

cache.RemoveEntry(entry1.Id)
require.Equal(t, cacheStats{
require.Equal(t, CacheStats{
AliasesByEntryID: 2, // one for each selector
AliasesBySelector: 2, // one for each selector
}, cache.Stats())
Expand All @@ -222,25 +222,25 @@ func TestCacheInternalStats(t *testing.T) {
t.Run("agents", func(t *testing.T) {
cache := NewCache(clk)
cache.UpdateAgent(agent1.String(), now.Add(time.Hour), []*types.Selector{sel1})
require.Equal(t, cacheStats{
require.Equal(t, CacheStats{
AgentsByID: 1,
AgentsByExpiresAt: 1,
}, cache.Stats())

cache.UpdateAgent(agent2.String(), now.Add(time.Hour*2), []*types.Selector{sel2})
require.Equal(t, cacheStats{
require.Equal(t, CacheStats{
AgentsByID: 2,
AgentsByExpiresAt: 2,
}, cache.Stats())

cache.UpdateAgent(agent2.String(), now.Add(time.Hour*3), []*types.Selector{sel2})
require.Equal(t, cacheStats{
require.Equal(t, CacheStats{
AgentsByID: 2,
AgentsByExpiresAt: 2,
}, cache.Stats())

cache.RemoveAgent(agent1.String())
require.Equal(t, cacheStats{
require.Equal(t, CacheStats{
AgentsByID: 1,
AgentsByExpiresAt: 1,
}, cache.Stats())
Expand Down
16 changes: 8 additions & 8 deletions pkg/server/datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ type DataStore interface {
UpdateRegistrationEntry(context.Context, *common.RegistrationEntry, *common.RegistrationEntryMask) (*common.RegistrationEntry, error)

// Entries Events
ListRegistrationEntriesEvents(ctx context.Context, req *ListRegistrationEntriesEventsRequest) (*ListRegistrationEntriesEventsResponse, error)
PruneRegistrationEntriesEvents(ctx context.Context, olderThan time.Duration) error
ListRegistrationEntryEvents(ctx context.Context, req *ListRegistrationEntryEventsRequest) (*ListRegistrationEntryEventsResponse, error)
PruneRegistrationEntryEvents(ctx context.Context, olderThan time.Duration) error
FetchRegistrationEntryEvent(ctx context.Context, eventID uint) (*RegistrationEntryEvent, error)
CreateRegistrationEntryEventForTesting(ctx context.Context, event *RegistrationEntryEvent) error
DeleteRegistrationEntryEventForTesting(ctx context.Context, eventID uint) error
Expand All @@ -55,8 +55,8 @@ type DataStore interface {
UpdateAttestedNode(context.Context, *common.AttestedNode, *common.AttestedNodeMask) (*common.AttestedNode, error)

// Nodes Events
ListAttestedNodesEvents(ctx context.Context, req *ListAttestedNodesEventsRequest) (*ListAttestedNodesEventsResponse, error)
PruneAttestedNodesEvents(ctx context.Context, olderThan time.Duration) error
ListAttestedNodeEvents(ctx context.Context, req *ListAttestedNodeEventsRequest) (*ListAttestedNodeEventsResponse, error)
PruneAttestedNodeEvents(ctx context.Context, olderThan time.Duration) error
FetchAttestedNodeEvent(ctx context.Context, eventID uint) (*AttestedNodeEvent, error)
CreateAttestedNodeEventForTesting(ctx context.Context, event *AttestedNodeEvent) error
DeleteAttestedNodeEventForTesting(ctx context.Context, eventID uint) error
Expand Down Expand Up @@ -169,7 +169,7 @@ type ListAttestedNodesResponse struct {
Pagination *Pagination
}

type ListAttestedNodesEventsRequest struct {
type ListAttestedNodeEventsRequest struct {
GreaterThanEventID uint
LessThanEventID uint
}
Expand All @@ -179,7 +179,7 @@ type AttestedNodeEvent struct {
SpiffeID string
}

type ListAttestedNodesEventsResponse struct {
type ListAttestedNodeEventsResponse struct {
Events []AttestedNodeEvent
}

Expand Down Expand Up @@ -223,7 +223,7 @@ type ListRegistrationEntriesResponse struct {
Pagination *Pagination
}

type ListRegistrationEntriesEventsRequest struct {
type ListRegistrationEntryEventsRequest struct {
GreaterThanEventID uint
LessThanEventID uint
}
Expand All @@ -233,7 +233,7 @@ type RegistrationEntryEvent struct {
EntryID string
}

type ListRegistrationEntriesEventsResponse struct {
type ListRegistrationEntryEventsResponse struct {
Events []RegistrationEntryEvent
}

Expand Down
Loading

0 comments on commit 5186212

Please sign in to comment.