Skip to content

Commit

Permalink
switch to using the real FIFO
Browse files Browse the repository at this point in the history
Kubernetes-commit: a9aab298b4738f4ea9111131cdf193a3b1ba14e5
  • Loading branch information
deads2k authored and k8s-publishing-bot committed Jan 10, 2025
1 parent 43bf1a1 commit f2d9cfb
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 15 deletions.
7 changes: 7 additions & 0 deletions features/known_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ const (
// alpha: v1.30
InformerResourceVersion Feature = "InformerResourceVersion"

// owner: @deads2k
// beta: v1.33
//
// Refactor informers to deliver watch stream events in order instead of out of order.
InOrderInformers Feature = "InOrderInformers"

// owner: @p0lyn0mial
// beta: v1.30
//
Expand All @@ -73,5 +79,6 @@ var defaultKubernetesFeatureGates = map[Feature]FeatureSpec{
ClientsAllowCBOR: {Default: false, PreRelease: Alpha},
ClientsPreferCBOR: {Default: false, PreRelease: Alpha},
InformerResourceVersion: {Default: false, PreRelease: Alpha},
InOrderInformers: {Default: true, PreRelease: Beta},
WatchListClient: {Default: false, PreRelease: Beta},
}
17 changes: 12 additions & 5 deletions tools/cache/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package cache
import (
"context"
"errors"
clientgofeaturegate "k8s.io/client-go/features"
"sync"
"time"

Expand Down Expand Up @@ -592,11 +593,17 @@ func newInformer(clientState Store, options InformerOptions) Controller {
// This will hold incoming changes. Note how we pass clientState in as a
// KeyLister, that way resync operations will result in the correct set
// of update/delete deltas.
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: clientState,
EmitDeltaTypeReplaced: true,
Transformer: options.Transform,
})

var fifo Queue
if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) {
fifo = NewRealFIFO(MetaNamespaceKeyFunc, clientState, options.Transform)
} else {
fifo = NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: clientState,
EmitDeltaTypeReplaced: true,
Transformer: options.Transform,
})
}

cfg := &Config{
Queue: fifo,
Expand Down
7 changes: 2 additions & 5 deletions tools/cache/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,7 @@ func Example() {
// This will hold incoming changes. Note how we pass downstream in as a
// KeyLister, that way resync operations will result in the correct set
// of update/delete deltas.
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KeyFunction: MetaNamespaceKeyFunc,
KnownObjects: downstream,
})
fifo := NewRealFIFO(MetaNamespaceKeyFunc, downstream, nil)

// Let's do threadsafe output to get predictable test results.
deletionCounter := make(chan string, 1000)
Expand Down Expand Up @@ -87,7 +84,7 @@ func Example() {

// fifo's KeyOf is easiest, because it handles
// DeletedFinalStateUnknown markers.
key, err := fifo.KeyOf(newest.Object)
key, err := fifo.keyOf(newest.Object)
if err != nil {
return err
}
Expand Down
15 changes: 10 additions & 5 deletions tools/cache/shared_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,11 +540,16 @@ func (s *sharedIndexInformer) RunWithContext(ctx context.Context) {
s.startedLock.Lock()
defer s.startedLock.Unlock()

fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: s.indexer,
EmitDeltaTypeReplaced: true,
Transformer: s.transform,
})
var fifo Queue
if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) {
fifo = NewRealFIFO(MetaNamespaceKeyFunc, s.indexer, s.transform)
} else {
fifo = NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: s.indexer,
EmitDeltaTypeReplaced: true,
Transformer: s.transform,
})
}

cfg := &Config{
Queue: fifo,
Expand Down

0 comments on commit f2d9cfb

Please sign in to comment.