Skip to content

Commit

Permalink
🐛 Fix Fail when Resource Quota exceeded. (#627)
Browse files Browse the repository at this point in the history
Detect resource quota exceeded and leave the task.State=Ready.

closes #625

---------

Signed-off-by: Jeff Ortel <jortel@redhat.com>
  • Loading branch information
jortel authored Apr 25, 2024
1 parent 719a452 commit cdc096f
Showing 1 changed file with 64 additions and 16 deletions.
80 changes: 64 additions & 16 deletions task/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"path"
"strconv"
"strings"
"time"

"github.com/golang-jwt/jwt/v4"
Expand Down Expand Up @@ -64,6 +65,36 @@ func (e *AddonNotFound) Is(err error) (matched bool) {
return
}

// QuotaExceeded report quota exceeded.
type QuotaExceeded struct {
Reason string
}

// Match returns true when the error is Forbidden due to quota exceeded.
func (e *QuotaExceeded) Match(err error) (matched bool) {
if k8serr.IsForbidden(err) {
matched = true
e.Reason = err.Error()
for _, s := range []string{"quota", "exceeded"} {
matched = strings.Contains(e.Reason, s)
if !matched {
break
}
}
}
return
}

func (e *QuotaExceeded) Error() (s string) {
return e.Reason
}

func (e *QuotaExceeded) Is(err error) (matched bool) {
var inst *QuotaExceeded
matched = errors.As(err, &inst)
return
}

// Manager provides task management.
type Manager struct {
// DB
Expand Down Expand Up @@ -142,28 +173,28 @@ func (m *Manager) startReady() {
if m.postpone(ready, list) {
ready.State = Postponed
Log.Info("Task postponed.", "id", ready.ID)
sErr := m.DB.Save(ready).Error
Log.Error(sErr, "")
err := m.DB.Save(ready).Error
Log.Error(err, "")
continue
}
if ready.Retries == 0 {
metrics.TasksInitiated.Inc()
}
rt := Task{ready}
err := rt.Run(m.Client)
started, err := rt.Run(m.Client)
if err != nil {
if errors.Is(err, &AddonNotFound{}) {
ready.Error("Error", err.Error())
ready.State = Failed
sErr := m.DB.Save(ready).Error
Log.Error(sErr, "")
}
Log.Error(err, "")
ready.Error("Error", err.Error())
ready.State = Failed
err = m.DB.Save(ready).Error
Log.Error(err, "")
continue
}
Log.Info("Task started.", "id", ready.ID)
err = m.DB.Save(ready).Error
Log.Error(err, "")
if started {
Log.Info("Task started.", "id", ready.ID)
if ready.Retries == 0 {
metrics.TasksInitiated.Inc()
}
}
default:
// Ignored.
// Other states included to support
Expand Down Expand Up @@ -256,13 +287,23 @@ type Task struct {
}

// Run the specified task.
func (r *Task) Run(client k8s.Client) (err error) {
func (r *Task) Run(client k8s.Client) (started bool, err error) {
mark := time.Now()
defer func() {
if err != nil {
if err == nil {
return
}
if errors.Is(err, &QuotaExceeded{}) {
Log.V(1).Info(err.Error())
err = nil
return
}
if errors.Is(err, &AddonNotFound{}) {
r.Error("Error", err.Error())
r.Terminated = &mark
r.State = Failed
err = nil
return
}
}()
addon, err := r.findAddon(client, r.Addon)
Expand All @@ -288,6 +329,10 @@ func (r *Task) Run(client k8s.Client) (err error) {
pod := r.pod(addon, owner, &secret)
err = client.Create(context.TODO(), &pod)
if err != nil {
qe := &QuotaExceeded{err.Error()}
if qe.Match(err) {
err = qe
}
err = liberr.Wrap(err)
return
}
Expand All @@ -309,6 +354,7 @@ func (r *Task) Run(client k8s.Client) (err error) {
err = liberr.Wrap(err)
return
}
started = true
r.Started = &mark
r.State = Pending
r.Pod = path.Join(
Expand All @@ -329,7 +375,9 @@ func (r *Task) Reflect(client k8s.Client) (err error) {
pod)
if err != nil {
if k8serr.IsNotFound(err) {
err = r.Run(client)
r.Pod = ""
r.State = Ready
err = nil
} else {
err = liberr.Wrap(err)
}
Expand Down

0 comments on commit cdc096f

Please sign in to comment.