Skip to content

Commit

Permalink
Add the readiness-retries flag, allowing the user to configure how many
Browse files Browse the repository at this point in the history
attempts should be made to injest a resource into OPA while blocking the
webhook.

Previously, a failure to injest an object into the OPA cache would
strike that object from the ObjectTracker, a type tasked with blocking
the webhook from serving requests until all the necessary objects had
been observed on the API server.  This setup optimizes for availability
(the webhook serving requests ASAP) over security.

By adding retry functionality, we configure gatekeeper to block the
webhook for longer, but unblock with a more complete set of constraints.

Signed-off-by: juliankatz <juliankatz@google.com>

diff --git a/pkg/controller/constraint/constraint_controller.go b/pkg/controller/constraint/constraint_controller.go
index 7e5054f..f0c8884 100644
--- a/pkg/controller/constraint/constraint_controller.go
+++ b/pkg/controller/constraint/constraint_controller.go
@@ -387,7 +387,7 @@ func (r *ReconcileConstraint) cacheConstraint(instance *unstructured.Unstructure
 	unstructured.RemoveNestedField(obj.Object, "status")
 	_, err := r.opa.AddConstraint(context.Background(), obj)
 	if err != nil {
-		t.CancelExpect(obj)
+		t.TryCancelExpect(obj)
 		return err
 	}

diff --git a/pkg/readiness/noop_expectations.go b/pkg/readiness/noop_expectations.go
index 320cd84..e182cd0 100644
--- a/pkg/readiness/noop_expectations.go
+++ b/pkg/readiness/noop_expectations.go
@@ -27,6 +27,9 @@ func (n noopExpectations) Expect(o runtime.Object) {
 func (n noopExpectations) CancelExpect(o runtime.Object) {
 }

+func (n noopExpectations) TryCancelExpect(o runtime.Object) {
+}
+
 func (n noopExpectations) ExpectationsDone() {
 }

diff --git a/pkg/readiness/object_tracker.go b/pkg/readiness/object_tracker.go
index bb933c9..a0337b2 100644
--- a/pkg/readiness/object_tracker.go
+++ b/pkg/readiness/object_tracker.go
@@ -16,6 +16,7 @@ limitations under the License.
 package readiness

 import (
+	"flag"
 	"fmt"
 	"sync"

@@ -29,6 +30,8 @@ import (
 	"k8s.io/apimachinery/pkg/types"
 )

+var readinessRetries = flag.Int("readiness-retries", 0, "The number of resource ingestion attempts allowed before the resource is disregarded")
+
 // Expectations tracks expectations for runtime.Objects.
 // A set of Expect() calls are made, demarcated by ExpectationsDone().
 // Expectations are satisfied by calls to Observe().
@@ -36,6 +39,7 @@ import (
 type Expectations interface {
 	Expect(o runtime.Object)
 	CancelExpect(o runtime.Object)
+	TryCancelExpect(o runtime.Object)
 	ExpectationsDone()
 	Observe(o runtime.Object)
 	Satisfied() bool
@@ -56,15 +60,20 @@ type objectTracker struct {
 	populated     bool                      // all expectations have been provided
 	allSatisfied  bool                      // true once all expectations have been satisfied. Acts as a circuit-breaker.
 	kindsSnapshot []schema.GroupVersionKind // Snapshot of kinds before freeing memory in Satisfied.
+	mutators      []objDataMutator          // functions that mutate objData types during their creation.  Allows for defaults.
 }

-func newObjTracker(gvk schema.GroupVersionKind) *objectTracker {
+func newObjTracker(gvk schema.GroupVersionKind, fns ...objDataMutator) *objectTracker {
+	// Make setRetriesFromFlag the default behavior.  It can be overridden for testing.
+	muts := append([]objDataMutator{setRetriesFromFlag}, fns...)
+
 	return &objectTracker{
 		gvk:       gvk,
 		cancelled: make(objSet),
 		expect:    make(objSet),
 		seen:      make(objSet),
 		satisfied: make(objSet),
+		mutators:  muts,
 	}
 }

@@ -99,11 +108,11 @@ func (t *objectTracker) Expect(o runtime.Object) {
 	if _, ok := t.seen[k]; ok {
 		delete(t.seen, k)
 		delete(t.expect, k)
-		t.satisfied[k] = struct{}{}
+		t.satisfied[k] = mutatedObjData(t.mutators...)
 		return
 	}

-	t.expect[k] = struct{}{}
+	t.expect[k] = mutatedObjData(t.mutators...)
 }

 // CancelExpect cancels an expectation and marks it so it
@@ -126,7 +135,30 @@ func (t *objectTracker) CancelExpect(o runtime.Object) {
 	delete(t.expect, k)
 	delete(t.seen, k)
 	delete(t.satisfied, k)
-	t.cancelled[k] = struct{}{}
+	t.cancelled[k] = mutatedObjData(t.mutators...)
+}
+
+func (t *objectTracker) TryCancelExpect(o runtime.Object) {
+	t.mu.Lock()
+	defer t.mu.Unlock()
+
+	// Respect circuit-breaker.
+	if t.allSatisfied {
+		return
+	}
+
+	k, err := objKeyFromObject(o)
+	if err != nil {
+		log.Error(err, "skipping")
+		return
+	}
+
+	deleted := t.expect.decrementOrDelete(k)
+	if deleted {
+		delete(t.seen, k)
+		delete(t.satisfied, k)
+		t.cancelled[k] = mutatedObjData(t.mutators...)
+	}
 }

 // ExpectationsDone tells the tracker to stop accepting new expectations.
@@ -187,7 +219,7 @@ func (t *objectTracker) Observe(o runtime.Object) {
 		// Satisfy existing expectation
 		delete(t.seen, k)
 		delete(t.expect, k)
-		t.satisfied[k] = struct{}{}
+		t.satisfied[k] = mutatedObjData(t.mutators...)
 		return
 	case !wasExpecting && t.populated:
 		// Not expecting and no longer accepting expectations.
@@ -197,7 +229,7 @@ func (t *objectTracker) Observe(o runtime.Object) {
 	}

 	// Track for future expectation.
-	t.seen[k] = struct{}{}
+	t.seen[k] = mutatedObjData(t.mutators...)
 }

 func (t *objectTracker) Populated() bool {
@@ -249,7 +281,7 @@ func (t *objectTracker) Satisfied() bool {
 		}
 		delete(t.seen, k)
 		delete(t.expect, k)
-		t.satisfied[k] = struct{}{}
+		t.satisfied[k] = mutatedObjData(t.mutators...)
 		resolveCount++
 	}
 	log.V(1).Info("resolved pre-observations", "gvk", t.gvk, "count", resolveCount)
diff --git a/pkg/readiness/object_tracker_test.go b/pkg/readiness/object_tracker_test.go
index 8898b9f..afe9443 100644
--- a/pkg/readiness/object_tracker_test.go
+++ b/pkg/readiness/object_tracker_test.go
@@ -69,6 +69,7 @@ func Test_ObjectTracker_Multiple_Expectations(t *testing.T) {
 	}
 	g.Expect(ot.Satisfied()).NotTo(gomega.BeTrue(), "should not be satisfied before ExpectationsDone")
 	ot.ExpectationsDone()
+	g.Expect(ot.Satisfied()).NotTo(gomega.BeTrue(), "should not be satisfied after ExpectationsDone")

 	for i := 0; i < len(ct); i++ {
 		g.Expect(ot.Satisfied()).NotTo(gomega.BeTrue(), "should not be satisfied before observations are done")
@@ -243,3 +244,68 @@ func Test_ObjectTracker_kinds(t *testing.T) {
 	g.Expect(kindsBefore).ShouldNot(gomega.BeEmpty(), "expected non-empty kinds")
 	g.Expect(kindsAfter).Should(gomega.Equal(kindsBefore), "expected kinds to match")
 }
+
+// Verify that TryCancelExpect functions the same as regular CancelExpect if readinessRetries is set to 0
+func Test_ObjectTracker_TryCancelExpect_Default(t *testing.T) {
+	g := gomega.NewWithT(t)
+	ot := newObjTracker(schema.GroupVersionKind{}, func(o *objData) *objData {
+		o.retries = 0
+		return o
+	})
+
+	const count = 10
+	ct := makeCTSlice("ct-", count)
+	for i := 0; i < len(ct); i++ {
+		ot.Expect(ct[i])
+	}
+	g.Expect(ot.Satisfied()).NotTo(gomega.BeTrue(), "should not be satisfied before ExpectationsDone")
+	ot.ExpectationsDone()
+
+	// Skip the first two
+	for i := 2; i < len(ct); i++ {
+		g.Expect(ot.Satisfied()).NotTo(gomega.BeTrue(), "should not be satisfied before observations are done")
+		ot.Observe(ct[i])
+	}
+	g.Expect(ot.Satisfied()).NotTo(gomega.BeTrue(), "two expectation remain")
+
+	ot.TryCancelExpect(ct[0])
+	g.Expect(ot.Satisfied()).NotTo(gomega.BeTrue(), "one expectation remains")
+
+	ot.TryCancelExpect(ct[1])
+	g.Expect(ot.Satisfied()).To(gomega.BeTrue(), "should be satisfied")
+}
+
+// Verify that TryCancelExpect must be called multiple times before an expectation is cancelled
+func Test_ObjectTracker_TryCancelExpect_WithRetries(t *testing.T) {
+	retries := 2
+
+	g := gomega.NewWithT(t)
+	ot := newObjTracker(schema.GroupVersionKind{}, func(o *objData) *objData {
+		o.retries = retries
+		return o
+	})
+
+	const count = 10
+	ct := makeCTSlice("ct-", count)
+	for i := 0; i < len(ct); i++ {
+		ot.Expect(ct[i])
+	}
+	g.Expect(ot.Satisfied()).NotTo(gomega.BeTrue(), "should not be satisfied before ExpectationsDone")
+	ot.ExpectationsDone()
+
+	// Skip the first one
+	for i := 1; i < len(ct); i++ {
+		g.Expect(ot.Satisfied()).NotTo(gomega.BeTrue(), "should not be satisfied before observations are done")
+		ot.Observe(ct[i])
+	}
+	g.Expect(ot.Satisfied()).NotTo(gomega.BeTrue(), "one expectation remains with two retries")
+
+	ot.TryCancelExpect(ct[0])
+	g.Expect(ot.Satisfied()).NotTo(gomega.BeTrue(), "one expectation remains with one retries")
+
+	ot.TryCancelExpect(ct[0])
+	g.Expect(ot.Satisfied()).NotTo(gomega.BeTrue(), "one expectation remains with zero retries")
+
+	ot.TryCancelExpect(ct[0])
+	g.Expect(ot.Satisfied()).To(gomega.BeTrue(), "should be satisfied")
+}
diff --git a/pkg/readiness/objset.go b/pkg/readiness/objset.go
index 663c2d3..a1708fa 100644
--- a/pkg/readiness/objset.go
+++ b/pkg/readiness/objset.go
@@ -27,9 +27,39 @@ type objKey struct {
 	namespacedName types.NamespacedName
 }

-type objSet map[objKey]struct{}
-
 func (k objKey) String() string {
 	return fmt.Sprintf("%s [%s]", k.namespacedName.String(), k.gvk.String())
+}
+
+type objData struct {
+	retries int
+}
+
+type objDataMutator func(*objData) *objData
+
+func setRetriesFromFlag(o *objData) *objData {
+	o.retries = *readinessRetries
+	return o
+}
+
+func mutatedObjData(muts ...objDataMutator) objData {
+	out := objData{}
+	for _, m := range muts {
+		m(&out)
+	}
+	return out
+}
+
+type objSet map[objKey]objData

+func (o objSet) decrementOrDelete(key objKey) bool {
+	val := o[key]
+	if val.retries > 0 {
+		val.retries--
+		o[key] = val
+		return false
+	} else {
+		delete(o, key)
+		return true
+	}
 }
  • Loading branch information
julianKatz committed Dec 9, 2020
1 parent d6c5389 commit 875c024
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 10 deletions.
2 changes: 1 addition & 1 deletion pkg/controller/constraint/constraint_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ func (r *ReconcileConstraint) cacheConstraint(instance *unstructured.Unstructure
unstructured.RemoveNestedField(obj.Object, "status")
_, err := r.opa.AddConstraint(context.Background(), obj)
if err != nil {
t.CancelExpect(obj)
t.TryCancelExpect(obj)
return err
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/readiness/noop_expectations.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ func (n noopExpectations) Expect(o runtime.Object) {
func (n noopExpectations) CancelExpect(o runtime.Object) {
}

func (n noopExpectations) TryCancelExpect(o runtime.Object) {
}

func (n noopExpectations) ExpectationsDone() {
}

Expand Down
46 changes: 39 additions & 7 deletions pkg/readiness/object_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ limitations under the License.
package readiness

import (
"flag"
"fmt"
"sync"

Expand All @@ -29,13 +30,16 @@ import (
"k8s.io/apimachinery/pkg/types"
)

var readinessRetries = flag.Int("readiness-retries", 0, "The number of resource ingestion attempts allowed before the resource is disregarded")

// Expectations tracks expectations for runtime.Objects.
// A set of Expect() calls are made, demarcated by ExpectationsDone().
// Expectations are satisfied by calls to Observe().
// Once all expectations are satisfied, Satisfied() will begin returning true.
type Expectations interface {
Expect(o runtime.Object)
CancelExpect(o runtime.Object)
TryCancelExpect(o runtime.Object)
ExpectationsDone()
Observe(o runtime.Object)
Satisfied() bool
Expand All @@ -56,15 +60,20 @@ type objectTracker struct {
populated bool // all expectations have been provided
allSatisfied bool // true once all expectations have been satisfied. Acts as a circuit-breaker.
kindsSnapshot []schema.GroupVersionKind // Snapshot of kinds before freeing memory in Satisfied.
mutators []objDataMutator // functions that mutate objData types during their creation. Allows for defaults.
}

func newObjTracker(gvk schema.GroupVersionKind) *objectTracker {
func newObjTracker(gvk schema.GroupVersionKind, fns ...objDataMutator) *objectTracker {
// Make setRetriesFromFlag the default behavior. It can be overridden for testing.
muts := append([]objDataMutator{setRetriesFromFlag}, fns...)

return &objectTracker{
gvk: gvk,
cancelled: make(objSet),
expect: make(objSet),
seen: make(objSet),
satisfied: make(objSet),
mutators: muts,
}
}

Expand Down Expand Up @@ -99,11 +108,11 @@ func (t *objectTracker) Expect(o runtime.Object) {
if _, ok := t.seen[k]; ok {
delete(t.seen, k)
delete(t.expect, k)
t.satisfied[k] = struct{}{}
t.satisfied[k] = mutatedObjData(t.mutators...)
return
}

t.expect[k] = struct{}{}
t.expect[k] = mutatedObjData(t.mutators...)
}

// CancelExpect cancels an expectation and marks it so it
Expand All @@ -126,7 +135,30 @@ func (t *objectTracker) CancelExpect(o runtime.Object) {
delete(t.expect, k)
delete(t.seen, k)
delete(t.satisfied, k)
t.cancelled[k] = struct{}{}
t.cancelled[k] = mutatedObjData(t.mutators...)
}

func (t *objectTracker) TryCancelExpect(o runtime.Object) {
t.mu.Lock()
defer t.mu.Unlock()

// Respect circuit-breaker.
if t.allSatisfied {
return
}

k, err := objKeyFromObject(o)
if err != nil {
log.Error(err, "skipping")
return
}

deleted := t.expect.decrementOrDelete(k)
if deleted {
delete(t.seen, k)
delete(t.satisfied, k)
t.cancelled[k] = mutatedObjData(t.mutators...)
}
}

// ExpectationsDone tells the tracker to stop accepting new expectations.
Expand Down Expand Up @@ -187,7 +219,7 @@ func (t *objectTracker) Observe(o runtime.Object) {
// Satisfy existing expectation
delete(t.seen, k)
delete(t.expect, k)
t.satisfied[k] = struct{}{}
t.satisfied[k] = mutatedObjData(t.mutators...)
return
case !wasExpecting && t.populated:
// Not expecting and no longer accepting expectations.
Expand All @@ -197,7 +229,7 @@ func (t *objectTracker) Observe(o runtime.Object) {
}

// Track for future expectation.
t.seen[k] = struct{}{}
t.seen[k] = mutatedObjData(t.mutators...)
}

func (t *objectTracker) Populated() bool {
Expand Down Expand Up @@ -249,7 +281,7 @@ func (t *objectTracker) Satisfied() bool {
}
delete(t.seen, k)
delete(t.expect, k)
t.satisfied[k] = struct{}{}
t.satisfied[k] = mutatedObjData(t.mutators...)
resolveCount++
}
log.V(1).Info("resolved pre-observations", "gvk", t.gvk, "count", resolveCount)
Expand Down
66 changes: 66 additions & 0 deletions pkg/readiness/object_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func Test_ObjectTracker_Multiple_Expectations(t *testing.T) {
}
g.Expect(ot.Satisfied()).NotTo(gomega.BeTrue(), "should not be satisfied before ExpectationsDone")
ot.ExpectationsDone()
g.Expect(ot.Satisfied()).NotTo(gomega.BeTrue(), "should not be satisfied after ExpectationsDone")

for i := 0; i < len(ct); i++ {
g.Expect(ot.Satisfied()).NotTo(gomega.BeTrue(), "should not be satisfied before observations are done")
Expand Down Expand Up @@ -243,3 +244,68 @@ func Test_ObjectTracker_kinds(t *testing.T) {
g.Expect(kindsBefore).ShouldNot(gomega.BeEmpty(), "expected non-empty kinds")
g.Expect(kindsAfter).Should(gomega.Equal(kindsBefore), "expected kinds to match")
}

// Verify that TryCancelExpect functions the same as regular CancelExpect if readinessRetries is set to 0
func Test_ObjectTracker_TryCancelExpect_Default(t *testing.T) {
g := gomega.NewWithT(t)
ot := newObjTracker(schema.GroupVersionKind{}, func(o *objData) *objData {
o.retries = 0
return o
})

const count = 10
ct := makeCTSlice("ct-", count)
for i := 0; i < len(ct); i++ {
ot.Expect(ct[i])
}
g.Expect(ot.Satisfied()).NotTo(gomega.BeTrue(), "should not be satisfied before ExpectationsDone")
ot.ExpectationsDone()

// Skip the first two
for i := 2; i < len(ct); i++ {
g.Expect(ot.Satisfied()).NotTo(gomega.BeTrue(), "should not be satisfied before observations are done")
ot.Observe(ct[i])
}
g.Expect(ot.Satisfied()).NotTo(gomega.BeTrue(), "two expectation remain")

ot.TryCancelExpect(ct[0])
g.Expect(ot.Satisfied()).NotTo(gomega.BeTrue(), "one expectation remains")

ot.TryCancelExpect(ct[1])
g.Expect(ot.Satisfied()).To(gomega.BeTrue(), "should be satisfied")
}

// Verify that TryCancelExpect must be called multiple times before an expectation is cancelled
func Test_ObjectTracker_TryCancelExpect_WithRetries(t *testing.T) {
retries := 2

g := gomega.NewWithT(t)
ot := newObjTracker(schema.GroupVersionKind{}, func(o *objData) *objData {
o.retries = retries
return o
})

const count = 10
ct := makeCTSlice("ct-", count)
for i := 0; i < len(ct); i++ {
ot.Expect(ct[i])
}
g.Expect(ot.Satisfied()).NotTo(gomega.BeTrue(), "should not be satisfied before ExpectationsDone")
ot.ExpectationsDone()

// Skip the first one
for i := 1; i < len(ct); i++ {
g.Expect(ot.Satisfied()).NotTo(gomega.BeTrue(), "should not be satisfied before observations are done")
ot.Observe(ct[i])
}
g.Expect(ot.Satisfied()).NotTo(gomega.BeTrue(), "one expectation remains with two retries")

ot.TryCancelExpect(ct[0])
g.Expect(ot.Satisfied()).NotTo(gomega.BeTrue(), "one expectation remains with one retries")

ot.TryCancelExpect(ct[0])
g.Expect(ot.Satisfied()).NotTo(gomega.BeTrue(), "one expectation remains with zero retries")

ot.TryCancelExpect(ct[0])
g.Expect(ot.Satisfied()).To(gomega.BeTrue(), "should be satisfied")
}
34 changes: 32 additions & 2 deletions pkg/readiness/objset.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,39 @@ type objKey struct {
namespacedName types.NamespacedName
}

type objSet map[objKey]struct{}

func (k objKey) String() string {
return fmt.Sprintf("%s [%s]", k.namespacedName.String(), k.gvk.String())
}

type objData struct {
retries int
}

type objDataMutator func(*objData) *objData

func setRetriesFromFlag(o *objData) *objData {
o.retries = *readinessRetries
return o
}

func mutatedObjData(muts ...objDataMutator) objData {
out := objData{}
for _, m := range muts {
m(&out)
}
return out
}

type objSet map[objKey]objData

func (o objSet) decrementOrDelete(key objKey) bool {
val := o[key]
if val.retries > 0 {
val.retries--
o[key] = val
return false
} else {
delete(o, key)
return true
}
}

0 comments on commit 875c024

Please sign in to comment.