Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cache: add SetSnapshot and GetSnapshot to LinearCache #437

Merged
merged 4 commits into from
Jun 25, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions pkg/cache/v3/linear.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,45 @@ func (cache *LinearCache) CreateWatch(request *Request) (chan Response, func())
}
}

// SetResources replaces current resources with a new set of resources
//
// If we have individual watches for two resources (there are 2 DiscoveryRequest with resourceNames=res1 and resourceNames=res2)
// Then if you set new resources, res1 changed but res2 is not, we assume that all resources are changed so both watches will be triggered.
// That's why this function is well suited for a use case when resourceNames of DiscoveryRequest is empty.
// This way you don't have to call UpdateResource for every resource which will trigger watch of DiscoveryRequest(resourceNames="") twice,
// but it will only happen once regardless of how many resources are changed.
alecholmez marked this conversation as resolved.
Show resolved Hide resolved
func (cache *LinearCache) SetResources(resources map[string]types.Resource) {
cache.mu.Lock()
defer cache.mu.Unlock()

cache.version += 1

modified := map[string]struct{}{}
for name := range cache.resources {
if _, found := resources[name]; !found {
delete(cache.versionVector, name)
modified[name] = struct{}{}
}
}

cache.resources = resources
for name := range resources {
// We assume all resources passed to SetResources are changed.
// Otherwise we would have to do proto.Equal on resources which is pretty expensive operation
cache.versionVector[name] = cache.version
modified[name] = struct{}{}
}

cache.notifyAll(modified)
}

// GetResources returns current resources stored in the cache
func (cache *LinearCache) GetResources() map[string]types.Resource {
cache.mu.Lock()
jakubdyszkiewicz marked this conversation as resolved.
Show resolved Hide resolved
defer cache.mu.Unlock()
return cache.resources
}

jakubdyszkiewicz marked this conversation as resolved.
Show resolved Hide resolved
func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, st *stream.StreamState) (chan DeltaResponse, func()) {
return nil, nil
}
Expand Down
59 changes: 59 additions & 0 deletions pkg/cache/v3/linear_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package cache

import (
"fmt"
"reflect"
"testing"

wrappers "github.com/golang/protobuf/ptypes/wrappers"
Expand Down Expand Up @@ -129,6 +130,64 @@ func TestLinearBasic(t *testing.T) {
verifyResponse(t, w, "3", 2)
}

func TestLinearSetResources(t *testing.T) {
c := NewLinearCache(testType)

// Create new resources
w1, _ := c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"})
mustBlock(t, w1)
w, _ := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"})
jakubdyszkiewicz marked this conversation as resolved.
Show resolved Hide resolved
mustBlock(t, w)
c.SetResources(map[string]types.Resource{
"a": testResource("a"),
"b": testResource("b"),
})
verifyResponse(t, w1, "1", 1)
verifyResponse(t, w, "1", 2) // the version was only incremented once for all resources

// Add another element and update the first, response should be different
w1, _ = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"})
mustBlock(t, w1)
w, _ = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"})
mustBlock(t, w)
c.SetResources(map[string]types.Resource{
"a": testResource("aa"),
"b": testResource("b"),
"c": testResource("c"),
})
verifyResponse(t, w1, "2", 1)
verifyResponse(t, w, "2", 3)

// Delete resource
w1, _ = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "2"})
mustBlock(t, w1)
w, _ = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "2"})
mustBlock(t, w)
c.SetResources(map[string]types.Resource{
"b": testResource("b"),
"c": testResource("c"),
})
verifyResponse(t, w1, "", 0) // removing a resource from the set triggers existing watches for deleted resources
verifyResponse(t, w, "3", 2)
}

func TestLinearGetResources(t *testing.T) {
c := NewLinearCache(testType)

expectedResources := map[string]types.Resource{
"a": testResource("a"),
"b": testResource("b"),
}

c.SetResources(expectedResources)

resources := c.GetResources()

if !reflect.DeepEqual(expectedResources, resources) {
t.Errorf("resources are not equal. got: %v want: %v", resources, expectedResources)
}
}

func TestLinearVersionPrefix(t *testing.T) {
c := NewLinearCache(testType, WithVersionPrefix("instance1-"))

Expand Down