diff --git a/task/manager.go b/task/manager.go index c2e66fa1d..96ca6d7f7 100644 --- a/task/manager.go +++ b/task/manager.go @@ -6,6 +6,7 @@ import ( "fmt" "path" "strconv" + "strings" "time" "github.com/golang-jwt/jwt/v4" @@ -64,6 +65,36 @@ func (e *AddonNotFound) Is(err error) (matched bool) { return } +// QuotaExceeded report quota exceeded. +type QuotaExceeded struct { + Reason string +} + +// Match returns true when the error is Forbidden due to quota exceeded. +func (e *QuotaExceeded) Match(err error) (matched bool) { + if k8serr.IsForbidden(err) { + matched = true + e.Reason = err.Error() + for _, s := range []string{"quota", "exceeded"} { + matched = strings.Contains(e.Reason, s) + if !matched { + break + } + } + } + return +} + +func (e *QuotaExceeded) Error() (s string) { + return e.Reason +} + +func (e *QuotaExceeded) Is(err error) (matched bool) { + var inst *QuotaExceeded + matched = errors.As(err, &inst) + return +} + // Manager provides task management. type Manager struct { // DB @@ -142,28 +173,28 @@ func (m *Manager) startReady() { if m.postpone(ready, list) { ready.State = Postponed Log.Info("Task postponed.", "id", ready.ID) - sErr := m.DB.Save(ready).Error - Log.Error(sErr, "") + err := m.DB.Save(ready).Error + Log.Error(err, "") continue } - if ready.Retries == 0 { - metrics.TasksInitiated.Inc() - } rt := Task{ready} - err := rt.Run(m.Client) + started, err := rt.Run(m.Client) if err != nil { - if errors.Is(err, &AddonNotFound{}) { - ready.Error("Error", err.Error()) - ready.State = Failed - sErr := m.DB.Save(ready).Error - Log.Error(sErr, "") - } + Log.Error(err, "") + ready.Error("Error", err.Error()) + ready.State = Failed + err = m.DB.Save(ready).Error Log.Error(err, "") continue } - Log.Info("Task started.", "id", ready.ID) err = m.DB.Save(ready).Error Log.Error(err, "") + if started { + Log.Info("Task started.", "id", ready.ID) + if ready.Retries == 0 { + metrics.TasksInitiated.Inc() + } + } default: // Ignored. // Other states included to support @@ -256,13 +287,23 @@ type Task struct { } // Run the specified task. -func (r *Task) Run(client k8s.Client) (err error) { +func (r *Task) Run(client k8s.Client) (started bool, err error) { mark := time.Now() defer func() { - if err != nil { + if err == nil { + return + } + if errors.Is(err, &QuotaExceeded{}) { + Log.V(1).Info(err.Error()) + err = nil + return + } + if errors.Is(err, &AddonNotFound{}) { r.Error("Error", err.Error()) r.Terminated = &mark r.State = Failed + err = nil + return } }() addon, err := r.findAddon(client, r.Addon) @@ -288,6 +329,10 @@ func (r *Task) Run(client k8s.Client) (err error) { pod := r.pod(addon, owner, &secret) err = client.Create(context.TODO(), &pod) if err != nil { + qe := &QuotaExceeded{err.Error()} + if qe.Match(err) { + err = qe + } err = liberr.Wrap(err) return } @@ -309,6 +354,7 @@ func (r *Task) Run(client k8s.Client) (err error) { err = liberr.Wrap(err) return } + started = true r.Started = &mark r.State = Pending r.Pod = path.Join( @@ -329,7 +375,9 @@ func (r *Task) Reflect(client k8s.Client) (err error) { pod) if err != nil { if k8serr.IsNotFound(err) { - err = r.Run(client) + r.Pod = "" + r.State = Ready + err = nil } else { err = liberr.Wrap(err) }