Skip to content

Commit

Permalink
Check resource generation when processing updates of some resources t…
Browse files Browse the repository at this point in the history
…o skip config regeneration (nginx#1422)

Problem: When processing updates to cluster resources, for some
resources, we check their generation, so that we don't trigger state
change (graph rebuild) if the generation didn't change. This is a
performance optimization so that we don't rebuild the graph and as a
result do not regenerate NGINX config and reload it. This is not a
K8s-native solution, and may introduce complexity for the codebase.

Solution: Use `GenerationChangedPredicate` in controller-runtime to
filter out the resource events at the controller level.
  • Loading branch information
miledxz committed Jan 18, 2024
1 parent f9ee472 commit 9b31e87
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 148 deletions.
20 changes: 16 additions & 4 deletions internal/mode/static/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,22 +281,31 @@ func registerControllers(
{
objectType: &gatewayv1.GatewayClass{},
options: []controller.Option{
controller.WithK8sPredicate(predicate.GatewayClassPredicate{ControllerName: cfg.GatewayCtlrName}),
controller.WithK8sPredicate(k8spredicate.And(
k8spredicate.GenerationChangedPredicate{},
predicate.GatewayClassPredicate{ControllerName: cfg.GatewayCtlrName})),
},
},
{
objectType: &gatewayv1.Gateway{},
options: func() []controller.Option {
options := []controller.Option{
controller.WithK8sPredicate(k8spredicate.GenerationChangedPredicate{}),
}
if cfg.GatewayNsName != nil {
return []controller.Option{
options = append(
options,
controller.WithNamespacedNameFilter(filter.CreateSingleResourceFilter(*cfg.GatewayNsName)),
}
)
}
return nil
return options
}(),
},
{
objectType: &gatewayv1.HTTPRoute{},
options: []controller.Option{
controller.WithK8sPredicate(k8spredicate.GenerationChangedPredicate{}),
},
},
{
objectType: &apiv1.Service{},
Expand Down Expand Up @@ -334,6 +343,9 @@ func registerControllers(
},
{
objectType: &gatewayv1beta1.ReferenceGrant{},
options: []controller.Option{
controller.WithK8sPredicate(k8spredicate.GenerationChangedPredicate{}),
},
},
{
objectType: &crdWithGVK,
Expand Down
8 changes: 4 additions & 4 deletions internal/mode/static/state/change_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,22 +126,22 @@ func NewChangeProcessorImpl(cfg ChangeProcessorConfig) *ChangeProcessorImpl {
{
gvk: extractGVK(&v1.GatewayClass{}),
store: newObjectStoreMapAdapter(clusterStore.GatewayClasses),
predicate: generationChangedPredicate{},
predicate: alwaysProcess{},
},
{
gvk: extractGVK(&v1.Gateway{}),
store: newObjectStoreMapAdapter(clusterStore.Gateways),
predicate: generationChangedPredicate{},
predicate: alwaysProcess{},
},
{
gvk: extractGVK(&v1.HTTPRoute{}),
store: newObjectStoreMapAdapter(clusterStore.HTTPRoutes),
predicate: generationChangedPredicate{},
predicate: alwaysProcess{},
},
{
gvk: extractGVK(&v1beta1.ReferenceGrant{}),
store: newObjectStoreMapAdapter(clusterStore.ReferenceGrants),
predicate: generationChangedPredicate{},
predicate: alwaysProcess{},
},
{
gvk: extractGVK(&apiv1.Namespace{}),
Expand Down
51 changes: 1 addition & 50 deletions internal/mode/static/state/change_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,17 +703,6 @@ var _ = Describe("ChangeProcessor", func() {
Expect(helpers.Diff(expGraph, graphCfg)).To(BeEmpty())
})
})
When("the first HTTPRoute without a generation changed is processed", func() {
It("returns nil graph", func() {
hr1UpdatedSameGen := hr1.DeepCopy()
// hr1UpdatedSameGen.Generation has not been changed
processor.CaptureUpsertChange(hr1UpdatedSameGen)

changed, graphCfg := processor.Process()
Expect(changed).To(BeFalse())
Expect(graphCfg).To(BeNil())
})
})
When("the first HTTPRoute update with a generation changed is processed", func() {
It("returns populated graph", func() {
processor.CaptureUpsertChange(hr1Updated)
Expand All @@ -733,17 +722,6 @@ var _ = Describe("ChangeProcessor", func() {
},
)
})
When("the first Gateway update without generation changed is processed", func() {
It("returns nil graph", func() {
gwUpdatedSameGen := gw1.DeepCopy()
// gwUpdatedSameGen.Generation has not been changed
processor.CaptureUpsertChange(gwUpdatedSameGen)

changed, graphCfg := processor.Process()
Expect(changed).To(BeFalse())
Expect(graphCfg).To(BeNil())
})
})
When("the first Gateway update with a generation changed is processed", func() {
It("returns populated graph", func() {
processor.CaptureUpsertChange(gw1Updated)
Expand All @@ -758,17 +736,6 @@ var _ = Describe("ChangeProcessor", func() {
Expect(helpers.Diff(expGraph, graphCfg)).To(BeEmpty())
})
})
When("the GatewayClass update without generation change is processed", func() {
It("returns nil graph", func() {
gcUpdatedSameGen := gc.DeepCopy()
// gcUpdatedSameGen.Generation has not been changed
processor.CaptureUpsertChange(gcUpdatedSameGen)

changed, graphCfg := processor.Process()
Expect(changed).To(BeFalse())
Expect(graphCfg).To(BeNil())
})
})
When("the GatewayClass update with generation change is processed", func() {
It("returns populated graph", func() {
processor.CaptureUpsertChange(gcUpdated)
Expand Down Expand Up @@ -1590,15 +1557,6 @@ var _ = Describe("ChangeProcessor", func() {
changed, _ := processor.Process()
Expect(changed).To(BeTrue())
})
It("should report not changed after multiple Upserts of the resource with same generation", func() {
processor.CaptureUpsertChange(gc)
processor.CaptureUpsertChange(gw1)
processor.CaptureUpsertChange(hr1)
processor.CaptureUpsertChange(rg1)

changed, _ := processor.Process()
Expect(changed).To(BeFalse())
})
When("a upsert of updated resources is followed by an upsert of the same generation", func() {
It("should report changed", func() {
// these are changing changes
Expand Down Expand Up @@ -1737,14 +1695,7 @@ var _ = Describe("ChangeProcessor", func() {
Expect(changed).To(BeTrue())
})

It("should report not changed after multiple Upserts of unrelated and unchanged resources", func() {
// unchanged Gateway API resources
fakeRelationshipCapturer.ExistsReturns(false)
processor.CaptureUpsertChange(gc)
processor.CaptureUpsertChange(gw1)
processor.CaptureUpsertChange(hr1)
processor.CaptureUpsertChange(rg1)

It("should report not changed after multiple Upserts of unrelated resources", func() {
// unrelated Kubernetes API resources
fakeRelationshipCapturer.ExistsReturns(false)
processor.CaptureUpsertChange(svc)
Expand Down
20 changes: 4 additions & 16 deletions internal/mode/static/state/changed_predicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,12 @@ func (f funcPredicate) delete(object client.Object) bool {
return f.stateChanged(object)
}

// generationChangedPredicate implements stateChangedPredicate based on the generation of the object.
// This predicate will return true on upsert if the object's generation has changed.
// It always returns true on delete.
type generationChangedPredicate struct{}

func (generationChangedPredicate) delete(_ client.Object) bool { return true }
// FIXME(kevin85421): We should remove this predicate and update changeTrackingUpdater once #1432 is merged.
type alwaysProcess struct{}

func (generationChangedPredicate) upsert(oldObject, newObject client.Object) bool {
if oldObject == nil {
return true
}
func (alwaysProcess) delete(_ client.Object) bool { return true }

if newObject == nil {
panic("Cannot determine if generation has changed on upsert because new object is nil")
}

return newObject.GetGeneration() != oldObject.GetGeneration()
}
func (alwaysProcess) upsert(_, _ client.Object) bool { return true }

// annotationChangedPredicate implements stateChangedPredicate based on the value of the annotation provided.
// This predicate will return true on upsert if the annotation's value has changed.
Expand Down
74 changes: 0 additions & 74 deletions internal/mode/static/state/changed_predicate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,80 +20,6 @@ func TestFuncPredicate(t *testing.T) {
g.Expect(p.upsert(nil, nil)).To(BeTrue())
}

func TestGenerationChangedPredicate_Delete(t *testing.T) {
p := generationChangedPredicate{}

g := NewWithT(t)
g.Expect(p.delete(nil)).To(BeTrue())
}

func TestGenerationChangedPredicate_Update(t *testing.T) {
tests := []struct {
oldObj client.Object
newObj client.Object
name string
stateChanged bool
expPanic bool
}{
{
name: "generation has changed",
oldObj: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Generation: 1,
},
},
newObj: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Generation: 2,
},
},
stateChanged: true,
},
{
name: "generation has not changed",
oldObj: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Generation: 1,
},
},
newObj: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Generation: 1,
},
},
stateChanged: false,
},
{
name: "old object is nil",
oldObj: nil,
newObj: &v1.Pod{},
stateChanged: true,
},
{
name: "new object is nil",
oldObj: &v1.Pod{},
newObj: nil,
expPanic: true,
},
}

p := generationChangedPredicate{}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
g := NewWithT(t)
if test.expPanic {
upsert := func() {
p.upsert(test.oldObj, test.newObj)
}
g.Expect(upsert).Should(Panic())
} else {
g.Expect(p.upsert(test.oldObj, test.newObj)).To(Equal(test.stateChanged))
}
})
}
}

func TestAnnotationChangedPredicate_Delete(t *testing.T) {
p := annotationChangedPredicate{}

Expand Down

0 comments on commit 9b31e87

Please sign in to comment.