Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: persistent task worker queues #193

Merged
merged 1 commit into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions pkg/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ func NewLocalAdapter(cfg *config.Config) (*LocalAdapter, error) {
return nil, err
}

if err := task.Manager.Init(cfg.Provider.WorkDir); err != nil {
return nil, errors.Wrap(err, "task manager init")
}

worker, err := NewWorker(cfg.Converter.Worker)
if err != nil {
return nil, errors.Wrap(err, "create worker")
Expand Down Expand Up @@ -140,8 +144,10 @@ func (adp *LocalAdapter) Convert(ctx context.Context, source string) (*converter
}

func (adp *LocalAdapter) Dispatch(ctx context.Context, ref string, sync bool) error {
taskID := task.Manager.Create(ref)

taskID, err := task.Manager.Create(ref)
if err != nil {
return err
}
if sync {
// FIXME: The synchronous conversion task should also be
// executed in a limited worker queue.
Expand Down
114 changes: 98 additions & 16 deletions pkg/task/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,33 +15,40 @@
package task

import (
"encoding/json"
"path/filepath"
"sort"
"sync"
"time"

"github.com/goharbor/acceleration-service/pkg/converter"
"github.com/google/uuid"
"github.com/pkg/errors"
bolt "go.etcd.io/bbolt"
)

var bucketObjectTasks = []byte("tasks")

const taskMaximumKeepPeriod = time.Hour * 24

const StatusProcessing = "PROCESSING"
const StatusCompleted = "COMPLETED"
const StatusFailed = "FAILED"

type Task struct {
ID string `json:"id"`
Created time.Time `json:"created"`
Finished *time.Time `json:"finished"`
Source string `json:"source"`
SourceSize uint `json:"source_size"`
TargetSize uint `json:"target_size"`
Status string `json:"status"`
Reason string `json:"reason"`
ID string `json:"id"`
Created time.Time `json:"created"`
Finished time.Time `json:"finished"`
Source string `json:"source"`
SourceSize uint `json:"source_size"`
TargetSize uint `json:"target_size"`
Status string `json:"status"`
Reason string `json:"reason"`
}

type manager struct {
mutex sync.Mutex
db *bolt.DB
tasks map[string]*Task
}

Expand All @@ -62,27 +69,96 @@ func (t *Task) IsExpired() bool {
return false
}

func (m *manager) Create(source string) string {
// Init manager supported by boltdb.
func (m *manager) Init(workDir string) error {
bdb, err := bolt.Open(filepath.Join(workDir, "task.db"), 0655, nil)
if err != nil {
return errors.Wrap(err, "create task database")
}
m.db = bdb
return m.initDatabase()
}

// initDatabase loads tasks from the database into memory.
func (m *manager) initDatabase() error {
return m.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte("tasks"))
if bucket == nil {
return nil
}

return bucket.ForEach(func(k, v []byte) error {
var task Task
if err := json.Unmarshal(v, &task); err != nil {
return err
}
if task.Status == StatusProcessing {
return bucket.Delete([]byte(task.ID))
}
m.tasks[task.ID] = &task
Desiki-high marked this conversation as resolved.
Show resolved Hide resolved
return nil
})
})
}

// updateBucket updates task in bucket and creates a new bucket if it doesn't already exist.
func (m *manager) updateBucket(task *Task) error {
return m.db.Update(func(tx *bolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists(bucketObjectTasks)
if err != nil {
return err
}

taskJSON, err := json.Marshal(task)
if err != nil {
return err
}

if err := bucket.Put([]byte(task.ID), taskJSON); err != nil {
return err
}

return nil
})
}

// deleteBucket deletes a task in bucket
func (m *manager) deleteBucket(taskID string) error {
return m.db.Update(func(tx *bolt.Tx) error {
Desiki-high marked this conversation as resolved.
Show resolved Hide resolved
bucket := tx.Bucket(bucketObjectTasks)
if bucket == nil {
return nil
}
return bucket.Delete([]byte(taskID))
})
}

// Create new task
func (m *manager) Create(source string) (string, error) {
m.mutex.Lock()
defer m.mutex.Unlock()

id := uuid.NewString()

m.tasks[id] = &Task{
task := &Task{
ID: id,
Created: time.Now(),
Finished: nil,
Source: source,
SourceSize: 0,
TargetSize: 0,
Status: StatusProcessing,
Reason: "",
}

return id
m.tasks[id] = task
if err := m.updateBucket(task); err != nil {
return "", err
}
m.tasks[id] = task
return id, nil
}

func (m *manager) Finish(id string, metric *converter.Metric, err error) {
// Finish a task
func (m *manager) Finish(id string, metric *converter.Metric, err error) error {
m.mutex.Lock()
defer m.mutex.Unlock()

Expand All @@ -99,16 +175,22 @@ func (m *manager) Finish(id string, metric *converter.Metric, err error) {
task.SourceSize = uint(metric.SourceImageSize)
task.TargetSize = uint(metric.TargetImageSize)
}
now := time.Now()
task.Finished = &now
task.Finished = time.Now()
}
if err := m.updateBucket(task); err != nil {
return err
}

// Evict expired tasks.
for id, task := range m.tasks {
if task.IsExpired() {
delete(m.tasks, id)
if err := m.deleteBucket(id); err != nil {
return err
}
}
}
return nil
}

func (m *manager) List() []*Task {
Expand Down