From 4845d52644b4b21ecee7f5a67724c4fc7e714e5e Mon Sep 17 00:00:00 2001 From: Matthias Bertschy Date: Fri, 19 Jul 2024 16:44:03 +0200 Subject: [PATCH] split JSON patch to avoid 10k limit Signed-off-by: Matthias Bertschy --- .../v1/applicationprofile_manager.go | 23 +------- pkg/networkmanager/v2/network_manager.go | 23 +------- pkg/storage/applicationprofile.go | 7 ++- pkg/storage/network.go | 7 ++- pkg/storage/storage_interface.go | 5 +- pkg/storage/v1/applicationprofile.go | 16 ++++- pkg/storage/v1/network.go | 16 ++++- pkg/storage/v1/storage.go | 2 + pkg/storage/v1/storage_test.go | 59 +++++++------------ pkg/utils/utils.go | 10 ++++ pkg/utils/utils_test.go | 39 ++++++++++++ tests/component_test.go | 1 + 12 files changed, 121 insertions(+), 87 deletions(-) diff --git a/pkg/applicationprofilemanager/v1/applicationprofile_manager.go b/pkg/applicationprofilemanager/v1/applicationprofile_manager.go index 7b6fdfe8..42d09d6c 100644 --- a/pkg/applicationprofilemanager/v1/applicationprofile_manager.go +++ b/pkg/applicationprofilemanager/v1/applicationprofile_manager.go @@ -2,7 +2,6 @@ package applicationprofilemanager import ( "context" - "encoding/json" "errors" "fmt" "runtime" @@ -311,18 +310,9 @@ func (am *ApplicationProfileManager) saveProfile(ctx context.Context, watchedCon Value: runtime.GOARCH, }) - patch, err := json.Marshal(operations) - if err != nil { - logger.L().Ctx(ctx).Error("ApplicationProfileManager - failed to marshal patch", helpers.Error(err), - helpers.String("slug", slug), - helpers.Int("container index", watchedContainer.ContainerIndex), - helpers.String("container ID", watchedContainer.ContainerID), - helpers.String("k8s workload", watchedContainer.K8sContainerID)) - return - } // 1. try to patch object var gotErr error - if err := am.storageClient.PatchApplicationProfile(slug, namespace, patch, watchedContainer.SyncChannel); err != nil { + if err := am.storageClient.PatchApplicationProfile(slug, namespace, operations, watchedContainer.SyncChannel); err != nil { if apierrors.IsNotFound(err) { // 2a. new object newObject := &v1beta1.ApplicationProfile{ @@ -479,16 +469,7 @@ func (am *ApplicationProfileManager) saveProfile(ctx context.Context, watchedCon }) } - patch, err := json.Marshal(replaceOperations) - if err != nil { - logger.L().Ctx(ctx).Error("ApplicationProfileManager - failed to marshal patch", helpers.Error(err), - helpers.String("slug", slug), - helpers.Int("container index", watchedContainer.ContainerIndex), - helpers.String("container ID", watchedContainer.ContainerID), - helpers.String("k8s workload", watchedContainer.K8sContainerID)) - return - } - if err := am.storageClient.PatchApplicationProfile(slug, namespace, patch, watchedContainer.SyncChannel); err != nil { + if err := am.storageClient.PatchApplicationProfile(slug, namespace, replaceOperations, watchedContainer.SyncChannel); err != nil { gotErr = err logger.L().Ctx(ctx).Error("ApplicationProfileManager - failed to patch application profile", helpers.Error(err), helpers.String("slug", slug), diff --git a/pkg/networkmanager/v2/network_manager.go b/pkg/networkmanager/v2/network_manager.go index 4cfa971c..647b3364 100644 --- a/pkg/networkmanager/v2/network_manager.go +++ b/pkg/networkmanager/v2/network_manager.go @@ -2,7 +2,6 @@ package v2 import ( "context" - "encoding/json" "errors" "fmt" "time" @@ -288,18 +287,9 @@ func (nm *NetworkManager) saveNetworkEvents(ctx context.Context, watchedContaine operations := utils.CreateNetworkPatchOperations(ingress, egress, watchedContainer.ContainerType.String(), watchedContainer.ContainerIndex) operations = utils.AppendStatusAnnotationPatchOperations(operations, watchedContainer) - patch, err := json.Marshal(operations) - if err != nil { - logger.L().Ctx(ctx).Error("NetworkManager - failed to marshal patch", helpers.Error(err), - helpers.String("slug", slug), - helpers.Int("container index", watchedContainer.ContainerIndex), - helpers.String("container ID", watchedContainer.ContainerID), - helpers.String("k8s workload", watchedContainer.K8sContainerID)) - return - } // 1. try to patch object var gotErr error - if err := nm.storageClient.PatchNetworkNeighborhood(slug, namespace, patch, watchedContainer.SyncChannel); err != nil { + if err := nm.storageClient.PatchNetworkNeighborhood(slug, namespace, operations, watchedContainer.SyncChannel); err != nil { if apierrors.IsNotFound(err) { // 2a. new object newObject := &v1beta1.NetworkNeighborhood{ @@ -404,16 +394,7 @@ func (nm *NetworkManager) saveNetworkEvents(ctx context.Context, watchedContaine replaceOperations = utils.AppendStatusAnnotationPatchOperations(replaceOperations, watchedContainer) - patch, err := json.Marshal(replaceOperations) - if err != nil { - logger.L().Ctx(ctx).Error("NetworkManager - failed to marshal patch", helpers.Error(err), - helpers.String("slug", slug), - helpers.Int("container index", watchedContainer.ContainerIndex), - helpers.String("container ID", watchedContainer.ContainerID), - helpers.String("k8s workload", watchedContainer.K8sContainerID)) - return - } - if err := nm.storageClient.PatchNetworkNeighborhood(slug, namespace, patch, watchedContainer.SyncChannel); err != nil { + if err := nm.storageClient.PatchNetworkNeighborhood(slug, namespace, replaceOperations, watchedContainer.SyncChannel); err != nil { gotErr = err logger.L().Ctx(ctx).Error("NetworkManager - failed to patch network neighborhood", helpers.Error(err), helpers.String("slug", slug), diff --git a/pkg/storage/applicationprofile.go b/pkg/storage/applicationprofile.go index 5f94009e..1664d540 100644 --- a/pkg/storage/applicationprofile.go +++ b/pkg/storage/applicationprofile.go @@ -6,6 +6,7 @@ import ( "fmt" jsonpatch "github.com/evanphx/json-patch" + "github.com/kubescape/node-agent/pkg/utils" "github.com/kubescape/storage/pkg/apis/softwarecomposition/v1beta1" errors2 "k8s.io/apimachinery/pkg/api/errors" ) @@ -33,7 +34,7 @@ func (sc *StorageHttpClientMock) CreateApplicationProfile(profile *v1beta1.Appli return nil } -func (sc *StorageHttpClientMock) PatchApplicationProfile(name, _ string, patchJSON []byte, _ chan error) error { +func (sc *StorageHttpClientMock) PatchApplicationProfile(name, _ string, operations []utils.PatchOperation, _ chan error) error { if len(sc.ApplicationProfiles) == 0 { return errors2.NewNotFound(v1beta1.Resource("applicationprofile"), name) } @@ -42,6 +43,10 @@ func (sc *StorageHttpClientMock) PatchApplicationProfile(name, _ string, patchJS if err != nil { return fmt.Errorf("marshal last profile: %w", err) } + patchJSON, err := json.Marshal(operations) + if err != nil { + return fmt.Errorf("marshal patch: %w", err) + } patch, err := jsonpatch.DecodePatch(patchJSON) if err != nil { return fmt.Errorf("decode patch: %w", err) diff --git a/pkg/storage/network.go b/pkg/storage/network.go index c86d8e8e..c7a67bf0 100644 --- a/pkg/storage/network.go +++ b/pkg/storage/network.go @@ -6,6 +6,7 @@ import ( "fmt" "github.com/evanphx/json-patch" + "github.com/kubescape/node-agent/pkg/utils" "github.com/kubescape/storage/pkg/apis/softwarecomposition/v1beta1" errors2 "k8s.io/apimachinery/pkg/api/errors" ) @@ -40,7 +41,7 @@ func (sc *StorageHttpClientMock) CreateNetworkNeighborhood(neighborhood *v1beta1 return nil } -func (sc *StorageHttpClientMock) PatchNetworkNeighborhood(name, _ string, patchJSON []byte, _ chan error) error { +func (sc *StorageHttpClientMock) PatchNetworkNeighborhood(name, _ string, operations []utils.PatchOperation, _ chan error) error { if len(sc.NetworkNeighborhoods) == 0 { return errors2.NewNotFound(v1beta1.Resource("networkneighborhood"), name) } @@ -49,6 +50,10 @@ func (sc *StorageHttpClientMock) PatchNetworkNeighborhood(name, _ string, patchJ if err != nil { return fmt.Errorf("marshal last neighborhood: %w", err) } + patchJSON, err := json.Marshal(operations) + if err != nil { + return fmt.Errorf("marshal patch: %w", err) + } patch, err := jsonpatch.DecodePatch(patchJSON) if err != nil { return fmt.Errorf("decode patch: %w", err) diff --git a/pkg/storage/storage_interface.go b/pkg/storage/storage_interface.go index 29fd5d2b..49bc0972 100644 --- a/pkg/storage/storage_interface.go +++ b/pkg/storage/storage_interface.go @@ -1,13 +1,14 @@ package storage import ( + "github.com/kubescape/node-agent/pkg/utils" "github.com/kubescape/storage/pkg/apis/softwarecomposition/v1beta1" ) type StorageClient interface { CreateApplicationActivity(activity *v1beta1.ApplicationActivity, namespace string) error CreateApplicationProfile(profile *v1beta1.ApplicationProfile, namespace string) error - PatchApplicationProfile(name, namespace string, patch []byte, channel chan error) error + PatchApplicationProfile(name, namespace string, operations []utils.PatchOperation, channel chan error) error GetApplicationProfile(namespace, name string) (*v1beta1.ApplicationProfile, error) CreateFilteredSBOM(SBOM *v1beta1.SBOMSyftFiltered) error GetFilteredSBOM(name string) (*v1beta1.SBOMSyftFiltered, error) @@ -20,5 +21,5 @@ type StorageClient interface { PatchNetworkNeighborsIngressAndEgress(name, namespace string, networkNeighbors *v1beta1.NetworkNeighbors) error GetNetworkNeighborhood(namespace, name string) (*v1beta1.NetworkNeighborhood, error) CreateNetworkNeighborhood(neighborhood *v1beta1.NetworkNeighborhood, namespace string) error - PatchNetworkNeighborhood(name, namespace string, patch []byte, channel chan error) error + PatchNetworkNeighborhood(name, namespace string, operations []utils.PatchOperation, channel chan error) error } diff --git a/pkg/storage/v1/applicationprofile.go b/pkg/storage/v1/applicationprofile.go index f8245631..7585ae54 100644 --- a/pkg/storage/v1/applicationprofile.go +++ b/pkg/storage/v1/applicationprofile.go @@ -27,7 +27,21 @@ func (sc Storage) CreateApplicationProfile(profile *v1beta1.ApplicationProfile, return nil } -func (sc Storage) PatchApplicationProfile(name, namespace string, patch []byte, channel chan error) error { +func (sc Storage) PatchApplicationProfile(name, namespace string, operations []utils.PatchOperation, channel chan error) error { + // split operations into max JSON operations batches + for _, chunk := range utils.ChunkBy(operations, sc.maxJsonPatchOperations) { + if err := sc.patchApplicationProfile(name, namespace, chunk, channel); err != nil { + return err + } + } + return nil +} + +func (sc Storage) patchApplicationProfile(name, namespace string, operations []utils.PatchOperation, channel chan error) error { + patch, err := json.Marshal(operations) + if err != nil { + return fmt.Errorf("marshal patch: %w", err) + } profile, err := sc.StorageClient.ApplicationProfiles(namespace).Patch(context.Background(), name, types.JSONPatchType, patch, v1.PatchOptions{}) if err != nil { return fmt.Errorf("patch application profile: %w", err) diff --git a/pkg/storage/v1/network.go b/pkg/storage/v1/network.go index cd1eb95e..83063bc9 100644 --- a/pkg/storage/v1/network.go +++ b/pkg/storage/v1/network.go @@ -28,7 +28,21 @@ func (sc Storage) CreateNetworkNeighborhood(neighborhood *v1beta1.NetworkNeighbo return nil } -func (sc Storage) PatchNetworkNeighborhood(name, namespace string, patch []byte, channel chan error) error { +func (sc Storage) PatchNetworkNeighborhood(name, namespace string, operations []utils.PatchOperation, channel chan error) error { + // split operations into max JSON operations batches + for _, chunk := range utils.ChunkBy(operations, sc.maxJsonPatchOperations) { + if err := sc.patchNetworkNeighborhood(name, namespace, chunk, channel); err != nil { + return err + } + } + return nil +} + +func (sc Storage) patchNetworkNeighborhood(name, namespace string, operations []utils.PatchOperation, channel chan error) error { + patch, err := json.Marshal(operations) + if err != nil { + return fmt.Errorf("marshal patch: %w", err) + } neighborhood, err := sc.StorageClient.NetworkNeighborhoods(namespace).Patch(context.Background(), name, types.JSONPatchType, patch, v1.PatchOptions{}) if err != nil { return fmt.Errorf("patch application neighborhood: %w", err) diff --git a/pkg/storage/v1/storage.go b/pkg/storage/v1/storage.go index 98d47947..c14305e0 100644 --- a/pkg/storage/v1/storage.go +++ b/pkg/storage/v1/storage.go @@ -33,6 +33,7 @@ type Storage struct { StorageClient spdxv1beta1.SpdxV1beta1Interface maxApplicationProfileSize int maxNetworkNeighborhoodSize int + maxJsonPatchOperations int namespace string } @@ -79,6 +80,7 @@ func CreateStorage(namespace string) (*Storage, error) { StorageClient: clientset.SpdxV1beta1(), maxApplicationProfileSize: maxApplicationProfileSize, maxNetworkNeighborhoodSize: maxNetworkNeighborhoodSize, + maxJsonPatchOperations: 9999, namespace: namespace, }, nil } diff --git a/pkg/storage/v1/storage_test.go b/pkg/storage/v1/storage_test.go index c99475ad..bffc9c6e 100644 --- a/pkg/storage/v1/storage_test.go +++ b/pkg/storage/v1/storage_test.go @@ -6,6 +6,7 @@ import ( "testing" "github.com/kubescape/node-agent/pkg/storage" + "github.com/kubescape/node-agent/pkg/utils" "github.com/kubescape/storage/pkg/apis/softwarecomposition/v1beta1" "github.com/stretchr/testify/assert" @@ -208,8 +209,8 @@ func TestStorage_PatchNetworkNeighbors(t *testing.T) { func TestStorage_PatchApplicationProfile(t *testing.T) { type args struct { - name string - patch []byte + name string + operations []utils.PatchOperation } tests := []struct { name string @@ -221,38 +222,14 @@ func TestStorage_PatchApplicationProfile(t *testing.T) { name: "test", args: args{ name: storage.NginxKey, - patch: []byte(`[ - { - "op": "add", - "path": "/spec/containers/0/capabilities/-", - "value": "SYS_ADMIN" - }, - { - "op": "add", - "path": "/spec/containers/0/execs/-", - "value": {"path": "/usr/bin/test2"} - }, - { - "op": "add", - "path": "/spec/containers/0/execs/-", - "value": {"path": "/usr/bin/test3"} - }, - { - "op": "add", - "path": "/spec/containers/0/opens/-", - "value": {"path": "/usr/bin/test2"} - }, - { - "op": "add", - "path": "/spec/containers/0/opens/-", - "value": {"path": "/usr/bin/test3"} - }, - { - "op": "add", - "path": "/spec/containers/0/syscalls/-", - "value": "open" - } -]`), + operations: []utils.PatchOperation{ + {Op: "add", Path: "/spec/containers/0/capabilities/-", Value: "SYS_ADMIN"}, + {Op: "add", Path: "/spec/containers/0/execs/-", Value: v1beta1.ExecCalls{Path: "/usr/bin/test2"}}, + {Op: "add", Path: "/spec/containers/0/execs/-", Value: v1beta1.ExecCalls{Path: "/usr/bin/test3"}}, + {Op: "add", Path: "/spec/containers/0/opens/-", Value: v1beta1.OpenCalls{Path: "/usr/bin/test2"}}, + {Op: "add", Path: "/spec/containers/0/opens/-", Value: v1beta1.OpenCalls{Path: "/usr/bin/test3"}}, + {Op: "add", Path: "/spec/containers/0/syscalls/-", Value: "open"}, + }, }, want: &v1beta1.ApplicationProfile{ ObjectMeta: v1.ObjectMeta{ @@ -283,8 +260,10 @@ func TestStorage_PatchApplicationProfile(t *testing.T) { { name: "test", args: args{ - name: storage.NginxKey, - patch: []byte(`[{"op":"add","path":"/spec/initContainers","value":[{},{},{"name":"toto"}]}]`), + name: storage.NginxKey, + operations: []utils.PatchOperation{ + {Op: "add", Path: "/spec/initContainers", Value: []v1beta1.ApplicationProfileContainer{{}, {}, {Name: "toto"}}}, + }, }, want: &v1beta1.ApplicationProfile{ ObjectMeta: v1.ObjectMeta{ @@ -312,8 +291,10 @@ func TestStorage_PatchApplicationProfile(t *testing.T) { { name: "test", args: args{ - name: storage.NginxKey, - patch: []byte(`[{"op":"add","path":"/spec/ephemeralContainers","value":[{},{},{"name":"abc"}]}]`), + name: storage.NginxKey, + operations: []utils.PatchOperation{ + {Op: "add", Path: "/spec/ephemeralContainers", Value: []v1beta1.ApplicationProfileContainer{{}, {}, {Name: "abc"}}}, + }, }, want: &v1beta1.ApplicationProfile{ ObjectMeta: v1.ObjectMeta{ @@ -365,7 +346,7 @@ func TestStorage_PatchApplicationProfile(t *testing.T) { }, } _, _ = sc.StorageClient.ApplicationProfiles("default").Create(context.Background(), existingProfile, v1.CreateOptions{}) - if err := sc.PatchApplicationProfile(tt.args.name, "default", tt.args.patch, nil); (err != nil) != tt.wantErr { + if err := sc.PatchApplicationProfile(tt.args.name, "default", tt.args.operations, nil); (err != nil) != tt.wantErr { t.Errorf("PatchFilteredSBOM() error = %v, wantErr %v", err, tt.wantErr) } got, err := sc.StorageClient.ApplicationProfiles("default").Get(context.Background(), tt.args.name, v1.GetOptions{}) diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index 64dbbcf4..383eacb8 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -693,3 +693,13 @@ func TrimRuntimePrefix(id string) string { func GetContainerStatuses(podStatus v1.PodStatus) []v1.ContainerStatus { return slices.Concat(podStatus.ContainerStatuses, podStatus.InitContainerStatuses, podStatus.EphemeralContainerStatuses) } + +func ChunkBy[T any](items []T, chunkSize int) [][]T { + var chunks [][]T + if chunkSize > 0 { + for chunkSize < len(items) { + items, chunks = items[chunkSize:], append(chunks, items[0:chunkSize:chunkSize]) + } + } + return append(chunks, items) +} diff --git a/pkg/utils/utils_test.go b/pkg/utils/utils_test.go index 8d0d0226..cc44eed7 100644 --- a/pkg/utils/utils_test.go +++ b/pkg/utils/utils_test.go @@ -357,3 +357,42 @@ func TestTrimRuntimePrefix(t *testing.T) { }) } } + +func TestChunkBy(t *testing.T) { + type testCase[T any] struct { + name string + items []T + chunkSize int + want [][]T + } + tests := []testCase[string]{ + { + name: "Test with empty items", + chunkSize: 2, + want: [][]string{nil}, + }, + { + name: "Test with chunk size greater than items length", + items: []string{"a", "b", "c"}, + chunkSize: 4, + want: [][]string{{"a", "b", "c"}}, + }, + { + name: "Test with chunk size equal to items length", + items: []string{"a", "b", "c"}, + chunkSize: 3, + want: [][]string{{"a", "b", "c"}}, + }, + { + name: "Test with chunk size less than items length", + items: []string{"a", "b", "c", "d", "e"}, + chunkSize: 2, + want: [][]string{{"a", "b"}, {"c", "d"}, {"e"}}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.want, ChunkBy(tt.items, tt.chunkSize)) + }) + } +} diff --git a/tests/component_test.go b/tests/component_test.go index c12f2e21..d1cf0172 100644 --- a/tests/component_test.go +++ b/tests/component_test.go @@ -495,6 +495,7 @@ func Test_08_ApplicationProfilePatching(t *testing.T) { patch, err := json.Marshal(patchOperations) assert.NoError(t, err) + // TODO use Storage abstraction? _, err = storageclient.ApplicationProfiles(ns.Name).Patch(context.Background(), name, types.JSONPatchType, patch, v1.PatchOptions{}) assert.NoError(t, err)