diff --git a/.github/workflows/upload_release_assets.yml b/.github/workflows/upload_release_assets.yml index 19a7ada9f..7689c7ad2 100644 --- a/.github/workflows/upload_release_assets.yml +++ b/.github/workflows/upload_release_assets.yml @@ -29,3 +29,48 @@ jobs: asset_path: ./assets/flow-visibility.yml asset_name: flow-visibility.yml asset_content_type: application/octet-stream + - name: Upload theiactl-darwin-x86_64 + uses: actions/upload-release-asset@v1 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + with: + upload_url: ${{ github.event.release.upload_url }} + asset_path: ./assets/theiactl-darwin-x86_64 + asset_name: theiactl-darwin-x86_64 + asset_content_type: application/octet-stream + - name: Upload theiactl-linux-arm + uses: actions/upload-release-asset@v1 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + with: + upload_url: ${{ github.event.release.upload_url }} + asset_path: ./assets/theiactl-linux-arm + asset_name: theiactl-linux-arm + asset_content_type: application/octet-stream + - name: Upload theiactl-linux-arm64 + uses: actions/upload-release-asset@v1 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + with: + upload_url: ${{ github.event.release.upload_url }} + asset_path: ./assets/theiactl-linux-arm64 + asset_name: theiactl-linux-arm64 + asset_content_type: application/octet-stream + - name: Upload theiactl-linux-x86_64 + uses: actions/upload-release-asset@v1 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + with: + upload_url: ${{ github.event.release.upload_url }} + asset_path: ./assets/theiactl-linux-x86_64 + asset_name: theiactl-linux-x86_64 + asset_content_type: application/octet-stream + - name: Upload theiactl-windows-x86_64.exe + uses: actions/upload-release-asset@v1 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + with: + upload_url: ${{ github.event.release.upload_url }} + asset_path: ./assets/theiactl-windows-x86_64.exe + asset_name: theiactl-windows-x86_64.exe + asset_content_type: application/octet-stream diff --git a/Makefile b/Makefile index 2b35d02ee..5c7a0075f 100644 --- a/Makefile +++ b/Makefile @@ -1,13 +1,14 @@ -SHELL := /bin/bash +SHELL := /bin/bash # go options -GO ?= go -LDFLAGS := -GOFLAGS := -BINDIR ?= $(CURDIR)/bin -GO_FILES := $(shell find . -type d -name '.cache' -prune -o -type f -name '*.go' -print) -GOPATH ?= $$($(GO) env GOPATH) -DOCKER_CACHE := $(CURDIR)/.cache -GO_VERSION := $(shell head -n 1 build/images/deps/go-version) +GO ?= go +LDFLAGS := +GOFLAGS := +BINDIR ?= $(CURDIR)/bin +GO_FILES := $(shell find . -type d -name '.cache' -prune -o -type f -name '*.go' -print) +GOPATH ?= $$($(GO) env GOPATH) +DOCKER_CACHE := $(CURDIR)/.cache +THEIACTL_BINARY_NAME ?= theiactl +GO_VERSION := $(shell head -n 1 build/images/deps/go-version) DOCKER_BUILD_ARGS = --build-arg GO_VERSION=$(GO_VERSION) @@ -168,3 +169,7 @@ theiactl: theiactl-darwin: @mkdir -p $(BINDIR) GOOS=darwin $(GO) build -o $(BINDIR) $(GOFLAGS) -ldflags '$(LDFLAGS)' antrea.io/theia/pkg/theiactl + +.PHONY: theiactl-release +theiactl-release: + @$(GO) build -o $(BINDIR)/$(THEIACTL_BINARY_NAME) $(GOFLAGS) -ldflags '-s -w $(LDFLAGS)' antrea.io/theia/pkg/theiactl diff --git a/hack/release/prepare-assets.sh b/hack/release/prepare-assets.sh index 59ab9da49..4d5da8444 100755 --- a/hack/release/prepare-assets.sh +++ b/hack/release/prepare-assets.sh @@ -40,6 +40,27 @@ pushd $THIS_DIR/../.. > /dev/null mkdir -p "$1" OUTPUT_DIR=$(cd "$1" && pwd) +THEIACTL_BUILDS=( + "linux amd64 linux-x86_64" + "linux arm64 linux-arm64" + "linux arm linux-arm" + "windows amd64 windows-x86_64.exe" + "darwin amd64 darwin-x86_64" +) + +for build in "${THEIACTL_BUILDS[@]}"; do + args=($build) + os="${args[0]}" + arch="${args[1]}" + suffix="${args[2]}" + + # cgo is disabled by default when cross-compiling, but enabled by default + # for native builds. We ensure it is always disabled for portability since + # these binaries will be distributed as release assets. + GOOS=$os GOARCH=$arch CGO_ENABLED=0 THEIACTL_BINARY_NAME="theia-$suffix" BINDIR="$OUTPUT_DIR"/ make theiactl-release +done + + export IMG_TAG=$VERSION export IMG_NAME=projects.registry.vmware.com/antrea/theia-clickhouse-monitor diff --git a/pkg/theiactl/commands/check.go b/pkg/theiactl/commands/check.go index 7b3f402a0..cc511935c 100644 --- a/pkg/theiactl/commands/check.go +++ b/pkg/theiactl/commands/check.go @@ -16,11 +16,17 @@ package commands import ( "context" + "encoding/json" "fmt" + "io/ioutil" + "net/http" + "os/exec" "strings" + "time" "github.com/google/uuid" "github.com/spf13/cobra" + "k8s.io/apimachinery/pkg/util/wait" sparkv1 "antrea.io/theia/third_party/sparkoperator/v1beta2" ) @@ -72,12 +78,92 @@ $ theiactl policyreco check --id e998433e-accb-4888-9fc8-06563f073e86 return err } state := strings.TrimSpace(string(sparkApplication.Status.AppState.State)) + if state == "RUNNING" { + // Check the working progress of running recommendation job + // Forward the policy recommendation service port + portForwardCmd := exec.Command("kubectl", "port-forward", fmt.Sprintf("service/policy-reco-%s-ui-svc", recoID), "-n", flowVisibilityNS, "4040:4040") + if err := portForwardCmd.Start(); err != nil { + return fmt.Errorf("failed to forward port for policy recommendation service, %v", err) + } + defer portForwardCmd.Process.Kill() + stateProgress, err := getPolicyRecommendationProgress() + if err != nil { + return fmt.Errorf("failed to get the status of job with id %s: %v", recoID, err) + } + state += stateProgress + } fmt.Printf("Status of this policy recommendation job is %s\n", state) return nil - // TODO: add implementation of checking work progress through Spark Monitoring Service after port forwarder finished }, } +func getPolicyRecommendationProgress() (string, error) { + // Get the id of current spark application + url := "http://localhost:4040/api/v1/applications" + response, err := getResponseFromSparkMonitoringSvc(url) + if err != nil { + return "", fmt.Errorf("failed to get response from Spark Monitoring service: %v", err) + } + var getAppsResult []map[string]interface{} + json.Unmarshal([]byte(response), &getAppsResult) + if len(getAppsResult) != 1 { + return "", fmt.Errorf("wrong number of Spark Application, expected 1, got %d", len(getAppsResult)) + } + sparkAppID := getAppsResult[0]["id"] + // Check the percentage of completed stages + url = fmt.Sprintf("http://localhost:4040/api/v1/applications/%s/stages", sparkAppID) + response, err = getResponseFromSparkMonitoringSvc(url) + if err != nil { + return "", fmt.Errorf("failed to get response from Spark Monitoring service: %v", err) + } + var getStagesResult []map[string]interface{} + json.Unmarshal([]byte(response), &getStagesResult) + NumStageResult := len(getStagesResult) + if NumStageResult < 1 { + return "", fmt.Errorf("wrong number of Spark Application stages, expected at least 1, got %d", NumStageResult) + } + completedStages := 0 + for _, stage := range getStagesResult { + if stage["status"] == "COMPLETE" || stage["status"] == "SKIPPED" { + completedStages++ + } + } + return fmt.Sprintf(": %d/%d (%d%%) stages completed", completedStages, NumStageResult, completedStages*100/NumStageResult), nil +} + +func getResponseFromSparkMonitoringSvc(url string) ([]byte, error) { + sparkMonitoringClient := http.Client{} + request, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + return nil, err + } + var res *http.Response + var getErr error + connRetryInterval := 1 * time.Second + connTimeout := 10 * time.Second + if err := wait.PollImmediate(connRetryInterval, connTimeout, func() (bool, error) { + res, err = sparkMonitoringClient.Do(request) + if err != nil { + getErr = err + return false, nil + } + return true, nil + }); err != nil { + return nil, getErr + } + if res == nil { + return nil, fmt.Errorf("response is nil") + } + if res.Body != nil { + defer res.Body.Close() + } + body, readErr := ioutil.ReadAll(res.Body) + if readErr != nil { + return nil, readErr + } + return body, nil +} + func init() { policyrecoCmd.AddCommand(checkCmd) checkCmd.Flags().StringP( diff --git a/pkg/theiactl/commands/result.go b/pkg/theiactl/commands/result.go index f79e2d133..7419567fb 100644 --- a/pkg/theiactl/commands/result.go +++ b/pkg/theiactl/commands/result.go @@ -15,10 +15,19 @@ package commands import ( + "context" + "database/sql" "fmt" + "net/url" + "os/exec" + "time" + "github.com/ClickHouse/clickhouse-go" "github.com/google/uuid" "github.com/spf13/cobra" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" ) // resultCmd represents the result command @@ -32,6 +41,7 @@ Get the recommendation result with job ID e998433e-accb-4888-9fc8-06563f073e86 $ theiactl policyreco result --id e998433e-accb-4888-9fc8-06563f073e86 `, RunE: func(cmd *cobra.Command, args []string) error { + // Parse the flags recoID, err := cmd.Flags().GetString("id") if err != nil { return err @@ -40,22 +50,137 @@ $ theiactl policyreco result --id e998433e-accb-4888-9fc8-06563f073e86 if err != nil { return fmt.Errorf("failed to decode input id %s into a UUID, err: %v", recoID, err) } - kubeconfig, err := cmd.Flags().GetString("kubeconfig") if err != nil { return err } + endpoint, err := cmd.Flags().GetString("endpoint") + if err != nil { + return err + } + if endpoint != "" { + _, err := url.ParseRequestURI(endpoint) + if err != nil { + return fmt.Errorf("failed to decode input endpoint %s into a url, err: %v", endpoint, err) + } + } + // Verify Clickhouse is running clientset, err := CreateK8sClient(kubeconfig) if err != nil { - return fmt.Errorf("couldn't create k8s client using given kubeconfig, %v", err) + return fmt.Errorf("couldn't create k8s client using given kubeconfig: %v", err) + } + if err := CheckClickHousePod(clientset); err != nil { + return err + } + + if endpoint == "" { + port, err := getClickHouseServicePort(clientset) + if err != nil { + return err + } + // Forward the ClickHouse service port + // TODO: use Theia port forwarding instead of kubectl port-forward + portForwardCmd := exec.Command("kubectl", "port-forward", "service/clickhouse-clickhouse", "-n", flowVisibilityNS, fmt.Sprintf("%d:%d", port, port)) + if err := portForwardCmd.Start(); err != nil { + return fmt.Errorf("failed to forward port for the ClickHouse Service: %v", err) + } + defer portForwardCmd.Process.Kill() + endpoint = fmt.Sprintf("tcp://localhost:%d", port) + } + + // Connect to ClickHouse and get the result + username, password, err := getClickHouseSecret(clientset) + if err != nil { + return err + } + url := fmt.Sprintf("%s?debug=false&username=%s&password=%s", endpoint, username, password) + connect, err := connectClickHouse(clientset, url) + if err != nil { + return fmt.Errorf("error when connecting to ClickHouse, %v", err) + } + recoResult, err := getResultFromClickHouse(connect, recoID) + if err != nil { + return fmt.Errorf("error when connecting to ClickHouse, %v", err) } - // TODO: setup port forwarder using clientset. - // TODO: get the recommendation result from ClickHouse using recoID - fmt.Printf("recoID: %v, clientset: %v\n", recoID, clientset) + fmt.Print(recoResult) return nil }, } +func getClickHouseServicePort(clientset kubernetes.Interface) (int32, error) { + var servicePort int32 + service, err := clientset.CoreV1().Services(flowVisibilityNS).Get(context.TODO(), "clickhouse-clickhouse", metav1.GetOptions{}) + if err != nil { + return servicePort, fmt.Errorf("error %v when finding the ClickHouse Service, please check the deployment of the ClickHouse", err) + } + for _, port := range service.Spec.Ports { + if port.Name == "tcp" { + servicePort = port.Port + } + } + if servicePort == 0 { + return servicePort, fmt.Errorf("error %v when finding the ClickHouse Service, please check the deployment of the ClickHouse", err) + } + return servicePort, nil +} + +func getClickHouseSecret(clientset kubernetes.Interface) (username []byte, password []byte, err error) { + secret, err := clientset.CoreV1().Secrets(flowVisibilityNS).Get(context.TODO(), "clickhouse-secret", metav1.GetOptions{}) + if err != nil { + return username, password, fmt.Errorf("error %v when finding the ClickHouse secret, please check the deployment of ClickHouse", err) + } + username, ok := secret.Data["username"] + if !ok { + return username, password, fmt.Errorf("error when getting the ClickHouse username") + } + password, ok = secret.Data["password"] + if !ok { + return username, password, fmt.Errorf("error when getting the ClickHouse password") + } + return username, password, nil +} + +func connectClickHouse(clientset kubernetes.Interface, url string) (*sql.DB, error) { + var connect *sql.DB + var connErr error + connRetryInterval := 1 * time.Second + connTimeout := 10 * time.Second + + // Connect to ClickHouse in a loop + if err := wait.PollImmediate(connRetryInterval, connTimeout, func() (bool, error) { + // Open the database and ping it + var err error + connect, err = sql.Open("clickhouse", url) + if err != nil { + connErr = fmt.Errorf("failed to ping ClickHouse: %v", err) + return false, nil + } + if err := connect.Ping(); err != nil { + if exception, ok := err.(*clickhouse.Exception); ok { + connErr = fmt.Errorf("failed to ping ClickHouse: %v", exception.Message) + } else { + connErr = fmt.Errorf("failed to ping ClickHouse: %v", err) + } + return false, nil + } else { + return true, nil + } + }); err != nil { + return nil, fmt.Errorf("failed to connect to ClickHouse after %s: %v", connTimeout, connErr) + } + return connect, nil +} + +func getResultFromClickHouse(connect *sql.DB, id string) (string, error) { + var recoResult string + query := "SELECT yamls FROM recommendations WHERE id = (?);" + err := connect.QueryRow(query, id).Scan(&recoResult) + if err != nil { + return recoResult, fmt.Errorf("failed to get recommendation result with id %s: %v", id, err) + } + return recoResult, nil +} + func init() { policyrecoCmd.AddCommand(resultCmd) resultCmd.Flags().StringP( @@ -64,4 +189,10 @@ func init() { "", "ID of the policy recommendation Spark job", ) + resultCmd.Flags().StringP( + "endpoint", + "e", + "", + "The ClickHouse service endpoint", + ) } diff --git a/pkg/theiactl/commands/result_test.go b/pkg/theiactl/commands/result_test.go new file mode 100644 index 000000000..06aa82e14 --- /dev/null +++ b/pkg/theiactl/commands/result_test.go @@ -0,0 +1,80 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package commands + +import ( + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" +) + +var fakeClientset = fake.NewSimpleClientset( + &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "clickhouse", + Namespace: flowVisibilityNS, + Labels: map[string]string{"app": "clickhouse"}, + }, + }, + &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "clickhouse-clickhouse", + Namespace: flowVisibilityNS, + }, + Spec: v1.ServiceSpec{ + Ports: []v1.ServicePort{{Name: "tcp", Port: 9000}}, + }, + }, + &v1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "clickhouse-secret", + Namespace: flowVisibilityNS, + }, + Data: map[string][]byte{ + "password": []byte("clickhouse_operator_password"), + "username": []byte("clickhouse_operator"), + }, + }, +) + +func TestGetClickHouseServicePort(t *testing.T) { + port, err := getClickHouseServicePort(fakeClientset) + assert.NoError(t, err) + assert.Equal(t, int32(9000), port) +} + +func TestGetClickHouseSecret(t *testing.T) { + username, password, err := getClickHouseSecret(fakeClientset) + assert.NoError(t, err) + assert.Equal(t, "clickhouse_operator", string(username)) + assert.Equal(t, "clickhouse_operator_password", string(password)) +} + +func TestGetResultFromClickHouse(t *testing.T) { + recoID := "db2134ea-7169-46f8-b56d-d643d4751d1d" + expectedResult := "recommend-allow-acnp-kube-system-rpeal" + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + assert.NoError(t, err) + defer db.Close() + resultRow := sqlmock.NewRows([]string{"yamls"}).AddRow(expectedResult) + mock.ExpectQuery("SELECT yamls FROM recommendations WHERE id = (?);").WithArgs(recoID).WillReturnRows(resultRow) + result, err := getResultFromClickHouse(db, recoID) + assert.NoError(t, err) + assert.Equal(t, expectedResult, result) +} diff --git a/pkg/theiactl/commands/utils.go b/pkg/theiactl/commands/utils.go index bbe5b03c9..4bdc90474 100644 --- a/pkg/theiactl/commands/utils.go +++ b/pkg/theiactl/commands/utils.go @@ -23,7 +23,7 @@ import ( "k8s.io/client-go/tools/clientcmd" ) -func CreateK8sClient(kubeconfig string) (*kubernetes.Clientset, error) { +func CreateK8sClient(kubeconfig string) (kubernetes.Interface, error) { var err error config, err := clientcmd.BuildConfigFromFlags("", kubeconfig) if err != nil { @@ -37,7 +37,7 @@ func CreateK8sClient(kubeconfig string) (*kubernetes.Clientset, error) { return clientset, nil } -func PolicyRecoPreCheck(clientset *kubernetes.Clientset) error { +func PolicyRecoPreCheck(clientset kubernetes.Interface) error { // Check the deployment of Spark Operator in flow-visibility ns pods, err := clientset.CoreV1().Pods(flowVisibilityNS).List(context.TODO(), metav1.ListOptions{ LabelSelector: "app.kubernetes.io/instance=policy-reco,app.kubernetes.io/name=spark-operator", @@ -48,7 +48,31 @@ func PolicyRecoPreCheck(clientset *kubernetes.Clientset) error { if len(pods.Items) < 1 { return fmt.Errorf("can't find the policy-reco-spark-operator Pod, please check the deployment of the Spark Operator") } - // TODO: add check the ClickHouse deployment and ping ClickHouse DB after port forwarder finished + CheckClickHousePod(clientset) + return nil +} + +func CheckClickHousePod(clientset kubernetes.Interface) error { + // Check the ClickHouse deployment in flow-visibility namespace + pods, err := clientset.CoreV1().Pods(flowVisibilityNS).List(context.TODO(), metav1.ListOptions{ + LabelSelector: "app=clickhouse", + }) + if err != nil { + return fmt.Errorf("error %v when finding the ClickHouse Pod, please check the deployment of the ClickHouse", err) + } + if len(pods.Items) < 1 { + return fmt.Errorf("can't find the ClickHouse Pod, please check the deployment of ClickHouse") + } + hasRunningPod := false + for _, pod := range pods.Items { + if pod.Status.Phase == "Running" { + hasRunningPod = true + break + } + } + if !hasRunningPod { + return fmt.Errorf("can't find a running ClickHouse Pod, please check the deployment of ClickHouse") + } return nil }