Skip to content

Commit

Permalink
Merge pull request #2478 from wallee94/remove-shard-validation-on-sts…
Browse files Browse the repository at this point in the history
…-create

fix(discovery): configure sharding every time MetricsHandler.Run runs
  • Loading branch information
k8s-ci-robot authored Oct 15, 2024
2 parents 77a99a5 + 166921b commit f3b7593
Show file tree
Hide file tree
Showing 4 changed files with 248 additions and 90 deletions.
22 changes: 2 additions & 20 deletions internal/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,6 @@ func (r *CRDiscoverer) PollForCacheUpdates(
) {
// The interval at which we will check the cache for updates.
t := time.NewTicker(Interval)
// Track previous context to allow refreshing cache.
olderContext, olderCancel := context.WithCancel(ctx)
// Prevent context leak (kill the last metric handler instance).
defer olderCancel()
generateMetrics := func() {
// Get families for discovered factories.
customFactories, err := factoryGenerator()
Expand Down Expand Up @@ -239,21 +235,8 @@ func (r *CRDiscoverer) PollForCacheUpdates(
r.SafeWrite(func() {
r.WasUpdated = false
})
// Run the metrics handler with updated configs.
olderContext, olderCancel = context.WithCancel(ctx)
go func() {
// Blocks indefinitely until the unbuffered context is cancelled to serve metrics for that duration.
err = m.Run(olderContext)
if err != nil {
// Check if context was cancelled.
select {
case <-olderContext.Done():
// Context cancelled, don't really need to log this though.
default:
klog.ErrorS(err, "failed to run metrics handler")
}
}
}()
// Update metric handler with the new configs.
m.BuildWriters(ctx)
}
go func() {
for range t.C {
Expand All @@ -269,7 +252,6 @@ func (r *CRDiscoverer) PollForCacheUpdates(
shouldGenerateMetrics = r.WasUpdated
})
if shouldGenerateMetrics {
olderCancel()
generateMetrics()
klog.InfoS("discovery finished, cache updated")
}
Expand Down
14 changes: 6 additions & 8 deletions pkg/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,14 +286,12 @@ func RunKubeStateMetrics(ctx context.Context, opts *options.Options) error {
opts.EnableGZIPEncoding,
)
// Run MetricsHandler
if config == nil {
ctxMetricsHandler, cancel := context.WithCancel(ctx)
g.Add(func() error {
return m.Run(ctxMetricsHandler)
}, func(error) {
cancel()
})
}
ctxMetricsHandler, cancel := context.WithCancel(ctx)
g.Add(func() error {
return m.Run(ctxMetricsHandler)
}, func(error) {
cancel()
})

tlsConfig := opts.TLSConfig

Expand Down
25 changes: 18 additions & 7 deletions pkg/metricshandler/metrics_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,24 +69,35 @@ func New(opts *options.Options, kubeClient kubernetes.Interface, storeBuilder ks
}
}

// ConfigureSharding (re-)configures sharding. Re-configuration can be done
// concurrently.
func (m *MetricsHandler) ConfigureSharding(ctx context.Context, shard int32, totalShards int) {
// BuildWriters builds the metrics writers, cancelling any previous context and passing a new one on every build.
// Build can be used multiple times and concurrently.
func (m *MetricsHandler) BuildWriters(ctx context.Context) {
m.mtx.Lock()
defer m.mtx.Unlock()

if m.cancel != nil {
m.cancel()
}
if totalShards != 1 {
klog.InfoS("Configuring sharding of this instance to be shard index (zero-indexed) out of total shards", "shard", shard, "totalShards", totalShards)
}
ctx, m.cancel = context.WithCancel(ctx)
m.storeBuilder.WithSharding(shard, totalShards)
m.storeBuilder.WithContext(ctx)
m.metricsWriters = m.storeBuilder.Build()
}

// ConfigureSharding configures sharding. Configuration can be used multiple times and
// concurrently.
func (m *MetricsHandler) ConfigureSharding(ctx context.Context, shard int32, totalShards int) {
m.mtx.Lock()

if totalShards != 1 {
klog.InfoS("Configuring sharding of this instance to be shard index (zero-indexed) out of total shards", "shard", shard, "totalShards", totalShards)
}
m.curShard = shard
m.curTotalShards = totalShards
m.storeBuilder.WithSharding(shard, totalShards)

// unlock because BuildWriters will hold a lock again
m.mtx.Unlock()
m.BuildWriters(ctx)
}

// Run configures the MetricsHandler's sharding and if autosharding is enabled
Expand Down
Loading

0 comments on commit f3b7593

Please sign in to comment.