Skip to content

Commit

Permalink
Merge pull request #179 from Desiki-high/gc-task
Browse files Browse the repository at this point in the history
gc: add scheduled gc task
  • Loading branch information
imeoer authored Sep 6, 2023
2 parents ef20453 + 65f8c55 commit a7aa579
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 7 deletions.
14 changes: 13 additions & 1 deletion pkg/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package adapter
import (
"context"
"strings"
"time"

"github.com/containerd/containerd/namespaces"
"github.com/pkg/errors"
Expand Down Expand Up @@ -65,6 +66,8 @@ func NewLocalAdapter(cfg *config.Config) (*LocalAdapter, error) {
if err != nil {
return nil, errors.Wrap(err, "create content provider")
}
// start scheduled gc task every hour
go startScheduledGC(content)
cvt, err := converter.New(
converter.WithProvider(provider),
converter.WithDriver(cfg.Converter.Driver.Type, cfg.Converter.Driver.Config),
Expand Down Expand Up @@ -94,6 +97,15 @@ func NewLocalAdapter(cfg *config.Config) (*LocalAdapter, error) {
return handler, nil
}

func startScheduledGC(content *content.Content) {
ticker := time.NewTicker(time.Hour)
for range ticker.C {
if err := content.GC(namespaces.WithNamespace(context.Background(), "acceleration-service"), content.Threshold/2); err != nil {
logrus.Error(errors.Wrap(err, "scheduled gc task"))
}
}
}

func (adp *LocalAdapter) Convert(ctx context.Context, source string) error {
target, err := adp.rule.Map(source, TagSuffix)
if err != nil {
Expand All @@ -119,7 +131,7 @@ func (adp *LocalAdapter) Convert(ctx context.Context, source string) error {
return err
}
adp.content.GcMutex.RUnlock()
if err := adp.content.GC(ctx); err != nil {
if err := adp.content.GC(ctx, adp.content.Threshold); err != nil {
return err
}
return nil
Expand Down
14 changes: 8 additions & 6 deletions pkg/content/content.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ import (
"golang.org/x/sync/singleflight"
)

const gcPercent int64 = 80

type Content struct {
// db is the bolt database of content
db *metadata.DB
Expand All @@ -54,8 +56,8 @@ type Content struct {
lc *leaseCache
// store is the local content store wrapped inner db
store ctrcontent.Store
// threshold is the maximum capacity of the local caches storage
threshold int64
// Threshold is the maximum capacity of the local caches storage
Threshold int64
// remoteCache is the cache of remote layers
remoteCache *RemoteCache
}
Expand Down Expand Up @@ -101,7 +103,7 @@ func NewContent(contentDir string, databaseDir string, threshold string, useRemo
GcMutex: &sync.RWMutex{},
lc: lc,
store: db.ContentStore(),
threshold: int64(t),
Threshold: int64(t),
remoteCache: remoteCache,
}
return &content, nil
Expand Down Expand Up @@ -133,13 +135,13 @@ func (content *Content) Size() (int64, error) {
}

// GC clean the local caches by cfg.Provider.GCPolicy configuration
func (content *Content) GC(ctx context.Context) error {
func (content *Content) GC(ctx context.Context, threshold int64) error {
size, err := content.Size()
if err != nil {
return err
}
// if the local content size over eighty percent of threshold, gc start
if size > (content.threshold*int64(80))/100 {
if size > (threshold*gcPercent)/100 {
if _, err, _ := content.gcSingleflight.Do(accelerationServiceNamespace, func() (interface{}, error) {
content.GcMutex.Lock()
defer content.GcMutex.Unlock()
Expand All @@ -148,7 +150,7 @@ func (content *Content) GC(ctx context.Context) error {
if err != nil {
return nil, err
}
return nil, content.garbageCollect(ctx, size-(content.threshold*int64(80))/100)
return nil, content.garbageCollect(ctx, size-(threshold*gcPercent)/100)
}); err != nil {
return err
}
Expand Down

0 comments on commit a7aa579

Please sign in to comment.