Skip to content

Commit

Permalink
Merge pull request #331 from kubescape/patch
Browse files Browse the repository at this point in the history
split JSON patch to avoid 10k limit
  • Loading branch information
matthyx authored Jul 24, 2024
2 parents bcad8aa + 4845d52 commit 9b3fa4a
Show file tree
Hide file tree
Showing 12 changed files with 121 additions and 87 deletions.
23 changes: 2 additions & 21 deletions pkg/applicationprofilemanager/v1/applicationprofile_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package applicationprofilemanager

import (
"context"
"encoding/json"
"errors"
"fmt"
"runtime"
Expand Down Expand Up @@ -311,18 +310,9 @@ func (am *ApplicationProfileManager) saveProfile(ctx context.Context, watchedCon
Value: runtime.GOARCH,
})

patch, err := json.Marshal(operations)
if err != nil {
logger.L().Ctx(ctx).Error("ApplicationProfileManager - failed to marshal patch", helpers.Error(err),
helpers.String("slug", slug),
helpers.Int("container index", watchedContainer.ContainerIndex),
helpers.String("container ID", watchedContainer.ContainerID),
helpers.String("k8s workload", watchedContainer.K8sContainerID))
return
}
// 1. try to patch object
var gotErr error
if err := am.storageClient.PatchApplicationProfile(slug, namespace, patch, watchedContainer.SyncChannel); err != nil {
if err := am.storageClient.PatchApplicationProfile(slug, namespace, operations, watchedContainer.SyncChannel); err != nil {
if apierrors.IsNotFound(err) {
// 2a. new object
newObject := &v1beta1.ApplicationProfile{
Expand Down Expand Up @@ -479,16 +469,7 @@ func (am *ApplicationProfileManager) saveProfile(ctx context.Context, watchedCon
})
}

patch, err := json.Marshal(replaceOperations)
if err != nil {
logger.L().Ctx(ctx).Error("ApplicationProfileManager - failed to marshal patch", helpers.Error(err),
helpers.String("slug", slug),
helpers.Int("container index", watchedContainer.ContainerIndex),
helpers.String("container ID", watchedContainer.ContainerID),
helpers.String("k8s workload", watchedContainer.K8sContainerID))
return
}
if err := am.storageClient.PatchApplicationProfile(slug, namespace, patch, watchedContainer.SyncChannel); err != nil {
if err := am.storageClient.PatchApplicationProfile(slug, namespace, replaceOperations, watchedContainer.SyncChannel); err != nil {
gotErr = err
logger.L().Ctx(ctx).Error("ApplicationProfileManager - failed to patch application profile", helpers.Error(err),
helpers.String("slug", slug),
Expand Down
23 changes: 2 additions & 21 deletions pkg/networkmanager/v2/network_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package v2

import (
"context"
"encoding/json"
"errors"
"fmt"
"time"
Expand Down Expand Up @@ -288,18 +287,9 @@ func (nm *NetworkManager) saveNetworkEvents(ctx context.Context, watchedContaine
operations := utils.CreateNetworkPatchOperations(ingress, egress, watchedContainer.ContainerType.String(), watchedContainer.ContainerIndex)
operations = utils.AppendStatusAnnotationPatchOperations(operations, watchedContainer)

patch, err := json.Marshal(operations)
if err != nil {
logger.L().Ctx(ctx).Error("NetworkManager - failed to marshal patch", helpers.Error(err),
helpers.String("slug", slug),
helpers.Int("container index", watchedContainer.ContainerIndex),
helpers.String("container ID", watchedContainer.ContainerID),
helpers.String("k8s workload", watchedContainer.K8sContainerID))
return
}
// 1. try to patch object
var gotErr error
if err := nm.storageClient.PatchNetworkNeighborhood(slug, namespace, patch, watchedContainer.SyncChannel); err != nil {
if err := nm.storageClient.PatchNetworkNeighborhood(slug, namespace, operations, watchedContainer.SyncChannel); err != nil {
if apierrors.IsNotFound(err) {
// 2a. new object
newObject := &v1beta1.NetworkNeighborhood{
Expand Down Expand Up @@ -404,16 +394,7 @@ func (nm *NetworkManager) saveNetworkEvents(ctx context.Context, watchedContaine

replaceOperations = utils.AppendStatusAnnotationPatchOperations(replaceOperations, watchedContainer)

patch, err := json.Marshal(replaceOperations)
if err != nil {
logger.L().Ctx(ctx).Error("NetworkManager - failed to marshal patch", helpers.Error(err),
helpers.String("slug", slug),
helpers.Int("container index", watchedContainer.ContainerIndex),
helpers.String("container ID", watchedContainer.ContainerID),
helpers.String("k8s workload", watchedContainer.K8sContainerID))
return
}
if err := nm.storageClient.PatchNetworkNeighborhood(slug, namespace, patch, watchedContainer.SyncChannel); err != nil {
if err := nm.storageClient.PatchNetworkNeighborhood(slug, namespace, replaceOperations, watchedContainer.SyncChannel); err != nil {
gotErr = err
logger.L().Ctx(ctx).Error("NetworkManager - failed to patch network neighborhood", helpers.Error(err),
helpers.String("slug", slug),
Expand Down
7 changes: 6 additions & 1 deletion pkg/storage/applicationprofile.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"

jsonpatch "github.com/evanphx/json-patch"
"github.com/kubescape/node-agent/pkg/utils"
"github.com/kubescape/storage/pkg/apis/softwarecomposition/v1beta1"
errors2 "k8s.io/apimachinery/pkg/api/errors"
)
Expand Down Expand Up @@ -33,7 +34,7 @@ func (sc *StorageHttpClientMock) CreateApplicationProfile(profile *v1beta1.Appli
return nil
}

func (sc *StorageHttpClientMock) PatchApplicationProfile(name, _ string, patchJSON []byte, _ chan error) error {
func (sc *StorageHttpClientMock) PatchApplicationProfile(name, _ string, operations []utils.PatchOperation, _ chan error) error {
if len(sc.ApplicationProfiles) == 0 {
return errors2.NewNotFound(v1beta1.Resource("applicationprofile"), name)
}
Expand All @@ -42,6 +43,10 @@ func (sc *StorageHttpClientMock) PatchApplicationProfile(name, _ string, patchJS
if err != nil {
return fmt.Errorf("marshal last profile: %w", err)
}
patchJSON, err := json.Marshal(operations)
if err != nil {
return fmt.Errorf("marshal patch: %w", err)
}
patch, err := jsonpatch.DecodePatch(patchJSON)
if err != nil {
return fmt.Errorf("decode patch: %w", err)
Expand Down
7 changes: 6 additions & 1 deletion pkg/storage/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"

"github.com/evanphx/json-patch"
"github.com/kubescape/node-agent/pkg/utils"
"github.com/kubescape/storage/pkg/apis/softwarecomposition/v1beta1"
errors2 "k8s.io/apimachinery/pkg/api/errors"
)
Expand Down Expand Up @@ -40,7 +41,7 @@ func (sc *StorageHttpClientMock) CreateNetworkNeighborhood(neighborhood *v1beta1
return nil
}

func (sc *StorageHttpClientMock) PatchNetworkNeighborhood(name, _ string, patchJSON []byte, _ chan error) error {
func (sc *StorageHttpClientMock) PatchNetworkNeighborhood(name, _ string, operations []utils.PatchOperation, _ chan error) error {
if len(sc.NetworkNeighborhoods) == 0 {
return errors2.NewNotFound(v1beta1.Resource("networkneighborhood"), name)
}
Expand All @@ -49,6 +50,10 @@ func (sc *StorageHttpClientMock) PatchNetworkNeighborhood(name, _ string, patchJ
if err != nil {
return fmt.Errorf("marshal last neighborhood: %w", err)
}
patchJSON, err := json.Marshal(operations)
if err != nil {
return fmt.Errorf("marshal patch: %w", err)
}
patch, err := jsonpatch.DecodePatch(patchJSON)
if err != nil {
return fmt.Errorf("decode patch: %w", err)
Expand Down
5 changes: 3 additions & 2 deletions pkg/storage/storage_interface.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package storage

import (
"github.com/kubescape/node-agent/pkg/utils"
"github.com/kubescape/storage/pkg/apis/softwarecomposition/v1beta1"
)

type StorageClient interface {
CreateApplicationActivity(activity *v1beta1.ApplicationActivity, namespace string) error
CreateApplicationProfile(profile *v1beta1.ApplicationProfile, namespace string) error
PatchApplicationProfile(name, namespace string, patch []byte, channel chan error) error
PatchApplicationProfile(name, namespace string, operations []utils.PatchOperation, channel chan error) error
GetApplicationProfile(namespace, name string) (*v1beta1.ApplicationProfile, error)
CreateFilteredSBOM(SBOM *v1beta1.SBOMSyftFiltered) error
GetFilteredSBOM(name string) (*v1beta1.SBOMSyftFiltered, error)
Expand All @@ -20,5 +21,5 @@ type StorageClient interface {
PatchNetworkNeighborsIngressAndEgress(name, namespace string, networkNeighbors *v1beta1.NetworkNeighbors) error
GetNetworkNeighborhood(namespace, name string) (*v1beta1.NetworkNeighborhood, error)
CreateNetworkNeighborhood(neighborhood *v1beta1.NetworkNeighborhood, namespace string) error
PatchNetworkNeighborhood(name, namespace string, patch []byte, channel chan error) error
PatchNetworkNeighborhood(name, namespace string, operations []utils.PatchOperation, channel chan error) error
}
16 changes: 15 additions & 1 deletion pkg/storage/v1/applicationprofile.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,21 @@ func (sc Storage) CreateApplicationProfile(profile *v1beta1.ApplicationProfile,
return nil
}

func (sc Storage) PatchApplicationProfile(name, namespace string, patch []byte, channel chan error) error {
func (sc Storage) PatchApplicationProfile(name, namespace string, operations []utils.PatchOperation, channel chan error) error {
// split operations into max JSON operations batches
for _, chunk := range utils.ChunkBy(operations, sc.maxJsonPatchOperations) {
if err := sc.patchApplicationProfile(name, namespace, chunk, channel); err != nil {
return err
}
}
return nil
}

func (sc Storage) patchApplicationProfile(name, namespace string, operations []utils.PatchOperation, channel chan error) error {
patch, err := json.Marshal(operations)
if err != nil {
return fmt.Errorf("marshal patch: %w", err)
}
profile, err := sc.StorageClient.ApplicationProfiles(namespace).Patch(context.Background(), name, types.JSONPatchType, patch, v1.PatchOptions{})
if err != nil {
return fmt.Errorf("patch application profile: %w", err)
Expand Down
16 changes: 15 additions & 1 deletion pkg/storage/v1/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,21 @@ func (sc Storage) CreateNetworkNeighborhood(neighborhood *v1beta1.NetworkNeighbo
return nil
}

func (sc Storage) PatchNetworkNeighborhood(name, namespace string, patch []byte, channel chan error) error {
func (sc Storage) PatchNetworkNeighborhood(name, namespace string, operations []utils.PatchOperation, channel chan error) error {
// split operations into max JSON operations batches
for _, chunk := range utils.ChunkBy(operations, sc.maxJsonPatchOperations) {
if err := sc.patchNetworkNeighborhood(name, namespace, chunk, channel); err != nil {
return err
}
}
return nil
}

func (sc Storage) patchNetworkNeighborhood(name, namespace string, operations []utils.PatchOperation, channel chan error) error {
patch, err := json.Marshal(operations)
if err != nil {
return fmt.Errorf("marshal patch: %w", err)
}
neighborhood, err := sc.StorageClient.NetworkNeighborhoods(namespace).Patch(context.Background(), name, types.JSONPatchType, patch, v1.PatchOptions{})
if err != nil {
return fmt.Errorf("patch application neighborhood: %w", err)
Expand Down
2 changes: 2 additions & 0 deletions pkg/storage/v1/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Storage struct {
StorageClient spdxv1beta1.SpdxV1beta1Interface
maxApplicationProfileSize int
maxNetworkNeighborhoodSize int
maxJsonPatchOperations int
namespace string
}

Expand Down Expand Up @@ -79,6 +80,7 @@ func CreateStorage(namespace string) (*Storage, error) {
StorageClient: clientset.SpdxV1beta1(),
maxApplicationProfileSize: maxApplicationProfileSize,
maxNetworkNeighborhoodSize: maxNetworkNeighborhoodSize,
maxJsonPatchOperations: 9999,
namespace: namespace,
}, nil
}
Expand Down
59 changes: 20 additions & 39 deletions pkg/storage/v1/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"

"github.com/kubescape/node-agent/pkg/storage"
"github.com/kubescape/node-agent/pkg/utils"

"github.com/kubescape/storage/pkg/apis/softwarecomposition/v1beta1"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -208,8 +209,8 @@ func TestStorage_PatchNetworkNeighbors(t *testing.T) {

func TestStorage_PatchApplicationProfile(t *testing.T) {
type args struct {
name string
patch []byte
name string
operations []utils.PatchOperation
}
tests := []struct {
name string
Expand All @@ -221,38 +222,14 @@ func TestStorage_PatchApplicationProfile(t *testing.T) {
name: "test",
args: args{
name: storage.NginxKey,
patch: []byte(`[
{
"op": "add",
"path": "/spec/containers/0/capabilities/-",
"value": "SYS_ADMIN"
},
{
"op": "add",
"path": "/spec/containers/0/execs/-",
"value": {"path": "/usr/bin/test2"}
},
{
"op": "add",
"path": "/spec/containers/0/execs/-",
"value": {"path": "/usr/bin/test3"}
},
{
"op": "add",
"path": "/spec/containers/0/opens/-",
"value": {"path": "/usr/bin/test2"}
},
{
"op": "add",
"path": "/spec/containers/0/opens/-",
"value": {"path": "/usr/bin/test3"}
},
{
"op": "add",
"path": "/spec/containers/0/syscalls/-",
"value": "open"
}
]`),
operations: []utils.PatchOperation{
{Op: "add", Path: "/spec/containers/0/capabilities/-", Value: "SYS_ADMIN"},
{Op: "add", Path: "/spec/containers/0/execs/-", Value: v1beta1.ExecCalls{Path: "/usr/bin/test2"}},
{Op: "add", Path: "/spec/containers/0/execs/-", Value: v1beta1.ExecCalls{Path: "/usr/bin/test3"}},
{Op: "add", Path: "/spec/containers/0/opens/-", Value: v1beta1.OpenCalls{Path: "/usr/bin/test2"}},
{Op: "add", Path: "/spec/containers/0/opens/-", Value: v1beta1.OpenCalls{Path: "/usr/bin/test3"}},
{Op: "add", Path: "/spec/containers/0/syscalls/-", Value: "open"},
},
},
want: &v1beta1.ApplicationProfile{
ObjectMeta: v1.ObjectMeta{
Expand Down Expand Up @@ -283,8 +260,10 @@ func TestStorage_PatchApplicationProfile(t *testing.T) {
{
name: "test",
args: args{
name: storage.NginxKey,
patch: []byte(`[{"op":"add","path":"/spec/initContainers","value":[{},{},{"name":"toto"}]}]`),
name: storage.NginxKey,
operations: []utils.PatchOperation{
{Op: "add", Path: "/spec/initContainers", Value: []v1beta1.ApplicationProfileContainer{{}, {}, {Name: "toto"}}},
},
},
want: &v1beta1.ApplicationProfile{
ObjectMeta: v1.ObjectMeta{
Expand Down Expand Up @@ -312,8 +291,10 @@ func TestStorage_PatchApplicationProfile(t *testing.T) {
{
name: "test",
args: args{
name: storage.NginxKey,
patch: []byte(`[{"op":"add","path":"/spec/ephemeralContainers","value":[{},{},{"name":"abc"}]}]`),
name: storage.NginxKey,
operations: []utils.PatchOperation{
{Op: "add", Path: "/spec/ephemeralContainers", Value: []v1beta1.ApplicationProfileContainer{{}, {}, {Name: "abc"}}},
},
},
want: &v1beta1.ApplicationProfile{
ObjectMeta: v1.ObjectMeta{
Expand Down Expand Up @@ -365,7 +346,7 @@ func TestStorage_PatchApplicationProfile(t *testing.T) {
},
}
_, _ = sc.StorageClient.ApplicationProfiles("default").Create(context.Background(), existingProfile, v1.CreateOptions{})
if err := sc.PatchApplicationProfile(tt.args.name, "default", tt.args.patch, nil); (err != nil) != tt.wantErr {
if err := sc.PatchApplicationProfile(tt.args.name, "default", tt.args.operations, nil); (err != nil) != tt.wantErr {
t.Errorf("PatchFilteredSBOM() error = %v, wantErr %v", err, tt.wantErr)
}
got, err := sc.StorageClient.ApplicationProfiles("default").Get(context.Background(), tt.args.name, v1.GetOptions{})
Expand Down
10 changes: 10 additions & 0 deletions pkg/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,3 +693,13 @@ func TrimRuntimePrefix(id string) string {
func GetContainerStatuses(podStatus v1.PodStatus) []v1.ContainerStatus {
return slices.Concat(podStatus.ContainerStatuses, podStatus.InitContainerStatuses, podStatus.EphemeralContainerStatuses)
}

func ChunkBy[T any](items []T, chunkSize int) [][]T {
var chunks [][]T
if chunkSize > 0 {
for chunkSize < len(items) {
items, chunks = items[chunkSize:], append(chunks, items[0:chunkSize:chunkSize])
}
}
return append(chunks, items)
}
39 changes: 39 additions & 0 deletions pkg/utils/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,3 +357,42 @@ func TestTrimRuntimePrefix(t *testing.T) {
})
}
}

func TestChunkBy(t *testing.T) {
type testCase[T any] struct {
name string
items []T
chunkSize int
want [][]T
}
tests := []testCase[string]{
{
name: "Test with empty items",
chunkSize: 2,
want: [][]string{nil},
},
{
name: "Test with chunk size greater than items length",
items: []string{"a", "b", "c"},
chunkSize: 4,
want: [][]string{{"a", "b", "c"}},
},
{
name: "Test with chunk size equal to items length",
items: []string{"a", "b", "c"},
chunkSize: 3,
want: [][]string{{"a", "b", "c"}},
},
{
name: "Test with chunk size less than items length",
items: []string{"a", "b", "c", "d", "e"},
chunkSize: 2,
want: [][]string{{"a", "b"}, {"c", "d"}, {"e"}},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equal(t, tt.want, ChunkBy(tt.items, tt.chunkSize))
})
}
}
1 change: 1 addition & 0 deletions tests/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,7 @@ func Test_08_ApplicationProfilePatching(t *testing.T) {
patch, err := json.Marshal(patchOperations)
assert.NoError(t, err)

// TODO use Storage abstraction?
_, err = storageclient.ApplicationProfiles(ns.Name).Patch(context.Background(), name, types.JSONPatchType, patch, v1.PatchOptions{})

assert.NoError(t, err)
Expand Down

0 comments on commit 9b3fa4a

Please sign in to comment.