diff --git a/pkg/adapter/adapter.go b/pkg/adapter/adapter.go index b1a81e27..0e32e74f 100644 --- a/pkg/adapter/adapter.go +++ b/pkg/adapter/adapter.go @@ -17,6 +17,7 @@ package adapter import ( "context" "strings" + "time" "github.com/containerd/containerd/namespaces" "github.com/pkg/errors" @@ -65,6 +66,8 @@ func NewLocalAdapter(cfg *config.Config) (*LocalAdapter, error) { if err != nil { return nil, errors.Wrap(err, "create content provider") } + // start scheduled gc task every hour + go startScheduledGC(content) cvt, err := converter.New( converter.WithProvider(provider), converter.WithDriver(cfg.Converter.Driver.Type, cfg.Converter.Driver.Config), @@ -94,6 +97,15 @@ func NewLocalAdapter(cfg *config.Config) (*LocalAdapter, error) { return handler, nil } +func startScheduledGC(content *content.Content) { + ticker := time.NewTicker(time.Hour) + for range ticker.C { + if err := content.GC(namespaces.WithNamespace(context.Background(), "acceleration-service"), content.Threshold/2); err != nil { + logrus.Error(errors.Wrap(err, "scheduled gc task")) + } + } +} + func (adp *LocalAdapter) Convert(ctx context.Context, source string) error { target, err := adp.rule.Map(source, TagSuffix) if err != nil { @@ -119,7 +131,7 @@ func (adp *LocalAdapter) Convert(ctx context.Context, source string) error { return err } adp.content.GcMutex.RUnlock() - if err := adp.content.GC(ctx); err != nil { + if err := adp.content.GC(ctx, adp.content.Threshold); err != nil { return err } return nil diff --git a/pkg/content/content.go b/pkg/content/content.go index 0349e3a3..2a6e7e6b 100644 --- a/pkg/content/content.go +++ b/pkg/content/content.go @@ -41,6 +41,8 @@ import ( "golang.org/x/sync/singleflight" ) +const gcPercent int64 = 80 + type Content struct { // db is the bolt database of content db *metadata.DB @@ -54,8 +56,8 @@ type Content struct { lc *leaseCache // store is the local content store wrapped inner db store ctrcontent.Store - // threshold is the maximum capacity of the local caches storage - threshold int64 + // Threshold is the maximum capacity of the local caches storage + Threshold int64 // remoteCache is the cache of remote layers remoteCache *RemoteCache } @@ -101,7 +103,7 @@ func NewContent(contentDir string, databaseDir string, threshold string, useRemo GcMutex: &sync.RWMutex{}, lc: lc, store: db.ContentStore(), - threshold: int64(t), + Threshold: int64(t), remoteCache: remoteCache, } return &content, nil @@ -133,13 +135,13 @@ func (content *Content) Size() (int64, error) { } // GC clean the local caches by cfg.Provider.GCPolicy configuration -func (content *Content) GC(ctx context.Context) error { +func (content *Content) GC(ctx context.Context, threshold int64) error { size, err := content.Size() if err != nil { return err } // if the local content size over eighty percent of threshold, gc start - if size > (content.threshold*int64(80))/100 { + if size > (threshold*gcPercent)/100 { if _, err, _ := content.gcSingleflight.Do(accelerationServiceNamespace, func() (interface{}, error) { content.GcMutex.Lock() defer content.GcMutex.Unlock() @@ -148,7 +150,7 @@ func (content *Content) GC(ctx context.Context) error { if err != nil { return nil, err } - return nil, content.garbageCollect(ctx, size-(content.threshold*int64(80))/100) + return nil, content.garbageCollect(ctx, size-(threshold*gcPercent)/100) }); err != nil { return err }