From f2d9cfb8c84be89826a55782c904908e7783d004 Mon Sep 17 00:00:00 2001 From: David Eads Date: Fri, 10 Jan 2025 16:23:23 -0500 Subject: [PATCH] switch to using the real FIFO Kubernetes-commit: a9aab298b4738f4ea9111131cdf193a3b1ba14e5 --- features/known_features.go | 7 +++++++ tools/cache/controller.go | 17 ++++++++++++----- tools/cache/controller_test.go | 7 ++----- tools/cache/shared_informer.go | 15 ++++++++++----- 4 files changed, 31 insertions(+), 15 deletions(-) diff --git a/features/known_features.go b/features/known_features.go index a74f6a833..344d2ebb7 100644 --- a/features/known_features.go +++ b/features/known_features.go @@ -53,6 +53,12 @@ const ( // alpha: v1.30 InformerResourceVersion Feature = "InformerResourceVersion" + // owner: @deads2k + // beta: v1.33 + // + // Refactor informers to deliver watch stream events in order instead of out of order. + InOrderInformers Feature = "InOrderInformers" + // owner: @p0lyn0mial // beta: v1.30 // @@ -73,5 +79,6 @@ var defaultKubernetesFeatureGates = map[Feature]FeatureSpec{ ClientsAllowCBOR: {Default: false, PreRelease: Alpha}, ClientsPreferCBOR: {Default: false, PreRelease: Alpha}, InformerResourceVersion: {Default: false, PreRelease: Alpha}, + InOrderInformers: {Default: true, PreRelease: Beta}, WatchListClient: {Default: false, PreRelease: Beta}, } diff --git a/tools/cache/controller.go b/tools/cache/controller.go index 9ea1f494a..1497700d8 100644 --- a/tools/cache/controller.go +++ b/tools/cache/controller.go @@ -19,6 +19,7 @@ package cache import ( "context" "errors" + clientgofeaturegate "k8s.io/client-go/features" "sync" "time" @@ -592,11 +593,17 @@ func newInformer(clientState Store, options InformerOptions) Controller { // This will hold incoming changes. Note how we pass clientState in as a // KeyLister, that way resync operations will result in the correct set // of update/delete deltas. - fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ - KnownObjects: clientState, - EmitDeltaTypeReplaced: true, - Transformer: options.Transform, - }) + + var fifo Queue + if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) { + fifo = NewRealFIFO(MetaNamespaceKeyFunc, clientState, options.Transform) + } else { + fifo = NewDeltaFIFOWithOptions(DeltaFIFOOptions{ + KnownObjects: clientState, + EmitDeltaTypeReplaced: true, + Transformer: options.Transform, + }) + } cfg := &Config{ Queue: fifo, diff --git a/tools/cache/controller_test.go b/tools/cache/controller_test.go index edc1c24a4..054257920 100644 --- a/tools/cache/controller_test.go +++ b/tools/cache/controller_test.go @@ -49,10 +49,7 @@ func Example() { // This will hold incoming changes. Note how we pass downstream in as a // KeyLister, that way resync operations will result in the correct set // of update/delete deltas. - fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ - KeyFunction: MetaNamespaceKeyFunc, - KnownObjects: downstream, - }) + fifo := NewRealFIFO(MetaNamespaceKeyFunc, downstream, nil) // Let's do threadsafe output to get predictable test results. deletionCounter := make(chan string, 1000) @@ -87,7 +84,7 @@ func Example() { // fifo's KeyOf is easiest, because it handles // DeletedFinalStateUnknown markers. - key, err := fifo.KeyOf(newest.Object) + key, err := fifo.keyOf(newest.Object) if err != nil { return err } diff --git a/tools/cache/shared_informer.go b/tools/cache/shared_informer.go index c511a0ceb..a8156a286 100644 --- a/tools/cache/shared_informer.go +++ b/tools/cache/shared_informer.go @@ -540,11 +540,16 @@ func (s *sharedIndexInformer) RunWithContext(ctx context.Context) { s.startedLock.Lock() defer s.startedLock.Unlock() - fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ - KnownObjects: s.indexer, - EmitDeltaTypeReplaced: true, - Transformer: s.transform, - }) + var fifo Queue + if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) { + fifo = NewRealFIFO(MetaNamespaceKeyFunc, s.indexer, s.transform) + } else { + fifo = NewDeltaFIFOWithOptions(DeltaFIFOOptions{ + KnownObjects: s.indexer, + EmitDeltaTypeReplaced: true, + Transformer: s.transform, + }) + } cfg := &Config{ Queue: fifo,