Skip to content

Commit 008f603

Browse files
Merge pull request #558 from asm582/exp_upd
try to provide most recent copy of AW to enable fast deletion
2 parents ee36a02 + 3e86cab commit 008f603

File tree

1 file changed

+13
-3
lines changed

1 file changed

+13
-3
lines changed

pkg/controller/queuejob/queuejob_controller_ex.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -932,7 +932,7 @@ func (qjm *XController) ScheduleNext() {
932932
// the appwrapper from being added in syncjob
933933
defer qjm.schedulingAWAtomicSet(nil)
934934

935-
scheduleNextRetrier := retrier.New(retrier.ExponentialBackoff(10, 100*time.Millisecond), &EtcdErrorClassifier{})
935+
scheduleNextRetrier := retrier.New(retrier.ExponentialBackoff(1, 100*time.Millisecond), &EtcdErrorClassifier{})
936936
scheduleNextRetrier.SetJitter(0.05)
937937
// Retry the execution
938938
err = scheduleNextRetrier.Run(func() error {
@@ -1018,7 +1018,8 @@ func (qjm *XController) ScheduleNext() {
10181018
retryErr = qjm.updateStatusInEtcd(ctx, qj, "ScheduleNext - setHOL")
10191019
if retryErr != nil {
10201020
if apierrors.IsConflict(retryErr) {
1021-
klog.Warningf("[ScheduleNext] Conflict error detected when updating status in etcd for app wrapper '%s/%s, status = %+v. Retrying update.", qj.Namespace, qj.Name, qj.Status)
1021+
klog.Warningf("[ScheduleNext] Conflict error detected when updating status in etcd for app wrapper '%s/%s, status = %+v this may be due to appwrapper deletion.", qj.Namespace, qj.Name, qj.Status)
1022+
return nil
10221023
} else {
10231024
klog.Errorf("[ScheduleNext] Failed to updated status in etcd for app wrapper '%s/%s', status = %+v, err=%v", qj.Namespace, qj.Name, qj.Status, retryErr)
10241025
}
@@ -1068,6 +1069,8 @@ func (qjm *XController) ScheduleNext() {
10681069
}
10691070
return retryErr
10701071
}
1072+
//Remove stale copy
1073+
qjm.eventQueue.Delete(qj)
10711074
if err00 := qjm.eventQueue.Add(qj); err00 != nil { // unsuccessful add to eventQueue, add back to activeQ
10721075
klog.Errorf("[ScheduleNext] [Dispatcher Mode] Fail to add %s to eventQueue, activeQ.Add_toSchedulingQueue &qj=%p Version=%s Status=%+v err=%#v", qj.Name, qj, qj.ResourceVersion, qj.Status, err)
10731076
qjm.qjqueue.MoveToActiveQueueIfExists(qj)
@@ -1224,6 +1227,8 @@ func (qjm *XController) ScheduleNext() {
12241227
}
12251228
tempAW.DeepCopyInto(qj)
12261229
// add to eventQueue for dispatching to Etcd
1230+
// Remove stale copy
1231+
qjm.eventQueue.Delete(qj)
12271232
if err00 := qjm.eventQueue.Add(qj); err00 != nil { // unsuccessful add to eventQueue, add back to activeQ
12281233
klog.Errorf("[ScheduleNext] [Agent Mode] Failed to add '%s/%s' to eventQueue, activeQ.Add_toSchedulingQueue &qj=%p Version=%s Status=%+v err=%#v", qj.Namespace,
12291234
qj.Name, qj, qj.ResourceVersion, qj.Status, err)
@@ -1587,7 +1592,11 @@ func (cc *XController) deleteQueueJob(obj interface{}) {
15871592
accessor.SetDeletionTimestamp(&current_ts)
15881593
}
15891594
klog.V(3).Infof("[Informer-deleteQJ] %s enqueue deletion, deletion ts = %v", qj.Name, qj.GetDeletionTimestamp())
1590-
cc.enqueue(qj)
1595+
//Remove stale copy
1596+
cc.eventQueue.Delete(qj)
1597+
cc.qjqueue.Delete(qj)
1598+
//Add fresh copy
1599+
cc.eventQueue.Add(qj)
15911600
}
15921601

15931602
func (cc *XController) enqueue(obj interface{}) error {
@@ -2146,6 +2155,7 @@ func (cc *XController) Cleanup(ctx context.Context, appwrapper *arbv1.AppWrapper
21462155
}
21472156
func (cc *XController) getAppWrapper(namespace string, name string, caller string) (*arbv1.AppWrapper, error) {
21482157
klog.V(5).Infof("[getAppWrapper] getting a copy of '%s/%s' when called by '%s'.", namespace, name, caller)
2158+
21492159
apiCacheAWJob, err := cc.appWrapperLister.AppWrappers(namespace).Get(name)
21502160
if err != nil {
21512161
if !apierrors.IsNotFound(err) {

0 commit comments

Comments
 (0)