From 7ad03804b1c61419730c924e3d1ed643686b952b Mon Sep 17 00:00:00 2001 From: Ryan Phillips Date: Wed, 16 Jun 2021 12:10:47 -0500 Subject: [PATCH 1/2] UPSTREAM: 86320: Add UID precondition to kubelet pod status patch updates --- .../pkg/kubelet/status/status_manager.go | 4 ++-- vendor/k8s.io/kubernetes/pkg/util/pod/pod.go | 12 +++++++----- .../kubernetes/pkg/util/pod/pod_test.go | 19 +++++++++++-------- 3 files changed, 20 insertions(+), 15 deletions(-) diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/status/status_manager.go b/vendor/k8s.io/kubernetes/pkg/kubelet/status/status_manager.go index 1dbfc6aa5e7b..e4d2a369f137 100644 --- a/vendor/k8s.io/kubernetes/pkg/kubelet/status/status_manager.go +++ b/vendor/k8s.io/kubernetes/pkg/kubelet/status/status_manager.go @@ -25,7 +25,7 @@ import ( clientset "k8s.io/client-go/kubernetes" "github.com/golang/glog" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -492,7 +492,7 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) { } oldStatus := pod.Status.DeepCopy() - newPod, patchBytes, err := statusutil.PatchPodStatus(m.kubeClient, pod.Namespace, pod.Name, *oldStatus, mergePodStatus(*oldStatus, status.status)) + newPod, patchBytes, err := statusutil.PatchPodStatus(m.kubeClient, pod.Namespace, pod.Name, pod.UID, *oldStatus, mergePodStatus(*oldStatus, status.status)) glog.V(3).Infof("Patch status for pod %q with %q", format.Pod(pod), patchBytes) if err != nil { glog.Warningf("Failed to update status for pod %q: %v", format.Pod(pod), err) diff --git a/vendor/k8s.io/kubernetes/pkg/util/pod/pod.go b/vendor/k8s.io/kubernetes/pkg/util/pod/pod.go index 81d4304fa2be..73b1a3834343 100644 --- a/vendor/k8s.io/kubernetes/pkg/util/pod/pod.go +++ b/vendor/k8s.io/kubernetes/pkg/util/pod/pod.go @@ -20,15 +20,16 @@ import ( "encoding/json" "fmt" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/strategicpatch" clientset "k8s.io/client-go/kubernetes" ) // PatchPodStatus patches pod status. -func PatchPodStatus(c clientset.Interface, namespace, name string, oldPodStatus, newPodStatus v1.PodStatus) (*v1.Pod, []byte, error) { - patchBytes, err := preparePatchBytesforPodStatus(namespace, name, oldPodStatus, newPodStatus) +func PatchPodStatus(c clientset.Interface, namespace, name string, uid types.UID, oldPodStatus, newPodStatus v1.PodStatus) (*v1.Pod, []byte, error) { + patchBytes, err := preparePatchBytesforPodStatus(namespace, name, uid, oldPodStatus, newPodStatus) if err != nil { return nil, nil, err } @@ -40,7 +41,7 @@ func PatchPodStatus(c clientset.Interface, namespace, name string, oldPodStatus, return updatedPod, patchBytes, nil } -func preparePatchBytesforPodStatus(namespace, name string, oldPodStatus, newPodStatus v1.PodStatus) ([]byte, error) { +func preparePatchBytesforPodStatus(namespace, name string, uid types.UID, oldPodStatus, newPodStatus v1.PodStatus) ([]byte, error) { oldData, err := json.Marshal(v1.Pod{ Status: oldPodStatus, }) @@ -49,7 +50,8 @@ func preparePatchBytesforPodStatus(namespace, name string, oldPodStatus, newPodS } newData, err := json.Marshal(v1.Pod{ - Status: newPodStatus, + ObjectMeta: metav1.ObjectMeta{UID: uid}, // only put the uid in the new object to ensure it appears in the patch as a precondition + Status: newPodStatus, }) if err != nil { return nil, fmt.Errorf("failed to Marshal newData for pod %q/%q: %v", namespace, name, err) diff --git a/vendor/k8s.io/kubernetes/pkg/util/pod/pod_test.go b/vendor/k8s.io/kubernetes/pkg/util/pod/pod_test.go index af0278fa0902..9c2b7e4f11af 100644 --- a/vendor/k8s.io/kubernetes/pkg/util/pod/pod_test.go +++ b/vendor/k8s.io/kubernetes/pkg/util/pod/pod_test.go @@ -17,18 +17,21 @@ limitations under the License. package pod import ( + "fmt" "testing" - "fmt" - "k8s.io/api/core/v1" + "reflect" + + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/fake" - "reflect" ) func TestPatchPodStatus(t *testing.T) { ns := "ns" name := "name" + uid := types.UID("myuid") client := &fake.Clientset{} client.CoreV1().Pods(ns).Create(&v1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -45,7 +48,7 @@ func TestPatchPodStatus(t *testing.T) { { "no change", func(input v1.PodStatus) v1.PodStatus { return input }, - []byte(fmt.Sprintf(`{}`)), + []byte(fmt.Sprintf(`{"metadata":{"uid":"myuid"}}`)), }, { "message change", @@ -53,7 +56,7 @@ func TestPatchPodStatus(t *testing.T) { input.Message = "random message" return input }, - []byte(fmt.Sprintf(`{"status":{"message":"random message"}}`)), + []byte(fmt.Sprintf(`{"metadata":{"uid":"myuid"},"status":{"message":"random message"}}`)), }, { "pod condition change", @@ -61,7 +64,7 @@ func TestPatchPodStatus(t *testing.T) { input.Conditions[0].Status = v1.ConditionFalse return input }, - []byte(fmt.Sprintf(`{"status":{"$setElementOrder/conditions":[{"type":"Ready"},{"type":"PodScheduled"}],"conditions":[{"status":"False","type":"Ready"}]}}`)), + []byte(fmt.Sprintf(`{"metadata":{"uid":"myuid"},"status":{"$setElementOrder/conditions":[{"type":"Ready"},{"type":"PodScheduled"}],"conditions":[{"status":"False","type":"Ready"}]}}`)), }, { "additional init container condition", @@ -74,11 +77,11 @@ func TestPatchPodStatus(t *testing.T) { } return input }, - []byte(fmt.Sprintf(`{"status":{"initContainerStatuses":[{"image":"","imageID":"","lastState":{},"name":"init-container","ready":true,"restartCount":0,"state":{}}]}}`)), + []byte(fmt.Sprintf(`{"metadata":{"uid":"myuid"},"status":{"initContainerStatuses":[{"image":"","imageID":"","lastState":{},"name":"init-container","ready":true,"restartCount":0,"state":{}}]}}`)), }, } for _, tc := range testCases { - _, patchBytes, err := PatchPodStatus(client, ns, name, getPodStatus(), tc.mutate(getPodStatus())) + _, patchBytes, err := PatchPodStatus(client, ns, name, uid, getPodStatus(), tc.mutate(getPodStatus())) if err != nil { t.Errorf("unexpected error: %v", err) } From a365637ef4f0a3c2b50186c2c3abc71dc545adba Mon Sep 17 00:00:00 2001 From: Ryan Phillips Date: Wed, 16 Jun 2021 12:15:04 -0500 Subject: [PATCH 2/2] UPSTREAM: 88591: Avoid sending no-op patches --- .../pkg/kubelet/status/status_manager.go | 10 +++++--- .../pkg/kubelet/status/status_manager_test.go | 20 +++++++++------- vendor/k8s.io/kubernetes/pkg/util/pod/pod.go | 24 +++++++++++-------- .../kubernetes/pkg/util/pod/pod_test.go | 24 +++++++++++++------ 4 files changed, 50 insertions(+), 28 deletions(-) diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/status/status_manager.go b/vendor/k8s.io/kubernetes/pkg/kubelet/status/status_manager.go index e4d2a369f137..73bb2260b3b8 100644 --- a/vendor/k8s.io/kubernetes/pkg/kubelet/status/status_manager.go +++ b/vendor/k8s.io/kubernetes/pkg/kubelet/status/status_manager.go @@ -492,15 +492,19 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) { } oldStatus := pod.Status.DeepCopy() - newPod, patchBytes, err := statusutil.PatchPodStatus(m.kubeClient, pod.Namespace, pod.Name, pod.UID, *oldStatus, mergePodStatus(*oldStatus, status.status)) + newPod, patchBytes, unchanged, err := statusutil.PatchPodStatus(m.kubeClient, pod.Namespace, pod.Name, pod.UID, *oldStatus, mergePodStatus(*oldStatus, status.status)) glog.V(3).Infof("Patch status for pod %q with %q", format.Pod(pod), patchBytes) if err != nil { glog.Warningf("Failed to update status for pod %q: %v", format.Pod(pod), err) return } - pod = newPod + if unchanged { + glog.V(3).Infof("Status for pod %q is up-to-date: (%d)", format.Pod(pod), status.version) + } else { + glog.V(3).Infof("Status for pod %q updated successfully: (%d, %+v)", format.Pod(pod), status.version, status.status) + pod = newPod + } - glog.V(3).Infof("Status for pod %q updated successfully: (%d, %+v)", format.Pod(pod), status.version, status.status) m.apiStatusVersions[kubetypes.MirrorPodUID(pod.UID)] = status.version // We don't handle graceful deletion of mirror pods. diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/status/status_manager_test.go b/vendor/k8s.io/kubernetes/pkg/kubelet/status/status_manager_test.go index 03f79b2a1bb5..7ca1ee13aecd 100644 --- a/vendor/k8s.io/kubernetes/pkg/kubelet/status/status_manager_test.go +++ b/vendor/k8s.io/kubernetes/pkg/kubelet/status/status_manager_test.go @@ -96,6 +96,7 @@ func getRandomPodStatus() v1.PodStatus { } func verifyActions(t *testing.T, manager *manager, expectedActions []core.Action) { + t.Helper() manager.consumeUpdates() actions := manager.kubeClient.(*fake.Clientset).Actions() defer manager.kubeClient.(*fake.Clientset).ClearActions() @@ -401,17 +402,17 @@ func TestStaleUpdates(t *testing.T) { t.Logf("Nothing left in the channel to sync") verifyActions(t, m, []core.Action{}) - t.Log("Unchanged status should not send an update.") + t.Log("Unchanged status should not send an update") m.SetPodStatus(pod, status) verifyUpdates(t, m, 0) - t.Log("... unless it's stale.") + t.Log("... even if it's stale as long as nothing changes") mirrorPodUID := kubetypes.MirrorPodUID(pod.UID) m.apiStatusVersions[mirrorPodUID] = m.apiStatusVersions[mirrorPodUID] - 1 m.SetPodStatus(pod, status) m.syncBatch() - verifyActions(t, m, []core.Action{getAction(), patchAction()}) + verifyActions(t, m, []core.Action{getAction()}) t.Logf("Nothing stuck in the pipe.") verifyUpdates(t, m, 0) @@ -737,8 +738,9 @@ func TestReconcilePodStatus(t *testing.T) { t.Logf("If the pod status is the same, a reconciliation is not needed and syncBatch should do nothing") syncer.podManager.UpdatePod(testPod) if syncer.needsReconcile(testPod.UID, podStatus) { - t.Errorf("Pod status is the same, a reconciliation is not needed") + t.Fatalf("Pod status is the same, a reconciliation is not needed") } + syncer.SetPodStatus(testPod, podStatus) syncer.syncBatch() verifyActions(t, syncer, []core.Action{}) @@ -751,17 +753,19 @@ func TestReconcilePodStatus(t *testing.T) { testPod.Status.StartTime = &normalizedStartTime syncer.podManager.UpdatePod(testPod) if syncer.needsReconcile(testPod.UID, podStatus) { - t.Errorf("Pod status only differs for timestamp format, a reconciliation is not needed") + t.Fatalf("Pod status only differs for timestamp format, a reconciliation is not needed") } + syncer.SetPodStatus(testPod, podStatus) syncer.syncBatch() verifyActions(t, syncer, []core.Action{}) t.Logf("If the pod status is different, a reconciliation is needed, syncBatch should trigger an update") - testPod.Status = getRandomPodStatus() + changedPodStatus := getRandomPodStatus() syncer.podManager.UpdatePod(testPod) - if !syncer.needsReconcile(testPod.UID, podStatus) { - t.Errorf("Pod status is different, a reconciliation is needed") + if !syncer.needsReconcile(testPod.UID, changedPodStatus) { + t.Fatalf("Pod status is different, a reconciliation is needed") } + syncer.SetPodStatus(testPod, changedPodStatus) syncer.syncBatch() verifyActions(t, syncer, []core.Action{getAction(), patchAction()}) } diff --git a/vendor/k8s.io/kubernetes/pkg/util/pod/pod.go b/vendor/k8s.io/kubernetes/pkg/util/pod/pod.go index 73b1a3834343..c9bb4e385306 100644 --- a/vendor/k8s.io/kubernetes/pkg/util/pod/pod.go +++ b/vendor/k8s.io/kubernetes/pkg/util/pod/pod.go @@ -17,6 +17,7 @@ limitations under the License. package pod import ( + "bytes" "encoding/json" "fmt" @@ -28,25 +29,28 @@ import ( ) // PatchPodStatus patches pod status. -func PatchPodStatus(c clientset.Interface, namespace, name string, uid types.UID, oldPodStatus, newPodStatus v1.PodStatus) (*v1.Pod, []byte, error) { - patchBytes, err := preparePatchBytesforPodStatus(namespace, name, uid, oldPodStatus, newPodStatus) +func PatchPodStatus(c clientset.Interface, namespace, name string, uid types.UID, oldPodStatus, newPodStatus v1.PodStatus) (*v1.Pod, []byte, bool, error) { + patchBytes, unchanged, err := preparePatchBytesforPodStatus(namespace, name, uid, oldPodStatus, newPodStatus) if err != nil { - return nil, nil, err + return nil, nil, false, err + } + if unchanged { + return nil, patchBytes, true, nil } updatedPod, err := c.CoreV1().Pods(namespace).Patch(name, types.StrategicMergePatchType, patchBytes, "status") if err != nil { - return nil, nil, fmt.Errorf("failed to patch status %q for pod %q/%q: %v", patchBytes, namespace, name, err) + return nil, nil, false, fmt.Errorf("failed to patch status %q for pod %q/%q: %v", patchBytes, namespace, name, err) } - return updatedPod, patchBytes, nil + return updatedPod, patchBytes, false, nil } -func preparePatchBytesforPodStatus(namespace, name string, uid types.UID, oldPodStatus, newPodStatus v1.PodStatus) ([]byte, error) { +func preparePatchBytesforPodStatus(namespace, name string, uid types.UID, oldPodStatus, newPodStatus v1.PodStatus) ([]byte, bool, error) { oldData, err := json.Marshal(v1.Pod{ Status: oldPodStatus, }) if err != nil { - return nil, fmt.Errorf("failed to Marshal oldData for pod %q/%q: %v", namespace, name, err) + return nil, false, fmt.Errorf("failed to Marshal oldData for pod %q/%q: %v", namespace, name, err) } newData, err := json.Marshal(v1.Pod{ @@ -54,12 +58,12 @@ func preparePatchBytesforPodStatus(namespace, name string, uid types.UID, oldPod Status: newPodStatus, }) if err != nil { - return nil, fmt.Errorf("failed to Marshal newData for pod %q/%q: %v", namespace, name, err) + return nil, false, fmt.Errorf("failed to Marshal newData for pod %q/%q: %v", namespace, name, err) } patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Pod{}) if err != nil { - return nil, fmt.Errorf("failed to CreateTwoWayMergePatch for pod %q/%q: %v", namespace, name, err) + return nil, false, fmt.Errorf("failed to CreateTwoWayMergePatch for pod %q/%q: %v", namespace, name, err) } - return patchBytes, nil + return patchBytes, bytes.Equal(patchBytes, []byte(fmt.Sprintf(`{"metadata":{"uid":%q}}`, uid))), nil } diff --git a/vendor/k8s.io/kubernetes/pkg/util/pod/pod_test.go b/vendor/k8s.io/kubernetes/pkg/util/pod/pod_test.go index 9c2b7e4f11af..b438f0a7493c 100644 --- a/vendor/k8s.io/kubernetes/pkg/util/pod/pod_test.go +++ b/vendor/k8s.io/kubernetes/pkg/util/pod/pod_test.go @@ -43,11 +43,13 @@ func TestPatchPodStatus(t *testing.T) { testCases := []struct { description string mutate func(input v1.PodStatus) v1.PodStatus + expectUnchanged bool expectedPatchBytes []byte }{ { "no change", func(input v1.PodStatus) v1.PodStatus { return input }, + true, []byte(fmt.Sprintf(`{"metadata":{"uid":"myuid"}}`)), }, { @@ -56,6 +58,7 @@ func TestPatchPodStatus(t *testing.T) { input.Message = "random message" return input }, + false, []byte(fmt.Sprintf(`{"metadata":{"uid":"myuid"},"status":{"message":"random message"}}`)), }, { @@ -64,6 +67,7 @@ func TestPatchPodStatus(t *testing.T) { input.Conditions[0].Status = v1.ConditionFalse return input }, + false, []byte(fmt.Sprintf(`{"metadata":{"uid":"myuid"},"status":{"$setElementOrder/conditions":[{"type":"Ready"},{"type":"PodScheduled"}],"conditions":[{"status":"False","type":"Ready"}]}}`)), }, { @@ -77,17 +81,23 @@ func TestPatchPodStatus(t *testing.T) { } return input }, + false, []byte(fmt.Sprintf(`{"metadata":{"uid":"myuid"},"status":{"initContainerStatuses":[{"image":"","imageID":"","lastState":{},"name":"init-container","ready":true,"restartCount":0,"state":{}}]}}`)), }, } for _, tc := range testCases { - _, patchBytes, err := PatchPodStatus(client, ns, name, uid, getPodStatus(), tc.mutate(getPodStatus())) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - if !reflect.DeepEqual(patchBytes, tc.expectedPatchBytes) { - t.Errorf("for test case %q, expect patchBytes: %q, got: %q\n", tc.description, tc.expectedPatchBytes, patchBytes) - } + t.Run(tc.description, func(t *testing.T) { + _, patchBytes, unchanged, err := PatchPodStatus(client, ns, name, uid, getPodStatus(), tc.mutate(getPodStatus())) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if unchanged != tc.expectUnchanged { + t.Errorf("unexpected change: %t", unchanged) + } + if !reflect.DeepEqual(patchBytes, tc.expectedPatchBytes) { + t.Errorf("expect patchBytes: %q, got: %q\n", tc.expectedPatchBytes, patchBytes) + } + }) } }