Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bbolt-cache: debug log when cleaning objects, and throttle to once per 1s #135

Merged
merged 1 commit into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/xgql/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func main() { //nolint:gocyclo
var camid []clients.NewCacheMiddlewareFn
// wrap client.Cache in cache.*BBoltCache if cacheFile is specified.
if *cacheFile != "" {
camid = append(camid, cache.WithBBoltCache(*cacheFile))
camid = append(camid, cache.WithBBoltCache(*cacheFile, cache.WithLogger(log)))
}
// enable live queries
camid = append(camid, clients.WithLiveQueries)
Expand Down
12 changes: 11 additions & 1 deletion internal/cache/bbolt_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"

"github.com/crossplane/crossplane-runtime/pkg/errors"
"github.com/crossplane/crossplane-runtime/pkg/logging"

"github.com/upbound/xgql/internal/clients"
)
Expand Down Expand Up @@ -152,6 +153,13 @@ func wrapCacheTranform(prev, next toolscache.TransformFunc) toolscache.Transform
// Option is an option for a cache.
type Option func(*BBoltCache)

// WithLogger wires a logger into the bbolt cache.
func WithLogger(o logging.Logger) Option {
return func(c *BBoltCache) {
c.log = o
}
}

// Cache implements cache.Cache.
var _ cache.Cache = &BBoltCache{}

Expand All @@ -173,6 +181,8 @@ type BBoltCache struct {
unmarshalFn UnmarshalFn

running atomic.Bool

log logging.Logger
}

// NewBBoltCache creates a new cache.
Expand All @@ -196,7 +206,7 @@ func NewBBoltCache(cache cache.Cache, scheme *runtime.Scheme, file string, opts
ca.db = db
}
if ca.cleaner == nil {
ca.cleaner = NewCleaner[client.Object, string](getKey, ca.cleanup)
ca.cleaner = NewCleaner[client.Object, string](getKey, ca.cleanup, WithLoggerCleanerOpt[client.Object, string](ca.log))
}
return ca, nil
}
Expand Down
30 changes: 29 additions & 1 deletion internal/cache/cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"time"

"k8s.io/utils/clock"

"github.com/crossplane/crossplane-runtime/pkg/logging"
)

// Cleaner cleans up objects after a given duration.
Expand Down Expand Up @@ -86,25 +88,45 @@ type cleaner[T any, K cmp.Ordered] struct {
keyFn func(T) K
cleanFn func([]T) error

tick time.Duration
clock clock.Clock
signal chan struct{}
running atomic.Bool
mu sync.Mutex
exps []expKey[K]
refs map[K]expRef[T]

log logging.Logger
}

type CleanerOpt[T any, K cmp.Ordered] func(*cleaner[T, K])

// WithLoggerCleanerOpt wires the logger into the cleaner.
func WithLoggerCleanerOpt[T any, K cmp.Ordered](log logging.Logger) CleanerOpt[T, K] {
return func(c *cleaner[T, K]) {
c.log = log
}
}

// WithTick sets the tick interval for the cleaner. Set it to zero to clean up
// as soon as possible.
func WithTick[T any, K cmp.Ordered](tick time.Duration) CleanerOpt[T, K] {
return func(c *cleaner[T, K]) {
c.tick = tick
}
}

// NewCleaner creates a cleaner for objects of type T, identified by comparable key K,
// using cleanFn for cleanup.
func NewCleaner[T any, K cmp.Ordered](keyFn func(T) K, cleanFn func([]T) error, opts ...CleanerOpt[T, K]) *cleaner[T, K] {
c := &cleaner[T, K]{
keyFn: keyFn,
cleanFn: cleanFn,
tick: time.Second,
clock: clock.RealClock{},
signal: make(chan struct{}),
refs: make(map[K]expRef[T]),
log: logging.NewNopLogger(),
}
for _, opt := range opts {
opt(c)
Expand Down Expand Up @@ -142,12 +164,18 @@ func (c *cleaner[T, K]) Start(ctx context.Context) error {
}
objs, exp := c.collect(c.clock.Now())
if len(objs) > 0 {
c.log.Debug("cleaning up objects", "count", len(objs), "next", exp)
if err := c.cleanFn(objs); err != nil {
// exit Start and stop the cache
return err
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we'd want to stop the process here normally.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

For context, it's printed here and the cache is removed:

	go func() {
		err := ca.Start(ctx)
		log.Debug("Cache stopped", "error", err)

		// Start blocks until ctx is closed, or it encounters an error. If we make
		// it here either the cache crashed, or the context was cancelled (e.g.
		// because our session expired).
		c.remove(id)
	}()

So, it's not the process in the unix sense that is stopped, but the cache.

}
}
if !exp.IsZero() {
wakeup = c.clock.After(exp.Sub(c.clock.Now()))
after := exp.Sub(c.clock.Now())
if after < c.tick {
after = c.tick
}
wakeup = c.clock.After(after)
}
}
}
Expand Down
1 change: 1 addition & 0 deletions internal/cache/cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func TestCleaner_Schedule(t *testing.T) {
return nil
},
WithClock(clock),
WithTick[int, int](time.Duration(0)),
)
startedCh := make(chan struct{})
errCh := make(chan error, 1)
Expand Down
Loading