diff --git a/Makefile b/Makefile index bb2e88e6c..16e92bb5e 100644 --- a/Makefile +++ b/Makefile @@ -116,6 +116,11 @@ clean: @rm -rf $(DOCKER_CACHE) @rm -rf .golangci-bin +.PHONY: codegen +codegen: + @echo "===> Updating generated code <===" + $(CURDIR)/hack/update-codegen.sh + .PHONY: manifest manifest: @echo "===> Generating dev manifest for Theia <===" diff --git a/build/charts/theia/README.md b/build/charts/theia/README.md index 4b55158f1..fbfe8d742 100644 --- a/build/charts/theia/README.md +++ b/build/charts/theia/README.md @@ -71,7 +71,7 @@ Kubernetes: `>= 1.16.0-0` | theiaManager.apiServer.tlsMinVersion | string | `""` | TLS min version from: VersionTLS10, VersionTLS11, VersionTLS12, VersionTLS13. | | theiaManager.enable | bool | `false` | Determine whether to install Theia Manager. | | theiaManager.image | object | `{"pullPolicy":"IfNotPresent","repository":"projects.registry.vmware.com/antrea/theia-manager","tag":""}` | Container image used by Theia Manager. | -| theiaManager.logVerbosity | int | `0` | | +| theiaManager.logVerbosity | int | `0` | Log verbosity switch for Theia Manager. | ---------------------------------------------- Autogenerated from chart metadata using [helm-docs v1.7.0](https://github.com/norwoodj/helm-docs/releases/v1.7.0) diff --git a/build/charts/theia/crds/network-policy-recommendation-crd.yaml b/build/charts/theia/crds/network-policy-recommendation-crd.yaml index fb198a4c5..708d8d58a 100644 --- a/build/charts/theia/crds/network-policy-recommendation-crd.yaml +++ b/build/charts/theia/crds/network-policy-recommendation-crd.yaml @@ -20,6 +20,12 @@ spec: type: object required: - jobType + - policyType + - executorInstances + - driverCoreRequest + - driverMemory + - executorCoreRequest + - executorMemory properties: jobType: type: string @@ -27,10 +33,10 @@ spec: type: integer policyType: type: string - startTime: + startInterval: type: string format: datetime - endTime: + endInterval: type: string format: datetime nsAllowList: @@ -56,6 +62,33 @@ spec: properties: state: type: string + sparkApplication: + type: string + completedStages: + type: integer + totalStages: + type: integer + startTime: + type: string + format: datetime + endTime: + type: string + format: datetime + recommendedNetworkPolicy: + type: object + properties: + spec: + type: object + properties: + id: + type: string + resultType: + type: string + timeCreated: + type: string + format: datetime + yamls: + type: string additionalPrinterColumns: - description: Current state of the job jsonPath: .status.state @@ -70,3 +103,48 @@ spec: kind: NetworkPolicyRecommendation shortNames: - npr +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: recommendednetworkpolicies.crd.theia.antrea.io + labels: + app: theia +spec: + group: crd.theia.antrea.io + versions: + - name: v1alpha1 + served: true + storage: true + schema: + openAPIV3Schema: + type: object + required: + - spec + properties: + spec: + type: object + required: + - id + - timeCreated + - resultType + - yamls + properties: + id: + type: string + timeCreated: + type: string + format: datetime + resultType: + type: string + yamls: + type: string + subresources: + status: {} + scope: Namespaced + names: + plural: recommendednetworkpolicies + singular: recommendednetworkpolicy + kind: RecommendedNetworkPolicy + shortNames: + - rnp diff --git a/build/charts/theia/templates/theia-manager/clusterrole.yaml b/build/charts/theia/templates/theia-manager/clusterrole.yaml index 651bc60dc..2a77170dd 100644 --- a/build/charts/theia/templates/theia-manager/clusterrole.yaml +++ b/build/charts/theia/templates/theia-manager/clusterrole.yaml @@ -44,6 +44,18 @@ rules: resources: ["configmaps"] verbs: ["get", "list", "watch"] - apiGroups: ["crd.theia.antrea.io"] - resources: ["networkpolicyrecommendations"] - verbs: ["get", "list", "watch"] + resources: ["networkpolicyrecommendations", "recommendednetworkpolicies"] + verbs: ["get", "list", "watch", "create", "delete"] + - apiGroups: ["crd.theia.antrea.io"] + resources: ["networkpolicyrecommendations/status"] + verbs: ["update"] + - apiGroups: [ "" ] + resources: [ "pods" ] + verbs: ["list"] + - apiGroups: [ "" ] + resources: [ "services", "secrets" ] + verbs: ["get"] + - apiGroups: ["sparkoperator.k8s.io"] + resources: ["sparkapplications"] + verbs: ["create", "delete", "get", "list"] {{- end }} diff --git a/build/charts/theia/values.yaml b/build/charts/theia/values.yaml index 6b380484e..886691f44 100644 --- a/build/charts/theia/values.yaml +++ b/build/charts/theia/values.yaml @@ -233,5 +233,5 @@ theiaManager: tlsCipherSuites: "" # -- TLS min version from: VersionTLS10, VersionTLS11, VersionTLS12, VersionTLS13. tlsMinVersion: "" - ## -- Log verbosity switch for Theia Manager. + # -- Log verbosity switch for Theia Manager. logVerbosity: 0 diff --git a/cmd/theia-manager/theia-manager.go b/cmd/theia-manager/theia-manager.go index c65688197..a019e77a7 100644 --- a/cmd/theia-manager/theia-manager.go +++ b/cmd/theia-manager/theia-manager.go @@ -27,6 +27,7 @@ import ( "antrea.io/antrea/pkg/util/cipher" genericapiserver "k8s.io/apiserver/pkg/server" genericoptions "k8s.io/apiserver/pkg/server/options" + "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/klog/v2" @@ -111,9 +112,9 @@ func run(o *Options) error { if err != nil { return fmt.Errorf("error when generating KubeConfig: %v", err) } - client, err := clientset.NewForConfig(kubeConfig) + kubeClient, err := kubernetes.NewForConfig(kubeConfig) if err != nil { - return fmt.Errorf("error when generating k8s client: %v", err) + return fmt.Errorf("error when generating kubernetes client: %v", err) } crdClient, err := crdclientset.NewForConfig(kubeConfig) if err != nil { @@ -121,7 +122,8 @@ func run(o *Options) error { } crdInformerFactory := crdinformers.NewSharedInformerFactory(crdClient, informerDefaultResync) npRecommendationInformer := crdInformerFactory.Crd().V1alpha1().NetworkPolicyRecommendations() - npRecoController := networkpolicyrecommendation.NewNPRecommendationController(crdClient, npRecommendationInformer) + recommendedNPInformer := crdInformerFactory.Crd().V1alpha1().RecommendedNetworkPolicies() + npRecoController := networkpolicyrecommendation.NewNPRecommendationController(crdClient, kubeClient, npRecommendationInformer, recommendedNPInformer) cipherSuites, err := cipher.GenerateCipherSuitesList(o.config.APIServer.TLSCipherSuites) if err != nil { @@ -129,7 +131,7 @@ func run(o *Options) error { } apiServerConfig, err := createAPIServerConfig( - client, + kubeClient, *o.config.APIServer.SelfSignedCert, o.config.APIServer.APIPort, cipherSuites, diff --git a/pkg/apis/crd/v1alpha1/register.go b/pkg/apis/crd/v1alpha1/register.go index 88cf80667..f797d99b7 100644 --- a/pkg/apis/crd/v1alpha1/register.go +++ b/pkg/apis/crd/v1alpha1/register.go @@ -52,6 +52,8 @@ func addKnownTypes(scheme *runtime.Scheme) error { SchemeGroupVersion, &NetworkPolicyRecommendation{}, &NetworkPolicyRecommendationList{}, + &RecommendedNetworkPolicy{}, + &RecommendedNetworkPolicyList{}, ) metav1.AddToGroupVersion( diff --git a/pkg/apis/crd/v1alpha1/types.go b/pkg/apis/crd/v1alpha1/types.go index ecebc6c6d..fc32df9d8 100644 --- a/pkg/apis/crd/v1alpha1/types.go +++ b/pkg/apis/crd/v1alpha1/types.go @@ -23,6 +23,14 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +const ( + NPRecommendationStateNew string = "NEW" + NPRecommendationStateScheduled string = "SCHEDULED" + NPRecommendationStateRunning string = "RUNNING" + NPRecommendationStateCompleted string = "COMPLETED" + NPRecommendationStateFailed string = "FAILED" +) + // +genclient // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object @@ -35,11 +43,11 @@ type NetworkPolicyRecommendation struct { } type NetworkPolicyRecommendationSpec struct { - Type string `json:"type,omitempty"` + JobType string `json:"jobType,omitempty"` Limit int `json:"limit,omitempty"` PolicyType string `json:"policyType,omitempty"` - StartTime metav1.Time `json:"startTime,omitempty"` - EndTime metav1.Time `json:"endTime,omitempty"` + StartInterval metav1.Time `json:"startInterval,omitempty"` + EndInterval metav1.Time `json:"endInterval,omitempty"` NSAllowList []string `json:"nsAllowList,omitempty"` ExcludeLabels bool `json:"excludeLabels,omitempty"` ToServices bool `json:"toServices,omitempty"` @@ -51,7 +59,14 @@ type NetworkPolicyRecommendationSpec struct { } type NetworkPolicyRecommendationStatus struct { - State string `json:"state,omitempty"` + State string `json:"state,omitempty"` + SparkApplication string `json:"sparkApplication,omitempty"` + CompletedStages int `json:"completedStages,omitempty"` + TotalStages int `json:"totalStages,omitempty"` + RecommendedNP *RecommendedNetworkPolicy `json:"recommendedNetworkPolicy,omitempty"` + ErrorMsg string `json:"errorMsg,omitempty"` + StartTime metav1.Time `json:"startTime,omitempty"` + EndTime metav1.Time `json:"endTime,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object @@ -61,3 +76,28 @@ type NetworkPolicyRecommendationList struct { metav1.ListMeta `json:"metadata,omitempty"` Items []NetworkPolicyRecommendation `json:"items"` } + +// +genclient +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +type RecommendedNetworkPolicy struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec RecommendedNetworkPolicySpec `json:"spec,omitempty"` +} + +type RecommendedNetworkPolicySpec struct { + Id string `json:"id,omitempty"` + Type string `json:"resultType,omitempty"` + TimeCreated metav1.Time `json:"timeCreated,omitempty"` + Yamls string `json:"yamls,omitempty"` +} + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +type RecommendedNetworkPolicyList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []RecommendedNetworkPolicy `json:"items"` +} diff --git a/pkg/apis/crd/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/crd/v1alpha1/zz_generated.deepcopy.go index e11ec8131..90a423319 100644 --- a/pkg/apis/crd/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/crd/v1alpha1/zz_generated.deepcopy.go @@ -29,7 +29,7 @@ func (in *NetworkPolicyRecommendation) DeepCopyInto(out *NetworkPolicyRecommenda out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) in.Spec.DeepCopyInto(&out.Spec) - out.Status = in.Status + in.Status.DeepCopyInto(&out.Status) return } @@ -87,8 +87,8 @@ func (in *NetworkPolicyRecommendationList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *NetworkPolicyRecommendationSpec) DeepCopyInto(out *NetworkPolicyRecommendationSpec) { *out = *in - in.StartTime.DeepCopyInto(&out.StartTime) - in.EndTime.DeepCopyInto(&out.EndTime) + in.StartInterval.DeepCopyInto(&out.StartInterval) + in.EndInterval.DeepCopyInto(&out.EndInterval) if in.NSAllowList != nil { in, out := &in.NSAllowList, &out.NSAllowList *out = make([]string, len(*in)) @@ -110,6 +110,13 @@ func (in *NetworkPolicyRecommendationSpec) DeepCopy() *NetworkPolicyRecommendati // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *NetworkPolicyRecommendationStatus) DeepCopyInto(out *NetworkPolicyRecommendationStatus) { *out = *in + if in.RecommendedNP != nil { + in, out := &in.RecommendedNP, &out.RecommendedNP + *out = new(RecommendedNetworkPolicy) + (*in).DeepCopyInto(*out) + } + in.StartTime.DeepCopyInto(&out.StartTime) + in.EndTime.DeepCopyInto(&out.EndTime) return } @@ -122,3 +129,80 @@ func (in *NetworkPolicyRecommendationStatus) DeepCopy() *NetworkPolicyRecommenda in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RecommendedNetworkPolicy) DeepCopyInto(out *RecommendedNetworkPolicy) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RecommendedNetworkPolicy. +func (in *RecommendedNetworkPolicy) DeepCopy() *RecommendedNetworkPolicy { + if in == nil { + return nil + } + out := new(RecommendedNetworkPolicy) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *RecommendedNetworkPolicy) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RecommendedNetworkPolicyList) DeepCopyInto(out *RecommendedNetworkPolicyList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]RecommendedNetworkPolicy, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RecommendedNetworkPolicyList. +func (in *RecommendedNetworkPolicyList) DeepCopy() *RecommendedNetworkPolicyList { + if in == nil { + return nil + } + out := new(RecommendedNetworkPolicyList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *RecommendedNetworkPolicyList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RecommendedNetworkPolicySpec) DeepCopyInto(out *RecommendedNetworkPolicySpec) { + *out = *in + in.TimeCreated.DeepCopyInto(&out.TimeCreated) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RecommendedNetworkPolicySpec. +func (in *RecommendedNetworkPolicySpec) DeepCopy() *RecommendedNetworkPolicySpec { + if in == nil { + return nil + } + out := new(RecommendedNetworkPolicySpec) + in.DeepCopyInto(out) + return out +} diff --git a/pkg/apiserver/registry/intelligence/networkpolicyrecommendation/rest.go b/pkg/apiserver/registry/intelligence/networkpolicyrecommendation/rest.go index 67da70338..de26a8488 100644 --- a/pkg/apiserver/registry/intelligence/networkpolicyrecommendation/rest.go +++ b/pkg/apiserver/registry/intelligence/networkpolicyrecommendation/rest.go @@ -55,11 +55,11 @@ func (r *REST) getNetworkPolicyRecommendation(name string) *intelligence.Network job := new(intelligence.NetworkPolicyRecommendation) job.Name = npReco.Name - job.Type = npReco.Spec.Type + job.Type = npReco.Spec.JobType job.Limit = npReco.Spec.Limit job.PolicyType = npReco.Spec.PolicyType - job.StartInterval = npReco.Spec.StartTime - job.EndInterval = npReco.Spec.EndTime + job.StartInterval = npReco.Spec.StartInterval + job.EndInterval = npReco.Spec.EndInterval job.NSAllowList = npReco.Spec.NSAllowList job.ExcludeLabels = npReco.Spec.ExcludeLabels job.ToServices = npReco.Spec.ToServices diff --git a/pkg/client/clientset/versioned/typed/crd/v1alpha1/crd_client.go b/pkg/client/clientset/versioned/typed/crd/v1alpha1/crd_client.go index f511973b1..997049e63 100644 --- a/pkg/client/clientset/versioned/typed/crd/v1alpha1/crd_client.go +++ b/pkg/client/clientset/versioned/typed/crd/v1alpha1/crd_client.go @@ -27,6 +27,7 @@ import ( type CrdV1alpha1Interface interface { RESTClient() rest.Interface NetworkPolicyRecommendationsGetter + RecommendedNetworkPoliciesGetter } // CrdV1alpha1Client is used to interact with features provided by the crd.theia.antrea.io group. @@ -38,6 +39,10 @@ func (c *CrdV1alpha1Client) NetworkPolicyRecommendations(namespace string) Netwo return newNetworkPolicyRecommendations(c, namespace) } +func (c *CrdV1alpha1Client) RecommendedNetworkPolicies(namespace string) RecommendedNetworkPolicyInterface { + return newRecommendedNetworkPolicies(c, namespace) +} + // NewForConfig creates a new CrdV1alpha1Client for the given config. // NewForConfig is equivalent to NewForConfigAndClient(c, httpClient), // where httpClient was generated with rest.HTTPClientFor(c). diff --git a/pkg/client/clientset/versioned/typed/crd/v1alpha1/fake/fake_crd_client.go b/pkg/client/clientset/versioned/typed/crd/v1alpha1/fake/fake_crd_client.go index 75fb73799..ca1f3d620 100644 --- a/pkg/client/clientset/versioned/typed/crd/v1alpha1/fake/fake_crd_client.go +++ b/pkg/client/clientset/versioned/typed/crd/v1alpha1/fake/fake_crd_client.go @@ -30,6 +30,10 @@ func (c *FakeCrdV1alpha1) NetworkPolicyRecommendations(namespace string) v1alpha return &FakeNetworkPolicyRecommendations{c, namespace} } +func (c *FakeCrdV1alpha1) RecommendedNetworkPolicies(namespace string) v1alpha1.RecommendedNetworkPolicyInterface { + return &FakeRecommendedNetworkPolicies{c, namespace} +} + // RESTClient returns a RESTClient that is used to communicate // with API server by this client implementation. func (c *FakeCrdV1alpha1) RESTClient() rest.Interface { diff --git a/pkg/client/clientset/versioned/typed/crd/v1alpha1/fake/fake_recommendednetworkpolicy.go b/pkg/client/clientset/versioned/typed/crd/v1alpha1/fake/fake_recommendednetworkpolicy.go new file mode 100644 index 000000000..08e2acbb0 --- /dev/null +++ b/pkg/client/clientset/versioned/typed/crd/v1alpha1/fake/fake_recommendednetworkpolicy.go @@ -0,0 +1,128 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Code generated by client-gen. DO NOT EDIT. + +package fake + +import ( + "context" + + v1alpha1 "antrea.io/theia/pkg/apis/crd/v1alpha1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + labels "k8s.io/apimachinery/pkg/labels" + schema "k8s.io/apimachinery/pkg/runtime/schema" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + testing "k8s.io/client-go/testing" +) + +// FakeRecommendedNetworkPolicies implements RecommendedNetworkPolicyInterface +type FakeRecommendedNetworkPolicies struct { + Fake *FakeCrdV1alpha1 + ns string +} + +var recommendednetworkpoliciesResource = schema.GroupVersionResource{Group: "crd.theia.antrea.io", Version: "v1alpha1", Resource: "recommendednetworkpolicies"} + +var recommendednetworkpoliciesKind = schema.GroupVersionKind{Group: "crd.theia.antrea.io", Version: "v1alpha1", Kind: "RecommendedNetworkPolicy"} + +// Get takes name of the recommendedNetworkPolicy, and returns the corresponding recommendedNetworkPolicy object, and an error if there is any. +func (c *FakeRecommendedNetworkPolicies) Get(ctx context.Context, name string, options v1.GetOptions) (result *v1alpha1.RecommendedNetworkPolicy, err error) { + obj, err := c.Fake. + Invokes(testing.NewGetAction(recommendednetworkpoliciesResource, c.ns, name), &v1alpha1.RecommendedNetworkPolicy{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.RecommendedNetworkPolicy), err +} + +// List takes label and field selectors, and returns the list of RecommendedNetworkPolicies that match those selectors. +func (c *FakeRecommendedNetworkPolicies) List(ctx context.Context, opts v1.ListOptions) (result *v1alpha1.RecommendedNetworkPolicyList, err error) { + obj, err := c.Fake. + Invokes(testing.NewListAction(recommendednetworkpoliciesResource, recommendednetworkpoliciesKind, c.ns, opts), &v1alpha1.RecommendedNetworkPolicyList{}) + + if obj == nil { + return nil, err + } + + label, _, _ := testing.ExtractFromListOptions(opts) + if label == nil { + label = labels.Everything() + } + list := &v1alpha1.RecommendedNetworkPolicyList{ListMeta: obj.(*v1alpha1.RecommendedNetworkPolicyList).ListMeta} + for _, item := range obj.(*v1alpha1.RecommendedNetworkPolicyList).Items { + if label.Matches(labels.Set(item.Labels)) { + list.Items = append(list.Items, item) + } + } + return list, err +} + +// Watch returns a watch.Interface that watches the requested recommendedNetworkPolicies. +func (c *FakeRecommendedNetworkPolicies) Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) { + return c.Fake. + InvokesWatch(testing.NewWatchAction(recommendednetworkpoliciesResource, c.ns, opts)) + +} + +// Create takes the representation of a recommendedNetworkPolicy and creates it. Returns the server's representation of the recommendedNetworkPolicy, and an error, if there is any. +func (c *FakeRecommendedNetworkPolicies) Create(ctx context.Context, recommendedNetworkPolicy *v1alpha1.RecommendedNetworkPolicy, opts v1.CreateOptions) (result *v1alpha1.RecommendedNetworkPolicy, err error) { + obj, err := c.Fake. + Invokes(testing.NewCreateAction(recommendednetworkpoliciesResource, c.ns, recommendedNetworkPolicy), &v1alpha1.RecommendedNetworkPolicy{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.RecommendedNetworkPolicy), err +} + +// Update takes the representation of a recommendedNetworkPolicy and updates it. Returns the server's representation of the recommendedNetworkPolicy, and an error, if there is any. +func (c *FakeRecommendedNetworkPolicies) Update(ctx context.Context, recommendedNetworkPolicy *v1alpha1.RecommendedNetworkPolicy, opts v1.UpdateOptions) (result *v1alpha1.RecommendedNetworkPolicy, err error) { + obj, err := c.Fake. + Invokes(testing.NewUpdateAction(recommendednetworkpoliciesResource, c.ns, recommendedNetworkPolicy), &v1alpha1.RecommendedNetworkPolicy{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.RecommendedNetworkPolicy), err +} + +// Delete takes name of the recommendedNetworkPolicy and deletes it. Returns an error if one occurs. +func (c *FakeRecommendedNetworkPolicies) Delete(ctx context.Context, name string, opts v1.DeleteOptions) error { + _, err := c.Fake. + Invokes(testing.NewDeleteActionWithOptions(recommendednetworkpoliciesResource, c.ns, name, opts), &v1alpha1.RecommendedNetworkPolicy{}) + + return err +} + +// DeleteCollection deletes a collection of objects. +func (c *FakeRecommendedNetworkPolicies) DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error { + action := testing.NewDeleteCollectionAction(recommendednetworkpoliciesResource, c.ns, listOpts) + + _, err := c.Fake.Invokes(action, &v1alpha1.RecommendedNetworkPolicyList{}) + return err +} + +// Patch applies the patch and returns the patched recommendedNetworkPolicy. +func (c *FakeRecommendedNetworkPolicies) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *v1alpha1.RecommendedNetworkPolicy, err error) { + obj, err := c.Fake. + Invokes(testing.NewPatchSubresourceAction(recommendednetworkpoliciesResource, c.ns, name, pt, data, subresources...), &v1alpha1.RecommendedNetworkPolicy{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.RecommendedNetworkPolicy), err +} diff --git a/pkg/client/clientset/versioned/typed/crd/v1alpha1/generated_expansion.go b/pkg/client/clientset/versioned/typed/crd/v1alpha1/generated_expansion.go index f87a38c7d..dc32da68a 100644 --- a/pkg/client/clientset/versioned/typed/crd/v1alpha1/generated_expansion.go +++ b/pkg/client/clientset/versioned/typed/crd/v1alpha1/generated_expansion.go @@ -17,3 +17,5 @@ package v1alpha1 type NetworkPolicyRecommendationExpansion interface{} + +type RecommendedNetworkPolicyExpansion interface{} diff --git a/pkg/client/clientset/versioned/typed/crd/v1alpha1/recommendednetworkpolicy.go b/pkg/client/clientset/versioned/typed/crd/v1alpha1/recommendednetworkpolicy.go new file mode 100644 index 000000000..6f7db7961 --- /dev/null +++ b/pkg/client/clientset/versioned/typed/crd/v1alpha1/recommendednetworkpolicy.go @@ -0,0 +1,176 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Code generated by client-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + "context" + "time" + + v1alpha1 "antrea.io/theia/pkg/apis/crd/v1alpha1" + scheme "antrea.io/theia/pkg/client/clientset/versioned/scheme" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + rest "k8s.io/client-go/rest" +) + +// RecommendedNetworkPoliciesGetter has a method to return a RecommendedNetworkPolicyInterface. +// A group's client should implement this interface. +type RecommendedNetworkPoliciesGetter interface { + RecommendedNetworkPolicies(namespace string) RecommendedNetworkPolicyInterface +} + +// RecommendedNetworkPolicyInterface has methods to work with RecommendedNetworkPolicy resources. +type RecommendedNetworkPolicyInterface interface { + Create(ctx context.Context, recommendedNetworkPolicy *v1alpha1.RecommendedNetworkPolicy, opts v1.CreateOptions) (*v1alpha1.RecommendedNetworkPolicy, error) + Update(ctx context.Context, recommendedNetworkPolicy *v1alpha1.RecommendedNetworkPolicy, opts v1.UpdateOptions) (*v1alpha1.RecommendedNetworkPolicy, error) + Delete(ctx context.Context, name string, opts v1.DeleteOptions) error + DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error + Get(ctx context.Context, name string, opts v1.GetOptions) (*v1alpha1.RecommendedNetworkPolicy, error) + List(ctx context.Context, opts v1.ListOptions) (*v1alpha1.RecommendedNetworkPolicyList, error) + Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) + Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *v1alpha1.RecommendedNetworkPolicy, err error) + RecommendedNetworkPolicyExpansion +} + +// recommendedNetworkPolicies implements RecommendedNetworkPolicyInterface +type recommendedNetworkPolicies struct { + client rest.Interface + ns string +} + +// newRecommendedNetworkPolicies returns a RecommendedNetworkPolicies +func newRecommendedNetworkPolicies(c *CrdV1alpha1Client, namespace string) *recommendedNetworkPolicies { + return &recommendedNetworkPolicies{ + client: c.RESTClient(), + ns: namespace, + } +} + +// Get takes name of the recommendedNetworkPolicy, and returns the corresponding recommendedNetworkPolicy object, and an error if there is any. +func (c *recommendedNetworkPolicies) Get(ctx context.Context, name string, options v1.GetOptions) (result *v1alpha1.RecommendedNetworkPolicy, err error) { + result = &v1alpha1.RecommendedNetworkPolicy{} + err = c.client.Get(). + Namespace(c.ns). + Resource("recommendednetworkpolicies"). + Name(name). + VersionedParams(&options, scheme.ParameterCodec). + Do(ctx). + Into(result) + return +} + +// List takes label and field selectors, and returns the list of RecommendedNetworkPolicies that match those selectors. +func (c *recommendedNetworkPolicies) List(ctx context.Context, opts v1.ListOptions) (result *v1alpha1.RecommendedNetworkPolicyList, err error) { + var timeout time.Duration + if opts.TimeoutSeconds != nil { + timeout = time.Duration(*opts.TimeoutSeconds) * time.Second + } + result = &v1alpha1.RecommendedNetworkPolicyList{} + err = c.client.Get(). + Namespace(c.ns). + Resource("recommendednetworkpolicies"). + VersionedParams(&opts, scheme.ParameterCodec). + Timeout(timeout). + Do(ctx). + Into(result) + return +} + +// Watch returns a watch.Interface that watches the requested recommendedNetworkPolicies. +func (c *recommendedNetworkPolicies) Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) { + var timeout time.Duration + if opts.TimeoutSeconds != nil { + timeout = time.Duration(*opts.TimeoutSeconds) * time.Second + } + opts.Watch = true + return c.client.Get(). + Namespace(c.ns). + Resource("recommendednetworkpolicies"). + VersionedParams(&opts, scheme.ParameterCodec). + Timeout(timeout). + Watch(ctx) +} + +// Create takes the representation of a recommendedNetworkPolicy and creates it. Returns the server's representation of the recommendedNetworkPolicy, and an error, if there is any. +func (c *recommendedNetworkPolicies) Create(ctx context.Context, recommendedNetworkPolicy *v1alpha1.RecommendedNetworkPolicy, opts v1.CreateOptions) (result *v1alpha1.RecommendedNetworkPolicy, err error) { + result = &v1alpha1.RecommendedNetworkPolicy{} + err = c.client.Post(). + Namespace(c.ns). + Resource("recommendednetworkpolicies"). + VersionedParams(&opts, scheme.ParameterCodec). + Body(recommendedNetworkPolicy). + Do(ctx). + Into(result) + return +} + +// Update takes the representation of a recommendedNetworkPolicy and updates it. Returns the server's representation of the recommendedNetworkPolicy, and an error, if there is any. +func (c *recommendedNetworkPolicies) Update(ctx context.Context, recommendedNetworkPolicy *v1alpha1.RecommendedNetworkPolicy, opts v1.UpdateOptions) (result *v1alpha1.RecommendedNetworkPolicy, err error) { + result = &v1alpha1.RecommendedNetworkPolicy{} + err = c.client.Put(). + Namespace(c.ns). + Resource("recommendednetworkpolicies"). + Name(recommendedNetworkPolicy.Name). + VersionedParams(&opts, scheme.ParameterCodec). + Body(recommendedNetworkPolicy). + Do(ctx). + Into(result) + return +} + +// Delete takes name of the recommendedNetworkPolicy and deletes it. Returns an error if one occurs. +func (c *recommendedNetworkPolicies) Delete(ctx context.Context, name string, opts v1.DeleteOptions) error { + return c.client.Delete(). + Namespace(c.ns). + Resource("recommendednetworkpolicies"). + Name(name). + Body(&opts). + Do(ctx). + Error() +} + +// DeleteCollection deletes a collection of objects. +func (c *recommendedNetworkPolicies) DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error { + var timeout time.Duration + if listOpts.TimeoutSeconds != nil { + timeout = time.Duration(*listOpts.TimeoutSeconds) * time.Second + } + return c.client.Delete(). + Namespace(c.ns). + Resource("recommendednetworkpolicies"). + VersionedParams(&listOpts, scheme.ParameterCodec). + Timeout(timeout). + Body(&opts). + Do(ctx). + Error() +} + +// Patch applies the patch and returns the patched recommendedNetworkPolicy. +func (c *recommendedNetworkPolicies) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *v1alpha1.RecommendedNetworkPolicy, err error) { + result = &v1alpha1.RecommendedNetworkPolicy{} + err = c.client.Patch(pt). + Namespace(c.ns). + Resource("recommendednetworkpolicies"). + Name(name). + SubResource(subresources...). + VersionedParams(&opts, scheme.ParameterCodec). + Body(data). + Do(ctx). + Into(result) + return +} diff --git a/pkg/client/informers/externalversions/crd/v1alpha1/interface.go b/pkg/client/informers/externalversions/crd/v1alpha1/interface.go index b6d2424cb..10fd8bc78 100644 --- a/pkg/client/informers/externalversions/crd/v1alpha1/interface.go +++ b/pkg/client/informers/externalversions/crd/v1alpha1/interface.go @@ -24,6 +24,8 @@ import ( type Interface interface { // NetworkPolicyRecommendations returns a NetworkPolicyRecommendationInformer. NetworkPolicyRecommendations() NetworkPolicyRecommendationInformer + // RecommendedNetworkPolicies returns a RecommendedNetworkPolicyInformer. + RecommendedNetworkPolicies() RecommendedNetworkPolicyInformer } type version struct { @@ -41,3 +43,8 @@ func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakList func (v *version) NetworkPolicyRecommendations() NetworkPolicyRecommendationInformer { return &networkPolicyRecommendationInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} } + +// RecommendedNetworkPolicies returns a RecommendedNetworkPolicyInformer. +func (v *version) RecommendedNetworkPolicies() RecommendedNetworkPolicyInformer { + return &recommendedNetworkPolicyInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} +} diff --git a/pkg/client/informers/externalversions/crd/v1alpha1/recommendednetworkpolicy.go b/pkg/client/informers/externalversions/crd/v1alpha1/recommendednetworkpolicy.go new file mode 100644 index 000000000..eff27e7c0 --- /dev/null +++ b/pkg/client/informers/externalversions/crd/v1alpha1/recommendednetworkpolicy.go @@ -0,0 +1,88 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Code generated by informer-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + "context" + time "time" + + crdv1alpha1 "antrea.io/theia/pkg/apis/crd/v1alpha1" + versioned "antrea.io/theia/pkg/client/clientset/versioned" + internalinterfaces "antrea.io/theia/pkg/client/informers/externalversions/internalinterfaces" + v1alpha1 "antrea.io/theia/pkg/client/listers/crd/v1alpha1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + runtime "k8s.io/apimachinery/pkg/runtime" + watch "k8s.io/apimachinery/pkg/watch" + cache "k8s.io/client-go/tools/cache" +) + +// RecommendedNetworkPolicyInformer provides access to a shared informer and lister for +// RecommendedNetworkPolicies. +type RecommendedNetworkPolicyInformer interface { + Informer() cache.SharedIndexInformer + Lister() v1alpha1.RecommendedNetworkPolicyLister +} + +type recommendedNetworkPolicyInformer struct { + factory internalinterfaces.SharedInformerFactory + tweakListOptions internalinterfaces.TweakListOptionsFunc + namespace string +} + +// NewRecommendedNetworkPolicyInformer constructs a new informer for RecommendedNetworkPolicy type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewRecommendedNetworkPolicyInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer { + return NewFilteredRecommendedNetworkPolicyInformer(client, namespace, resyncPeriod, indexers, nil) +} + +// NewFilteredRecommendedNetworkPolicyInformer constructs a new informer for RecommendedNetworkPolicy type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewFilteredRecommendedNetworkPolicyInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { + return cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options v1.ListOptions) (runtime.Object, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.CrdV1alpha1().RecommendedNetworkPolicies(namespace).List(context.TODO(), options) + }, + WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.CrdV1alpha1().RecommendedNetworkPolicies(namespace).Watch(context.TODO(), options) + }, + }, + &crdv1alpha1.RecommendedNetworkPolicy{}, + resyncPeriod, + indexers, + ) +} + +func (f *recommendedNetworkPolicyInformer) defaultInformer(client versioned.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + return NewFilteredRecommendedNetworkPolicyInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) +} + +func (f *recommendedNetworkPolicyInformer) Informer() cache.SharedIndexInformer { + return f.factory.InformerFor(&crdv1alpha1.RecommendedNetworkPolicy{}, f.defaultInformer) +} + +func (f *recommendedNetworkPolicyInformer) Lister() v1alpha1.RecommendedNetworkPolicyLister { + return v1alpha1.NewRecommendedNetworkPolicyLister(f.Informer().GetIndexer()) +} diff --git a/pkg/client/informers/externalversions/generic.go b/pkg/client/informers/externalversions/generic.go index 2002cd83b..309437377 100644 --- a/pkg/client/informers/externalversions/generic.go +++ b/pkg/client/informers/externalversions/generic.go @@ -53,6 +53,8 @@ func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource // Group=crd.theia.antrea.io, Version=v1alpha1 case v1alpha1.SchemeGroupVersion.WithResource("networkpolicyrecommendations"): return &genericInformer{resource: resource.GroupResource(), informer: f.Crd().V1alpha1().NetworkPolicyRecommendations().Informer()}, nil + case v1alpha1.SchemeGroupVersion.WithResource("recommendednetworkpolicies"): + return &genericInformer{resource: resource.GroupResource(), informer: f.Crd().V1alpha1().RecommendedNetworkPolicies().Informer()}, nil } diff --git a/pkg/client/listers/crd/v1alpha1/expansion_generated.go b/pkg/client/listers/crd/v1alpha1/expansion_generated.go index 0ee2c5fd0..7b7320953 100644 --- a/pkg/client/listers/crd/v1alpha1/expansion_generated.go +++ b/pkg/client/listers/crd/v1alpha1/expansion_generated.go @@ -23,3 +23,11 @@ type NetworkPolicyRecommendationListerExpansion interface{} // NetworkPolicyRecommendationNamespaceListerExpansion allows custom methods to be added to // NetworkPolicyRecommendationNamespaceLister. type NetworkPolicyRecommendationNamespaceListerExpansion interface{} + +// RecommendedNetworkPolicyListerExpansion allows custom methods to be added to +// RecommendedNetworkPolicyLister. +type RecommendedNetworkPolicyListerExpansion interface{} + +// RecommendedNetworkPolicyNamespaceListerExpansion allows custom methods to be added to +// RecommendedNetworkPolicyNamespaceLister. +type RecommendedNetworkPolicyNamespaceListerExpansion interface{} diff --git a/pkg/client/listers/crd/v1alpha1/recommendednetworkpolicy.go b/pkg/client/listers/crd/v1alpha1/recommendednetworkpolicy.go new file mode 100644 index 000000000..8b76bb894 --- /dev/null +++ b/pkg/client/listers/crd/v1alpha1/recommendednetworkpolicy.go @@ -0,0 +1,97 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Code generated by lister-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + v1alpha1 "antrea.io/theia/pkg/apis/crd/v1alpha1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/tools/cache" +) + +// RecommendedNetworkPolicyLister helps list RecommendedNetworkPolicies. +// All objects returned here must be treated as read-only. +type RecommendedNetworkPolicyLister interface { + // List lists all RecommendedNetworkPolicies in the indexer. + // Objects returned here must be treated as read-only. + List(selector labels.Selector) (ret []*v1alpha1.RecommendedNetworkPolicy, err error) + // RecommendedNetworkPolicies returns an object that can list and get RecommendedNetworkPolicies. + RecommendedNetworkPolicies(namespace string) RecommendedNetworkPolicyNamespaceLister + RecommendedNetworkPolicyListerExpansion +} + +// recommendedNetworkPolicyLister implements the RecommendedNetworkPolicyLister interface. +type recommendedNetworkPolicyLister struct { + indexer cache.Indexer +} + +// NewRecommendedNetworkPolicyLister returns a new RecommendedNetworkPolicyLister. +func NewRecommendedNetworkPolicyLister(indexer cache.Indexer) RecommendedNetworkPolicyLister { + return &recommendedNetworkPolicyLister{indexer: indexer} +} + +// List lists all RecommendedNetworkPolicies in the indexer. +func (s *recommendedNetworkPolicyLister) List(selector labels.Selector) (ret []*v1alpha1.RecommendedNetworkPolicy, err error) { + err = cache.ListAll(s.indexer, selector, func(m interface{}) { + ret = append(ret, m.(*v1alpha1.RecommendedNetworkPolicy)) + }) + return ret, err +} + +// RecommendedNetworkPolicies returns an object that can list and get RecommendedNetworkPolicies. +func (s *recommendedNetworkPolicyLister) RecommendedNetworkPolicies(namespace string) RecommendedNetworkPolicyNamespaceLister { + return recommendedNetworkPolicyNamespaceLister{indexer: s.indexer, namespace: namespace} +} + +// RecommendedNetworkPolicyNamespaceLister helps list and get RecommendedNetworkPolicies. +// All objects returned here must be treated as read-only. +type RecommendedNetworkPolicyNamespaceLister interface { + // List lists all RecommendedNetworkPolicies in the indexer for a given namespace. + // Objects returned here must be treated as read-only. + List(selector labels.Selector) (ret []*v1alpha1.RecommendedNetworkPolicy, err error) + // Get retrieves the RecommendedNetworkPolicy from the indexer for a given namespace and name. + // Objects returned here must be treated as read-only. + Get(name string) (*v1alpha1.RecommendedNetworkPolicy, error) + RecommendedNetworkPolicyNamespaceListerExpansion +} + +// recommendedNetworkPolicyNamespaceLister implements the RecommendedNetworkPolicyNamespaceLister +// interface. +type recommendedNetworkPolicyNamespaceLister struct { + indexer cache.Indexer + namespace string +} + +// List lists all RecommendedNetworkPolicies in the indexer for a given namespace. +func (s recommendedNetworkPolicyNamespaceLister) List(selector labels.Selector) (ret []*v1alpha1.RecommendedNetworkPolicy, err error) { + err = cache.ListAllByNamespace(s.indexer, s.namespace, selector, func(m interface{}) { + ret = append(ret, m.(*v1alpha1.RecommendedNetworkPolicy)) + }) + return ret, err +} + +// Get retrieves the RecommendedNetworkPolicy from the indexer for a given namespace and name. +func (s recommendedNetworkPolicyNamespaceLister) Get(name string) (*v1alpha1.RecommendedNetworkPolicy, error) { + obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.NewNotFound(v1alpha1.Resource("recommendednetworkpolicy"), name) + } + return obj.(*v1alpha1.RecommendedNetworkPolicy), nil +} diff --git a/pkg/controller/networkpolicyrecommendation/controller.go b/pkg/controller/networkpolicyrecommendation/controller.go index e60f0a1cd..073c61573 100644 --- a/pkg/controller/networkpolicyrecommendation/controller.go +++ b/pkg/controller/networkpolicyrecommendation/controller.go @@ -15,11 +15,23 @@ package networkpolicyrecommendation import ( + "context" + "database/sql" + "fmt" + "reflect" + "regexp" + "strconv" + "strings" + "sync" "time" + "github.com/google/uuid" apimachineryerrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" apimachinerytypes "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" @@ -28,6 +40,7 @@ import ( "antrea.io/theia/pkg/client/clientset/versioned" crdv1a1informers "antrea.io/theia/pkg/client/informers/externalversions/crd/v1alpha1" "antrea.io/theia/pkg/client/listers/crd/v1alpha1" + sparkv1 "antrea.io/theia/third_party/sparkoperator/v1beta2" ) const ( @@ -39,44 +52,116 @@ const ( maxRetryDelay = 300 * time.Second // Default number of workers processing an Service change. defaultWorkers = 4 + // Time format for parsing input time + inputTimeFormat = "2006-01-02 15:04:05" + // Time format for parsing time from ClickHouse + clickHouseTimeFormat = "2006-01-02T15:04:05Z" + // SparkApplication id index name for RecommendedNetworkPolicy + idIndex = "id" + k8sQuantitiesReg = "^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$" + // Spark related parameters + sparkImage = "projects.registry.vmware.com/antrea/theia-policy-recommendation:latest" + sparkImagePullPolicy = "IfNotPresent" + sparkAppFile = "local:///opt/spark/work-dir/policy_recommendation_job.py" + sparkServiceAccount = "policy-recommendation-spark" + sparkVersion = "3.1.1" + sparkPort = 4040 +) + +var ( + // Spark Application CRUD functions, for unit tests + CreateSparkApplication = createSparkApplication + DeleteSparkApplication = deleteSparkApplication + ListSparkApplication = listSparkApplication + GetSparkApplication = getSparkApplication + GetSparkMonitoringSvcDNS = getSparkMonitoringSvcDNS + // For NPR in scheduled or running state, check its status periodically + npRecommendationResyncPeriod = 10 * time.Second ) type NPRecommendationController struct { - crdClient versioned.Interface + crdClient versioned.Interface + kubeClient kubernetes.Interface npRecommendationInformer cache.SharedIndexInformer npRecommendationLister v1alpha1.NetworkPolicyRecommendationLister npRecommendationSynced cache.InformerSynced + recommendedNPInformer cache.SharedIndexInformer // queue maintains the Service objects that need to be synced. - queue workqueue.RateLimitingInterface + queue workqueue.RateLimitingInterface + deletionQueue workqueue.RateLimitingInterface + periodicResyncSetMutex sync.Mutex + periodicResyncSet map[apimachinerytypes.NamespacedName]struct{} + clickhouseConnect *sql.DB +} + +type NamespacedId struct { + Namespace string + Id string } func NewNPRecommendationController( crdClient versioned.Interface, + kubeClient kubernetes.Interface, npRecommendationInformer crdv1a1informers.NetworkPolicyRecommendationInformer, + recommendedNPInformer crdv1a1informers.RecommendedNetworkPolicyInformer, ) *NPRecommendationController { c := &NPRecommendationController{ crdClient: crdClient, + kubeClient: kubeClient, queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "npRecommendation"), + deletionQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "npRecommendationCleanup"), npRecommendationInformer: npRecommendationInformer.Informer(), npRecommendationLister: npRecommendationInformer.Lister(), npRecommendationSynced: npRecommendationInformer.Informer().HasSynced, + recommendedNPInformer: recommendedNPInformer.Informer(), + periodicResyncSet: make(map[apimachinerytypes.NamespacedName]struct{}), } c.npRecommendationInformer.AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ AddFunc: c.addNPRecommendation, + UpdateFunc: c.updateNPRecommendation, DeleteFunc: c.deleteNPRecommendation, }, resyncPeriod, ) + // Add SparkApplication ID index for RecommendedNetworkPolicy + c.recommendedNPInformer.AddIndexers(cache.Indexers{idIndex: rnpIdIndexFunc}) + return c } +func rnpIdIndexFunc(obj interface{}) ([]string, error) { + recommendedNP, ok := obj.(*crdv1alpha1.RecommendedNetworkPolicy) + if !ok { + return nil, fmt.Errorf("obj is not RecommendedNetworkPolicy: %v", obj) + } + return []string{recommendedNP.Spec.Id}, nil +} + func (c *NPRecommendationController) addNPRecommendation(obj interface{}) { - npReco, _ := obj.(*crdv1alpha1.NetworkPolicyRecommendation) - klog.V(2).Infof("Processing NP Recommendation %s ADD event, labels: %v", npReco.Name, npReco.Labels) + npReco, ok := obj.(*crdv1alpha1.NetworkPolicyRecommendation) + if !ok { + klog.ErrorS(nil, "fail to convert to NetworkPolicyRecommendation", "object", obj) + return + } + klog.V(2).InfoS("Processing NP Recommendation ADD event", "name", npReco.Name, "labels", npReco.Labels) + namespacedName := apimachinerytypes.NamespacedName{ + Namespace: npReco.Namespace, + Name: npReco.Name, + } + c.queue.Add(namespacedName) +} + +func (c *NPRecommendationController) updateNPRecommendation(_, new interface{}) { + npReco, ok := new.(*crdv1alpha1.NetworkPolicyRecommendation) + if !ok { + klog.ErrorS(nil, "fail to convert to NetworkPolicyRecommendation", "object", new) + return + } + klog.V(2).InfoS("Processing NP Recommendation UPDATE event", "name", npReco.Name, "labels", npReco.Labels) namespacedName := apimachinerytypes.NamespacedName{ Namespace: npReco.Namespace, Name: npReco.Name, @@ -89,21 +174,29 @@ func (c *NPRecommendationController) deleteNPRecommendation(old interface{}) { if !ok { tombstone, ok := old.(cache.DeletedFinalStateUnknown) if !ok { - klog.Errorf("Error decoding object when deleting NP Recommendation, invalid type: %v", old) + klog.ErrorS(nil, "Error decoding object when deleting NP Recommendation", "oldObject", old) return } npReco, ok = tombstone.Obj.(*crdv1alpha1.NetworkPolicyRecommendation) if !ok { - klog.Errorf("Error decoding object tombstone when deleting NP Recommendation, invalid type: %v", tombstone.Obj) + klog.ErrorS(nil, "Error decoding object tombstone when deleting NP Recommendation", "tombstone", tombstone.Obj) return } } - klog.V(2).Infof("Processing NP Recommendation %s DELETE event, labels: %v", npReco.Name, npReco.Labels) - namespacedName := apimachinerytypes.NamespacedName{ + klog.V(2).InfoS("Processing NP Recommendation DELETE event", "name", npReco.Name, "labels", npReco.Labels) + // remove NPRecommendation from periodic synchronization list in case it is deleted before completing + c.stopPeriodicSync(apimachinerytypes.NamespacedName{ Namespace: npReco.Namespace, Name: npReco.Name, + }) + // Add SparkApplication and Namespace information to deletionQueue for cleanup + if npReco.Status.SparkApplication != "" { + namespacedId := NamespacedId{ + Namespace: npReco.Namespace, + Id: npReco.Status.SparkApplication, + } + c.deletionQueue.Add(namespacedId) } - c.queue.Add(namespacedName) } // Run will create defaultWorkers workers (go routines) which will process the Service events from the @@ -118,12 +211,45 @@ func (c *NPRecommendationController) Run(stopCh <-chan struct{}) { return } + go func() { + wait.Until(c.resyncNPRecommendation, npRecommendationResyncPeriod, stopCh) + }() + + go wait.Until(c.deletionworker, time.Second, stopCh) + for i := 0; i < defaultWorkers; i++ { go wait.Until(c.worker, time.Second, stopCh) } <-stopCh } +func (c *NPRecommendationController) deletionworker() { + for c.processNextDeletionWorkItem() { + } +} + +func (c *NPRecommendationController) processNextDeletionWorkItem() bool { + obj, quit := c.deletionQueue.Get() + if quit { + return false + } + defer c.deletionQueue.Done(obj) + if key, ok := obj.(NamespacedId); !ok { + c.queue.Forget(obj) + klog.ErrorS(nil, "Expected Spark Application namespaced id in work queue", "got", obj) + return true + } else if err := c.cleanupNPRecommendation(key.Namespace, key.Id); err == nil { + // If no error occurs we forget this item so it does not get queued again until + // another change happens. + c.deletionQueue.Forget(key) + } else { + // Put the item back on the workqueue to handle any transient errors. + c.deletionQueue.AddRateLimited(key) + klog.ErrorS(err, "Error when cleaning Spark Application, requeuing", "key", key) + } + return true +} + // worker is a long-running function that will continually call the processNextWorkItem function in // order to read and process a message on the workqueue. func (c *NPRecommendationController) worker() { @@ -131,6 +257,18 @@ func (c *NPRecommendationController) worker() { } } +func (c *NPRecommendationController) resyncNPRecommendation() { + c.periodicResyncSetMutex.Lock() + nprs := make([]apimachinerytypes.NamespacedName, 0, len(c.periodicResyncSet)) + for nprNamespacedName := range c.periodicResyncSet { + nprs = append(nprs, nprNamespacedName) + } + c.periodicResyncSetMutex.Unlock() + for _, nprNamespacedName := range nprs { + c.queue.Add(nprNamespacedName) + } +} + func (c *NPRecommendationController) processNextWorkItem() bool { obj, quit := c.queue.Get() if quit { @@ -139,16 +277,16 @@ func (c *NPRecommendationController) processNextWorkItem() bool { defer c.queue.Done(obj) if key, ok := obj.(apimachinerytypes.NamespacedName); !ok { c.queue.Forget(obj) - klog.Errorf("Expected NP Recommendation in work queue but got %#v", obj) + klog.ErrorS(nil, "Expected NP Recommendation in work queue", "got", obj) return true } else if err := c.syncNPRecommendation(key); err == nil { - // If no error occurs we Forget this item so it does not get queued again until + // If no error occurs we forget this item so it does not get queued again until // another change happens. c.queue.Forget(key) } else { // Put the item back on the workqueue to handle any transient errors. c.queue.AddRateLimited(key) - klog.Errorf("Error syncing NP Recommendation %s, requeuing. Error: %v", key, err) + klog.ErrorS(err, "Error when syncing NP Recommendation, requeuing", "key", key) } return true } @@ -156,7 +294,7 @@ func (c *NPRecommendationController) processNextWorkItem() bool { func (c *NPRecommendationController) syncNPRecommendation(key apimachinerytypes.NamespacedName) error { startTime := time.Now() defer func() { - klog.V(4).Infof("Finished syncing NP Recommendation for %s. (%v)", key, time.Since(startTime)) + klog.V(4).InfoS("Finished syncing NP Recommendation", "key", key, "time", time.Since(startTime)) }() npReco, err := c.npRecommendationLister.NetworkPolicyRecommendations(key.Namespace).Get(key.Name) @@ -168,11 +306,427 @@ func (c *NPRecommendationController) syncNPRecommendation(key apimachinerytypes. return err } - klog.V(4).Infof("Syncing NP Recommendation %v", npReco) - // TODO (wshaoquan): add handling logic here + klog.V(4).Infof("Syncing NP Recommendation", "npReco", npReco) + + switch npReco.Status.State { + case "", crdv1alpha1.NPRecommendationStateNew: + err = c.startJob(npReco) + case crdv1alpha1.NPRecommendationStateScheduled: + _, err = c.checkSparkApplicationStatus(npReco) + case crdv1alpha1.NPRecommendationStateRunning: + err = c.updateProgress(npReco) + case crdv1alpha1.NPRecommendationStateCompleted: + err = c.updateResult(npReco) + } + return err +} + +func (c *NPRecommendationController) cleanupNPRecommendation(namespace string, sparkApplicationId string) error { + // Delete the Spark Application if exists + err := deleteSparkApplicationIfExists(c.kubeClient, namespace, sparkApplicationId) + if err != nil { + return fmt.Errorf("failed to delete the SparkApplication %s, error: %v", sparkApplicationId, err) + } + // Delete the RNP if exists + rnps, err := c.recommendedNPInformer.GetIndexer().ByIndex(idIndex, sparkApplicationId) + if err != nil { + return fmt.Errorf("failed to get the RecommendedNetworkPolicy when deleting, error: %v", err) + } + var undeletedRnpNames []string + var errorList []error + for _, obj := range rnps { + recommendedNetworkPolicy := obj.(*crdv1alpha1.RecommendedNetworkPolicy) + err = c.crdClient.CrdV1alpha1().RecommendedNetworkPolicies(namespace).Delete(context.TODO(), recommendedNetworkPolicy.Name, metav1.DeleteOptions{}) + if err != nil { + undeletedRnpNames = append(undeletedRnpNames, recommendedNetworkPolicy.Name) + errorList = append(errorList, err) + } + } + if len(errorList) > 0 { + return fmt.Errorf("failed to deleted RecommendedNetworkPolicies: %v, error: %v", undeletedRnpNames, errorList) + } + // Delete the result from the ClickHouse + if c.clickhouseConnect == nil { + c.clickhouseConnect, err = setupClickHouseConnection(c.kubeClient, namespace) + if err != nil { + return err + } + } + return deletePolicyRecommendationResult(c.clickhouseConnect, sparkApplicationId) +} + +func (c *NPRecommendationController) updateResult(npReco *crdv1alpha1.NetworkPolicyRecommendation) error { + namespacedName := apimachinerytypes.NamespacedName{ + Name: npReco.Name, + Namespace: npReco.Namespace, + } + // Stop periodical job + c.stopPeriodicSync(namespacedName) + // Delete related SparkApplication CR + if npReco.Status.SparkApplication == "" { + return c.updateNPRecommendationStatus( + npReco, + crdv1alpha1.NetworkPolicyRecommendationStatus{ + State: crdv1alpha1.NPRecommendationStateFailed, + ErrorMsg: "Spark Application should be started before updating results", + }, + ) + } + + if npReco.Status.RecommendedNP == nil { + err := deleteSparkApplicationIfExists(c.kubeClient, npReco.Namespace, npReco.Status.SparkApplication) + if err != nil { + return fmt.Errorf("fail to delete Spark Application: %s, error: %v", npReco.Status.SparkApplication, err) + } + var recommendedNetworkPolicy *crdv1alpha1.RecommendedNetworkPolicy + // Check if RecommendedNetworkPolicy CR is created + rnps, err := c.recommendedNPInformer.GetIndexer().ByIndex(idIndex, npReco.Status.SparkApplication) + if err == nil && len(rnps) > 0 { + // Use the existing RecommendedNetworkPolicy CR + recommendedNetworkPolicy = rnps[0].(*crdv1alpha1.RecommendedNetworkPolicy) + if len(rnps) > 1 { + klog.V(4).InfoS("More than 1 RecommendedNetworkPolicy", "id", npReco.Status.SparkApplication) + } + } else { + if err != nil { + klog.V(4).InfoS("Failed to find RecommendedNetworkPolicy", "id", npReco.Status.SparkApplication) + } + // Get result from database + if c.clickhouseConnect == nil { + c.clickhouseConnect, err = setupClickHouseConnection(c.kubeClient, npReco.Namespace) + if err != nil { + return err + } + } + recommendedNetworkPolicy, err = getPolicyRecommendationResult(c.clickhouseConnect, npReco.Status.SparkApplication) + if err != nil { + return err + } + // Create RecommendedNetworkPolicy CR + recommendedNetworkPolicy, err = c.crdClient.CrdV1alpha1().RecommendedNetworkPolicies(npReco.Namespace).Create(context.TODO(), recommendedNetworkPolicy, metav1.CreateOptions{}) + if err != nil { + return err + } + klog.V(2).InfoS("Created RecommendedNetworkPolicy", "RecommendedNetworkPolicy", recommendedNetworkPolicy.Name, "NetworkRecommendationPolicy", npReco.Name) + } + return c.updateNPRecommendationStatus(npReco, crdv1alpha1.NetworkPolicyRecommendationStatus{ + State: crdv1alpha1.NPRecommendationStateCompleted, + RecommendedNP: recommendedNetworkPolicy, + EndTime: metav1.NewTime(time.Now()), + }) + } return nil } +func (c *NPRecommendationController) updateProgress(npReco *crdv1alpha1.NetworkPolicyRecommendation) error { + // Check the status before checking the progress in case the job is failed or completed + state, err := c.checkSparkApplicationStatus(npReco) + if err != nil { + return err + } + if state != crdv1alpha1.NPRecommendationStateRunning { + return nil + } + endpoint := GetSparkMonitoringSvcDNS(npReco.Status.SparkApplication, npReco.Namespace) + completedStages, totalStages, err := getPolicyRecommendationProgress(endpoint) + if err != nil { + // The Spark Monitoring Service may not start or closed at this point due to the async + // between Spark operator and this controller. + // As we periodically check the progress, we do not need to requeue this failure. + klog.V(4).ErrorS(err, "Failed to get the progress of the policy recommendation job") + return nil + } + klog.V(4).InfoS("Got Spark Application progress", "completedStages", completedStages, "totalStages", totalStages, "NetworkRecommendationPolicy", npReco.Name) + return c.updateNPRecommendationStatus( + npReco, + crdv1alpha1.NetworkPolicyRecommendationStatus{ + State: crdv1alpha1.NPRecommendationStateRunning, + CompletedStages: completedStages, + TotalStages: totalStages, + }, + ) +} + +func (c *NPRecommendationController) checkSparkApplicationStatus(npReco *crdv1alpha1.NetworkPolicyRecommendation) (string, error) { + if npReco.Status.SparkApplication == "" { + return "", c.updateNPRecommendationStatus( + npReco, + crdv1alpha1.NetworkPolicyRecommendationStatus{ + State: crdv1alpha1.NPRecommendationStateFailed, + ErrorMsg: "Spark Application should be started before status checking", + }, + ) + } + + state, errorMessage, err := getPolicyRecommendationStatus(c.kubeClient, npReco.Status.SparkApplication, npReco.Namespace) + if err != nil { + return state, err + } + klog.V(4).InfoS("Got Spark Application state", "state", state, "NetworkRecommendationPolicy", npReco.Name) + if state == "RUNNING" { + return state, c.updateNPRecommendationStatus( + npReco, + crdv1alpha1.NetworkPolicyRecommendationStatus{ + State: crdv1alpha1.NPRecommendationStateRunning, + ErrorMsg: errorMessage, + }, + ) + } else if state == "COMPLETED" { + return state, c.updateNPRecommendationStatus( + npReco, + crdv1alpha1.NetworkPolicyRecommendationStatus{ + State: crdv1alpha1.NPRecommendationStateCompleted, + ErrorMsg: errorMessage, + }, + ) + } else if state == "FAILED" || state == "SUBMISSION_FAILED" || state == "FAILING" || state == "INVALIDATING" { + return state, c.updateNPRecommendationStatus( + npReco, + crdv1alpha1.NetworkPolicyRecommendationStatus{ + State: crdv1alpha1.NPRecommendationStateFailed, + ErrorMsg: fmt.Sprintf("policy recommendation job failed, state: %s, error message: %v", state, errorMessage), + }, + ) + } + return state, nil +} + +func (c *NPRecommendationController) startJob(npReco *crdv1alpha1.NetworkPolicyRecommendation) error { + // Validate Cluster readiness + if err := validateCluster(c.kubeClient, npReco.Namespace); err != nil { + return err + } + err := c.startSparkApplication(npReco) + // Mark the NetworkPolicyRecommendation as failed and not retry if it failed due to illeagel arguments in request + if err != nil && reflect.TypeOf(err) == reflect.TypeOf(IlleagelArguementError{}) { + return c.updateNPRecommendationStatus( + npReco, + crdv1alpha1.NetworkPolicyRecommendationStatus{ + State: crdv1alpha1.NPRecommendationStateFailed, + ErrorMsg: fmt.Sprintf("error in creating NetworkPolicyRecommendation: %v", err), + }, + ) + } + // Schedule periodical resync for successful starting + if err == nil { + c.addPeriodicSync(apimachinerytypes.NamespacedName{ + Name: npReco.Name, + Namespace: npReco.Namespace, + }) + } + return err +} + +func (c *NPRecommendationController) startSparkApplication(npReco *crdv1alpha1.NetworkPolicyRecommendation) error { + var recoJobArgs []string + if npReco.Spec.JobType != "initial" && npReco.Spec.JobType != "subsequent" { + return IlleagelArguementError{fmt.Errorf("invalid request: recommendation type should be 'initial' or 'subsequent'")} + } + recoJobArgs = append(recoJobArgs, "--type", npReco.Spec.JobType) + + if npReco.Spec.Limit < 0 { + return IlleagelArguementError{fmt.Errorf("invalid request: limit should be an integer >= 0")} + } + recoJobArgs = append(recoJobArgs, "--limit", strconv.Itoa(npReco.Spec.Limit)) + + var policyTypeArg int + if npReco.Spec.PolicyType == "anp-deny-applied" { + policyTypeArg = 1 + } else if npReco.Spec.PolicyType == "anp-deny-all" { + policyTypeArg = 2 + } else if npReco.Spec.PolicyType == "k8s-np" { + policyTypeArg = 3 + } else { + return IlleagelArguementError{fmt.Errorf("invalid request: type of generated NetworkPolicy should be anp-deny-applied or anp-deny-all or k8s-np")} + } + recoJobArgs = append(recoJobArgs, "--option", strconv.Itoa(policyTypeArg)) + + if !npReco.Spec.StartInterval.IsZero() { + recoJobArgs = append(recoJobArgs, "--start_time", npReco.Spec.StartInterval.Format(inputTimeFormat)) + } + if !npReco.Spec.EndInterval.IsZero() { + endAfterStart := npReco.Spec.EndInterval.After(npReco.Spec.StartInterval.Time) + if !endAfterStart { + return IlleagelArguementError{fmt.Errorf("invalid request: EndInterval should be after StartInterval")} + } + recoJobArgs = append(recoJobArgs, "--end_time", npReco.Spec.EndInterval.Format(inputTimeFormat)) + } + + if len(npReco.Spec.NSAllowList) > 0 { + nsAllowListStr := strings.Join(npReco.Spec.NSAllowList, "\",\"") + nsAllowListStr = "[\"" + nsAllowListStr + "\"]" + recoJobArgs = append(recoJobArgs, "--ns_allow_list", nsAllowListStr) + } + + recoJobArgs = append(recoJobArgs, "--rm_labels", strconv.FormatBool(npReco.Spec.ExcludeLabels)) + recoJobArgs = append(recoJobArgs, "--to_services", strconv.FormatBool(npReco.Spec.ToServices)) + + sparkResourceArgs := struct { + executorInstances int32 + driverCoreRequest string + driverMemory string + executorCoreRequest string + executorMemory string + }{} + + if npReco.Spec.ExecutorInstances < 0 { + return IlleagelArguementError{fmt.Errorf("invalid request: ExecutorInstances should be an integer >= 0")} + } + sparkResourceArgs.executorInstances = int32(npReco.Spec.ExecutorInstances) + + matchResult, err := regexp.MatchString(k8sQuantitiesReg, npReco.Spec.DriverCoreRequest) + if err != nil || !matchResult { + return IlleagelArguementError{fmt.Errorf("invalid request: DriverCoreRequest should conform to the Kubernetes resource quantity convention")} + } + sparkResourceArgs.driverCoreRequest = npReco.Spec.DriverCoreRequest + + matchResult, err = regexp.MatchString(k8sQuantitiesReg, npReco.Spec.DriverMemory) + if err != nil || !matchResult { + return IlleagelArguementError{fmt.Errorf("invalid request: DriverMemory should conform to the Kubernetes resource quantity convention")} + } + sparkResourceArgs.driverMemory = npReco.Spec.DriverMemory + + matchResult, err = regexp.MatchString(k8sQuantitiesReg, npReco.Spec.ExecutorCoreRequest) + if err != nil || !matchResult { + return IlleagelArguementError{fmt.Errorf("invalid request: ExecutorCoreRequest should conform to the Kubernetes resource quantity convention")} + } + sparkResourceArgs.executorCoreRequest = npReco.Spec.ExecutorCoreRequest + + matchResult, err = regexp.MatchString(k8sQuantitiesReg, npReco.Spec.ExecutorMemory) + if err != nil || !matchResult { + return IlleagelArguementError{fmt.Errorf("invalid request: ExecutorMemory should conform to the Kubernetes resource quantity convention")} + } + sparkResourceArgs.executorMemory = npReco.Spec.ExecutorMemory + + recommendationID := uuid.New().String() + recoJobArgs = append(recoJobArgs, "--id", recommendationID) + recommendationApplication := &sparkv1.SparkApplication{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "sparkoperator.k8s.io/v1beta2", + Kind: "SparkApplication", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "pr-" + recommendationID, + Namespace: npReco.Namespace, + }, + Spec: sparkv1.SparkApplicationSpec{ + Type: "Python", + SparkVersion: sparkVersion, + Mode: "cluster", + Image: constStrToPointer(sparkImage), + ImagePullPolicy: constStrToPointer(sparkImagePullPolicy), + MainApplicationFile: constStrToPointer(sparkAppFile), + Arguments: recoJobArgs, + Driver: sparkv1.DriverSpec{ + CoreRequest: &npReco.Spec.DriverCoreRequest, + SparkPodSpec: sparkv1.SparkPodSpec{ + Memory: &npReco.Spec.DriverMemory, + Labels: map[string]string{ + "version": sparkVersion, + }, + EnvSecretKeyRefs: map[string]sparkv1.NameKey{ + "CH_USERNAME": { + Name: "clickhouse-secret", + Key: "username", + }, + "CH_PASSWORD": { + Name: "clickhouse-secret", + Key: "password", + }, + }, + ServiceAccount: constStrToPointer(sparkServiceAccount), + }, + }, + Executor: sparkv1.ExecutorSpec{ + CoreRequest: &npReco.Spec.ExecutorCoreRequest, + SparkPodSpec: sparkv1.SparkPodSpec{ + Memory: &npReco.Spec.ExecutorMemory, + Labels: map[string]string{ + "version": sparkVersion, + }, + EnvSecretKeyRefs: map[string]sparkv1.NameKey{ + "CH_USERNAME": { + Name: "clickhouse-secret", + Key: "username", + }, + "CH_PASSWORD": { + Name: "clickhouse-secret", + Key: "password", + }, + }, + }, + Instances: &sparkResourceArgs.executorInstances, + }, + }, + } + err = CreateSparkApplication(c.kubeClient, npReco.Namespace, recommendationApplication) + if err != nil { + return fmt.Errorf("failed to create Spark Application: %v", err) + } + klog.V(2).InfoS("Start SparkApplication", "id", recommendationID, "NetworkPolicyRecommendation", npReco.Name) + + return c.updateNPRecommendationStatus( + npReco, + crdv1alpha1.NetworkPolicyRecommendationStatus{ + State: crdv1alpha1.NPRecommendationStateScheduled, + SparkApplication: recommendationID, + StartTime: metav1.NewTime(time.Now()), + }, + ) +} + +func (c *NPRecommendationController) updateNPRecommendationStatus(npReco *crdv1alpha1.NetworkPolicyRecommendation, status crdv1alpha1.NetworkPolicyRecommendationStatus) error { + update := npReco.DeepCopy() + update.Status.State = status.State + if status.SparkApplication != "" { + update.Status.SparkApplication = status.SparkApplication + } + if status.CompletedStages != 0 { + update.Status.CompletedStages = status.CompletedStages + } + if status.TotalStages != 0 { + update.Status.TotalStages = status.TotalStages + } + if status.RecommendedNP != nil { + update.Status.RecommendedNP = status.RecommendedNP + } + if status.ErrorMsg != "" { + update.Status.ErrorMsg = status.ErrorMsg + } + if !status.StartTime.IsZero() { + update.Status.StartTime = status.StartTime + } + if !status.EndTime.IsZero() { + update.Status.EndTime = status.EndTime + } + _, err := c.crdClient.CrdV1alpha1().NetworkPolicyRecommendations(npReco.Namespace).UpdateStatus(context.TODO(), update, metav1.UpdateOptions{}) + return err +} + +func (c *NPRecommendationController) addPeriodicSync(key apimachinerytypes.NamespacedName) { + c.periodicResyncSetMutex.Lock() + defer c.periodicResyncSetMutex.Unlock() + c.periodicResyncSet[key] = struct{}{} +} + +func (c *NPRecommendationController) stopPeriodicSync(key apimachinerytypes.NamespacedName) { + c.periodicResyncSetMutex.Lock() + defer c.periodicResyncSetMutex.Unlock() + delete(c.periodicResyncSet, key) +} + func (c *NPRecommendationController) GetNetworkPolicyRecommendation(namespace, name string) (*crdv1alpha1.NetworkPolicyRecommendation, error) { return c.npRecommendationLister.NetworkPolicyRecommendations(namespace).Get(name) } + +func (c *NPRecommendationController) ListNetworkPolicyRecommendation(namespace string) ([]*crdv1alpha1.NetworkPolicyRecommendation, error) { + return c.npRecommendationLister.NetworkPolicyRecommendations(namespace).List(labels.Everything()) +} + +func (c *NPRecommendationController) DeleteNetworkPolicyRecommendation(namespace, name string) error { + return c.crdClient.CrdV1alpha1().NetworkPolicyRecommendations(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{}) +} + +func (c *NPRecommendationController) CreateNetworkPolicyRecommendation(namespace string, networkPolicyRecommendation *crdv1alpha1.NetworkPolicyRecommendation) (*crdv1alpha1.NetworkPolicyRecommendation, error) { + return c.crdClient.CrdV1alpha1().NetworkPolicyRecommendations(namespace).Create(context.TODO(), networkPolicyRecommendation, metav1.CreateOptions{}) +} diff --git a/pkg/controller/networkpolicyrecommendation/controller_test.go b/pkg/controller/networkpolicyrecommendation/controller_test.go new file mode 100644 index 000000000..e97eee868 --- /dev/null +++ b/pkg/controller/networkpolicyrecommendation/controller_test.go @@ -0,0 +1,665 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package networkpolicyrecommendation + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "strings" + "sync" + "testing" + "time" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + apimachinerytypes "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/rand" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" + k8stesting "k8s.io/client-go/testing" + "k8s.io/klog/v2" + + crdv1alpha1 "antrea.io/theia/pkg/apis/crd/v1alpha1" + "antrea.io/theia/pkg/client/clientset/versioned" + fakecrd "antrea.io/theia/pkg/client/clientset/versioned/fake" + crdinformers "antrea.io/theia/pkg/client/informers/externalversions" + "antrea.io/theia/third_party/sparkoperator/v1beta2" +) + +const informerDefaultResync = 30 * time.Second + +var ( + testNamespace = "controller-test" +) + +type fakeController struct { + *NPRecommendationController + crdClient versioned.Interface + kubeClient kubernetes.Interface + crdInformerFactory crdinformers.SharedInformerFactory +} + +func newFakeController() *fakeController { + kubeClient := fake.NewSimpleClientset() + createClickHousePod(kubeClient) + createSparkOperatorPod(kubeClient) + createClickHouseService(kubeClient) + createClickHouseSecret(kubeClient) + crdClient := fakecrd.NewSimpleClientset() + + crdClient.PrependReactor("create", "recommendednetworkpolicies", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + rnp := action.(k8stesting.CreateAction).GetObject().(*crdv1alpha1.RecommendedNetworkPolicy) + rnp.Name = fmt.Sprintf("%s%s", rnp.GenerateName, rand.String(8)) + return false, rnp, nil + }) + + crdInformerFactory := crdinformers.NewSharedInformerFactory(crdClient, informerDefaultResync) + npRecommendationInformer := crdInformerFactory.Crd().V1alpha1().NetworkPolicyRecommendations() + recommendedNPInformer := crdInformerFactory.Crd().V1alpha1().RecommendedNetworkPolicies() + + nprController := NewNPRecommendationController(crdClient, kubeClient, npRecommendationInformer, recommendedNPInformer) + + return &fakeController{ + nprController, + crdClient, + kubeClient, + crdInformerFactory, + } +} + +func createClickHousePod(kubeClient kubernetes.Interface) { + clickHousePod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "clickhouse", + Namespace: testNamespace, + Labels: map[string]string{"app": "clickhouse"}, + }, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + }, + } + kubeClient.CoreV1().Pods(testNamespace).Create(context.TODO(), clickHousePod, metav1.CreateOptions{}) +} + +func createSparkOperatorPod(kubeClient kubernetes.Interface) { + sparkOperatorPod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "spark-operator", + Namespace: testNamespace, + Labels: map[string]string{ + "app.kubernetes.io/name": "spark-operator", + }, + }, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + }, + } + kubeClient.CoreV1().Pods(testNamespace).Create(context.TODO(), sparkOperatorPod, metav1.CreateOptions{}) +} + +func createClickHouseService(kubeClient kubernetes.Interface) { + clickhouseService := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "clickhouse-clickhouse", + Namespace: testNamespace, + }, + Spec: v1.ServiceSpec{ + Ports: []v1.ServicePort{{Name: "tcp", Port: 9000}}, + ClusterIP: "10.98.208.26", + }, + } + kubeClient.CoreV1().Services(testNamespace).Create(context.TODO(), clickhouseService, metav1.CreateOptions{}) +} + +func createClickHouseSecret(kubeClient kubernetes.Interface) { + clickhouseSecret := &v1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "clickhouse-secret", + Namespace: testNamespace, + }, + Data: map[string][]byte{ + "username": []byte("clickhouse_operator"), + "password": []byte("clickhouse_operator_password"), + }, + } + kubeClient.CoreV1().Secrets(testNamespace).Create(context.TODO(), clickhouseSecret, metav1.CreateOptions{}) +} + +func createFakeSparkApplicationService(kubeClient kubernetes.Interface, id string) error { + testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch strings.TrimSpace(r.URL.Path) { + case "/api/v1/applications": + responses := []map[string]interface{}{ + {"id": id}, + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(responses) + case fmt.Sprintf("/api/v1/applications/%s/stages", id): + responses := []map[string]interface{}{ + {"status": "COMPLETE"}, + {"status": "COMPLETE"}, + {"status": "SKIPPED"}, + {"status": "PENDING"}, + {"status": "ACTIVE"}, + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(responses) + } + })) + + GetSparkMonitoringSvcDNS = func(id, namespace string) string { + return testServer.URL + } + return nil +} + +// mock Spark Applications +type fakeSparkApplicationClient struct { + sparkApplications map[apimachinerytypes.NamespacedName]*v1beta2.SparkApplication + mapMutex sync.Mutex +} + +func (f *fakeSparkApplicationClient) create(client kubernetes.Interface, namespace string, recommendationApplication *v1beta2.SparkApplication) error { + namespacedName := apimachinerytypes.NamespacedName{ + Namespace: namespace, + Name: recommendationApplication.Name, + } + klog.InfoS("Spark Application created", "name", recommendationApplication.ObjectMeta.Name, "namespace", namespace) + f.mapMutex.Lock() + defer f.mapMutex.Unlock() + f.sparkApplications[namespacedName] = recommendationApplication + return nil +} + +func (f *fakeSparkApplicationClient) delete(client kubernetes.Interface, name, namespace string) { + namespacedName := apimachinerytypes.NamespacedName{ + Namespace: namespace, + Name: name, + } + f.mapMutex.Lock() + defer f.mapMutex.Unlock() + delete(f.sparkApplications, namespacedName) +} + +func (f *fakeSparkApplicationClient) list(client kubernetes.Interface, namespace string) (*v1beta2.SparkApplicationList, error) { + f.mapMutex.Lock() + defer f.mapMutex.Unlock() + list := make([]v1beta2.SparkApplication, len(f.sparkApplications)) + index := 0 + for _, item := range f.sparkApplications { + list[index] = *item + index++ + } + saList := &v1beta2.SparkApplicationList{ + Items: list, + } + return saList, nil +} + +func (f *fakeSparkApplicationClient) get(client kubernetes.Interface, name, namespace string) (sparkApp v1beta2.SparkApplication, err error) { + namespacedName := apimachinerytypes.NamespacedName{ + Namespace: namespace, + Name: name, + } + f.mapMutex.Lock() + defer f.mapMutex.Unlock() + return *f.sparkApplications[namespacedName], nil +} + +func (f *fakeSparkApplicationClient) step(name, namespace string) { + namespacedName := apimachinerytypes.NamespacedName{ + Namespace: namespace, + Name: name, + } + f.mapMutex.Lock() + defer f.mapMutex.Unlock() + sa, ok := f.sparkApplications[namespacedName] + if !ok { + klog.InfoS("Spark Application not created yet", "name", name, "namespace", namespace) + return + } + switch sa.Status.AppState.State { + case v1beta2.NewState: + klog.InfoS("Spark Application setting from new to running") + sa.Status.AppState.State = v1beta2.RunningState + case v1beta2.RunningState: + klog.InfoS("Spark Application setting from running to completed") + sa.Status.AppState.State = v1beta2.CompletedState + } +} + +func TestNPRecommendation(t *testing.T) { + fakeSAClient := fakeSparkApplicationClient{ + sparkApplications: make(map[apimachinerytypes.NamespacedName]*v1beta2.SparkApplication), + } + CreateSparkApplication = fakeSAClient.create + DeleteSparkApplication = fakeSAClient.delete + ListSparkApplication = fakeSAClient.list + GetSparkApplication = fakeSAClient.get + + // Use a shorter resync period + npRecommendationResyncPeriod = 500 * time.Millisecond + + nprController := newFakeController() + stopCh := make(chan struct{}) + + nprController.crdInformerFactory.Start(stopCh) + nprController.crdInformerFactory.WaitForCacheSync(stopCh) + + go nprController.Run(stopCh) + + t.Run("NormalNetworkPolicyRecommendation", func(t *testing.T) { + npr := &crdv1alpha1.NetworkPolicyRecommendation{ + ObjectMeta: metav1.ObjectMeta{Name: "npr", Namespace: testNamespace}, + Spec: crdv1alpha1.NetworkPolicyRecommendationSpec{ + JobType: "initial", + PolicyType: "anp-deny-applied", + ExecutorInstances: 1, + DriverCoreRequest: "200m", + DriverMemory: "512M", + ExecutorCoreRequest: "200m", + ExecutorMemory: "512M", + ExcludeLabels: true, + ToServices: true, + StartInterval: metav1.NewTime(time.Now()), + EndInterval: metav1.NewTime(time.Now().Add(time.Second * 10)), + NSAllowList: []string{"kube-system", "flow-visibility"}, + }, + Status: crdv1alpha1.NetworkPolicyRecommendationStatus{}, + } + + npr, err := nprController.CreateNetworkPolicyRecommendation(testNamespace, npr) + assert.NoError(t, err) + + serviceCreated := false + // The step interval should be larger than resync period to ensure the progress is updated + stepInterval := 1 * time.Second + timeout := 30 * time.Second + + wait.PollImmediate(stepInterval, timeout, func() (done bool, err error) { + npr, err = nprController.GetNetworkPolicyRecommendation(testNamespace, "npr") + if err != nil { + return false, nil + } + // Mocking ClickHouse results and Spark Monitor service requires + // the SparkApplication id. + if !serviceCreated { + // Mock ClickHouse database + openSql = func(driverName, dataSourceName string) (*sql.DB, error) { + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual), sqlmock.MonitorPingsOption(true)) + if err != nil { + return db, err + } + mock.ExpectPing() + recommendationRow := sqlmock.NewRows([]string{"type", "timeCreated", "yamls"}).AddRow("initial", "2022-10-01T12:30:10Z", "recommendations") + mock.ExpectQuery("SELECT type, timeCreated, yamls FROM recommendations WHERE id = (?);").WithArgs(npr.Status.SparkApplication).WillReturnRows(recommendationRow) + return db, err + } + // Create Spark Monitor service + err = createFakeSparkApplicationService(nprController.kubeClient, npr.Status.SparkApplication) + assert.NoError(t, err) + serviceCreated = true + } + if npr != nil { + fakeSAClient.step("pr-"+npr.Status.SparkApplication, testNamespace) + } + return !(npr.Status.RecommendedNP == nil), nil + }) + + assert.Equal(t, crdv1alpha1.NPRecommendationStateCompleted, npr.Status.State) + assert.Equal(t, 3, npr.Status.CompletedStages) + assert.Equal(t, 5, npr.Status.TotalStages) + assert.True(t, npr.Status.StartTime.Before(&npr.Status.EndTime)) + assert.Equal(t, npr.Status.SparkApplication, npr.Status.RecommendedNP.Spec.Id) + assert.Equal(t, "initial", npr.Status.RecommendedNP.Spec.Type) + expectedTimeCreated, err := time.Parse(clickHouseTimeFormat, "2022-10-01T12:30:10Z") + assert.NoError(t, err) + assert.Equal(t, metav1.NewTime(expectedTimeCreated), npr.Status.RecommendedNP.Spec.TimeCreated) + assert.Equal(t, "recommendations", npr.Status.RecommendedNP.Spec.Yamls) + + nprList, err := nprController.ListNetworkPolicyRecommendation(testNamespace) + assert.NoError(t, err) + assert.Equal(t, 1, len(nprList), "Expected exactly one NetworkPolicyRecommendation, got %d", len(nprList)) + assert.Equal(t, npr, nprList[0]) + + openSql = func(driverName, dataSourceName string) (*sql.DB, error) { + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual), sqlmock.MonitorPingsOption(true)) + if err != nil { + return db, err + } + mock.ExpectPing() + mock.ExpectExec("ALTER TABLE recommendations_local ON CLUSTER '{cluster}' DELETE WHERE id = (?);").WithArgs(npr.Status.SparkApplication).WillReturnResult(sqlmock.NewResult(0, 1)) + return db, err + } + err = nprController.DeleteNetworkPolicyRecommendation(testNamespace, "npr") + assert.NoError(t, err) + }) + + testCases := []struct { + name string + nprName string + npr *crdv1alpha1.NetworkPolicyRecommendation + expectedErrorMsg string + }{ + { + name: "invalid JobType", + nprName: "npr-invalid-job-type", + npr: &crdv1alpha1.NetworkPolicyRecommendation{ + ObjectMeta: metav1.ObjectMeta{Name: "npr-invalid-job-type", Namespace: testNamespace}, + Spec: crdv1alpha1.NetworkPolicyRecommendationSpec{ + JobType: "nonexistent-job-type", + }, + }, + expectedErrorMsg: "invalid request: recommendation type should be 'initial' or 'subsequent'", + }, + { + name: "invalid Limit", + nprName: "npr-invalid-limit", + npr: &crdv1alpha1.NetworkPolicyRecommendation{ + ObjectMeta: metav1.ObjectMeta{Name: "npr-invalid-limit", Namespace: testNamespace}, + Spec: crdv1alpha1.NetworkPolicyRecommendationSpec{ + JobType: "initial", + Limit: -1, + }, + }, + expectedErrorMsg: "invalid request: limit should be an integer >= 0", + }, + { + name: "invalid PolicyType", + nprName: "npr-invalid-policy-type", + npr: &crdv1alpha1.NetworkPolicyRecommendation{ + ObjectMeta: metav1.ObjectMeta{Name: "npr-invalid-policy-type", Namespace: testNamespace}, + Spec: crdv1alpha1.NetworkPolicyRecommendationSpec{ + JobType: "initial", + PolicyType: "nonexistent-policy-type", + }, + }, + expectedErrorMsg: "invalid request: type of generated NetworkPolicy should be anp-deny-applied or anp-deny-all or k8s-np", + }, + { + name: "invalid EndInterval", + nprName: "npr-invalid-end-interval", + npr: &crdv1alpha1.NetworkPolicyRecommendation{ + ObjectMeta: metav1.ObjectMeta{Name: "npr-invalid-end-interval", Namespace: testNamespace}, + Spec: crdv1alpha1.NetworkPolicyRecommendationSpec{ + JobType: "initial", + PolicyType: "anp-deny-all", + StartInterval: metav1.NewTime(time.Now().Add(time.Second * 10)), + EndInterval: metav1.NewTime(time.Now()), + }, + }, + expectedErrorMsg: "invalid request: EndInterval should be after StartInterval", + }, + { + name: "invalid ExecutorInstances", + nprName: "npr-invalid-executor-instances", + npr: &crdv1alpha1.NetworkPolicyRecommendation{ + ObjectMeta: metav1.ObjectMeta{Name: "npr-invalid-executor-instances", Namespace: testNamespace}, + Spec: crdv1alpha1.NetworkPolicyRecommendationSpec{ + JobType: "initial", + PolicyType: "k8s-np", + ExecutorInstances: -1, + }, + }, + expectedErrorMsg: "invalid request: ExecutorInstances should be an integer >= 0", + }, + { + name: "invalid DriverCoreRequest", + nprName: "npr-invalid-driver-core-request", + npr: &crdv1alpha1.NetworkPolicyRecommendation{ + ObjectMeta: metav1.ObjectMeta{Name: "npr-invalid-driver-core-request", Namespace: testNamespace}, + Spec: crdv1alpha1.NetworkPolicyRecommendationSpec{ + JobType: "initial", + PolicyType: "k8s-np", + ExecutorInstances: 1, + DriverCoreRequest: "m200", + }, + }, + expectedErrorMsg: "invalid request: DriverCoreRequest should conform to the Kubernetes resource quantity convention", + }, + { + name: "invalid DriverMemory", + nprName: "npr-invalid-driver-memory", + npr: &crdv1alpha1.NetworkPolicyRecommendation{ + ObjectMeta: metav1.ObjectMeta{Name: "npr-invalid-driver-memory", Namespace: testNamespace}, + Spec: crdv1alpha1.NetworkPolicyRecommendationSpec{ + JobType: "initial", + PolicyType: "k8s-np", + ExecutorInstances: 1, + DriverCoreRequest: "200m", + DriverMemory: "m512", + }, + }, + expectedErrorMsg: "invalid request: DriverMemory should conform to the Kubernetes resource quantity convention", + }, + { + name: "invalid ExecutorCoreRequest", + nprName: "npr-invalid-executor-core-request", + npr: &crdv1alpha1.NetworkPolicyRecommendation{ + ObjectMeta: metav1.ObjectMeta{Name: "npr-invalid-executor-core-request", Namespace: testNamespace}, + Spec: crdv1alpha1.NetworkPolicyRecommendationSpec{ + JobType: "initial", + PolicyType: "k8s-np", + ExecutorInstances: 1, + DriverCoreRequest: "200m", + DriverMemory: "512M", + ExecutorCoreRequest: "m200", + }, + }, + expectedErrorMsg: "invalid request: ExecutorCoreRequest should conform to the Kubernetes resource quantity convention", + }, + { + name: "invalid ExecutorMemory", + nprName: "npr-invalid-executor-memory", + npr: &crdv1alpha1.NetworkPolicyRecommendation{ + ObjectMeta: metav1.ObjectMeta{Name: "npr-invalid-executor-memory", Namespace: testNamespace}, + Spec: crdv1alpha1.NetworkPolicyRecommendationSpec{ + JobType: "initial", + PolicyType: "k8s-np", + ExecutorInstances: 1, + DriverCoreRequest: "200m", + DriverMemory: "512M", + ExecutorCoreRequest: "200m", + ExecutorMemory: "m512", + }, + }, + expectedErrorMsg: "invalid request: ExecutorMemory should conform to the Kubernetes resource quantity convention", + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + npr, err := nprController.CreateNetworkPolicyRecommendation(testNamespace, tc.npr) + assert.NoError(t, err) + stepInterval := 100 * time.Millisecond + timeout := 30 * time.Second + wait.PollImmediate(stepInterval, timeout, func() (done bool, err error) { + npr, err = nprController.GetNetworkPolicyRecommendation(testNamespace, tc.nprName) + if err != nil { + return false, nil + } + if npr.Status.State == crdv1alpha1.NPRecommendationStateFailed { + assert.Contains(t, npr.Status.ErrorMsg, tc.expectedErrorMsg) + return true, nil + } + return false, nil + }) + }) + } +} + +func TestValidateCluster(t *testing.T) { + testCases := []struct { + name string + setupClient func(kubernetes.Interface) + expectedErrorMsg string + }{ + { + name: "clickhouse pod not found", + setupClient: func(i kubernetes.Interface) {}, + expectedErrorMsg: "failed to find the ClickHouse Pod, please check the deployment", + }, + { + name: "spark operator pod not found", + setupClient: func(client kubernetes.Interface) { + createClickHousePod(client) + }, + expectedErrorMsg: "failed to find the Spark Operator Pod, please check the deployment", + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + kubeClient := fake.NewSimpleClientset() + tc.setupClient(kubeClient) + err := validateCluster(kubeClient, testNamespace) + assert.Contains(t, err.Error(), tc.expectedErrorMsg) + }) + } +} + +func TestGetPolicyRecommendationProgress(t *testing.T) { + sparkAppID := "spark-application-id" + testCases := []struct { + name string + testServer *httptest.Server + expectedErrorMsg string + }{ + { + name: "more than one spark application", + testServer: httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch strings.TrimSpace(r.URL.Path) { + case "/api/v1/applications": + responses := []map[string]interface{}{ + {"id": sparkAppID}, + {"id": sparkAppID}, + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(responses) + } + })), + expectedErrorMsg: "wrong Spark Application number, expected 1, got 2", + }, + { + name: "no spark monitor service", + testServer: nil, + expectedErrorMsg: "failed to get response from the Spark Monitoring Service", + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + var err error + if tc.testServer != nil { + defer tc.testServer.Close() + _, _, err = getPolicyRecommendationProgress(tc.testServer.URL) + } else { + _, _, err = getPolicyRecommendationProgress("http://127.0.0.1") + } + assert.Contains(t, err.Error(), tc.expectedErrorMsg) + }) + } +} + +func TestGetPolicyRecommendationResult(t *testing.T) { + sparkAppID := "spark-application-id" + var db *sql.DB + + testCases := []struct { + name string + setup func(kubernetes.Interface) + expectedErrorMsg string + }{ + { + name: "no ClickHouse service", + setup: func(client kubernetes.Interface) {}, + expectedErrorMsg: "error when getting the ClickHouse Service address: error when finding the Service clickhouse-clickhouse", + }, + { + name: "no ClickHouse secret", + setup: func(client kubernetes.Interface) { + createClickHouseService(client) + }, + expectedErrorMsg: "error when finding the ClickHouse secret", + }, + { + name: "connection error for ClickHouse", + setup: func(client kubernetes.Interface) { + createClickHouseService(client) + createClickHouseSecret(client) + openSql = func(driverName, dataSourceName string) (*sql.DB, error) { + return nil, fmt.Errorf("connection error") + } + }, + expectedErrorMsg: "failed to open ClickHouse: connection error", + }, + { + name: "ping error for ClickHouse", + setup: func(client kubernetes.Interface) { + createClickHouseService(client) + createClickHouseSecret(client) + openSql = func(driverName, dataSourceName string) (*sql.DB, error) { + var err error + db, _, err = sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual), sqlmock.MonitorPingsOption(true)) + return db, err + } + }, + expectedErrorMsg: "error when connecting to ClickHouse, failed to ping ClickHouse", + }, + { + name: "no result in ClickHouse", + setup: func(client kubernetes.Interface) { + createClickHouseService(client) + createClickHouseSecret(client) + openSql = func(driverName, dataSourceName string) (*sql.DB, error) { + var err error + var mock sqlmock.Sqlmock + db, mock, err = sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual), sqlmock.MonitorPingsOption(true)) + if err != nil { + return db, err + } + mock.ExpectPing() + mock.ExpectQuery("SELECT type, timeCreated, yamls FROM recommendations WHERE id = (?);").WithArgs(sparkAppID).WillReturnError(sql.ErrNoRows) + return db, err + } + }, + expectedErrorMsg: fmt.Sprintf("failed to get recommendation result with id %s", sparkAppID), + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + kubeClient := fake.NewSimpleClientset() + tc.setup(kubeClient) + if db != nil { + defer db.Close() + } + connect, err := setupClickHouseConnection(kubeClient, testNamespace) + if err != nil { + assert.Contains(t, err.Error(), tc.expectedErrorMsg) + } else { + _, err := getPolicyRecommendationResult(connect, sparkAppID) + assert.Contains(t, err.Error(), tc.expectedErrorMsg) + } + }) + } +} diff --git a/pkg/controller/networkpolicyrecommendation/util.go b/pkg/controller/networkpolicyrecommendation/util.go new file mode 100644 index 000000000..42b7aac09 --- /dev/null +++ b/pkg/controller/networkpolicyrecommendation/util.go @@ -0,0 +1,318 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package networkpolicyrecommendation + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + "time" + + "github.com/ClickHouse/clickhouse-go" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + + crdv1alpha1 "antrea.io/theia/pkg/apis/crd/v1alpha1" + sparkv1 "antrea.io/theia/third_party/sparkoperator/v1beta2" +) + +var openSql = sql.Open + +func constStrToPointer(constStr string) *string { + return &constStr +} + +func validateCluster(client kubernetes.Interface, namespace string) error { + err := checkPodByLabel(client, namespace, "app=clickhouse") + if err != nil { + return fmt.Errorf("failed to find the ClickHouse Pod, please check the deployment, error: %v", err) + } + err = checkPodByLabel(client, namespace, "app.kubernetes.io/name=spark-operator") + if err != nil { + return fmt.Errorf("failed to find the Spark Operator Pod, please check the deployment, error: %v", err) + } + return nil +} + +func checkPodByLabel(client kubernetes.Interface, namespace string, label string) error { + pods, err := client.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{ + LabelSelector: label, + }) + if err != nil { + return fmt.Errorf("error %v when finding the Pod", err) + } + if len(pods.Items) < 1 { + return fmt.Errorf("expected at least 1 pod, but found %d", len(pods.Items)) + } + hasRunningPod := false + for _, pod := range pods.Items { + if pod.Status.Phase == "Running" { + hasRunningPod = true + break + } + } + if !hasRunningPod { + return fmt.Errorf("can't find a running Pod") + } + return nil +} + +func getPolicyRecommendationStatus(client kubernetes.Interface, id string, namespace string) (state string, errorMessage string, err error) { + sparkApplication, err := GetSparkApplication(client, "pr-"+id, namespace) + if err != nil { + return state, errorMessage, err + } + state = strings.TrimSpace(string(sparkApplication.Status.AppState.State)) + errorMessage = strings.TrimSpace(string(sparkApplication.Status.AppState.ErrorMessage)) + + return state, errorMessage, nil +} + +func getServiceAddr(client kubernetes.Interface, serviceName, serviceNamespace, servicePortName string) (string, int, error) { + var serviceIP string + var servicePort int + service, err := client.CoreV1().Services(serviceNamespace).Get(context.TODO(), serviceName, metav1.GetOptions{}) + if err != nil { + return serviceIP, servicePort, fmt.Errorf("error when finding the Service %s: %v", serviceName, err) + } + serviceIP = service.Spec.ClusterIP + for _, port := range service.Spec.Ports { + if port.Name == servicePortName { + servicePort = int(port.Port) + } + } + if servicePort == 0 { + return serviceIP, servicePort, fmt.Errorf("error when finding the Service %s: no %s service port", serviceName, servicePortName) + } + return serviceIP, servicePort, nil +} + +func getResponseFromSparkMonitoringSvc(url string) ([]byte, error) { + sparkMonitoringClient := http.Client{} + request, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + return nil, err + } + res, err := sparkMonitoringClient.Do(request) + if err != nil { + return nil, err + } + if res == nil { + return nil, fmt.Errorf("response is nil") + } + if res.Body != nil { + defer res.Body.Close() + } + body, readErr := io.ReadAll(res.Body) + if readErr != nil { + return nil, readErr + } + return body, nil +} + +func getPolicyRecommendationProgress(baseUrl string) (completedStages int, totalStages int, err error) { + // Get the id of current Spark application + url := fmt.Sprintf("%s/api/v1/applications", baseUrl) + response, err := getResponseFromSparkMonitoringSvc(url) + if err != nil { + return completedStages, totalStages, fmt.Errorf("failed to get response from the Spark Monitoring Service: %v", err) + } + var getAppsResult []map[string]interface{} + json.Unmarshal([]byte(response), &getAppsResult) + if len(getAppsResult) != 1 { + return completedStages, totalStages, fmt.Errorf("wrong Spark Application number, expected 1, got %d", len(getAppsResult)) + } + sparkAppID := getAppsResult[0]["id"] + // Check the percentage of completed stages + url = fmt.Sprintf("%s/api/v1/applications/%s/stages", baseUrl, sparkAppID) + response, err = getResponseFromSparkMonitoringSvc(url) + if err != nil { + return completedStages, totalStages, fmt.Errorf("failed to get response from the Spark Monitoring Service: %v", err) + } + var getStagesResult []map[string]interface{} + json.Unmarshal([]byte(response), &getStagesResult) + // totalStages can be 0 when the SparkApplication just starts and the stages have not be determined + totalStages = len(getStagesResult) + completedStages = 0 + for _, stage := range getStagesResult { + if stage["status"] == "COMPLETE" || stage["status"] == "SKIPPED" { + completedStages++ + } + } + return completedStages, totalStages, nil +} + +func getPolicyRecommendationResult(connect *sql.DB, id string) (*crdv1alpha1.RecommendedNetworkPolicy, error) { + var resultType, resultTimeCreatedStr, resultYamls string + query := "SELECT type, timeCreated, yamls FROM recommendations WHERE id = (?);" + err := connect.QueryRow(query, id).Scan(&resultType, &resultTimeCreatedStr, &resultYamls) + if err != nil { + return nil, fmt.Errorf("failed to get recommendation result with id %s: %v", id, err) + } + resultTimeCreated, err := time.Parse(clickHouseTimeFormat, resultTimeCreatedStr) + if err != nil { + return nil, fmt.Errorf("failed to parse timeCreated of the result for %s: %v", id, err) + } + recommendedNetworkPolicy := &crdv1alpha1.RecommendedNetworkPolicy{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "rnp-", + }, + Spec: crdv1alpha1.RecommendedNetworkPolicySpec{ + Id: id, + Type: resultType, + TimeCreated: metav1.NewTime(resultTimeCreated), + Yamls: resultYamls, + }, + } + return recommendedNetworkPolicy, nil +} + +func getClickHouseSecret(client kubernetes.Interface, namespace string) (username []byte, password []byte, err error) { + secret, err := client.CoreV1().Secrets(namespace).Get(context.TODO(), "clickhouse-secret", metav1.GetOptions{}) + if err != nil { + return username, password, fmt.Errorf("error when finding the ClickHouse secret. Error: %v", err) + } + username, ok := secret.Data["username"] + if !ok { + return username, password, fmt.Errorf("error when getting the ClickHouse username") + } + password, ok = secret.Data["password"] + if !ok { + return username, password, fmt.Errorf("error when getting the ClickHouse password") + } + return username, password, nil +} + +func connectClickHouse(url string) (*sql.DB, error) { + var connect *sql.DB + // Open the database and ping it + var err error + connect, err = openSql("clickhouse", url) + if err != nil { + return connect, fmt.Errorf("failed to open ClickHouse: %v", err) + } + if err := connect.Ping(); err != nil { + if exception, ok := err.(*clickhouse.Exception); ok { + return connect, fmt.Errorf("failed to ping ClickHouse: %v", exception.Message) + } else { + return connect, fmt.Errorf("failed to ping ClickHouse: %v", err) + } + } + return connect, nil +} + +func setupClickHouseConnection(client kubernetes.Interface, namespace string) (connect *sql.DB, err error) { + serviceIP, servicePort, err := getServiceAddr(client, "clickhouse-clickhouse", namespace, "tcp") + if err != nil { + return connect, fmt.Errorf("error when getting the ClickHouse Service address: %v", err) + } + endpoint := fmt.Sprintf("tcp://%s:%d", serviceIP, servicePort) + + // Connect to ClickHouse and execute query + username, password, err := getClickHouseSecret(client, namespace) + if err != nil { + return nil, err + } + url := fmt.Sprintf("%s?debug=false&username=%s&password=%s", endpoint, username, password) + connect, err = connectClickHouse(url) + if err != nil { + return nil, fmt.Errorf("error when connecting to ClickHouse, %v", err) + } + return connect, nil +} + +func deleteSparkApplicationIfExists(client kubernetes.Interface, namespace string, id string) error { + sparkApplicationList, err := ListSparkApplication(client, namespace) + if err != nil { + return err + } + existed := false + for _, sparkApplication := range sparkApplicationList.Items { + existedId := sparkApplication.ObjectMeta.Name[3:] + if existedId == id { + existed = true + } + } + if existed { + DeleteSparkApplication(client, "pr-"+id, namespace) + } + return nil +} + +func deletePolicyRecommendationResult(connect *sql.DB, id string) (err error) { + query := "ALTER TABLE recommendations_local ON CLUSTER '{cluster}' DELETE WHERE id = (?);" + _, err = connect.Exec(query, id) + if err != nil { + return fmt.Errorf("failed to delete recommendation result with id %s: %v", id, err) + } + return nil +} + +func getSparkApplication(client kubernetes.Interface, name string, namespace string) (sparkApp sparkv1.SparkApplication, err error) { + err = client.CoreV1().RESTClient().Get(). + AbsPath("/apis/sparkoperator.k8s.io/v1beta2"). + Namespace(namespace). + Resource("sparkapplications"). + Name(name). + Do(context.TODO()). + Into(&sparkApp) + if err != nil { + return sparkApp, err + } + return sparkApp, nil +} + +func listSparkApplication(client kubernetes.Interface, namespace string) (*sparkv1.SparkApplicationList, error) { + sparkApplicationList := &sparkv1.SparkApplicationList{} + err := client.CoreV1().RESTClient().Get(). + AbsPath("/apis/sparkoperator.k8s.io/v1beta2"). + Namespace(namespace). + Resource("sparkapplications"). + Do(context.TODO()).Into(sparkApplicationList) + return sparkApplicationList, err +} + +func deleteSparkApplication(client kubernetes.Interface, name string, namespace string) { + client.CoreV1().RESTClient().Delete(). + AbsPath("/apis/sparkoperator.k8s.io/v1beta2"). + Namespace(namespace). + Resource("sparkapplications"). + Name(name). + Do(context.TODO()) +} + +func createSparkApplication(client kubernetes.Interface, namespace string, recommendationApplication *sparkv1.SparkApplication) error { + response := &sparkv1.SparkApplication{} + return client.CoreV1().RESTClient(). + Post(). + AbsPath("/apis/sparkoperator.k8s.io/v1beta2"). + Namespace(namespace). + Resource("sparkapplications"). + Body(recommendationApplication). + Do(context.TODO()). + Into(response) +} + +type IlleagelArguementError struct { + error +} + +func getSparkMonitoringSvcDNS(id string, namespace string) string { + return fmt.Sprintf("http://pr-%s-ui-svc.%s.svc:%d", id, namespace, sparkPort) +} diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 26d885c09..bc114d2e1 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -20,4 +20,7 @@ import ( type NPRecommendationQuerier interface { GetNetworkPolicyRecommendation(namespace, name string) (*v1alpha1.NetworkPolicyRecommendation, error) + ListNetworkPolicyRecommendation(namespace string) ([]*v1alpha1.NetworkPolicyRecommendation, error) + DeleteNetworkPolicyRecommendation(namespace, name string) error + CreateNetworkPolicyRecommendation(namespace string, networkPolicyRecommendation *v1alpha1.NetworkPolicyRecommendation) (*v1alpha1.NetworkPolicyRecommendation, error) }