From d2d444d7ccf5ee748d6abe2f179294fdee792b14 Mon Sep 17 00:00:00 2001 From: Jan Safranek Date: Wed, 2 Oct 2019 10:41:43 +0200 Subject: [PATCH] Update sig-storage-lib-external-provisioner to v4.0.1 --- Gopkg.lock | 6 +- Gopkg.toml | 2 +- .../controller/controller.go | 111 +++++++++--------- .../controller/volume_store.go | 35 +++++- 4 files changed, 91 insertions(+), 63 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 9468a27a10..9474f15169 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -813,7 +813,7 @@ revision = "6c36bc71fc4aeb1f49801054e71aebdaef1fbeb4" [[projects]] - digest = "1:1628303a766eb8b6a1185756a064843933f453851720ad75e219489356dc26fa" + digest = "1:4e2ac2a0870f97ddaf096354f4417c39344a607df9f86efc474d38990d4c5274" name = "sigs.k8s.io/sig-storage-lib-external-provisioner" packages = [ "controller", @@ -821,8 +821,8 @@ "util", ] pruneopts = "NUT" - revision = "d22b74e900af4bf90174d259c3e52c3680b41ab4" - version = "v4.0.0" + revision = "c525773885fccef89a1edd22bb3f813d50548ed1" + version = "v4.0.1" [[projects]] digest = "1:8730e0150dfb2b7e173890c8b9868e7a273082ef8e39f4940e3506a481cf895c" diff --git a/Gopkg.toml b/Gopkg.toml index 2198172467..252d5d234b 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -35,7 +35,7 @@ [[constraint]] name = "sigs.k8s.io/sig-storage-lib-external-provisioner" - version = "v4.0.0" + version = "v4.0.1" # TODO: remove when released [[constraint]] diff --git a/vendor/sigs.k8s.io/sig-storage-lib-external-provisioner/controller/controller.go b/vendor/sigs.k8s.io/sig-storage-lib-external-provisioner/controller/controller.go index 2e2c3bba0e..094d5b2133 100644 --- a/vendor/sigs.k8s.io/sig-storage-lib-external-provisioner/controller/controller.go +++ b/vendor/sigs.k8s.io/sig-storage-lib-external-provisioner/controller/controller.go @@ -617,29 +617,6 @@ func NewProvisionController( controller.claimQueue = workqueue.NewNamedRateLimitingQueue(rateLimiter, "claims") controller.volumeQueue = workqueue.NewNamedRateLimitingQueue(rateLimiter, "volumes") - if controller.createProvisionerPVLimiter != nil { - glog.V(2).Infof("Using saving PVs to API server in background") - controller.volumeStore = NewVolumeStoreQueue(client, controller.createProvisionerPVLimiter) - } else { - if controller.createProvisionedPVBackoff == nil { - // Use linear backoff with createProvisionedPVInterval and createProvisionedPVRetryCount by default. - if controller.createProvisionedPVInterval == 0 { - controller.createProvisionedPVInterval = DefaultCreateProvisionedPVInterval - } - if controller.createProvisionedPVRetryCount == 0 { - controller.createProvisionedPVRetryCount = DefaultCreateProvisionedPVRetryCount - } - controller.createProvisionedPVBackoff = &wait.Backoff{ - Duration: controller.createProvisionedPVInterval, - Factor: 1, // linear backoff - Steps: controller.createProvisionedPVRetryCount, - //Cap: controller.createProvisionedPVInterval, - } - } - glog.V(2).Infof("Using blocking saving PVs to API server") - controller.volumeStore = NewBackoffStore(client, controller.eventRecorder, controller.createProvisionedPVBackoff, controller) - } - informer := informers.NewSharedInformerFactory(client, controller.resyncPeriod) // ---------------------- @@ -698,6 +675,30 @@ func NewProvisionController( } } controller.classes = controller.classInformer.GetStore() + + if controller.createProvisionerPVLimiter != nil { + glog.V(2).Infof("Using saving PVs to API server in background") + controller.volumeStore = NewVolumeStoreQueue(client, controller.createProvisionerPVLimiter, controller.claimsIndexer, controller.eventRecorder) + } else { + if controller.createProvisionedPVBackoff == nil { + // Use linear backoff with createProvisionedPVInterval and createProvisionedPVRetryCount by default. + if controller.createProvisionedPVInterval == 0 { + controller.createProvisionedPVInterval = DefaultCreateProvisionedPVInterval + } + if controller.createProvisionedPVRetryCount == 0 { + controller.createProvisionedPVRetryCount = DefaultCreateProvisionedPVRetryCount + } + controller.createProvisionedPVBackoff = &wait.Backoff{ + Duration: controller.createProvisionedPVInterval, + Factor: 1, // linear backoff + Steps: controller.createProvisionedPVRetryCount, + //Cap: controller.createProvisionedPVInterval, + } + } + glog.V(2).Infof("Using blocking saving PVs to API server") + controller.volumeStore = NewBackoffStore(client, controller.eventRecorder, controller.createProvisionedPVBackoff, controller) + } + return controller } @@ -881,7 +882,7 @@ func (ctrl *ProvisionController) processNextClaimWorkItem() bool { return fmt.Errorf("expected string in workqueue but got %#v", obj) } - if _, err := ctrl.syncClaimHandler(key); err != nil { + if err := ctrl.syncClaimHandler(key); err != nil { if ctrl.failedProvisionThreshold == 0 { glog.Warningf("Retrying syncing claim %q, failure %v", key, ctrl.claimQueue.NumRequeues(obj)) ctrl.claimQueue.AddRateLimited(obj) @@ -899,7 +900,8 @@ func (ctrl *ProvisionController) processNextClaimWorkItem() bool { } ctrl.claimQueue.Forget(obj) - glog.V(2).Infof("Provisioning succeeded, removing PVC %s from claims in progress", key) + // Silently remove the PVC from list of volumes in progress. The provisioning either succeeded + // or the PVC was ignored by this provisioner. ctrl.claimsInProgress.Delete(key) return nil }(obj) @@ -957,10 +959,10 @@ func (ctrl *ProvisionController) processNextVolumeWorkItem() bool { } // syncClaimHandler gets the claim from informer's cache then calls syncClaim -func (ctrl *ProvisionController) syncClaimHandler(key string) (ProvisioningState, error) { +func (ctrl *ProvisionController) syncClaimHandler(key string) error { objs, err := ctrl.claimsIndexer.ByIndex(uidIndex, key) if err != nil { - return ProvisioningFinished, err + return err } var claimObj interface{} if len(objs) > 0 { @@ -969,28 +971,11 @@ func (ctrl *ProvisionController) syncClaimHandler(key string) (ProvisioningState obj, found := ctrl.claimsInProgress.Load(key) if !found { utilruntime.HandleError(fmt.Errorf("claim %q in work queue no longer exists", key)) - return ProvisioningFinished, nil + return nil } claimObj = obj } - status, err := ctrl.syncClaim(claimObj) - if err == nil || status == ProvisioningFinished { - // Provisioning is 100% finished / not in progress. - glog.V(2).Infof("Final error received, removing PVC %s from claims in progress", key) - ctrl.claimsInProgress.Delete(key) - return status, err - } - if status == ProvisioningInBackground { - // Provisioning is in progress in background. - glog.V(2).Infof("Temporary error received, adding PVC %s to claims in progress", key) - ctrl.claimsInProgress.Store(key, claimObj) - } else { - // status == ProvisioningNoChange. - // Don't change claimsInProgress: - // - the claim is already there if previous status was ProvisioningInBackground. - // - the claim is not there if if previous status was ProvisioningFinished. - } - return status, err + return ctrl.syncClaim(claimObj) } // syncVolumeHandler gets the volume from informer's cache then calls syncVolume @@ -1009,25 +994,43 @@ func (ctrl *ProvisionController) syncVolumeHandler(key string) error { // syncClaim checks if the claim should have a volume provisioned for it and // provisions one if so. -func (ctrl *ProvisionController) syncClaim(obj interface{}) (ProvisioningState, error) { +func (ctrl *ProvisionController) syncClaim(obj interface{}) error { claim, ok := obj.(*v1.PersistentVolumeClaim) if !ok { - return ProvisioningFinished, fmt.Errorf("expected claim but got %+v", obj) + return fmt.Errorf("expected claim but got %+v", obj) } should, err := ctrl.shouldProvision(claim) if err != nil { - return ProvisioningFinished, err + return err } else if should { startTime := time.Now() - var status ProvisioningState - var err error - status, err = ctrl.provisionClaimOperation(claim) + status, err := ctrl.provisionClaimOperation(claim) ctrl.updateProvisionStats(claim, err, startTime) - return status, err + if err == nil || status == ProvisioningFinished { + // Provisioning is 100% finished / not in progress. + if err == nil { + glog.V(5).Infof("Claim processing succeeded, removing PVC %s from claims in progress", claim.UID) + } else { + glog.V(2).Infof("Final error received, removing PVC %s from claims in progress", claim.UID) + } + ctrl.claimsInProgress.Delete(string(claim.UID)) + return err + } + if status == ProvisioningInBackground { + // Provisioning is in progress in background. + glog.V(2).Infof("Temporary error received, adding PVC %s to claims in progress", claim.UID) + ctrl.claimsInProgress.Store(string(claim.UID), claim) + } else { + // status == ProvisioningNoChange. + // Don't change claimsInProgress: + // - the claim is already there if previous status was ProvisioningInBackground. + // - the claim is not there if if previous status was ProvisioningFinished. + } + return err } - return ProvisioningFinished, nil + return nil } // syncVolume checks if the volume should be deleted and deletes if so diff --git a/vendor/sigs.k8s.io/sig-storage-lib-external-provisioner/controller/volume_store.go b/vendor/sigs.k8s.io/sig-storage-lib-external-provisioner/controller/volume_store.go index 1d47ca490a..d535f2ac8c 100644 --- a/vendor/sigs.k8s.io/sig-storage-lib-external-provisioner/controller/volume_store.go +++ b/vendor/sigs.k8s.io/sig-storage-lib-external-provisioner/controller/volume_store.go @@ -24,11 +24,12 @@ import ( "k8s.io/client-go/tools/record" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" apierrs "k8s.io/apimachinery/pkg/api/errors" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/klog" ) @@ -56,8 +57,10 @@ type VolumeStore interface { // PVs to API server using a workqueue running in its own goroutine(s). // After failed save, volume is re-qeueued with exponential backoff. type queueStore struct { - client kubernetes.Interface - queue workqueue.RateLimitingInterface + client kubernetes.Interface + queue workqueue.RateLimitingInterface + eventRecorder record.EventRecorder + claimsIndexer cache.Indexer volumes sync.Map } @@ -68,11 +71,15 @@ var _ VolumeStore = &queueStore{} func NewVolumeStoreQueue( client kubernetes.Interface, limiter workqueue.RateLimiter, + claimsIndexer cache.Indexer, + eventRecorder record.EventRecorder, ) VolumeStore { return &queueStore{ - client: client, - queue: workqueue.NewNamedRateLimitingQueue(limiter, "unsavedpvs"), + client: client, + queue: workqueue.NewNamedRateLimitingQueue(limiter, "unsavedpvs"), + claimsIndexer: claimsIndexer, + eventRecorder: eventRecorder, } } @@ -148,11 +155,29 @@ func (q *queueStore) doSaveVolume(volume *v1.PersistentVolume) error { _, err := q.client.CoreV1().PersistentVolumes().Create(volume) if err == nil || apierrs.IsAlreadyExists(err) { klog.V(5).Infof("Volume %s saved", volume.Name) + q.sendSuccessEvent(volume) return nil } return fmt.Errorf("error saving volume %s: %s", volume.Name, err) } +func (q *queueStore) sendSuccessEvent(volume *v1.PersistentVolume) { + claimObjs, err := q.claimsIndexer.ByIndex(uidIndex, string(volume.Spec.ClaimRef.UID)) + if err != nil { + klog.V(2).Infof("Error sending event to claim %s: %s", volume.Spec.ClaimRef.UID, err) + return + } + if len(claimObjs) != 1 { + return + } + claim, ok := claimObjs[0].(*v1.PersistentVolumeClaim) + if !ok { + return + } + msg := fmt.Sprintf("Successfully provisioned volume %s", volume.Name) + q.eventRecorder.Event(claim, v1.EventTypeNormal, "ProvisioningSucceeded", msg) +} + // backoffStore is implementation of VolumeStore that blocks and tries to save // a volume to API server with configurable backoff. If saving fails, // StoreVolume() deletes the storage asset in the end and returns appropriate