Skip to content

Commit

Permalink
Delay removal of orphanPVC to avoid the removal of PVC in use (#67)
Browse files Browse the repository at this point in the history
* fix: delay removal of orphanPVC to avoid race condition

* fix: updated log messages

* fix: deletionTimeStamp label name changed and refactoring deleteOrphanPVC

* fix: removed else
  • Loading branch information
gurjotkaur20 committed Jun 22, 2023
1 parent 06fd4a6 commit 45b59e1
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 5 deletions.
85 changes: 80 additions & 5 deletions controllers/druid/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"fmt"
"regexp"
"sort"
"strconv"
"time"

autoscalev2 "k8s.io/api/autoscaling/v2"
networkingv1 "k8s.io/api/networking/v1"
Expand All @@ -31,6 +33,8 @@ import (
const (
druidOpResourceHash = "druidOpResourceHash"
defaultCommonConfigMountPath = "/druid/conf/druid/_common"
toBeDeletedLabel = "toBeDeleted"
deletionTSLabel = "deletionTS"
)

var logger = logf.Log.WithName("druid_operator_handler")
Expand Down Expand Up @@ -490,19 +494,90 @@ func deleteOrphanPVC(ctx context.Context, sdk client.Client, drd *v1alpha1.Druid
for i, pvc := range pvcList {

if !ContainsString(mountedPVC, pvc.GetName()) {
err := writers.Delete(ctx, sdk, drd, pvcList[i], emitEvents, &client.DeleteAllOfOptions{})
if err != nil {
return err

if _, ok := pvc.GetLabels()[toBeDeletedLabel]; ok {
err := checkPVCLabelsAndDelete(ctx, sdk, drd, emitEvents, pvcList[i])
if err != nil {
return err
}
} else {
msg := fmt.Sprintf("Deleted orphaned pvc [%s:%s] successfully", pvcList[i].GetName(), drd.Namespace)
logger.Info(msg, "name", drd.Name, "namespace", drd.Namespace)
// set labels when pvc comes for deletion for the first time
getPvcLabels := pvc.GetLabels()
getPvcLabels[toBeDeletedLabel] = "yes"
getPvcLabels[deletionTSLabel] = strconv.FormatInt(time.Now().Unix(), 10)

err = setPVCLabels(ctx, sdk, drd, emitEvents, pvcList[i], getPvcLabels, true)
if err != nil {
return err
}
}
} else {
// do not delete pvc
if _, ok := pvc.GetLabels()[toBeDeletedLabel]; ok {
getPvcLabels := pvc.GetLabels()
delete(getPvcLabels, toBeDeletedLabel)
delete(getPvcLabels, deletionTSLabel)

err = setPVCLabels(ctx, sdk, drd, emitEvents, pvcList[i], getPvcLabels, false)
if err != nil {
return err
}
}
}
}
}
return nil
}

func checkPVCLabelsAndDelete(ctx context.Context, sdk client.Client, drd *v1alpha1.Druid, emitEvents EventEmitter, pvc object) error {
deletionTS := pvc.GetLabels()[deletionTSLabel]

parsedDeletionTS, err := strconv.ParseInt(deletionTS, 10, 64)

if err != nil {
msg := fmt.Sprintf("Unable to parse label %s [%s:%s]", deletionTSLabel, deletionTS, pvc.GetName())
logger.Info(msg, "name", drd.Name, "namespace", drd.Namespace)
return err
}

timeNow := time.Now().Unix()
timeDiff := timeDifference(parsedDeletionTS, timeNow)

if timeDiff >= int64(time.Second/time.Second)*60 {
// delete pvc
err = writers.Delete(ctx, sdk, drd, pvc, emitEvents, &client.DeleteAllOfOptions{})
if err != nil {
return err
} else {
msg := fmt.Sprintf("Deleted orphaned pvc [%s:%s] successfully", pvc.GetName(), drd.Namespace)
logger.Info(msg, "name", drd.Name, "namespace", drd.Namespace)
}
} else {
// wait for 60s
msg := fmt.Sprintf("pvc [%s:%s] marked to be deleted after %ds", pvc.GetName(), drd.Namespace, 60-timeDiff)
logger.Info(msg, "name", drd.Name, "namespace", drd.Namespace)
}
return nil
}

func setPVCLabels(ctx context.Context, sdk client.Client, drd *v1alpha1.Druid, emitEvents EventEmitter, pvc object, labels map[string]string, isSetLabel bool) error {

pvc.SetLabels(labels)
_, err := writers.Update(ctx, sdk, drd, pvc, emitEvents)
if err != nil {
return err
} else {
if isSetLabel {
msg := fmt.Sprintf("marked pvc for deletion , added labels %s and %s successfully [%s]", toBeDeletedLabel, deletionTSLabel, pvc.GetName())
logger.Info(msg, "name", drd.Name, "namespace", drd.Namespace)
} else {
msg := fmt.Sprintf("unmarked pvc for deletion, removed labels %s and %s successfully in pvc [%s]", toBeDeletedLabel, deletionTSLabel, pvc.GetName())
logger.Info(msg, "name", drd.Name, "namespace", drd.Namespace)
}
}
return nil
}

func executeFinalizers(ctx context.Context, sdk client.Client, m *v1alpha1.Druid, emitEvents EventEmitter) error {

if ContainsString(m.ObjectMeta.Finalizers, finalizerName) {
Expand Down
10 changes: 10 additions & 0 deletions controllers/druid/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"reflect"
"strconv"
"strings"
"time"
)

func firstNonEmptyStr(s1 string, s2 string) string {
Expand Down Expand Up @@ -77,3 +78,12 @@ func Str2Int(s string) int {
}
return i
}

// to find the time difference between two epoch times
func timeDifference(epochTime1, epochTime2 int64) int64 {
t1 := time.Unix(epochTime1, 0)
t2 := time.Unix(epochTime2, 0)

diff := time.Duration(t2.Sub(t1))
return int64(diff.Seconds())
}

0 comments on commit 45b59e1

Please sign in to comment.