diff --git a/.github/workflows/kind.yml b/.github/workflows/kind.yml index 58d9de1b4..d457313d6 100644 --- a/.github/workflows/kind.yml +++ b/.github/workflows/kind.yml @@ -144,6 +144,9 @@ jobs: - build-clickhouse-server-image runs-on: [ubuntu-latest] steps: + - name: Check disk space + run: | + df -h - name: Free disk space # https://github.com/actions/virtual-environments/issues/709 run: | diff --git a/build/charts/theia/templates/theia-manager/clusterrole.yaml b/build/charts/theia/templates/theia-manager/clusterrole.yaml index 3b1a67501..e4e46172a 100644 --- a/build/charts/theia/templates/theia-manager/clusterrole.yaml +++ b/build/charts/theia/templates/theia-manager/clusterrole.yaml @@ -19,10 +19,4 @@ rules: - apiGroups: ["crd.theia.antrea.io"] resources: ["networkpolicyrecommendations"] verbs: ["get", "list", "watch", "create", "delete"] - - apiGroups: [ "" ] - resources: [ "pods" ] - verbs: [ "list"] - - apiGroups: [ "" ] - resources: [ "services", "secrets" ] - verbs: [ "get" ] {{- end }} diff --git a/cmd/theia-manager/theia-manager.go b/cmd/theia-manager/theia-manager.go index d88dcb8b1..401bd5d40 100644 --- a/cmd/theia-manager/theia-manager.go +++ b/cmd/theia-manager/theia-manager.go @@ -21,7 +21,6 @@ import ( "antrea.io/antrea/pkg/log" "antrea.io/antrea/pkg/signals" "antrea.io/antrea/pkg/util/cipher" - "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/klog/v2" @@ -53,16 +52,9 @@ func run(o *Options) error { if err != nil { return fmt.Errorf("error when generating CRD client: %v", err) } - k8sClient, err := createK8sClient() - if err != nil { - return fmt.Errorf("error when creating K8s client: %v", err) - } crdInformerFactory := crdinformers.NewSharedInformerFactory(crdClient, informerDefaultResync) npRecommendationInformer := crdInformerFactory.Crd().V1alpha1().NetworkPolicyRecommendations() - npRecoController, err := networkpolicyrecommendation.NewNPRecommendationController(crdClient, k8sClient, npRecommendationInformer) - if err != nil { - return fmt.Errorf("error when creating networkPolicyRecommendation controller: %v", err) - } + npRecoController := networkpolicyrecommendation.NewNPRecommendationController(crdClient, npRecommendationInformer) cipherSuites, err := cipher.GenerateCipherSuitesList(o.config.APIServer.TLSCipherSuites) if err != nil { return fmt.Errorf("error when generating Cipher Suite list: %v", err) @@ -84,15 +76,3 @@ func run(o *Options) error { klog.InfoS("Stopping theia manager") return nil } - -func createK8sClient() (kubernetes.Interface, error) { - config, err := rest.InClusterConfig() - if err != nil { - return nil, err - } - k8sClient, err := kubernetes.NewForConfig(config) - if err != nil { - return nil, err - } - return k8sClient, nil -} diff --git a/pkg/apis/crd/v1alpha1/types.go b/pkg/apis/crd/v1alpha1/types.go index 856d63e5e..fc32df9d8 100644 --- a/pkg/apis/crd/v1alpha1/types.go +++ b/pkg/apis/crd/v1alpha1/types.go @@ -23,6 +23,14 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +const ( + NPRecommendationStateNew string = "NEW" + NPRecommendationStateScheduled string = "SCHEDULED" + NPRecommendationStateRunning string = "RUNNING" + NPRecommendationStateCompleted string = "COMPLETED" + NPRecommendationStateFailed string = "FAILED" +) + // +genclient // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object @@ -38,8 +46,8 @@ type NetworkPolicyRecommendationSpec struct { JobType string `json:"jobType,omitempty"` Limit int `json:"limit,omitempty"` PolicyType string `json:"policyType,omitempty"` - StartTime metav1.Time `json:"startTime,omitempty"` - EndTime metav1.Time `json:"endTime,omitempty"` + StartInterval metav1.Time `json:"startInterval,omitempty"` + EndInterval metav1.Time `json:"endInterval,omitempty"` NSAllowList []string `json:"nsAllowList,omitempty"` ExcludeLabels bool `json:"excludeLabels,omitempty"` ToServices bool `json:"toServices,omitempty"` @@ -51,7 +59,14 @@ type NetworkPolicyRecommendationSpec struct { } type NetworkPolicyRecommendationStatus struct { - State string `json:"state,omitempty"` + State string `json:"state,omitempty"` + SparkApplication string `json:"sparkApplication,omitempty"` + CompletedStages int `json:"completedStages,omitempty"` + TotalStages int `json:"totalStages,omitempty"` + RecommendedNP *RecommendedNetworkPolicy `json:"recommendedNetworkPolicy,omitempty"` + ErrorMsg string `json:"errorMsg,omitempty"` + StartTime metav1.Time `json:"startTime,omitempty"` + EndTime metav1.Time `json:"endTime,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object @@ -61,3 +76,28 @@ type NetworkPolicyRecommendationList struct { metav1.ListMeta `json:"metadata,omitempty"` Items []NetworkPolicyRecommendation `json:"items"` } + +// +genclient +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +type RecommendedNetworkPolicy struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec RecommendedNetworkPolicySpec `json:"spec,omitempty"` +} + +type RecommendedNetworkPolicySpec struct { + Id string `json:"id,omitempty"` + Type string `json:"resultType,omitempty"` + TimeCreated metav1.Time `json:"timeCreated,omitempty"` + Yamls string `json:"yamls,omitempty"` +} + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +type RecommendedNetworkPolicyList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []RecommendedNetworkPolicy `json:"items"` +} diff --git a/pkg/apis/crd/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/crd/v1alpha1/zz_generated.deepcopy.go index e11ec8131..90a423319 100644 --- a/pkg/apis/crd/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/crd/v1alpha1/zz_generated.deepcopy.go @@ -29,7 +29,7 @@ func (in *NetworkPolicyRecommendation) DeepCopyInto(out *NetworkPolicyRecommenda out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) in.Spec.DeepCopyInto(&out.Spec) - out.Status = in.Status + in.Status.DeepCopyInto(&out.Status) return } @@ -87,8 +87,8 @@ func (in *NetworkPolicyRecommendationList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *NetworkPolicyRecommendationSpec) DeepCopyInto(out *NetworkPolicyRecommendationSpec) { *out = *in - in.StartTime.DeepCopyInto(&out.StartTime) - in.EndTime.DeepCopyInto(&out.EndTime) + in.StartInterval.DeepCopyInto(&out.StartInterval) + in.EndInterval.DeepCopyInto(&out.EndInterval) if in.NSAllowList != nil { in, out := &in.NSAllowList, &out.NSAllowList *out = make([]string, len(*in)) @@ -110,6 +110,13 @@ func (in *NetworkPolicyRecommendationSpec) DeepCopy() *NetworkPolicyRecommendati // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *NetworkPolicyRecommendationStatus) DeepCopyInto(out *NetworkPolicyRecommendationStatus) { *out = *in + if in.RecommendedNP != nil { + in, out := &in.RecommendedNP, &out.RecommendedNP + *out = new(RecommendedNetworkPolicy) + (*in).DeepCopyInto(*out) + } + in.StartTime.DeepCopyInto(&out.StartTime) + in.EndTime.DeepCopyInto(&out.EndTime) return } @@ -122,3 +129,80 @@ func (in *NetworkPolicyRecommendationStatus) DeepCopy() *NetworkPolicyRecommenda in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RecommendedNetworkPolicy) DeepCopyInto(out *RecommendedNetworkPolicy) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RecommendedNetworkPolicy. +func (in *RecommendedNetworkPolicy) DeepCopy() *RecommendedNetworkPolicy { + if in == nil { + return nil + } + out := new(RecommendedNetworkPolicy) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *RecommendedNetworkPolicy) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RecommendedNetworkPolicyList) DeepCopyInto(out *RecommendedNetworkPolicyList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]RecommendedNetworkPolicy, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RecommendedNetworkPolicyList. +func (in *RecommendedNetworkPolicyList) DeepCopy() *RecommendedNetworkPolicyList { + if in == nil { + return nil + } + out := new(RecommendedNetworkPolicyList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *RecommendedNetworkPolicyList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RecommendedNetworkPolicySpec) DeepCopyInto(out *RecommendedNetworkPolicySpec) { + *out = *in + in.TimeCreated.DeepCopyInto(&out.TimeCreated) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RecommendedNetworkPolicySpec. +func (in *RecommendedNetworkPolicySpec) DeepCopy() *RecommendedNetworkPolicySpec { + if in == nil { + return nil + } + out := new(RecommendedNetworkPolicySpec) + in.DeepCopyInto(out) + return out +} diff --git a/pkg/apis/intelligence/v1alpha1/types.go b/pkg/apis/intelligence/v1alpha1/types.go index 91eb3e04f..b0a96e593 100644 --- a/pkg/apis/intelligence/v1alpha1/types.go +++ b/pkg/apis/intelligence/v1alpha1/types.go @@ -17,13 +17,13 @@ package v1alpha1 import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" const ( - NPRecommendationJobInitial string = "Initial" - NPRecommendationJobSubsequent string = "Subsequent" - NPRecommendationStateNew string = "NEW" - NPRecommendationStateScheduled string = "SCHEDULED" - NPRecommendationStateRunning string = "RUNNING" - NPRecommendationStateCompleted string = "COMPLETED" - NPRecommendationStateFailed string = "FAILED" + NPRecommendationJobInitial string = "Initial" + NPRecommendationJobSubsequent string = "Subsequent" + NPRecommendationJobNew string = "NEW" + NPRecommendationJobScheduled string = "SCHEDULED" + NPRecommendationJobRunning string = "RUNNING" + NPRecommendationJobCompleted string = "COMPLETED" + NPRecommendationJobFailed string = "FAILED" ) // +genclient @@ -37,8 +37,8 @@ type NetworkPolicyRecommendation struct { Type string `json:"jobType,omitempty"` Limit int `json:"limit,omitempty"` PolicyType string `json:"policyType,omitempty"` - IntervalStart metav1.Time `json:"intervalStart,omitempty"` - IntervalEnd metav1.Time `json:"intervalEnd,omitempty"` + StartInterval metav1.Time `json:"startInterval,omitempty"` + EndInterval metav1.Time `json:"endInterval,omitempty"` NSAllowList []string `json:"nsAllowList,omitempty"` ExcludeLabels bool `json:"excludeLabels,omitempty"` ToServices bool `json:"toServices,omitempty"` @@ -51,13 +51,15 @@ type NetworkPolicyRecommendation struct { } type NetworkPolicyRecommendationStatus struct { - State string `json:"state,omitempty"` - SparkApplication string `json:"sparkApplication,omitempty"` - CompletedStages int `json:"completedStages,omitempty"` - TotalStages int `json:"totalStages,omitempty"` - RecommendedNetworkPolicy string `json:"recommendedNetworkPolicy,omitempty"` - ErrorCode string `json:"errorCode,omitempty"` - ErrorMsg string `json:"errorMsg,omitempty"` + State string `json:"state,omitempty"` + SparkApplication string `json:"sparkApplication,omitempty"` + CompletedStages int `json:"completedStages,omitempty"` + TotalStages int `json:"totalStages,omitempty"` + RecommendedNetworkPolicy string `json:"recommendedNetworkPolicy,omitempty"` + ErrorCode string `json:"errorCode,omitempty"` + ErrorMsg string `json:"errorMsg,omitempty"` + StartTime metav1.Time `json:"startTime,omitempty"` + EndTime metav1.Time `json:"endTime,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/apis/intelligence/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/intelligence/v1alpha1/zz_generated.deepcopy.go index c5fde9ce5..81c20a4e3 100644 --- a/pkg/apis/intelligence/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/intelligence/v1alpha1/zz_generated.deepcopy.go @@ -28,14 +28,14 @@ func (in *NetworkPolicyRecommendation) DeepCopyInto(out *NetworkPolicyRecommenda *out = *in out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) - in.IntervalStart.DeepCopyInto(&out.IntervalStart) - in.IntervalEnd.DeepCopyInto(&out.IntervalEnd) + in.StartInterval.DeepCopyInto(&out.StartInterval) + in.EndInterval.DeepCopyInto(&out.EndInterval) if in.NSAllowList != nil { in, out := &in.NSAllowList, &out.NSAllowList *out = make([]string, len(*in)) copy(*out, *in) } - out.Status = in.Status + in.Status.DeepCopyInto(&out.Status) return } @@ -93,6 +93,8 @@ func (in *NetworkPolicyRecommendationList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *NetworkPolicyRecommendationStatus) DeepCopyInto(out *NetworkPolicyRecommendationStatus) { *out = *in + in.StartTime.DeepCopyInto(&out.StartTime) + in.EndTime.DeepCopyInto(&out.EndTime) return } diff --git a/pkg/apiserver/registry/intelligence/networkpolicyrecommendation/rest.go b/pkg/apiserver/registry/intelligence/networkpolicyrecommendation/rest.go index 13ded82b9..ebf7cb40f 100644 --- a/pkg/apiserver/registry/intelligence/networkpolicyrecommendation/rest.go +++ b/pkg/apiserver/registry/intelligence/networkpolicyrecommendation/rest.go @@ -42,6 +42,8 @@ var ( _ rest.GracefulDeleter = &REST{} ) +const defaultNameSpace = "flow-visibility" + // NewREST returns a REST object that will work against API services. func NewREST(nprq querier.NPRecommendationQuerier) *REST { return &REST{npRecommendationQuerier: nprq} @@ -51,32 +53,33 @@ func (r *REST) New() runtime.Object { return &intelligence.NetworkPolicyRecommendation{} } -func (r *REST) getNetworkPolicyRecommendation(name string) *intelligence.NetworkPolicyRecommendation { - npReco, err := r.npRecommendationQuerier.GetNetworkPolicyRecommendation(name) +func (r *REST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { + npReco, err := r.npRecommendationQuerier.GetNetworkPolicyRecommendation(defaultNameSpace, name) if err != nil { - return nil - } - return npReco -} - -func (r *REST) Get(_ context.Context, name string, _ *metav1.GetOptions) (runtime.Object, error) { - job := r.getNetworkPolicyRecommendation(name) - if job == nil { return nil, errors.NewNotFound(intelligence.Resource("networkpolicyrecommendations"), name) } - return job, nil + intelliNPR := new(intelligence.NetworkPolicyRecommendation) + r.copyNetworkPolicyRecommendation(intelliNPR, npReco) + return intelliNPR, nil } func (r *REST) NewList() runtime.Object { return &intelligence.NetworkPolicyRecommendationList{} } -func (r *REST) List(_ context.Context, _ *internalversion.ListOptions) (runtime.Object, error) { - itemList, err := r.npRecommendationQuerier.ListNetworkPolicyRecommendation() +func (r *REST) List(ctx context.Context, options *internalversion.ListOptions) (runtime.Object, error) { + npRecoList, err := r.npRecommendationQuerier.ListNetworkPolicyRecommendation(defaultNameSpace) if err != nil { - return nil, errors.NewBadRequest(fmt.Sprintf("cannot retrieve npr from controller. err:%s", err)) + return nil, errors.NewBadRequest(fmt.Sprintf("error when getting NetworkPolicyRecommendationsList: %v", err)) } - return itemList, nil + items := make([]intelligence.NetworkPolicyRecommendation, 0, len(npRecoList)) + for _, npReco := range npRecoList { + intelliNPR := new(intelligence.NetworkPolicyRecommendation) + r.copyNetworkPolicyRecommendation(intelliNPR, npReco) + items = append(items, *intelliNPR) + } + list := &intelligence.NetworkPolicyRecommendationList{Items: items} + return list, nil } func (r *REST) NamespaceScoped() bool { @@ -87,12 +90,12 @@ func (r *REST) ConvertToTable(ctx context.Context, obj runtime.Object, tableOpti return rest.NewDefaultTableConvertor(intelligence.Resource("networkpolicyrecommendations")).ConvertToTable(ctx, obj, tableOptions) } -func (r *REST) Create(_ context.Context, obj runtime.Object, _ rest.ValidateObjectFunc, _ *metav1.CreateOptions) (runtime.Object, error) { +func (r *REST) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) { npReco, ok := obj.(*intelligence.NetworkPolicyRecommendation) if !ok { return nil, errors.NewBadRequest(fmt.Sprintf("not a NetworkPolicyRecommendation object: %T", obj)) } - existNPReco, _ := r.npRecommendationQuerier.GetNetworkPolicyRecommendation(npReco.Name) + existNPReco, _ := r.npRecommendationQuerier.GetNetworkPolicyRecommendation(defaultNameSpace, npReco.Name) if existNPReco != nil { return nil, errors.NewBadRequest(fmt.Sprintf("networkPolicyRecommendation job exists, name: %s", npReco.Name)) } @@ -101,8 +104,8 @@ func (r *REST) Create(_ context.Context, obj runtime.Object, _ rest.ValidateObje job.Spec.JobType = npReco.Type job.Spec.Limit = npReco.Limit job.Spec.PolicyType = npReco.PolicyType - job.Spec.StartTime = npReco.IntervalStart - job.Spec.EndTime = npReco.IntervalEnd + job.Spec.StartInterval = npReco.StartInterval + job.Spec.EndInterval = npReco.EndInterval job.Spec.NSAllowList = npReco.NSAllowList job.Spec.ExcludeLabels = npReco.ExcludeLabels job.Spec.ToServices = npReco.ToServices @@ -111,21 +114,49 @@ func (r *REST) Create(_ context.Context, obj runtime.Object, _ rest.ValidateObje job.Spec.DriverMemory = npReco.DriverMemory job.Spec.ExecutorCoreRequest = npReco.ExecutorCoreRequest job.Spec.ExecutorMemory = npReco.ExecutorMemory - _, err := r.npRecommendationQuerier.CreateNetworkPolicyRecommendation(job) + _, err := r.npRecommendationQuerier.CreateNetworkPolicyRecommendation(defaultNameSpace, job) if err != nil { - return nil, err + return nil, errors.NewBadRequest(fmt.Sprintf("error when creating NetworkPolicyRecommendation CR: %v", err)) } return &metav1.Status{Status: metav1.StatusSuccess}, nil } -func (r *REST) Delete(_ context.Context, name string, _ rest.ValidateObjectFunc, _ *metav1.DeleteOptions) (runtime.Object, bool, error) { - _, err := r.npRecommendationQuerier.GetNetworkPolicyRecommendation(name) +func (r *REST) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) { + _, err := r.npRecommendationQuerier.GetNetworkPolicyRecommendation(defaultNameSpace, name) if err != nil { - return nil, false, errors.NewBadRequest(fmt.Sprintf("networkPolicyRecommendation job doesn't exist, name: %s", name)) + return nil, false, errors.NewBadRequest(fmt.Sprintf("NetworkPolicyRecommendation job doesn't exist, name: %s", name)) } - err = r.npRecommendationQuerier.DeleteNetworkPolicyRecommendation(name) + err = r.npRecommendationQuerier.DeleteNetworkPolicyRecommendation(defaultNameSpace, name) if err != nil { return nil, false, err } return &metav1.Status{Status: metav1.StatusSuccess}, false, nil } + +// copyNetworkPolicyRecommendation is used to copy NetworkPolicyRecommendation from crd to intelligence +func (r *REST) copyNetworkPolicyRecommendation(intelli *intelligence.NetworkPolicyRecommendation, crd *crdv1alpha1.NetworkPolicyRecommendation) error { + intelli.Name = crd.Name + intelli.Type = crd.Spec.JobType + intelli.Limit = crd.Spec.Limit + intelli.PolicyType = crd.Spec.PolicyType + intelli.StartInterval = crd.Spec.StartInterval + intelli.EndInterval = crd.Spec.EndInterval + intelli.NSAllowList = crd.Spec.NSAllowList + intelli.ExcludeLabels = crd.Spec.ExcludeLabels + intelli.ToServices = crd.Spec.ToServices + intelli.ExecutorInstances = crd.Spec.ExecutorInstances + intelli.DriverCoreRequest = crd.Spec.DriverCoreRequest + intelli.DriverMemory = crd.Spec.DriverMemory + intelli.ExecutorCoreRequest = crd.Spec.ExecutorCoreRequest + intelli.ExecutorMemory = crd.Spec.ExecutorMemory + intelli.Status.State = crd.Status.State + intelli.Status.SparkApplication = crd.Status.SparkApplication + intelli.Status.CompletedStages = crd.Status.CompletedStages + intelli.Status.TotalStages = crd.Status.TotalStages + intelli.Status.RecommendedNetworkPolicy = crd.Status.RecommendedNP.Spec.Yamls + intelli.Status.ErrorMsg = crd.Status.ErrorMsg + // todo: need to parse the error code + intelli.Status.StartTime = crd.Status.StartTime + intelli.Status.EndTime = crd.Status.EndTime + return nil +} diff --git a/pkg/apiserver/registry/intelligence/networkpolicyrecommendation/rest_test.go b/pkg/apiserver/registry/intelligence/networkpolicyrecommendation/rest_test.go index 79688babc..c4d82dee4 100644 --- a/pkg/apiserver/registry/intelligence/networkpolicyrecommendation/rest_test.go +++ b/pkg/apiserver/registry/intelligence/networkpolicyrecommendation/rest_test.go @@ -41,8 +41,8 @@ func TestREST_Get(t *testing.T) { }{ { name: "Not Found case", - nprName: "npr-1", - expectErr: errors.NewNotFound(intelligence.Resource("networkpolicyrecommendations"), "npr-1"), + nprName: "non-existent-npr", + expectErr: errors.NewNotFound(intelligence.Resource("networkpolicyrecommendations"), "non-existent-npr"), expectResult: nil, }, { @@ -74,8 +74,8 @@ func TestREST_Delete(t *testing.T) { }{ { name: "Job doesn't exist case", - nprName: "npr-1", - expectErr: errors.NewBadRequest(fmt.Sprintf("networkPolicyRecommendation job doesn't exist, name: %s", "npr-1")), + nprName: "non-existent-npr", + expectErr: errors.NewBadRequest(fmt.Sprintf("NetworkPolicyRecommendation job doesn't exist, name: %s", "non-existent-npr")), }, { name: "Successful Delete case", @@ -109,16 +109,16 @@ func TestREST_Create(t *testing.T) { name: "Job already exists case", obj: &intelligence.NetworkPolicyRecommendation{ TypeMeta: v1.TypeMeta{}, - ObjectMeta: v1.ObjectMeta{Name: "npr-2"}, + ObjectMeta: v1.ObjectMeta{Name: "existent-npr"}, }, - expectErr: errors.NewBadRequest(fmt.Sprintf("networkPolicyRecommendation job exists, name: %s", "npr-2")), + expectErr: errors.NewBadRequest(fmt.Sprintf("networkPolicyRecommendation job exists, name: %s", "existent-npr")), expectResult: nil, }, { name: "Successful Create case", obj: &intelligence.NetworkPolicyRecommendation{ TypeMeta: v1.TypeMeta{}, - ObjectMeta: v1.ObjectMeta{Name: "npr-1"}, + ObjectMeta: v1.ObjectMeta{Name: "non-existent-npr"}, }, expectErr: nil, expectResult: &v1.Status{Status: v1.StatusSuccess}, @@ -159,24 +159,30 @@ func TestREST_List(t *testing.T) { } } -func (c *fakeQuerier) GetNetworkPolicyRecommendation(name string) (*intelligence.NetworkPolicyRecommendation, error) { - if name == "npr-1" { +func (c *fakeQuerier) GetNetworkPolicyRecommendation(namespace, name string) (*v1alpha1.NetworkPolicyRecommendation, error) { + if name == "non-existent-npr" { return nil, fmt.Errorf("not found") } - return &intelligence.NetworkPolicyRecommendation{Type: "NPR", PolicyType: "Allow"}, nil + return &crdv1alpha1.NetworkPolicyRecommendation{ + Spec: crdv1alpha1.NetworkPolicyRecommendationSpec{ + JobType: "NPR", PolicyType: "Allow"}, + Status: crdv1alpha1.NetworkPolicyRecommendationStatus{ + RecommendedNP: &crdv1alpha1.RecommendedNetworkPolicy{}, + }, + }, nil } -func (c *fakeQuerier) CreateNetworkPolicyRecommendation(*v1alpha1.NetworkPolicyRecommendation) (*v1alpha1.NetworkPolicyRecommendation, error) { +func (c *fakeQuerier) CreateNetworkPolicyRecommendation(namespace string, networkPolicyRecommendation *v1alpha1.NetworkPolicyRecommendation) (*v1alpha1.NetworkPolicyRecommendation, error) { return nil, nil } -func (c *fakeQuerier) DeleteNetworkPolicyRecommendation(name string) error { +func (c *fakeQuerier) DeleteNetworkPolicyRecommendation(namespace, name string) error { return nil } -func (c *fakeQuerier) ListNetworkPolicyRecommendation() (*intelligence.NetworkPolicyRecommendationList, error) { - return &intelligence.NetworkPolicyRecommendationList{Items: []intelligence.NetworkPolicyRecommendation{ - {ObjectMeta: v1.ObjectMeta{Name: "npr-1"}}, - {ObjectMeta: v1.ObjectMeta{Name: "npr-2"}}, - }}, nil +func (c *fakeQuerier) ListNetworkPolicyRecommendation(namespace string) ([]*v1alpha1.NetworkPolicyRecommendation, error) { + return []*crdv1alpha1.NetworkPolicyRecommendation{ + {ObjectMeta: v1.ObjectMeta{Name: "npr-1"}, Status: crdv1alpha1.NetworkPolicyRecommendationStatus{RecommendedNP: &crdv1alpha1.RecommendedNetworkPolicy{}}}, + {ObjectMeta: v1.ObjectMeta{Name: "npr-2"}, Status: crdv1alpha1.NetworkPolicyRecommendationStatus{RecommendedNP: &crdv1alpha1.RecommendedNetworkPolicy{}}}, + }, nil } diff --git a/pkg/client/clientset/versioned/typed/crd/v1alpha1/crd_client.go b/pkg/client/clientset/versioned/typed/crd/v1alpha1/crd_client.go index f511973b1..997049e63 100644 --- a/pkg/client/clientset/versioned/typed/crd/v1alpha1/crd_client.go +++ b/pkg/client/clientset/versioned/typed/crd/v1alpha1/crd_client.go @@ -27,6 +27,7 @@ import ( type CrdV1alpha1Interface interface { RESTClient() rest.Interface NetworkPolicyRecommendationsGetter + RecommendedNetworkPoliciesGetter } // CrdV1alpha1Client is used to interact with features provided by the crd.theia.antrea.io group. @@ -38,6 +39,10 @@ func (c *CrdV1alpha1Client) NetworkPolicyRecommendations(namespace string) Netwo return newNetworkPolicyRecommendations(c, namespace) } +func (c *CrdV1alpha1Client) RecommendedNetworkPolicies(namespace string) RecommendedNetworkPolicyInterface { + return newRecommendedNetworkPolicies(c, namespace) +} + // NewForConfig creates a new CrdV1alpha1Client for the given config. // NewForConfig is equivalent to NewForConfigAndClient(c, httpClient), // where httpClient was generated with rest.HTTPClientFor(c). diff --git a/pkg/client/clientset/versioned/typed/crd/v1alpha1/fake/fake_crd_client.go b/pkg/client/clientset/versioned/typed/crd/v1alpha1/fake/fake_crd_client.go index 75fb73799..ca1f3d620 100644 --- a/pkg/client/clientset/versioned/typed/crd/v1alpha1/fake/fake_crd_client.go +++ b/pkg/client/clientset/versioned/typed/crd/v1alpha1/fake/fake_crd_client.go @@ -30,6 +30,10 @@ func (c *FakeCrdV1alpha1) NetworkPolicyRecommendations(namespace string) v1alpha return &FakeNetworkPolicyRecommendations{c, namespace} } +func (c *FakeCrdV1alpha1) RecommendedNetworkPolicies(namespace string) v1alpha1.RecommendedNetworkPolicyInterface { + return &FakeRecommendedNetworkPolicies{c, namespace} +} + // RESTClient returns a RESTClient that is used to communicate // with API server by this client implementation. func (c *FakeCrdV1alpha1) RESTClient() rest.Interface { diff --git a/pkg/client/clientset/versioned/typed/crd/v1alpha1/fake/fake_recommendednetworkpolicy.go b/pkg/client/clientset/versioned/typed/crd/v1alpha1/fake/fake_recommendednetworkpolicy.go new file mode 100644 index 000000000..08e2acbb0 --- /dev/null +++ b/pkg/client/clientset/versioned/typed/crd/v1alpha1/fake/fake_recommendednetworkpolicy.go @@ -0,0 +1,128 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Code generated by client-gen. DO NOT EDIT. + +package fake + +import ( + "context" + + v1alpha1 "antrea.io/theia/pkg/apis/crd/v1alpha1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + labels "k8s.io/apimachinery/pkg/labels" + schema "k8s.io/apimachinery/pkg/runtime/schema" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + testing "k8s.io/client-go/testing" +) + +// FakeRecommendedNetworkPolicies implements RecommendedNetworkPolicyInterface +type FakeRecommendedNetworkPolicies struct { + Fake *FakeCrdV1alpha1 + ns string +} + +var recommendednetworkpoliciesResource = schema.GroupVersionResource{Group: "crd.theia.antrea.io", Version: "v1alpha1", Resource: "recommendednetworkpolicies"} + +var recommendednetworkpoliciesKind = schema.GroupVersionKind{Group: "crd.theia.antrea.io", Version: "v1alpha1", Kind: "RecommendedNetworkPolicy"} + +// Get takes name of the recommendedNetworkPolicy, and returns the corresponding recommendedNetworkPolicy object, and an error if there is any. +func (c *FakeRecommendedNetworkPolicies) Get(ctx context.Context, name string, options v1.GetOptions) (result *v1alpha1.RecommendedNetworkPolicy, err error) { + obj, err := c.Fake. + Invokes(testing.NewGetAction(recommendednetworkpoliciesResource, c.ns, name), &v1alpha1.RecommendedNetworkPolicy{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.RecommendedNetworkPolicy), err +} + +// List takes label and field selectors, and returns the list of RecommendedNetworkPolicies that match those selectors. +func (c *FakeRecommendedNetworkPolicies) List(ctx context.Context, opts v1.ListOptions) (result *v1alpha1.RecommendedNetworkPolicyList, err error) { + obj, err := c.Fake. + Invokes(testing.NewListAction(recommendednetworkpoliciesResource, recommendednetworkpoliciesKind, c.ns, opts), &v1alpha1.RecommendedNetworkPolicyList{}) + + if obj == nil { + return nil, err + } + + label, _, _ := testing.ExtractFromListOptions(opts) + if label == nil { + label = labels.Everything() + } + list := &v1alpha1.RecommendedNetworkPolicyList{ListMeta: obj.(*v1alpha1.RecommendedNetworkPolicyList).ListMeta} + for _, item := range obj.(*v1alpha1.RecommendedNetworkPolicyList).Items { + if label.Matches(labels.Set(item.Labels)) { + list.Items = append(list.Items, item) + } + } + return list, err +} + +// Watch returns a watch.Interface that watches the requested recommendedNetworkPolicies. +func (c *FakeRecommendedNetworkPolicies) Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) { + return c.Fake. + InvokesWatch(testing.NewWatchAction(recommendednetworkpoliciesResource, c.ns, opts)) + +} + +// Create takes the representation of a recommendedNetworkPolicy and creates it. Returns the server's representation of the recommendedNetworkPolicy, and an error, if there is any. +func (c *FakeRecommendedNetworkPolicies) Create(ctx context.Context, recommendedNetworkPolicy *v1alpha1.RecommendedNetworkPolicy, opts v1.CreateOptions) (result *v1alpha1.RecommendedNetworkPolicy, err error) { + obj, err := c.Fake. + Invokes(testing.NewCreateAction(recommendednetworkpoliciesResource, c.ns, recommendedNetworkPolicy), &v1alpha1.RecommendedNetworkPolicy{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.RecommendedNetworkPolicy), err +} + +// Update takes the representation of a recommendedNetworkPolicy and updates it. Returns the server's representation of the recommendedNetworkPolicy, and an error, if there is any. +func (c *FakeRecommendedNetworkPolicies) Update(ctx context.Context, recommendedNetworkPolicy *v1alpha1.RecommendedNetworkPolicy, opts v1.UpdateOptions) (result *v1alpha1.RecommendedNetworkPolicy, err error) { + obj, err := c.Fake. + Invokes(testing.NewUpdateAction(recommendednetworkpoliciesResource, c.ns, recommendedNetworkPolicy), &v1alpha1.RecommendedNetworkPolicy{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.RecommendedNetworkPolicy), err +} + +// Delete takes name of the recommendedNetworkPolicy and deletes it. Returns an error if one occurs. +func (c *FakeRecommendedNetworkPolicies) Delete(ctx context.Context, name string, opts v1.DeleteOptions) error { + _, err := c.Fake. + Invokes(testing.NewDeleteActionWithOptions(recommendednetworkpoliciesResource, c.ns, name, opts), &v1alpha1.RecommendedNetworkPolicy{}) + + return err +} + +// DeleteCollection deletes a collection of objects. +func (c *FakeRecommendedNetworkPolicies) DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error { + action := testing.NewDeleteCollectionAction(recommendednetworkpoliciesResource, c.ns, listOpts) + + _, err := c.Fake.Invokes(action, &v1alpha1.RecommendedNetworkPolicyList{}) + return err +} + +// Patch applies the patch and returns the patched recommendedNetworkPolicy. +func (c *FakeRecommendedNetworkPolicies) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *v1alpha1.RecommendedNetworkPolicy, err error) { + obj, err := c.Fake. + Invokes(testing.NewPatchSubresourceAction(recommendednetworkpoliciesResource, c.ns, name, pt, data, subresources...), &v1alpha1.RecommendedNetworkPolicy{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.RecommendedNetworkPolicy), err +} diff --git a/pkg/client/clientset/versioned/typed/crd/v1alpha1/generated_expansion.go b/pkg/client/clientset/versioned/typed/crd/v1alpha1/generated_expansion.go index f87a38c7d..dc32da68a 100644 --- a/pkg/client/clientset/versioned/typed/crd/v1alpha1/generated_expansion.go +++ b/pkg/client/clientset/versioned/typed/crd/v1alpha1/generated_expansion.go @@ -17,3 +17,5 @@ package v1alpha1 type NetworkPolicyRecommendationExpansion interface{} + +type RecommendedNetworkPolicyExpansion interface{} diff --git a/pkg/client/clientset/versioned/typed/crd/v1alpha1/recommendednetworkpolicy.go b/pkg/client/clientset/versioned/typed/crd/v1alpha1/recommendednetworkpolicy.go new file mode 100644 index 000000000..6f7db7961 --- /dev/null +++ b/pkg/client/clientset/versioned/typed/crd/v1alpha1/recommendednetworkpolicy.go @@ -0,0 +1,176 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Code generated by client-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + "context" + "time" + + v1alpha1 "antrea.io/theia/pkg/apis/crd/v1alpha1" + scheme "antrea.io/theia/pkg/client/clientset/versioned/scheme" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + rest "k8s.io/client-go/rest" +) + +// RecommendedNetworkPoliciesGetter has a method to return a RecommendedNetworkPolicyInterface. +// A group's client should implement this interface. +type RecommendedNetworkPoliciesGetter interface { + RecommendedNetworkPolicies(namespace string) RecommendedNetworkPolicyInterface +} + +// RecommendedNetworkPolicyInterface has methods to work with RecommendedNetworkPolicy resources. +type RecommendedNetworkPolicyInterface interface { + Create(ctx context.Context, recommendedNetworkPolicy *v1alpha1.RecommendedNetworkPolicy, opts v1.CreateOptions) (*v1alpha1.RecommendedNetworkPolicy, error) + Update(ctx context.Context, recommendedNetworkPolicy *v1alpha1.RecommendedNetworkPolicy, opts v1.UpdateOptions) (*v1alpha1.RecommendedNetworkPolicy, error) + Delete(ctx context.Context, name string, opts v1.DeleteOptions) error + DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error + Get(ctx context.Context, name string, opts v1.GetOptions) (*v1alpha1.RecommendedNetworkPolicy, error) + List(ctx context.Context, opts v1.ListOptions) (*v1alpha1.RecommendedNetworkPolicyList, error) + Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) + Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *v1alpha1.RecommendedNetworkPolicy, err error) + RecommendedNetworkPolicyExpansion +} + +// recommendedNetworkPolicies implements RecommendedNetworkPolicyInterface +type recommendedNetworkPolicies struct { + client rest.Interface + ns string +} + +// newRecommendedNetworkPolicies returns a RecommendedNetworkPolicies +func newRecommendedNetworkPolicies(c *CrdV1alpha1Client, namespace string) *recommendedNetworkPolicies { + return &recommendedNetworkPolicies{ + client: c.RESTClient(), + ns: namespace, + } +} + +// Get takes name of the recommendedNetworkPolicy, and returns the corresponding recommendedNetworkPolicy object, and an error if there is any. +func (c *recommendedNetworkPolicies) Get(ctx context.Context, name string, options v1.GetOptions) (result *v1alpha1.RecommendedNetworkPolicy, err error) { + result = &v1alpha1.RecommendedNetworkPolicy{} + err = c.client.Get(). + Namespace(c.ns). + Resource("recommendednetworkpolicies"). + Name(name). + VersionedParams(&options, scheme.ParameterCodec). + Do(ctx). + Into(result) + return +} + +// List takes label and field selectors, and returns the list of RecommendedNetworkPolicies that match those selectors. +func (c *recommendedNetworkPolicies) List(ctx context.Context, opts v1.ListOptions) (result *v1alpha1.RecommendedNetworkPolicyList, err error) { + var timeout time.Duration + if opts.TimeoutSeconds != nil { + timeout = time.Duration(*opts.TimeoutSeconds) * time.Second + } + result = &v1alpha1.RecommendedNetworkPolicyList{} + err = c.client.Get(). + Namespace(c.ns). + Resource("recommendednetworkpolicies"). + VersionedParams(&opts, scheme.ParameterCodec). + Timeout(timeout). + Do(ctx). + Into(result) + return +} + +// Watch returns a watch.Interface that watches the requested recommendedNetworkPolicies. +func (c *recommendedNetworkPolicies) Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) { + var timeout time.Duration + if opts.TimeoutSeconds != nil { + timeout = time.Duration(*opts.TimeoutSeconds) * time.Second + } + opts.Watch = true + return c.client.Get(). + Namespace(c.ns). + Resource("recommendednetworkpolicies"). + VersionedParams(&opts, scheme.ParameterCodec). + Timeout(timeout). + Watch(ctx) +} + +// Create takes the representation of a recommendedNetworkPolicy and creates it. Returns the server's representation of the recommendedNetworkPolicy, and an error, if there is any. +func (c *recommendedNetworkPolicies) Create(ctx context.Context, recommendedNetworkPolicy *v1alpha1.RecommendedNetworkPolicy, opts v1.CreateOptions) (result *v1alpha1.RecommendedNetworkPolicy, err error) { + result = &v1alpha1.RecommendedNetworkPolicy{} + err = c.client.Post(). + Namespace(c.ns). + Resource("recommendednetworkpolicies"). + VersionedParams(&opts, scheme.ParameterCodec). + Body(recommendedNetworkPolicy). + Do(ctx). + Into(result) + return +} + +// Update takes the representation of a recommendedNetworkPolicy and updates it. Returns the server's representation of the recommendedNetworkPolicy, and an error, if there is any. +func (c *recommendedNetworkPolicies) Update(ctx context.Context, recommendedNetworkPolicy *v1alpha1.RecommendedNetworkPolicy, opts v1.UpdateOptions) (result *v1alpha1.RecommendedNetworkPolicy, err error) { + result = &v1alpha1.RecommendedNetworkPolicy{} + err = c.client.Put(). + Namespace(c.ns). + Resource("recommendednetworkpolicies"). + Name(recommendedNetworkPolicy.Name). + VersionedParams(&opts, scheme.ParameterCodec). + Body(recommendedNetworkPolicy). + Do(ctx). + Into(result) + return +} + +// Delete takes name of the recommendedNetworkPolicy and deletes it. Returns an error if one occurs. +func (c *recommendedNetworkPolicies) Delete(ctx context.Context, name string, opts v1.DeleteOptions) error { + return c.client.Delete(). + Namespace(c.ns). + Resource("recommendednetworkpolicies"). + Name(name). + Body(&opts). + Do(ctx). + Error() +} + +// DeleteCollection deletes a collection of objects. +func (c *recommendedNetworkPolicies) DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error { + var timeout time.Duration + if listOpts.TimeoutSeconds != nil { + timeout = time.Duration(*listOpts.TimeoutSeconds) * time.Second + } + return c.client.Delete(). + Namespace(c.ns). + Resource("recommendednetworkpolicies"). + VersionedParams(&listOpts, scheme.ParameterCodec). + Timeout(timeout). + Body(&opts). + Do(ctx). + Error() +} + +// Patch applies the patch and returns the patched recommendedNetworkPolicy. +func (c *recommendedNetworkPolicies) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *v1alpha1.RecommendedNetworkPolicy, err error) { + result = &v1alpha1.RecommendedNetworkPolicy{} + err = c.client.Patch(pt). + Namespace(c.ns). + Resource("recommendednetworkpolicies"). + Name(name). + SubResource(subresources...). + VersionedParams(&opts, scheme.ParameterCodec). + Body(data). + Do(ctx). + Into(result) + return +} diff --git a/pkg/client/informers/externalversions/crd/v1alpha1/interface.go b/pkg/client/informers/externalversions/crd/v1alpha1/interface.go index b6d2424cb..10fd8bc78 100644 --- a/pkg/client/informers/externalversions/crd/v1alpha1/interface.go +++ b/pkg/client/informers/externalversions/crd/v1alpha1/interface.go @@ -24,6 +24,8 @@ import ( type Interface interface { // NetworkPolicyRecommendations returns a NetworkPolicyRecommendationInformer. NetworkPolicyRecommendations() NetworkPolicyRecommendationInformer + // RecommendedNetworkPolicies returns a RecommendedNetworkPolicyInformer. + RecommendedNetworkPolicies() RecommendedNetworkPolicyInformer } type version struct { @@ -41,3 +43,8 @@ func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakList func (v *version) NetworkPolicyRecommendations() NetworkPolicyRecommendationInformer { return &networkPolicyRecommendationInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} } + +// RecommendedNetworkPolicies returns a RecommendedNetworkPolicyInformer. +func (v *version) RecommendedNetworkPolicies() RecommendedNetworkPolicyInformer { + return &recommendedNetworkPolicyInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} +} diff --git a/pkg/client/informers/externalversions/crd/v1alpha1/recommendednetworkpolicy.go b/pkg/client/informers/externalversions/crd/v1alpha1/recommendednetworkpolicy.go new file mode 100644 index 000000000..eff27e7c0 --- /dev/null +++ b/pkg/client/informers/externalversions/crd/v1alpha1/recommendednetworkpolicy.go @@ -0,0 +1,88 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Code generated by informer-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + "context" + time "time" + + crdv1alpha1 "antrea.io/theia/pkg/apis/crd/v1alpha1" + versioned "antrea.io/theia/pkg/client/clientset/versioned" + internalinterfaces "antrea.io/theia/pkg/client/informers/externalversions/internalinterfaces" + v1alpha1 "antrea.io/theia/pkg/client/listers/crd/v1alpha1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + runtime "k8s.io/apimachinery/pkg/runtime" + watch "k8s.io/apimachinery/pkg/watch" + cache "k8s.io/client-go/tools/cache" +) + +// RecommendedNetworkPolicyInformer provides access to a shared informer and lister for +// RecommendedNetworkPolicies. +type RecommendedNetworkPolicyInformer interface { + Informer() cache.SharedIndexInformer + Lister() v1alpha1.RecommendedNetworkPolicyLister +} + +type recommendedNetworkPolicyInformer struct { + factory internalinterfaces.SharedInformerFactory + tweakListOptions internalinterfaces.TweakListOptionsFunc + namespace string +} + +// NewRecommendedNetworkPolicyInformer constructs a new informer for RecommendedNetworkPolicy type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewRecommendedNetworkPolicyInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer { + return NewFilteredRecommendedNetworkPolicyInformer(client, namespace, resyncPeriod, indexers, nil) +} + +// NewFilteredRecommendedNetworkPolicyInformer constructs a new informer for RecommendedNetworkPolicy type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewFilteredRecommendedNetworkPolicyInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { + return cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options v1.ListOptions) (runtime.Object, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.CrdV1alpha1().RecommendedNetworkPolicies(namespace).List(context.TODO(), options) + }, + WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.CrdV1alpha1().RecommendedNetworkPolicies(namespace).Watch(context.TODO(), options) + }, + }, + &crdv1alpha1.RecommendedNetworkPolicy{}, + resyncPeriod, + indexers, + ) +} + +func (f *recommendedNetworkPolicyInformer) defaultInformer(client versioned.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + return NewFilteredRecommendedNetworkPolicyInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) +} + +func (f *recommendedNetworkPolicyInformer) Informer() cache.SharedIndexInformer { + return f.factory.InformerFor(&crdv1alpha1.RecommendedNetworkPolicy{}, f.defaultInformer) +} + +func (f *recommendedNetworkPolicyInformer) Lister() v1alpha1.RecommendedNetworkPolicyLister { + return v1alpha1.NewRecommendedNetworkPolicyLister(f.Informer().GetIndexer()) +} diff --git a/pkg/client/informers/externalversions/generic.go b/pkg/client/informers/externalversions/generic.go index 2002cd83b..309437377 100644 --- a/pkg/client/informers/externalversions/generic.go +++ b/pkg/client/informers/externalversions/generic.go @@ -53,6 +53,8 @@ func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource // Group=crd.theia.antrea.io, Version=v1alpha1 case v1alpha1.SchemeGroupVersion.WithResource("networkpolicyrecommendations"): return &genericInformer{resource: resource.GroupResource(), informer: f.Crd().V1alpha1().NetworkPolicyRecommendations().Informer()}, nil + case v1alpha1.SchemeGroupVersion.WithResource("recommendednetworkpolicies"): + return &genericInformer{resource: resource.GroupResource(), informer: f.Crd().V1alpha1().RecommendedNetworkPolicies().Informer()}, nil } diff --git a/pkg/client/listers/crd/v1alpha1/expansion_generated.go b/pkg/client/listers/crd/v1alpha1/expansion_generated.go index 0ee2c5fd0..7b7320953 100644 --- a/pkg/client/listers/crd/v1alpha1/expansion_generated.go +++ b/pkg/client/listers/crd/v1alpha1/expansion_generated.go @@ -23,3 +23,11 @@ type NetworkPolicyRecommendationListerExpansion interface{} // NetworkPolicyRecommendationNamespaceListerExpansion allows custom methods to be added to // NetworkPolicyRecommendationNamespaceLister. type NetworkPolicyRecommendationNamespaceListerExpansion interface{} + +// RecommendedNetworkPolicyListerExpansion allows custom methods to be added to +// RecommendedNetworkPolicyLister. +type RecommendedNetworkPolicyListerExpansion interface{} + +// RecommendedNetworkPolicyNamespaceListerExpansion allows custom methods to be added to +// RecommendedNetworkPolicyNamespaceLister. +type RecommendedNetworkPolicyNamespaceListerExpansion interface{} diff --git a/pkg/client/listers/crd/v1alpha1/recommendednetworkpolicy.go b/pkg/client/listers/crd/v1alpha1/recommendednetworkpolicy.go new file mode 100644 index 000000000..8b76bb894 --- /dev/null +++ b/pkg/client/listers/crd/v1alpha1/recommendednetworkpolicy.go @@ -0,0 +1,97 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Code generated by lister-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + v1alpha1 "antrea.io/theia/pkg/apis/crd/v1alpha1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/tools/cache" +) + +// RecommendedNetworkPolicyLister helps list RecommendedNetworkPolicies. +// All objects returned here must be treated as read-only. +type RecommendedNetworkPolicyLister interface { + // List lists all RecommendedNetworkPolicies in the indexer. + // Objects returned here must be treated as read-only. + List(selector labels.Selector) (ret []*v1alpha1.RecommendedNetworkPolicy, err error) + // RecommendedNetworkPolicies returns an object that can list and get RecommendedNetworkPolicies. + RecommendedNetworkPolicies(namespace string) RecommendedNetworkPolicyNamespaceLister + RecommendedNetworkPolicyListerExpansion +} + +// recommendedNetworkPolicyLister implements the RecommendedNetworkPolicyLister interface. +type recommendedNetworkPolicyLister struct { + indexer cache.Indexer +} + +// NewRecommendedNetworkPolicyLister returns a new RecommendedNetworkPolicyLister. +func NewRecommendedNetworkPolicyLister(indexer cache.Indexer) RecommendedNetworkPolicyLister { + return &recommendedNetworkPolicyLister{indexer: indexer} +} + +// List lists all RecommendedNetworkPolicies in the indexer. +func (s *recommendedNetworkPolicyLister) List(selector labels.Selector) (ret []*v1alpha1.RecommendedNetworkPolicy, err error) { + err = cache.ListAll(s.indexer, selector, func(m interface{}) { + ret = append(ret, m.(*v1alpha1.RecommendedNetworkPolicy)) + }) + return ret, err +} + +// RecommendedNetworkPolicies returns an object that can list and get RecommendedNetworkPolicies. +func (s *recommendedNetworkPolicyLister) RecommendedNetworkPolicies(namespace string) RecommendedNetworkPolicyNamespaceLister { + return recommendedNetworkPolicyNamespaceLister{indexer: s.indexer, namespace: namespace} +} + +// RecommendedNetworkPolicyNamespaceLister helps list and get RecommendedNetworkPolicies. +// All objects returned here must be treated as read-only. +type RecommendedNetworkPolicyNamespaceLister interface { + // List lists all RecommendedNetworkPolicies in the indexer for a given namespace. + // Objects returned here must be treated as read-only. + List(selector labels.Selector) (ret []*v1alpha1.RecommendedNetworkPolicy, err error) + // Get retrieves the RecommendedNetworkPolicy from the indexer for a given namespace and name. + // Objects returned here must be treated as read-only. + Get(name string) (*v1alpha1.RecommendedNetworkPolicy, error) + RecommendedNetworkPolicyNamespaceListerExpansion +} + +// recommendedNetworkPolicyNamespaceLister implements the RecommendedNetworkPolicyNamespaceLister +// interface. +type recommendedNetworkPolicyNamespaceLister struct { + indexer cache.Indexer + namespace string +} + +// List lists all RecommendedNetworkPolicies in the indexer for a given namespace. +func (s recommendedNetworkPolicyNamespaceLister) List(selector labels.Selector) (ret []*v1alpha1.RecommendedNetworkPolicy, err error) { + err = cache.ListAllByNamespace(s.indexer, s.namespace, selector, func(m interface{}) { + ret = append(ret, m.(*v1alpha1.RecommendedNetworkPolicy)) + }) + return ret, err +} + +// Get retrieves the RecommendedNetworkPolicy from the indexer for a given namespace and name. +func (s recommendedNetworkPolicyNamespaceLister) Get(name string) (*v1alpha1.RecommendedNetworkPolicy, error) { + obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.NewNotFound(v1alpha1.Resource("recommendednetworkpolicy"), name) + } + return obj.(*v1alpha1.RecommendedNetworkPolicy), nil +} diff --git a/pkg/controller/networkpolicyrecommendation/controller.go b/pkg/controller/networkpolicyrecommendation/controller.go index d6437a7b0..b05fcbc49 100644 --- a/pkg/controller/networkpolicyrecommendation/controller.go +++ b/pkg/controller/networkpolicyrecommendation/controller.go @@ -16,25 +16,21 @@ package networkpolicyrecommendation import ( "context" - "database/sql" - "fmt" "time" apimachineryerrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" apimachinerytypes "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" crdv1alpha1 "antrea.io/theia/pkg/apis/crd/v1alpha1" - intelligence "antrea.io/theia/pkg/apis/intelligence/v1alpha1" "antrea.io/theia/pkg/client/clientset/versioned" crdv1a1informers "antrea.io/theia/pkg/client/informers/externalversions/crd/v1alpha1" "antrea.io/theia/pkg/client/listers/crd/v1alpha1" - "antrea.io/theia/pkg/controller" ) const ( @@ -45,8 +41,7 @@ const ( minRetryDelay = 5 * time.Second maxRetryDelay = 300 * time.Second // Default number of workers processing an Service change. - defaultWorkers = 4 - defaultNameSpace = "flow-visibility" + defaultWorkers = 4 ) type NPRecommendationController struct { @@ -56,31 +51,19 @@ type NPRecommendationController struct { npRecommendationLister v1alpha1.NetworkPolicyRecommendationLister npRecommendationSynced cache.InformerSynced // queue maintains the Service objects that need to be synced. - queue workqueue.RateLimitingInterface - connect *sql.DB + queue workqueue.RateLimitingInterface } func NewNPRecommendationController( crdClient versioned.Interface, - k8sClient kubernetes.Interface, npRecommendationInformer crdv1a1informers.NetworkPolicyRecommendationInformer, -) (*NPRecommendationController, error) { - err := controller.CheckClickHousePod(k8sClient) - if err != nil { - return nil, fmt.Errorf("error when checking ClickHouse status: %v", err) - } - connect, err := controller.SetupClickHouseConnection(k8sClient) - if err != nil { - return nil, fmt.Errorf("error when connecting to ClickHouse: %v", err) - } - +) *NPRecommendationController { c := &NPRecommendationController{ crdClient: crdClient, queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "npRecommendation"), npRecommendationInformer: npRecommendationInformer.Informer(), npRecommendationLister: npRecommendationInformer.Lister(), npRecommendationSynced: npRecommendationInformer.Informer().HasSynced, - connect: connect, } c.npRecommendationInformer.AddEventHandlerWithResyncPeriod( @@ -91,7 +74,7 @@ func NewNPRecommendationController( resyncPeriod, ) - return c, nil + return c } func (c *NPRecommendationController) addNPRecommendation(obj interface{}) { @@ -193,100 +176,18 @@ func (c *NPRecommendationController) syncNPRecommendation(key apimachinerytypes. return nil } -func (c *NPRecommendationController) GetNetworkPolicyRecommendation(name string) (*intelligence.NetworkPolicyRecommendation, error) { - npReco, err := c.npRecommendationLister.NetworkPolicyRecommendations(defaultNameSpace).Get(name) - if err != nil { - return nil, fmt.Errorf("error when finding NetworkPolicyRecommendations CR: %v", err) - } - intelli := new(intelligence.NetworkPolicyRecommendation) - err = c.copyNetworkPolicyRecommendation(intelli, npReco) - if err != nil { - return nil, fmt.Errorf("error when copying NetworkPolicyRecommendations CR: %v", err) - } - return intelli, nil -} - -func (c *NPRecommendationController) CreateNetworkPolicyRecommendation(npReco *crdv1alpha1.NetworkPolicyRecommendation) (*crdv1alpha1.NetworkPolicyRecommendation, error) { - return c.crdClient.CrdV1alpha1().NetworkPolicyRecommendations(defaultNameSpace).Create(context.TODO(), npReco, metav1.CreateOptions{}) -} - -func (c *NPRecommendationController) DeleteNetworkPolicyRecommendation(name string) error { - // delete NetworkPolicyRecommendation and RecommendedNetworkPolicy - result, _ := c.getRecommendedNetworkPolicyResult(name) - if result != "" { - err := c.deleteRecommendedNetworkPolicyResult(name) - if err != nil { - return fmt.Errorf("error when delete result in ClickHouse: %v", err) - } - } - err := c.crdClient.CrdV1alpha1().NetworkPolicyRecommendations(defaultNameSpace).Delete(context.TODO(), name, metav1.DeleteOptions{}) - return err -} - -func (c *NPRecommendationController) ListNetworkPolicyRecommendation() (*intelligence.NetworkPolicyRecommendationList, error) { - npRecoItems, err := c.crdClient.CrdV1alpha1().NetworkPolicyRecommendations(defaultNameSpace).List(context.TODO(), metav1.ListOptions{}) - if err != nil { - return nil, fmt.Errorf("error when getting NetworkPolicyRecommendationsList: %v", err) - } - items := make([]intelligence.NetworkPolicyRecommendation, 0, len(npRecoItems.Items)) - for _, npReco := range npRecoItems.Items { - job := intelligence.NetworkPolicyRecommendation{} - err = c.copyNetworkPolicyRecommendation(&job, &npReco) - if err != nil { - return nil, fmt.Errorf("error when copying NetworkPolicyRecommendation CR: %v", err) - } - items = append(items, job) - } - list := &intelligence.NetworkPolicyRecommendationList{Items: items} - return list, nil +func (c *NPRecommendationController) GetNetworkPolicyRecommendation(namespace, name string) (*crdv1alpha1.NetworkPolicyRecommendation, error) { + return c.npRecommendationLister.NetworkPolicyRecommendations(namespace).Get(name) } -// getRecommendedNetworkPolicyResult is used to get Recommended Network Policy in ClickHouse -func (c *NPRecommendationController) getRecommendedNetworkPolicyResult(id string) (string, error) { - var recoResult string - query := "SELECT yamls FROM recommendations WHERE id = (?)" - err := c.connect.QueryRow(query, id).Scan(&recoResult) - if err != nil { - return recoResult, fmt.Errorf("failed to get Recommended Network Policy Result with id %s: %v", id, err) - } - return recoResult, nil +func (c *NPRecommendationController) ListNetworkPolicyRecommendation(namespace string) ([]*crdv1alpha1.NetworkPolicyRecommendation, error) { + return c.npRecommendationLister.NetworkPolicyRecommendations(namespace).List(labels.Everything()) } -// deleteRecommendedNetworkPolicyResult is used to delete Recommended Network Policy in ClickHouse -func (c *NPRecommendationController) deleteRecommendedNetworkPolicyResult(recoID string) error { - query := "ALTER TABLE recommendations_local ON CLUSTER '{cluster}' DELETE WHERE id = (?)" - _, err := c.connect.Exec(query, recoID) - if err != nil { - return fmt.Errorf("failed to delete Recommended Network Policy Result with id %s: %v", recoID, err) - } - return nil +func (c *NPRecommendationController) DeleteNetworkPolicyRecommendation(namespace, name string) error { + return c.crdClient.CrdV1alpha1().NetworkPolicyRecommendations(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{}) } -// copyNetworkPolicyRecommendation is used to copy NetworkPolicyRecommendation from crd to intelligence -func (c *NPRecommendationController) copyNetworkPolicyRecommendation(intelli *intelligence.NetworkPolicyRecommendation, crd *crdv1alpha1.NetworkPolicyRecommendation) error { - intelli.Name = crd.Name - intelli.Type = crd.Spec.JobType - intelli.Limit = crd.Spec.Limit - intelli.PolicyType = crd.Spec.PolicyType - intelli.IntervalStart = crd.Spec.StartTime - intelli.IntervalEnd = crd.Spec.EndTime - intelli.NSAllowList = crd.Spec.NSAllowList - intelli.ExcludeLabels = crd.Spec.ExcludeLabels - intelli.ToServices = crd.Spec.ToServices - intelli.ExecutorInstances = crd.Spec.ExecutorInstances - intelli.DriverCoreRequest = crd.Spec.DriverCoreRequest - intelli.DriverMemory = crd.Spec.DriverMemory - intelli.ExecutorCoreRequest = crd.Spec.ExecutorCoreRequest - intelli.ExecutorMemory = crd.Spec.ExecutorMemory - intelli.Status.State = crd.Status.State - // todo: need to check and add other status field. - if intelli.Status.State != "COMPLETE" { - return nil - } - result, err := c.getRecommendedNetworkPolicyResult(crd.Name) - if err != nil { - return fmt.Errorf("error when getting result from ClickHouse: %v", err) - } - intelli.Status.RecommendedNetworkPolicy = result - return nil +func (c *NPRecommendationController) CreateNetworkPolicyRecommendation(namespace string, networkPolicyRecommendation *crdv1alpha1.NetworkPolicyRecommendation) (*crdv1alpha1.NetworkPolicyRecommendation, error) { + return c.crdClient.CrdV1alpha1().NetworkPolicyRecommendations(namespace).Create(context.TODO(), networkPolicyRecommendation, metav1.CreateOptions{}) } diff --git a/pkg/controller/networkpolicyrecommendation/controller_test.go b/pkg/controller/networkpolicyrecommendation/controller_test.go index 9c15115cc..6d7f86242 100644 --- a/pkg/controller/networkpolicyrecommendation/controller_test.go +++ b/pkg/controller/networkpolicyrecommendation/controller_test.go @@ -1,11 +1,23 @@ +// Copyright 2022 Antrea Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package networkpolicyrecommendation import ( - "fmt" "testing" "time" - "github.com/DATA-DOG/go-sqlmock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -13,12 +25,16 @@ import ( "k8s.io/client-go/util/workqueue" crdv1alpha1 "antrea.io/theia/pkg/apis/crd/v1alpha1" - intelligence "antrea.io/theia/pkg/apis/intelligence/v1alpha1" "antrea.io/theia/pkg/client/clientset/versioned/fake" crdinformers "antrea.io/theia/pkg/client/informers/externalversions" ) -const informerDefaultResync = 12 * time.Hour +const ( + informerDefaultResync = 12 * time.Hour + defaultNameSpace = "flow-visibility" + defaultRetryPeriod = 100 * time.Millisecond + defaultMaxRetryTime = 3 * time.Second +) var ( npr1 = &crdv1alpha1.NetworkPolicyRecommendation{ @@ -45,53 +61,10 @@ var ( State: "COMPLETE", }, } - npr3 = &crdv1alpha1.NetworkPolicyRecommendation{ - ObjectMeta: metav1.ObjectMeta{ - Name: "npr3", - Namespace: defaultNameSpace, - }, - Spec: crdv1alpha1.NetworkPolicyRecommendationSpec{ - JobType: "Subsequent", - }, - Status: crdv1alpha1.NetworkPolicyRecommendationStatus{ - State: "COMPLETE", - }, - } - npr1Intelli = &intelligence.NetworkPolicyRecommendation{ - ObjectMeta: metav1.ObjectMeta{ - Name: "npr1", - }, - Type: "Initial", - Status: intelligence.NetworkPolicyRecommendationStatus{ - State: "Pending", - }, - } - npr2Intelli = &intelligence.NetworkPolicyRecommendation{ - ObjectMeta: metav1.ObjectMeta{ - Name: "npr2", - }, - Type: "Initial", - Status: intelligence.NetworkPolicyRecommendationStatus{ - State: "COMPLETE", - RecommendedNetworkPolicy: "RNP-test-npr2", - }, - } - npr3Intelli = &intelligence.NetworkPolicyRecommendation{ - ObjectMeta: metav1.ObjectMeta{ - Name: "npr3", - }, - Type: "Subsequent", - Status: intelligence.NetworkPolicyRecommendationStatus{ - State: "COMPLETE", - RecommendedNetworkPolicy: "RNP-test-npr3", - }, - } ) -func initTestObjects(t *testing.T, stopCh chan struct{}) (*NPRecommendationController, sqlmock.Sqlmock) { - connect, mockConnect, err := sqlmock.New() - assert.NoError(t, err) - fakeClient := fake.NewSimpleClientset(npr1, npr2, npr3) +func initTestObjects(t *testing.T, stopCh chan struct{}) *NPRecommendationController { + fakeClient := fake.NewSimpleClientset(npr1) crdInformerFactory := crdinformers.NewSharedInformerFactory(fakeClient, informerDefaultResync) npRecommendationInformer := crdInformerFactory.Crd().V1alpha1().NetworkPolicyRecommendations() @@ -101,18 +74,17 @@ func initTestObjects(t *testing.T, stopCh chan struct{}) (*NPRecommendationContr npRecommendationInformer: npRecommendationInformer.Informer(), npRecommendationLister: npRecommendationInformer.Lister(), npRecommendationSynced: npRecommendationInformer.Informer().HasSynced, - connect: connect, } crdInformerFactory.Start(stopCh) // Wait until npr propagates to the informer - err = waitPropagationToInformer("npr1", c) + err := waitPropagationToInformer("npr1", c) require.NoError(t, err) - return c, mockConnect + return c } func waitPropagationToInformer(name string, c *NPRecommendationController) error { // Wait until npr propagates to the informer - err := wait.PollImmediate(100*time.Millisecond, 3*time.Second, func() (bool, error) { + err := wait.PollImmediate(defaultRetryPeriod, defaultMaxRetryTime, func() (bool, error) { _, err := c.npRecommendationLister.NetworkPolicyRecommendations(defaultNameSpace).Get(name) if err != nil { return false, nil @@ -125,184 +97,47 @@ func waitPropagationToInformer(name string, c *NPRecommendationController) error func TestGetNetworkPolicyRecommendation(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) - c, mockConnect := initTestObjects(t, stopCh) - - tests := []struct { - name string - nprName string - nprNameSpace string - expectedError error - expectedResult *intelligence.NetworkPolicyRecommendation - }{ - { - name: "NPR not exist case", - nprName: "empty", - expectedError: fmt.Errorf("error when finding NetworkPolicyRecommendations CR"), - expectedResult: nil, - }, - { - name: "Status Complete but no data in ClickHouse", - nprName: "npr3", - expectedError: fmt.Errorf("error when copying NetworkPolicyRecommendations CR"), - expectedResult: nil, - }, - { - name: "Successful case", - nprName: "npr1", - expectedError: nil, - expectedResult: npr1Intelli, - }, - { - name: "Successful case with Status Complete", - nprName: "npr2", - expectedError: nil, - expectedResult: npr2Intelli, - }, - } - rows := mockConnect.NewRows([]string{"RNP"}).AddRow("RNP-test-npr2") - query := "SELECT yamls FROM recommendations WHERE id = (?)" - mockConnect.ExpectQuery(query).WithArgs("npr2").WillReturnRows(rows) - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - intelli, err := c.GetNetworkPolicyRecommendation(tt.nprName) - if tt.expectedError != nil { - assert.Error(t, err) - assert.Contains(t, err.Error(), tt.expectedError.Error()) - } else { - assert.NoError(t, err) - assert.Equal(t, tt.expectedResult, intelli) - } - }) - } + c := initTestObjects(t, stopCh) + npr, err := c.GetNetworkPolicyRecommendation(defaultNameSpace, "npr1") + assert.NoError(t, err) + assert.Equal(t, npr1, npr) } func TestCreateNetworkPolicyRecommendation(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) - c, _ := initTestObjects(t, stopCh) - - nprCreate := &crdv1alpha1.NetworkPolicyRecommendation{ - ObjectMeta: metav1.ObjectMeta{ - Name: "nprCreate", - Namespace: defaultNameSpace, - }, - Spec: crdv1alpha1.NetworkPolicyRecommendationSpec{ - JobType: "initial", - }, - Status: crdv1alpha1.NetworkPolicyRecommendationStatus{ - State: "RUNNING", - }, - } - tests := []struct { - name string - nprName string - CreateNPR *crdv1alpha1.NetworkPolicyRecommendation - expectedError error - expectedResult *crdv1alpha1.NetworkPolicyRecommendation - }{ - { - name: "Successful case", - nprName: "nprCreate", - CreateNPR: nprCreate, - expectedError: nil, - expectedResult: nprCreate, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - _, err := c.CreateNetworkPolicyRecommendation(nprCreate) - assert.NoError(t, err) - err = waitPropagationToInformer("nprCreate", c) - assert.NoError(t, err) - npr, err := c.npRecommendationLister.NetworkPolicyRecommendations(defaultNameSpace).Get(tt.nprName) - assert.NoError(t, err) - assert.Equal(t, tt.expectedResult, npr) - }) - } + c := initTestObjects(t, stopCh) + npr, err := c.CreateNetworkPolicyRecommendation(defaultNameSpace, npr2) + assert.NoError(t, err) + assert.Equal(t, npr2, npr) + err = waitPropagationToInformer("npr2", c) + assert.NoError(t, err) + npr, err = c.npRecommendationLister.NetworkPolicyRecommendations(defaultNameSpace).Get("npr2") + assert.NoError(t, err) + assert.Equal(t, npr2, npr) } func TestDeleteNetworkPolicyRecommendation(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) - c, mockConnect := initTestObjects(t, stopCh) - - tests := []struct { - name string - nprName string - expectedError error - }{ - { - name: "No result in ClickHouse and CRD", - nprName: "empty", - expectedError: fmt.Errorf("networkpolicyrecommendations.crd.theia.antrea.io \"empty\" not found"), - }, - { - name: "Has result in CRD but No result in ClickHouse", - nprName: "npr1", - expectedError: nil, - }, - { - name: "Has result in CRD and ClickHouse", - nprName: "npr2", - expectedError: nil, - }, - } - rows := mockConnect.NewRows([]string{"RNP"}).AddRow("RNP-test") - queryGet := "SELECT yamls FROM recommendations WHERE id = (?)" - mockConnect.ExpectQuery(queryGet).WithArgs("npr2").WillReturnRows(rows) - query := "ALTER TABLE recommendations_local ON CLUSTER '{cluster}' DELETE WHERE id = (?)" - mockConnect.ExpectExec(query).WithArgs("npr2").WillReturnResult(sqlmock.NewResult(1, 1)) - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - err := c.DeleteNetworkPolicyRecommendation(tt.nprName) - if tt.expectedError != nil { - assert.Error(t, err) - assert.Contains(t, err.Error(), tt.expectedError.Error()) - } else { - assert.NoError(t, err) - err = wait.PollImmediate(100*time.Millisecond, 3*time.Second, func() (bool, error) { - _, err := c.npRecommendationLister.NetworkPolicyRecommendations(defaultNameSpace).Get(tt.nprName) - if err != nil { - return true, nil - } - return false, nil - }) - assert.NoError(t, err) - } - }) - } + c := initTestObjects(t, stopCh) + err := c.DeleteNetworkPolicyRecommendation(defaultNameSpace, "npr1") + assert.NoError(t, err) + err = wait.PollImmediate(defaultRetryPeriod, defaultMaxRetryTime, func() (bool, error) { + _, err = c.npRecommendationLister.NetworkPolicyRecommendations(defaultNameSpace).Get("npr1") + if err != nil { + return true, nil + } + return false, nil + }) + assert.NoError(t, err) } func TestListNetworkPolicyRecommendation(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) - c, mockConnect := initTestObjects(t, stopCh) - - tests := []struct { - name string - nprName string - expectedResult *intelligence.NetworkPolicyRecommendationList - }{ - { - name: "Successful case", - expectedResult: &intelligence.NetworkPolicyRecommendationList{ - Items: []intelligence.NetworkPolicyRecommendation{ - *npr1Intelli, *npr2Intelli, *npr3Intelli, - }, - }, - }, - } - rows := mockConnect.NewRows([]string{"RNP"}).AddRow("RNP-test-npr2") - query := "SELECT yamls FROM recommendations WHERE id = (?)" - mockConnect.ExpectQuery(query).WithArgs("npr2").WillReturnRows(rows) - rows = mockConnect.NewRows([]string{"RNP"}).AddRow("RNP-test-npr3") - mockConnect.ExpectQuery(query).WithArgs("npr3").WillReturnRows(rows) - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - list, err := c.ListNetworkPolicyRecommendation() - assert.NoError(t, err) - assert.ElementsMatch(t, tt.expectedResult.Items, list.Items) - }) - } + c := initTestObjects(t, stopCh) + list, err := c.ListNetworkPolicyRecommendation(defaultNameSpace) + assert.NoError(t, err) + assert.ElementsMatch(t, []*crdv1alpha1.NetworkPolicyRecommendation{npr1}, list) } diff --git a/pkg/controller/utils.go b/pkg/controller/utils.go deleted file mode 100644 index 0f58522de..000000000 --- a/pkg/controller/utils.go +++ /dev/null @@ -1,140 +0,0 @@ -// Copyright 2022 Antrea Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package controller - -import ( - "context" - "database/sql" - "fmt" - "time" - - "github.com/ClickHouse/clickhouse-go" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/kubernetes" - - "antrea.io/theia/pkg/theia/commands/config" -) - -func CheckClickHousePod(clientset kubernetes.Interface) error { - // Check the ClickHouse deployment in flow-visibility namespace - pods, err := clientset.CoreV1().Pods(config.FlowVisibilityNS).List(context.TODO(), metav1.ListOptions{ - LabelSelector: "app=clickhouse", - }) - if err != nil { - return fmt.Errorf("error %v when finding the ClickHouse Pod, please check the deployment of the ClickHouse", err) - } - if len(pods.Items) < 1 { - return fmt.Errorf("can't find the ClickHouse Pod, please check the deployment of ClickHouse") - } - hasRunningPod := false - for _, pod := range pods.Items { - if pod.Status.Phase == "Running" { - hasRunningPod = true - break - } - } - if !hasRunningPod { - return fmt.Errorf("can't find a running ClickHouse Pod, please check the deployment of ClickHouse") - } - return nil -} - -func GetServiceAddr(clientset kubernetes.Interface, serviceName string) (string, int, error) { - var serviceIP string - var servicePort int - service, err := clientset.CoreV1().Services(config.FlowVisibilityNS).Get(context.TODO(), serviceName, metav1.GetOptions{}) - if err != nil { - return serviceIP, servicePort, fmt.Errorf("error when finding the Service %s: %v", serviceName, err) - } - serviceIP = service.Spec.ClusterIP - for _, port := range service.Spec.Ports { - if port.Name == "tcp" { - servicePort = int(port.Port) - } - } - if servicePort == 0 { - return serviceIP, servicePort, fmt.Errorf("error when finding the Service %s: %v", serviceName, err) - } - return serviceIP, servicePort, nil -} - -func getClickHouseSecret(clientset kubernetes.Interface) (username []byte, password []byte, err error) { - secret, err := clientset.CoreV1().Secrets(config.FlowVisibilityNS).Get(context.TODO(), "clickhouse-secret", metav1.GetOptions{}) - if err != nil { - return username, password, fmt.Errorf("error %v when finding the ClickHouse secret, please check the deployment of ClickHouse", err) - } - username, ok := secret.Data["username"] - if !ok { - return username, password, fmt.Errorf("error when getting the ClickHouse username") - } - password, ok = secret.Data["password"] - if !ok { - return username, password, fmt.Errorf("error when getting the ClickHouse password") - } - return username, password, nil -} - -func connectClickHouse(clientset kubernetes.Interface, url string) (*sql.DB, error) { - var connect *sql.DB - var connErr error - connRetryInterval := 1 * time.Second - connTimeout := 10 * time.Second - - // Connect to ClickHouse in a loop - if err := wait.PollImmediate(connRetryInterval, connTimeout, func() (bool, error) { - // Open the database and ping it - var err error - connect, err = sql.Open("clickhouse", url) - if err != nil { - connErr = fmt.Errorf("failed to open ClickHouse: %v", err) - return false, nil - } - if err := connect.Ping(); err != nil { - if exception, ok := err.(*clickhouse.Exception); ok { - connErr = fmt.Errorf("failed to ping ClickHouse: %v", exception.Message) - } else { - connErr = fmt.Errorf("failed to ping ClickHouse: %v", err) - } - return false, nil - } else { - return true, nil - } - }); err != nil { - return nil, fmt.Errorf("failed to connect to ClickHouse after %s: %v", connTimeout, connErr) - } - return connect, nil -} - -func SetupClickHouseConnection(clientset kubernetes.Interface) (connect *sql.DB, err error) { - service := "clickhouse-clickhouse" - serviceIP, servicePort, err := GetServiceAddr(clientset, service) - if err != nil { - return nil, fmt.Errorf("error when getting the ClickHouse Service address: %v", err) - } - endpoint := fmt.Sprintf("tcp://%s:%d", serviceIP, servicePort) - - // Connect to ClickHouse and execute query - username, password, err := getClickHouseSecret(clientset) - if err != nil { - return nil, err - } - url := fmt.Sprintf("%s?debug=false&username=%s&password=%s", endpoint, username, password) - connect, err = connectClickHouse(clientset, url) - if err != nil { - return nil, fmt.Errorf("error when connecting to ClickHouse, %v", err) - } - return connect, nil -} diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 856d2877e..bc114d2e1 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -16,12 +16,11 @@ package querier import ( "antrea.io/theia/pkg/apis/crd/v1alpha1" - intelligence "antrea.io/theia/pkg/apis/intelligence/v1alpha1" ) type NPRecommendationQuerier interface { - GetNetworkPolicyRecommendation(name string) (*intelligence.NetworkPolicyRecommendation, error) - CreateNetworkPolicyRecommendation(*v1alpha1.NetworkPolicyRecommendation) (*v1alpha1.NetworkPolicyRecommendation, error) - DeleteNetworkPolicyRecommendation(name string) error - ListNetworkPolicyRecommendation() (*intelligence.NetworkPolicyRecommendationList, error) + GetNetworkPolicyRecommendation(namespace, name string) (*v1alpha1.NetworkPolicyRecommendation, error) + ListNetworkPolicyRecommendation(namespace string) ([]*v1alpha1.NetworkPolicyRecommendation, error) + DeleteNetworkPolicyRecommendation(namespace, name string) error + CreateNetworkPolicyRecommendation(namespace string, networkPolicyRecommendation *v1alpha1.NetworkPolicyRecommendation) (*v1alpha1.NetworkPolicyRecommendation, error) } diff --git a/pkg/theia/commands/policy_recommendation_status.go b/pkg/theia/commands/policy_recommendation_status.go index e8bfc11b1..c57b32228 100644 --- a/pkg/theia/commands/policy_recommendation_status.go +++ b/pkg/theia/commands/policy_recommendation_status.go @@ -15,9 +15,21 @@ package commands import ( + "context" + "encoding/json" "fmt" + "io" + "net/http" + "strings" + "time" "github.com/spf13/cobra" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" + + "antrea.io/theia/pkg/theia/commands/config" + sparkv1 "antrea.io/theia/third_party/sparkoperator/v1beta2" ) // policyRecommendationStatusCmd represents the policy-recommendation status command @@ -35,61 +47,211 @@ $ theia policy-recommendation status e998433e-accb-4888-9fc8-06563f073e86 Use Service ClusterIP when checking the current status of job with ID e998433e-accb-4888-9fc8-06563f073e86 $ theia policy-recommendation status e998433e-accb-4888-9fc8-06563f073e86 --use-cluster-ip `, - RunE: policyRecommendationStatus, + RunE: func(cmd *cobra.Command, args []string) error { + recoID, err := cmd.Flags().GetString("id") + if err != nil { + return err + } + if recoID == "" && len(args) == 1 { + recoID = args[0] + } + err = ParseRecommendationID(recoID) + if err != nil { + return err + } + kubeconfig, err := ResolveKubeConfig(cmd) + if err != nil { + return err + } + clientset, err := CreateK8sClient(kubeconfig) + if err != nil { + return fmt.Errorf("couldn't create k8s client using given kubeconfig, %v", err) + } + endpoint, err := cmd.Flags().GetString("clickhouse-endpoint") + if err != nil { + return err + } + if endpoint != "" { + err = ParseEndpoint(endpoint) + if err != nil { + return err + } + } + useClusterIP, err := cmd.Flags().GetBool("use-cluster-ip") + if err != nil { + return err + } + + err = PolicyRecoPreCheck(clientset) + if err != nil { + return err + } + var state, errorMessage string + // Check the ClickHouse first because completed jobs will store results in ClickHouse + _, err = getPolicyRecommendationResult(clientset, kubeconfig, endpoint, useClusterIP, "", recoID) + if err != nil { + state, err = getPolicyRecommendationStatus(clientset, recoID) + if err != nil { + return err + } + if state == "" { + state = "NEW" + } + if state == "RUNNING" { + var endpoint string + service := fmt.Sprintf("pr-%s-ui-svc", recoID) + if useClusterIP { + serviceIP, servicePort, err := GetServiceAddr(clientset, service) + if err != nil { + klog.V(2).ErrorS(err, "error when getting the progress of the job, cannot get Spark Monitor Service address") + } else { + endpoint = fmt.Sprintf("tcp://%s:%d", serviceIP, servicePort) + } + } else { + servicePort := 4040 + listenAddress := "localhost" + listenPort := 4040 + pf, err := StartPortForward(kubeconfig, service, servicePort, listenAddress, listenPort) + if err != nil { + klog.V(2).ErrorS(err, "error when getting the progress of the job, cannot forward port") + } else { + endpoint = fmt.Sprintf("http://%s:%d", listenAddress, listenPort) + defer pf.Stop() + } + } + // Check the working progress of running recommendation job + if endpoint != "" { + stateProgress, err := getPolicyRecommendationProgress(endpoint) + if err != nil { + klog.V(2).ErrorS(err, "failed to get the progress of the job") + } + state += stateProgress + } + } + errorMessage, err = getPolicyRecommendationErrorMsg(clientset, recoID) + if err != nil { + return err + } + } else { + state = "COMPLETED" + } + fmt.Printf("Status of this policy recommendation job is %s\n", state) + if errorMessage != "" { + fmt.Printf("Error message: %s\n", errorMessage) + } + return nil + }, } -func init() { - policyRecommendationCmd.AddCommand(policyRecommendationStatusCmd) - policyRecommendationStatusCmd.Flags().StringP( - "name", - "", - "", - "Name of the policy recommendation job.", - ) +func getSparkAppByRecommendationID(clientset kubernetes.Interface, id string) (sparkApp sparkv1.SparkApplication, err error) { + err = clientset.CoreV1().RESTClient(). + Get(). + AbsPath("/apis/sparkoperator.k8s.io/v1beta2"). + Namespace(config.FlowVisibilityNS). + Resource("sparkapplications"). + Name("pr-" + id). + Do(context.TODO()). + Into(&sparkApp) + if err != nil { + return sparkApp, err + } + return sparkApp, nil } -func policyRecommendationStatus(cmd *cobra.Command, args []string) error { - prName, err := cmd.Flags().GetString("name") +func getPolicyRecommendationStatus(clientset kubernetes.Interface, id string) (string, error) { + sparkApplication, err := getSparkAppByRecommendationID(clientset, id) if err != nil { - return err + return "", err } - if prName == "" && len(args) == 1 { - prName = args[0] + state := strings.TrimSpace(string(sparkApplication.Status.AppState.State)) + if state == "" { + state = "NEW" } - err = ParseRecommendationName(prName) + return state, nil +} + +func getPolicyRecommendationErrorMsg(clientset kubernetes.Interface, id string) (string, error) { + sparkApplication, err := getSparkAppByRecommendationID(clientset, id) if err != nil { - return err + return "", err } - useClusterIP, err := cmd.Flags().GetBool("use-cluster-ip") + errorMessage := strings.TrimSpace(string(sparkApplication.Status.AppState.ErrorMessage)) + return errorMessage, nil +} + +func getPolicyRecommendationProgress(baseUrl string) (string, error) { + // Get the id of current Spark application + url := fmt.Sprintf("%s/api/v1/applications", baseUrl) + response, err := getResponseFromSparkMonitoringSvc(url) if err != nil { - return err + return "", fmt.Errorf("failed to get response from the Spark Monitoring Service: %v", err) } - theiaClient, pf, err := SetupTheiaClientAndConnection(cmd, useClusterIP) + var getAppsResult []map[string]interface{} + json.Unmarshal([]byte(response), &getAppsResult) + if len(getAppsResult) != 1 { + return "", fmt.Errorf("wrong Spark Application number, expected 1, got %d", len(getAppsResult)) + } + sparkAppID := getAppsResult[0]["id"] + // Check the percentage of completed stages + url = fmt.Sprintf("%s/api/v1/applications/%s/stages", baseUrl, sparkAppID) + response, err = getResponseFromSparkMonitoringSvc(url) if err != nil { - return fmt.Errorf("couldn't setup theia client: %v", err) + return "", fmt.Errorf("failed to get response from the Spark Monitoring Service: %v", err) + } + var getStagesResult []map[string]interface{} + json.Unmarshal([]byte(response), &getStagesResult) + NumStageResult := len(getStagesResult) + if NumStageResult < 1 { + return "", fmt.Errorf("wrong Spark Application stages number, expected at least 1, got %d", NumStageResult) } - if pf != nil { - defer pf.Stop() + completedStages := 0 + for _, stage := range getStagesResult { + if stage["status"] == "COMPLETE" || stage["status"] == "SKIPPED" { + completedStages++ + } } - npr, err := getPolicyRecommendationByName(theiaClient, prName) + return fmt.Sprintf(": %d/%d (%d%%) stages completed", completedStages, NumStageResult, completedStages*100/NumStageResult), nil +} + +func getResponseFromSparkMonitoringSvc(url string) ([]byte, error) { + sparkMonitoringClient := http.Client{} + request, err := http.NewRequest(http.MethodGet, url, nil) if err != nil { - return fmt.Errorf("error when getting policy recommendation job by using job name: %v", err) + return nil, err } - state := npr.Status.State - if state == "RUNNING" { - completedStages := npr.Status.CompletedStages - totalStages := npr.Status.TotalStages - if totalStages < 1 { - return fmt.Errorf("wrong Spark Application stages number, expected at least 1, got %d", totalStages) + var res *http.Response + var getErr error + connRetryInterval := 1 * time.Second + connTimeout := 10 * time.Second + if err := wait.PollImmediate(connRetryInterval, connTimeout, func() (bool, error) { + res, err = sparkMonitoringClient.Do(request) + if err != nil { + getErr = err + return false, nil } - stateProgress := fmt.Sprintf(": %d/%d (%d%%) stages completed", completedStages, totalStages, completedStages*100/totalStages) - state += stateProgress + return true, nil + }); err != nil { + return nil, getErr } - errorMessage := npr.Status.ErrorMsg - - fmt.Printf("Status of this policy recommendation job is %s\n", state) - if errorMessage != "" { - fmt.Printf("Error message: %s\n", errorMessage) + if res == nil { + return nil, fmt.Errorf("response is nil") + } + if res.Body != nil { + defer res.Body.Close() + } + body, readErr := io.ReadAll(res.Body) + if readErr != nil { + return nil, readErr } - return nil + return body, nil +} + +func init() { + policyRecommendationCmd.AddCommand(policyRecommendationStatusCmd) + policyRecommendationStatusCmd.Flags().StringP( + "id", + "i", + "", + "ID of the policy recommendation Spark job.", + ) }