From 28522b47ccea55c720bdb0cd046b0ec0a95c9510 Mon Sep 17 00:00:00 2001 From: Yanjun Zhou Date: Tue, 27 Sep 2022 11:01:12 -0700 Subject: [PATCH] Add unit tests for NPR controller Signed-off-by: Yanjun Zhou --- .../networkpolicyrecommendation/controller.go | 26 +- .../controller_test.go | 352 ++++++++++++++++++ .../networkpolicyrecommendation/util.go | 123 +++--- 3 files changed, 427 insertions(+), 74 deletions(-) create mode 100644 pkg/controller/networkpolicyrecommendation/controller_test.go diff --git a/pkg/controller/networkpolicyrecommendation/controller.go b/pkg/controller/networkpolicyrecommendation/controller.go index 26920f01f..aa8beefca 100644 --- a/pkg/controller/networkpolicyrecommendation/controller.go +++ b/pkg/controller/networkpolicyrecommendation/controller.go @@ -50,8 +50,6 @@ const ( maxRetryDelay = 300 * time.Second // Default number of workers processing an Service change. defaultWorkers = 4 - // For NPR in scheduled or running state, check its status periodically - npRecommendationResyncPeriod = 10 * time.Second // Time format for parsing input time inputTimeFormat = "2006-01-02 15:04:05" // Time format for parsing time from ClickHouse @@ -68,6 +66,16 @@ const ( sparkPortName = "spark-driver-ui-port" ) +var ( + // Spark Application CRUD functions, for unit tests + CreateSparkApplication = createSparkApplication + DeleteSparkApplication = deleteSparkApplication + ListSparkApplication = listSparkApplication + GetSparkApplication = getSparkApplication + // For NPR in scheduled or running state, check its status periodically + npRecommendationResyncPeriod = 10 * time.Second +) + type NPRecommendationController struct { crdClient versioned.Interface kubeClient kubernetes.Interface @@ -303,7 +311,7 @@ func (c *NPRecommendationController) syncNPRecommendation(key apimachinerytypes. func (c *NPRecommendationController) cleanupNPRecommendation(namespace string, sparkApplicationId string) error { // Delete the Spark Application if exists - err := deleteSparkApplication(c.kubeClient, namespace, sparkApplicationId) + err := deleteSparkApplicationIfExists(c.kubeClient, namespace, sparkApplicationId) if err != nil { return fmt.Errorf("failed to delete the SparkApplication %s, error: %v", sparkApplicationId, err) } @@ -339,7 +347,7 @@ func (c *NPRecommendationController) updateResult(npReco *crdv1alpha1.NetworkPol } if npReco.Status.RecommendedNP == nil { - err := deleteSparkApplication(c.kubeClient, npReco.Namespace, npReco.Status.SparkApplication) + err := deleteSparkApplicationIfExists(c.kubeClient, npReco.Namespace, npReco.Status.SparkApplication) if err != nil { return fmt.Errorf("fail to delete Spark Application: %s, error: %v", npReco.Status.SparkApplication, err) } @@ -630,15 +638,7 @@ func (c *NPRecommendationController) startSparkApplication(npReco *crdv1alpha1.N }, }, } - response := &sparkv1.SparkApplication{} - err = c.kubeClient.CoreV1().RESTClient(). - Post(). - AbsPath("/apis/sparkoperator.k8s.io/v1beta2"). - Namespace(npReco.Namespace). - Resource("sparkapplications"). - Body(recommendationApplication). - Do(context.TODO()). - Into(response) + CreateSparkApplication(c.kubeClient, npReco.Namespace, recommendationApplication) if err != nil { return fmt.Errorf("failed to create Spark Application: %v", err) } diff --git a/pkg/controller/networkpolicyrecommendation/controller_test.go b/pkg/controller/networkpolicyrecommendation/controller_test.go new file mode 100644 index 000000000..bdf80eb11 --- /dev/null +++ b/pkg/controller/networkpolicyrecommendation/controller_test.go @@ -0,0 +1,352 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package networkpolicyrecommendation + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "strconv" + "strings" + "sync" + "testing" + "time" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + apimachinerytypes "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/klog/v2" + + crdv1alpha1 "antrea.io/theia/pkg/apis/crd/v1alpha1" + "antrea.io/theia/pkg/client/clientset/versioned" + fakecrd "antrea.io/theia/pkg/client/clientset/versioned/fake" + crdinformers "antrea.io/theia/pkg/client/informers/externalversions" + "antrea.io/theia/third_party/sparkoperator/v1beta2" +) + +const informerDefaultResync = 30 * time.Second + +var ( + testNamespace = "controller-test" +) + +type fakeController struct { + *NPRecommendationController + crdClient versioned.Interface + kubeClient kubernetes.Interface + crdInformerFactory crdinformers.SharedInformerFactory +} + +func newFakeController() *fakeController { + kubeClient := fake.NewSimpleClientset() + setUpClient(kubeClient) + crdClient := fakecrd.NewSimpleClientset() + + crdInformerFactory := crdinformers.NewSharedInformerFactory(crdClient, informerDefaultResync) + npRecommendationInformer := crdInformerFactory.Crd().V1alpha1().NetworkPolicyRecommendations() + recommendedNPInformer := crdInformerFactory.Crd().V1alpha1().RecommendedNetworkPolicies() + + nprController := NewNPRecommendationController(crdClient, kubeClient, npRecommendationInformer, recommendedNPInformer) + + return &fakeController{ + nprController, + crdClient, + kubeClient, + crdInformerFactory, + } +} + +func setUpClient(kubeClient kubernetes.Interface) { + clickHousePod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "clickhouse", + Namespace: testNamespace, + Labels: map[string]string{"app": "clickhouse"}, + }, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + }, + } + kubeClient.CoreV1().Pods(testNamespace).Create(context.TODO(), clickHousePod, metav1.CreateOptions{}) + + sparkOperatorPod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "spark-operator", + Namespace: testNamespace, + Labels: map[string]string{ + "app.kubernetes.io/name": "spark-operator", + }, + }, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + }, + } + kubeClient.CoreV1().Pods(testNamespace).Create(context.TODO(), sparkOperatorPod, metav1.CreateOptions{}) + + clickhouseService := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "clickhouse-clickhouse", + Namespace: testNamespace, + }, + Spec: v1.ServiceSpec{ + Ports: []v1.ServicePort{{Name: "tcp", Port: 9000}}, + ClusterIP: "10.98.208.26", + }, + } + kubeClient.CoreV1().Services(testNamespace).Create(context.TODO(), clickhouseService, metav1.CreateOptions{}) + + clickhouseSecret := &v1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "clickhouse-secret", + Namespace: testNamespace, + }, + Data: map[string][]byte{ + "username": []byte("clickhouse_operator"), + "password": []byte("clickhouse_operator_password"), + }, + } + kubeClient.CoreV1().Secrets(testNamespace).Create(context.TODO(), clickhouseSecret, metav1.CreateOptions{}) +} + +func createFakeSparkApplicationService(kubeClient kubernetes.Interface, id string) error { + testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch strings.TrimSpace(r.URL.Path) { + case "/api/v1/applications": + responses := []map[string]interface{}{ + {"id": id}, + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(responses) + case fmt.Sprintf("/api/v1/applications/%s/stages", id): + responses := []map[string]interface{}{ + {"status": "COMPLETE"}, + {"status": "COMPLETE"}, + {"status": "SKIPPED"}, + {"status": "PENDING"}, + {"status": "ACTIVE"}, + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(responses) + } + })) + + // the url format http://ip:port + urlArray := strings.Split(strings.Split(testServer.URL, "//")[1], ":") + ip := urlArray[0] + port, err := strconv.ParseInt(urlArray[1], 10, 32) + if err != nil { + return err + } + + serviceName := fmt.Sprintf("pr-%s-ui-svc", id) + sparkApplicationService := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: serviceName, + Namespace: testNamespace, + }, + Spec: v1.ServiceSpec{ + Ports: []v1.ServicePort{{Name: sparkPortName, Port: int32(port)}}, + ClusterIP: ip, + }, + } + kubeClient.CoreV1().Services(testNamespace).Create(context.TODO(), sparkApplicationService, metav1.CreateOptions{}) + return nil +} + +// mock Spark Applications +type fakeSparkApplicationClient struct { + sparkApplications map[apimachinerytypes.NamespacedName]*v1beta2.SparkApplication + mapMutex sync.Mutex +} + +func (f *fakeSparkApplicationClient) create(client kubernetes.Interface, namespace string, recommendationApplication *v1beta2.SparkApplication) error { + namespacedName := apimachinerytypes.NamespacedName{ + Namespace: namespace, + Name: recommendationApplication.Name, + } + klog.InfoS("Spark Application created", "name", recommendationApplication.ObjectMeta.Name, "namespace", namespace) + f.mapMutex.Lock() + defer f.mapMutex.Unlock() + f.sparkApplications[namespacedName] = recommendationApplication + return nil +} + +func (f *fakeSparkApplicationClient) delete(client kubernetes.Interface, name, namespace string) { + namespacedName := apimachinerytypes.NamespacedName{ + Namespace: namespace, + Name: name, + } + f.mapMutex.Lock() + defer f.mapMutex.Unlock() + delete(f.sparkApplications, namespacedName) +} + +func (f *fakeSparkApplicationClient) list(client kubernetes.Interface, namespace string) (*v1beta2.SparkApplicationList, error) { + f.mapMutex.Lock() + defer f.mapMutex.Unlock() + list := make([]v1beta2.SparkApplication, len(f.sparkApplications)) + index := 0 + for _, item := range f.sparkApplications { + list[index] = *item + index++ + } + saList := &v1beta2.SparkApplicationList{ + Items: list, + } + return saList, nil +} + +func (f *fakeSparkApplicationClient) get(client kubernetes.Interface, name, namespace string) (sparkApp v1beta2.SparkApplication, err error) { + namespacedName := apimachinerytypes.NamespacedName{ + Namespace: namespace, + Name: name, + } + f.mapMutex.Lock() + defer f.mapMutex.Unlock() + return *f.sparkApplications[namespacedName], nil +} + +func (f *fakeSparkApplicationClient) step(name, namespace string) { + namespacedName := apimachinerytypes.NamespacedName{ + Namespace: namespace, + Name: name, + } + f.mapMutex.Lock() + defer f.mapMutex.Unlock() + sa, ok := f.sparkApplications[namespacedName] + if !ok { + klog.InfoS("Spark Application not created yet", "name", name, "namespace", namespace) + return + } + switch sa.Status.AppState.State { + case v1beta2.NewState: + klog.InfoS("Spark Application setting from new to running") + sa.Status.AppState.State = v1beta2.RunningState + case v1beta2.RunningState: + klog.InfoS("Spark Application setting from running to completed") + sa.Status.AppState.State = v1beta2.CompletedState + } +} + +func TestNPRecommendation(t *testing.T) { + fakeSAClient := fakeSparkApplicationClient{ + sparkApplications: make(map[apimachinerytypes.NamespacedName]*v1beta2.SparkApplication), + } + CreateSparkApplication = fakeSAClient.create + DeleteSparkApplication = fakeSAClient.delete + ListSparkApplication = fakeSAClient.list + GetSparkApplication = fakeSAClient.get + + // Use a shorter resync period + npRecommendationResyncPeriod = 1 * time.Second + + nprController := newFakeController() + stopCh := make(chan struct{}) + + nprController.crdInformerFactory.Start(stopCh) + nprController.crdInformerFactory.WaitForCacheSync(stopCh) + + go nprController.Run(stopCh) + + npr := &crdv1alpha1.NetworkPolicyRecommendation{ + ObjectMeta: metav1.ObjectMeta{Name: "npr", Namespace: testNamespace}, + Spec: crdv1alpha1.NetworkPolicyRecommendationSpec{ + JobType: "initial", + PolicyType: "anp-deny-applied", + ExecutorInstances: 1, + DriverCoreRequest: "200m", + DriverMemory: "512M", + ExecutorCoreRequest: "200m", + ExecutorMemory: "512M", + ExcludeLabels: true, + ToServices: true, + StartInterval: metav1.NewTime(time.Now()), + EndInterval: metav1.NewTime(time.Now().Add(time.Second * 10)), + NSAllowList: []string{"kube-system", "flow-visibility"}, + }, + Status: crdv1alpha1.NetworkPolicyRecommendationStatus{}, + } + + npr, err := nprController.CreateNetworkPolicyRecommendation(testNamespace, npr) + assert.NoError(t, err) + + serviceCreated := false + // The step interval should be larger than resync period to ensure the progress is updated + stepInterval := 2 * time.Second + timeout := 30 * time.Second + wait.PollImmediate(stepInterval, timeout, func() (done bool, err error) { + npr, err = nprController.GetNetworkPolicyRecommendation(testNamespace, "npr") + if err != nil { + return false, nil + } + // Mocking ClickHouse results and Spark Monitor service requires + // the SparkApplication id. + if !serviceCreated { + // Mock ClickHouse database + openSql = func(driverName, dataSourceName string) (*sql.DB, error) { + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual), sqlmock.MonitorPingsOption(true)) + if err != nil { + return db, err + } + mock.ExpectPing() + recommendationRow := sqlmock.NewRows([]string{"type", "timeCreated", "yamls"}).AddRow("initial", "2022-10-01T12:30:10Z", "recommendations") + mock.ExpectQuery("SELECT type, timeCreated, yamls FROM recommendations WHERE id = (?);").WithArgs(npr.Status.SparkApplication).WillReturnRows(recommendationRow) + return db, err + } + // Create Spark Monitor service + err = createFakeSparkApplicationService(nprController.kubeClient, npr.Status.SparkApplication) + assert.NoError(t, err) + serviceCreated = true + } + if npr != nil { + fakeSAClient.step("pr-"+npr.Status.SparkApplication, testNamespace) + } + return !(npr.Status.RecommendedNP == nil), nil + }) + + assert.Equal(t, npr.Status.State, crdv1alpha1.NPRecommendationStateCompleted) + assert.Equal(t, npr.Status.CompletedStages, 3) + assert.Equal(t, npr.Status.TotalStages, 5) + assert.True(t, npr.Status.StartTime.Before(&npr.Status.EndTime)) + assert.Equal(t, npr.Status.RecommendedNP.Spec.Id, npr.Status.SparkApplication) + assert.Equal(t, npr.Status.RecommendedNP.Spec.Type, "initial") + expectedTimeCreated, err := time.Parse(clickHouseTimeFormat, "2022-10-01T12:30:10Z") + assert.NoError(t, err) + assert.Equal(t, npr.Status.RecommendedNP.Spec.TimeCreated, metav1.NewTime(expectedTimeCreated)) + assert.Equal(t, npr.Status.RecommendedNP.Spec.Yamls, "recommendations") + + openSql = func(driverName, dataSourceName string) (*sql.DB, error) { + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual), sqlmock.MonitorPingsOption(true)) + if err != nil { + return db, err + } + mock.ExpectPing() + mock.ExpectExec("ALTER TABLE recommendations_local ON CLUSTER '{cluster}' DELETE WHERE id = (?);").WithArgs(npr.Status.SparkApplication).WillReturnResult(sqlmock.NewResult(0, 1)) + return db, err + } + err = nprController.DeleteNetworkPolicyRecommendation(testNamespace, "npr") + assert.NoError(t, err) + +} diff --git a/pkg/controller/networkpolicyrecommendation/util.go b/pkg/controller/networkpolicyrecommendation/util.go index ecfcbeaff..10434d5d7 100644 --- a/pkg/controller/networkpolicyrecommendation/util.go +++ b/pkg/controller/networkpolicyrecommendation/util.go @@ -33,52 +33,33 @@ import ( sparkv1 "antrea.io/theia/third_party/sparkoperator/v1beta2" ) +var openSql = sql.Open + func constStrToPointer(constStr string) *string { return &constStr } func validateCluster(client kubernetes.Interface, namespace string) error { - err := checkSparkOperatorPod(client, namespace) + err := checkPodByLabel(client, namespace, "app=clickhouse") if err != nil { - return err + return fmt.Errorf("failed to find the ClickHouse Pod, please check the deployment, error: %v", err) } - return checkClickHousePod(client, namespace) -} - -func checkClickHousePod(client kubernetes.Interface, namespace string) error { - // Check the ClickHouse deployment in flow-visibility namespace - pods, err := client.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{ - LabelSelector: "app=clickhouse", - }) + err = checkPodByLabel(client, namespace, "app.kubernetes.io/name=spark-operator") if err != nil { - return fmt.Errorf("error %v when finding the ClickHouse Pod, please check the deployment of the ClickHouse", err) - } - if len(pods.Items) < 1 { - return fmt.Errorf("can't find the ClickHouse Pod, please check the deployment of ClickHouse") - } - hasRunningPod := false - for _, pod := range pods.Items { - if pod.Status.Phase == "Running" { - hasRunningPod = true - break - } - } - if !hasRunningPod { - return fmt.Errorf("can't find a running ClickHouse Pod, please check the deployment of ClickHouse") + return fmt.Errorf("failed to find the Spark Operator Pod, please check the deployment, error: %v", err) } return nil } -func checkSparkOperatorPod(client kubernetes.Interface, namespace string) error { - // Check the deployment of Spark Operator in flow-visibility ns +func checkPodByLabel(client kubernetes.Interface, namespace string, label string) error { pods, err := client.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{ - LabelSelector: "app.kubernetes.io/name=spark-operator", + LabelSelector: label, }) if err != nil { - return fmt.Errorf("error %v when finding the policy-recommendation-spark-operator Pod, please check the deployment of the Spark Operator", err) + return fmt.Errorf("error %v when finding the Pod", err) } if len(pods.Items) < 1 { - return fmt.Errorf("can't find the policy-recommendation-spark-operator Pod, please check the deployment of the Spark Operator") + return fmt.Errorf("expected at least 1 pod, but found %d", len(pods.Items)) } hasRunningPod := false for _, pod := range pods.Items { @@ -88,28 +69,13 @@ func checkSparkOperatorPod(client kubernetes.Interface, namespace string) error } } if !hasRunningPod { - return fmt.Errorf("can't find a running Spark Operator Pod, please check the deployment of Spark") + return fmt.Errorf("can't find a running Pod") } return nil } -func getSparkAppByRecommendationID(client kubernetes.Interface, id string, namespace string) (sparkApp sparkv1.SparkApplication, err error) { - err = client.CoreV1().RESTClient(). - Get(). - AbsPath("/apis/sparkoperator.k8s.io/v1beta2"). - Namespace(namespace). - Resource("sparkapplications"). - Name("pr-" + id). - Do(context.TODO()). - Into(&sparkApp) - if err != nil { - return sparkApp, err - } - return sparkApp, nil -} - func getPolicyRecommendationStatus(client kubernetes.Interface, id string, namespace string) (state string, errorMessage string, err error) { - sparkApplication, err := getSparkAppByRecommendationID(client, id, namespace) + sparkApplication, err := GetSparkApplication(client, "pr-"+id, namespace) if err != nil { return state, errorMessage, err } @@ -133,7 +99,7 @@ func getServiceAddr(client kubernetes.Interface, serviceName, serviceNamespace, } } if servicePort == 0 { - return serviceIP, servicePort, fmt.Errorf("error when finding the Service %s: no tcp service port", serviceName) + return serviceIP, servicePort, fmt.Errorf("error when finding the Service %s: no %s service port", serviceName, servicePortName) } return serviceIP, servicePort, nil } @@ -247,7 +213,7 @@ func connectClickHouse(url string) (*sql.DB, error) { if err := wait.PollImmediate(connRetryInterval, connTimeout, func() (bool, error) { // Open the database and ping it var err error - connect, err = sql.Open("clickhouse", url) + connect, err = openSql("clickhouse", url) if err != nil { connErr = fmt.Errorf("failed to open ClickHouse: %v", err) return false, nil @@ -300,13 +266,8 @@ func getPolicyRecommendationResult(client kubernetes.Interface, id string, names return recommendedNetworkPolicy, nil } -func deleteSparkApplication(client kubernetes.Interface, namespace string, id string) error { - sparkApplicationList := &sparkv1.SparkApplicationList{} - err := client.CoreV1().RESTClient().Get(). - AbsPath("/apis/sparkoperator.k8s.io/v1beta2"). - Namespace(namespace). - Resource("sparkapplications"). - Do(context.TODO()).Into(sparkApplicationList) +func deleteSparkApplicationIfExists(client kubernetes.Interface, namespace string, id string) error { + sparkApplicationList, err := ListSparkApplication(client, namespace) if err != nil { return err } @@ -318,12 +279,7 @@ func deleteSparkApplication(client kubernetes.Interface, namespace string, id st } } if existed { - client.CoreV1().RESTClient().Delete(). - AbsPath("/apis/sparkoperator.k8s.io/v1beta2"). - Namespace(namespace). - Resource("sparkapplications"). - Name("pr-" + id). - Do(context.TODO()) + DeleteSparkApplication(client, namespace, "pr-"+id) } return nil } @@ -340,3 +296,48 @@ func deletePolicyRecommendationResult(client kubernetes.Interface, namespace str } return nil } + +func getSparkApplication(client kubernetes.Interface, name string, namespace string) (sparkApp sparkv1.SparkApplication, err error) { + err = client.CoreV1().RESTClient().Get(). + AbsPath("/apis/sparkoperator.k8s.io/v1beta2"). + Namespace(namespace). + Resource("sparkapplications"). + Name(name). + Do(context.TODO()). + Into(&sparkApp) + if err != nil { + return sparkApp, err + } + return sparkApp, nil +} + +func listSparkApplication(client kubernetes.Interface, namespace string) (*sparkv1.SparkApplicationList, error) { + sparkApplicationList := &sparkv1.SparkApplicationList{} + err := client.CoreV1().RESTClient().Get(). + AbsPath("/apis/sparkoperator.k8s.io/v1beta2"). + Namespace(namespace). + Resource("sparkapplications"). + Do(context.TODO()).Into(sparkApplicationList) + return sparkApplicationList, err +} + +func deleteSparkApplication(client kubernetes.Interface, name string, namespace string) { + client.CoreV1().RESTClient().Delete(). + AbsPath("/apis/sparkoperator.k8s.io/v1beta2"). + Namespace(namespace). + Resource("sparkapplications"). + Name(name). + Do(context.TODO()) +} + +func createSparkApplication(client kubernetes.Interface, namespace string, recommendationApplication *sparkv1.SparkApplication) error { + response := &sparkv1.SparkApplication{} + return client.CoreV1().RESTClient(). + Post(). + AbsPath("/apis/sparkoperator.k8s.io/v1beta2"). + Namespace(namespace). + Resource("sparkapplications"). + Body(recommendationApplication). + Do(context.TODO()). + Into(response) +}