diff --git a/.github/workflows/upload_release_assets.yml b/.github/workflows/upload_release_assets.yml index 19a7ada9f..21471c5aa 100644 --- a/.github/workflows/upload_release_assets.yml +++ b/.github/workflows/upload_release_assets.yml @@ -29,3 +29,48 @@ jobs: asset_path: ./assets/flow-visibility.yml asset_name: flow-visibility.yml asset_content_type: application/octet-stream + - name: Upload theia-darwin-x86_64 + uses: actions/upload-release-asset@v1 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + with: + upload_url: ${{ github.event.release.upload_url }} + asset_path: ./assets/theia-darwin-x86_64 + asset_name: theia-darwin-x86_64 + asset_content_type: application/octet-stream + - name: Upload theia-linux-arm + uses: actions/upload-release-asset@v1 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + with: + upload_url: ${{ github.event.release.upload_url }} + asset_path: ./assets/theia-linux-arm + asset_name: theia-linux-arm + asset_content_type: application/octet-stream + - name: Upload theia-linux-arm64 + uses: actions/upload-release-asset@v1 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + with: + upload_url: ${{ github.event.release.upload_url }} + asset_path: ./assets/theia-linux-arm64 + asset_name: theia-linux-arm64 + asset_content_type: application/octet-stream + - name: Upload theia-linux-x86_64 + uses: actions/upload-release-asset@v1 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + with: + upload_url: ${{ github.event.release.upload_url }} + asset_path: ./assets/theia-linux-x86_64 + asset_name: theia-linux-x86_64 + asset_content_type: application/octet-stream + - name: Upload theia-windows-x86_64.exe + uses: actions/upload-release-asset@v1 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + with: + upload_url: ${{ github.event.release.upload_url }} + asset_path: ./assets/theia-windows-x86_64.exe + asset_name: theia-windows-x86_64.exe + asset_content_type: application/octet-stream diff --git a/Makefile b/Makefile index b47244ec8..384d02e1f 100644 --- a/Makefile +++ b/Makefile @@ -1,13 +1,14 @@ -SHELL := /bin/bash +SHELL := /bin/bash # go options -GO ?= go -LDFLAGS := -GOFLAGS := -BINDIR ?= $(CURDIR)/bin -GO_FILES := $(shell find . -type d -name '.cache' -prune -o -type f -name '*.go' -print) -GOPATH ?= $$($(GO) env GOPATH) -DOCKER_CACHE := $(CURDIR)/.cache -GO_VERSION := $(shell head -n 1 build/images/deps/go-version) +GO ?= go +LDFLAGS := +GOFLAGS := +BINDIR ?= $(CURDIR)/bin +GO_FILES := $(shell find . -type d -name '.cache' -prune -o -type f -name '*.go' -print) +GOPATH ?= $$($(GO) env GOPATH) +DOCKER_CACHE := $(CURDIR)/.cache +THEIA_BINARY_NAME ?= theia +GO_VERSION := $(shell head -n 1 build/images/deps/go-version) DOCKER_BUILD_ARGS = --build-arg GO_VERSION=$(GO_VERSION) @@ -165,3 +166,19 @@ policy-recommendation: docker tag antrea/theia-policy-recommendation:$(DOCKER_IMG_VERSION) antrea/theia-policy-recommendation docker tag antrea/theia-policy-recommendation:$(DOCKER_IMG_VERSION) projects.registry.vmware.com/antrea/theia-policy-recommendation docker tag antrea/theia-policy-recommendation:$(DOCKER_IMG_VERSION) projects.registry.vmware.com/antrea/theia-policy-recommendation:$(DOCKER_IMG_VERSION) + +THEIA_BINARIES := theia-darwin theia-linux theia-windows +$(THEIA_BINARIES): theia-%: + @GOOS=$* $(GO) build -o $(BINDIR)/$@ $(GOFLAGS) -ldflags '$(LDFLAGS)' antrea.io/theia/pkg/theia + @if [[ $@ != *windows ]]; then \ + chmod 0755 $(BINDIR)/$@; \ + else \ + mv $(BINDIR)/$@ $(BINDIR)/$@.exe; \ + fi + +.PHONY: theia +theia: $(THEIA_BINARIES) + +.PHONY: theia-release +theia-release: + @$(GO) build -o $(BINDIR)/$(THEIA_BINARY_NAME) $(GOFLAGS) -ldflags '-s -w $(LDFLAGS)' antrea.io/theia/pkg/theia diff --git a/go.mod b/go.mod index 21f4f033d..f2612e0dd 100644 --- a/go.mod +++ b/go.mod @@ -7,8 +7,10 @@ require ( github.com/ClickHouse/clickhouse-go v1.5.4 github.com/DATA-DOG/go-sqlmock v1.5.0 github.com/containernetworking/plugins v0.8.7 + github.com/google/uuid v1.1.2 github.com/kevinburke/ssh_config v0.0.0-20190725054713-01f96b0aa0cd github.com/sirupsen/logrus v1.8.1 + github.com/spf13/cobra v1.4.0 github.com/stretchr/testify v1.7.0 github.com/vmware/go-ipfix v0.5.12 golang.org/x/crypto v0.0.0-20210503195802-e9a32991a82e @@ -60,7 +62,6 @@ require ( github.com/golang/protobuf v1.5.2 // indirect github.com/google/go-cmp v0.5.5 // indirect github.com/google/gofuzz v1.1.0 // indirect - github.com/google/uuid v1.1.2 // indirect github.com/googleapis/gnostic v0.5.5 // indirect github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect github.com/hashicorp/golang-lru v0.5.4 // indirect @@ -88,7 +89,6 @@ require ( github.com/safchain/ethtool v0.0.0-20190326074333-42ed695e3de8 // indirect github.com/satori/go.uuid v1.2.0 // indirect github.com/spf13/afero v1.6.0 // indirect - github.com/spf13/cobra v1.4.0 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/streamrail/concurrent-map v0.0.0-20160823150647-8bf1e9bacbf6 // indirect github.com/vishvananda/netlink v1.1.1-0.20210510164352-d17758a128bf // indirect diff --git a/hack/release/prepare-assets.sh b/hack/release/prepare-assets.sh index 59ab9da49..6236984aa 100755 --- a/hack/release/prepare-assets.sh +++ b/hack/release/prepare-assets.sh @@ -40,6 +40,27 @@ pushd $THIS_DIR/../.. > /dev/null mkdir -p "$1" OUTPUT_DIR=$(cd "$1" && pwd) +THEIA_BUILDS=( + "linux amd64 linux-x86_64" + "linux arm64 linux-arm64" + "linux arm linux-arm" + "windows amd64 windows-x86_64.exe" + "darwin amd64 darwin-x86_64" +) + +for build in "${THEIA_BUILDS[@]}"; do + args=($build) + os="${args[0]}" + arch="${args[1]}" + suffix="${args[2]}" + + # cgo is disabled by default when cross-compiling, but enabled by default + # for native builds. We ensure it is always disabled for portability since + # these binaries will be distributed as release assets. + GOOS=$os GOARCH=$arch CGO_ENABLED=0 THEIA_BINARY_NAME="theia-$suffix" BINDIR="$OUTPUT_DIR"/ make theia-release +done + + export IMG_TAG=$VERSION export IMG_NAME=projects.registry.vmware.com/antrea/theia-clickhouse-monitor diff --git a/pkg/theia/commands/policy_recommendation.go b/pkg/theia/commands/policy_recommendation.go new file mode 100644 index 000000000..f89658f3b --- /dev/null +++ b/pkg/theia/commands/policy_recommendation.go @@ -0,0 +1,43 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package commands + +import ( + "fmt" + + "github.com/spf13/cobra" +) + +// policyRecommendationCmd represents the policy recommendation command group +var policyRecommendationCmd = &cobra.Command{ + Use: "policy-recommendation", + Aliases: []string{"pr"}, + Short: "Commands of Theia policy recommendation feature", + Long: `Command group of Theia policy recommendation feature. +Must specify a subcommand like run, status or retrieve.`, + Run: func(cmd *cobra.Command, args []string) { + fmt.Println("Error: must also specify a subcommand like run, status or retrieve") + }, +} + +func init() { + rootCmd.AddCommand(policyRecommendationCmd) + rootCmd.PersistentFlags().StringP( + "kubeconfig", + "k", + "", + "absolute path to the k8s config file, will use $KUBECONFIG if not specified", + ) +} diff --git a/pkg/theia/commands/policy_recommendation_retrieve.go b/pkg/theia/commands/policy_recommendation_retrieve.go new file mode 100644 index 000000000..587fe6b85 --- /dev/null +++ b/pkg/theia/commands/policy_recommendation_retrieve.go @@ -0,0 +1,241 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package commands + +import ( + "context" + "database/sql" + "fmt" + "io/ioutil" + "net/url" + "time" + + "github.com/ClickHouse/clickhouse-go" + "github.com/google/uuid" + "github.com/spf13/cobra" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" +) + +// policyRecommendationRetrieveCmd represents the policy-recommendation retrieve command +var policyRecommendationRetrieveCmd = &cobra.Command{ + Use: "retrieve", + Short: "Get the recommendation result of a policy recommendation Spark job", + Long: `Get the recommendation result of a policy recommendation Spark job by ID. +It will return the recommended NetworkPolicies described in yaml.`, + Args: cobra.RangeArgs(0, 1), + Example: ` +Get the recommendation result with job ID e998433e-accb-4888-9fc8-06563f073e86 +$ theia policy-recommendation retrieve --id e998433e-accb-4888-9fc8-06563f073e86 +Or +$ theia policy-recommendation retrieve e998433e-accb-4888-9fc8-06563f073e86 +Use a customized ClickHouse endpoint when connecting to ClickHouse to getting the result +$ theia policy-recommendation retrieve e998433e-accb-4888-9fc8-06563f073e86 --clickhouse-endpoint 10.10.1.1 +Use Service ClusterIP when connecting to ClickHouse to getting the result +$ theia policy-recommendation retrieve e998433e-accb-4888-9fc8-06563f073e86 --use-cluster-ip +Save the recommendation result to file +$ theia policy-recommendation retrieve e998433e-accb-4888-9fc8-06563f073e86 --use-cluster-ip --file output.yaml +`, + RunE: func(cmd *cobra.Command, args []string) error { + // Parse the flags + recoID, err := cmd.Flags().GetString("id") + if err != nil { + return err + } + if recoID == "" && len(args) == 1 { + recoID = args[0] + } + _, err = uuid.Parse(recoID) + if err != nil { + return fmt.Errorf("failed to decode input id %s into a UUID, err: %v", recoID, err) + } + kubeconfig, err := ResolveKubeConfig(cmd) + if err != nil { + return err + } + endpoint, err := cmd.Flags().GetString("clickhouse-endpoint") + if err != nil { + return err + } + if endpoint != "" { + _, err := url.ParseRequestURI(endpoint) + if err != nil { + return fmt.Errorf("failed to decode input endpoint %s into a url, err: %v", endpoint, err) + } + } + useClusterIP, err := cmd.Flags().GetBool("use-cluster-ip") + if err != nil { + return err + } + filePath, err := cmd.Flags().GetString("file") + if err != nil { + return err + } + + // Verify Clickhouse is running + clientset, err := CreateK8sClient(kubeconfig) + if err != nil { + return fmt.Errorf("couldn't create k8s client using given kubeconfig: %v", err) + } + if err := CheckClickHousePod(clientset); err != nil { + return err + } + + recoResult, err := getPolicyRecommendationResult(clientset, kubeconfig, endpoint, useClusterIP, filePath, recoID) + if err != nil { + return err + } else { + if recoResult != "" { + fmt.Print(recoResult) + } + } + return nil + }, +} + +func getPolicyRecommendationResult(clientset kubernetes.Interface, kubeconfig string, endpoint string, useClusterIP bool, filePath string, recoID string) (recoResult string, err error) { + if endpoint == "" { + service := "clickhouse-clickhouse" + if useClusterIP { + serviceIP, servicePort, err := GetServiceAddr(clientset, service) + if err != nil { + return "", fmt.Errorf("error when getting the ClickHouse Service address: %v", err) + } + endpoint = fmt.Sprintf("tcp://%s:%d", serviceIP, servicePort) + } else { + listenAddress := "localhost" + listenPort := 9000 + _, servicePort, err := GetServiceAddr(clientset, service) + if err != nil { + return "", fmt.Errorf("error when getting the ClickHouse Service port: %v", err) + } + // Forward the ClickHouse service port + pf, err := StartPortForward(kubeconfig, service, servicePort, listenAddress, listenPort) + if err != nil { + return "", fmt.Errorf("error when forwarding port: %v", err) + } + defer pf.Stop() + endpoint = fmt.Sprintf("tcp://%s:%d", listenAddress, listenPort) + } + } + + // Connect to ClickHouse and get the result + username, password, err := getClickHouseSecret(clientset) + if err != nil { + return "", err + } + url := fmt.Sprintf("%s?debug=false&username=%s&password=%s", endpoint, username, password) + connect, err := connectClickHouse(clientset, url) + if err != nil { + return "", fmt.Errorf("error when connecting to ClickHouse, %v", err) + } + recoResult, err = getResultFromClickHouse(connect, recoID) + if err != nil { + return "", fmt.Errorf("error when connecting to ClickHouse, %v", err) + } + if filePath != "" { + if err := ioutil.WriteFile(filePath, []byte(recoResult), 0600); err != nil { + return "", fmt.Errorf("error when writing recommendation result to file: %v", err) + } + } else { + return recoResult, nil + } + return "", nil +} + +func getClickHouseSecret(clientset kubernetes.Interface) (username []byte, password []byte, err error) { + secret, err := clientset.CoreV1().Secrets(flowVisibilityNS).Get(context.TODO(), "clickhouse-secret", metav1.GetOptions{}) + if err != nil { + return username, password, fmt.Errorf("error %v when finding the ClickHouse secret, please check the deployment of ClickHouse", err) + } + username, ok := secret.Data["username"] + if !ok { + return username, password, fmt.Errorf("error when getting the ClickHouse username") + } + password, ok = secret.Data["password"] + if !ok { + return username, password, fmt.Errorf("error when getting the ClickHouse password") + } + return username, password, nil +} + +func connectClickHouse(clientset kubernetes.Interface, url string) (*sql.DB, error) { + var connect *sql.DB + var connErr error + connRetryInterval := 1 * time.Second + connTimeout := 10 * time.Second + + // Connect to ClickHouse in a loop + if err := wait.PollImmediate(connRetryInterval, connTimeout, func() (bool, error) { + // Open the database and ping it + var err error + connect, err = sql.Open("clickhouse", url) + if err != nil { + connErr = fmt.Errorf("failed to ping ClickHouse: %v", err) + return false, nil + } + if err := connect.Ping(); err != nil { + if exception, ok := err.(*clickhouse.Exception); ok { + connErr = fmt.Errorf("failed to ping ClickHouse: %v", exception.Message) + } else { + connErr = fmt.Errorf("failed to ping ClickHouse: %v", err) + } + return false, nil + } else { + return true, nil + } + }); err != nil { + return nil, fmt.Errorf("failed to connect to ClickHouse after %s: %v", connTimeout, connErr) + } + return connect, nil +} + +func getResultFromClickHouse(connect *sql.DB, id string) (string, error) { + var recoResult string + query := "SELECT yamls FROM recommendations WHERE id = (?);" + err := connect.QueryRow(query, id).Scan(&recoResult) + if err != nil { + return recoResult, fmt.Errorf("failed to get recommendation result with id %s: %v", id, err) + } + return recoResult, nil +} + +func init() { + policyRecommendationCmd.AddCommand(policyRecommendationRetrieveCmd) + policyRecommendationRetrieveCmd.Flags().StringP( + "id", + "i", + "", + "ID of the policy recommendation Spark job.", + ) + policyRecommendationRetrieveCmd.Flags().String( + "clickhouse-endpoint", + "", + "The ClickHouse service endpoint.", + ) + policyRecommendationRetrieveCmd.Flags().Bool( + "use-cluster-ip", + false, + `Enable this option will use Service ClusterIP instead of port forwarding when connecting to the ClickHouse service. +It can only be used when running theia in cluster.`, + ) + policyRecommendationRetrieveCmd.Flags().StringP( + "file", + "f", + "", + "The file path where you want to save the result.", + ) +} diff --git a/pkg/theia/commands/policy_recommendation_retrieve_test.go b/pkg/theia/commands/policy_recommendation_retrieve_test.go new file mode 100644 index 000000000..59c7443a9 --- /dev/null +++ b/pkg/theia/commands/policy_recommendation_retrieve_test.go @@ -0,0 +1,57 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package commands + +import ( + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" +) + +func TestGetClickHouseSecret(t *testing.T) { + var fakeClientset = fake.NewSimpleClientset( + &v1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "clickhouse-secret", + Namespace: flowVisibilityNS, + }, + Data: map[string][]byte{ + "password": []byte("clickhouse_operator_password"), + "username": []byte("clickhouse_operator"), + }, + }, + ) + username, password, err := getClickHouseSecret(fakeClientset) + assert.NoError(t, err) + assert.Equal(t, "clickhouse_operator", string(username)) + assert.Equal(t, "clickhouse_operator_password", string(password)) +} + +func TestGetResultFromClickHouse(t *testing.T) { + recoID := "db2134ea-7169-46f8-b56d-d643d4751d1d" + expectedResult := "recommend-allow-acnp-kube-system-rpeal" + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + assert.NoError(t, err) + defer db.Close() + resultRow := sqlmock.NewRows([]string{"yamls"}).AddRow(expectedResult) + mock.ExpectQuery("SELECT yamls FROM recommendations WHERE id = (?);").WithArgs(recoID).WillReturnRows(resultRow) + result, err := getResultFromClickHouse(db, recoID) + assert.NoError(t, err) + assert.Equal(t, expectedResult, result) +} diff --git a/pkg/theia/commands/policy_recommendation_run.go b/pkg/theia/commands/policy_recommendation_run.go new file mode 100644 index 000000000..d64226846 --- /dev/null +++ b/pkg/theia/commands/policy_recommendation_run.go @@ -0,0 +1,476 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package commands + +import ( + "context" + "encoding/json" + "fmt" + "net/url" + "regexp" + "strconv" + "strings" + "time" + + "github.com/google/uuid" + "github.com/spf13/cobra" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + + sparkv1 "antrea.io/theia/third_party/sparkoperator/v1beta2" +) + +const ( + flowVisibilityNS = "flow-visibility" + k8sQuantitiesReg = "^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$" + sparkImage = "antrea/theia-policy-recommendation:latest" + sparkImagePullPolicy = "IfNotPresent" + sparkAppFile = "local:///opt/spark/work-dir/policy_recommendation_job.py" + sparkServiceAccount = "policy-recommendation-spark" + sparkVersion = "3.1.1" + statusCheckPollInterval = 5 * time.Second + statusCheckPollTimeout = 60 * time.Minute +) + +type SparkResourceArgs struct { + executorInstances int32 + driverCoreRequest string + driverMemory string + executorCoreRequest string + executorMemory string +} + +// policyRecommendationRunCmd represents the policy recommendation run command +var policyRecommendationRunCmd = &cobra.Command{ + Use: "run", + Short: "Run a new policy recommendation Spark job", + Long: `Run a new policy recommendation Spark job. +Must finish the deployment of Theia first`, + Example: `Run a policy recommendation Spark job with default configuration +$ theia policy-recommendation run +Run an initial policy recommendation Spark job with policy type anp-deny-applied and limit on last 10k flow records +$ theia policy-recommendation run --type initial --policy-type anp-deny-applied --limit 10000 +Run an initial policy recommendation Spark job with policy type anp-deny-applied and limit on flow records from 2022-01-01 00:00:00 to 2022-01-31 23:59:59. +$ theia policy-recommendation run --type initial --policy-type anp-deny-applied --start-time '2022-01-01 00:00:00' --end-time '2022-01-31 23:59:59' +Run a policy recommendation Spark job with default configuration but doesn't recommend toServices ANPs +$ theia policy-recommendation run --to-services=false +`, + RunE: func(cmd *cobra.Command, args []string) error { + var recoJobArgs []string + sparkResourceArgs := SparkResourceArgs{} + + recoType, err := cmd.Flags().GetString("type") + if err != nil { + return err + } + if recoType != "initial" && recoType != "subsequent" { + return fmt.Errorf("recommendation type should be 'initial' or 'subsequent'") + } + recoJobArgs = append(recoJobArgs, "--type", recoType) + + limit, err := cmd.Flags().GetInt("limit") + if err != nil { + return err + } + if limit < 0 { + return fmt.Errorf("limit should be an integer >= 0") + } + recoJobArgs = append(recoJobArgs, "--limit", strconv.Itoa(limit)) + + policyType, err := cmd.Flags().GetString("policy-type") + if err != nil { + return err + } + var policyTypeArg int + if policyType == "anp-deny-applied" { + policyTypeArg = 1 + } else if policyType == "anp-deny-all" { + policyTypeArg = 2 + } else if policyType == "k8s-np" { + policyTypeArg = 3 + } else { + return fmt.Errorf(`type of generated NetworkPolicy should be +anp-deny-applied or anp-deny-all or k8s-np`) + } + recoJobArgs = append(recoJobArgs, "--option", strconv.Itoa(policyTypeArg)) + + startTime, err := cmd.Flags().GetString("start-time") + if err != nil { + return err + } + var startTimeObj time.Time + if startTime != "" { + startTimeObj, err = time.Parse("2006-01-02 15:04:05", startTime) + if err != nil { + return fmt.Errorf(`parsing start-time: %v, start-time should be in +'YYYY-MM-DD hh:mm:ss' format, for example: 2006-01-02 15:04:05`, err) + } + recoJobArgs = append(recoJobArgs, "--start_time", startTime) + } + + endTime, err := cmd.Flags().GetString("end-time") + if err != nil { + return err + } + if endTime != "" { + endTimeObj, err := time.Parse("2006-01-02 15:04:05", endTime) + if err != nil { + return fmt.Errorf(`parsing end-time: %v, end-time should be in +'YYYY-MM-DD hh:mm:ss' format, for example: 2006-01-02 15:04:05`, err) + } + endAfterStart := endTimeObj.After(startTimeObj) + if !endAfterStart { + return fmt.Errorf("end-time should be after start-time") + } + recoJobArgs = append(recoJobArgs, "--end_time", endTime) + } + + nsAllowList, err := cmd.Flags().GetString("ns-allow-list") + if err != nil { + return err + } + if nsAllowList != "" { + var parsedNsAllowList []string + err := json.Unmarshal([]byte(nsAllowList), &parsedNsAllowList) + if err != nil { + return fmt.Errorf(`parsing ns-allow-list: %v, ns-allow-list should +be a list of namespace string, for example: '["kube-system","flow-aggregator","flow-visibility"]'`, err) + } + recoJobArgs = append(recoJobArgs, "--ns_allow_list", nsAllowList) + } + + excludeLabels, err := cmd.Flags().GetBool("exclude-labels") + if err != nil { + return err + } + recoJobArgs = append(recoJobArgs, "--rm_labels", strconv.FormatBool(excludeLabels)) + + toServices, err := cmd.Flags().GetBool("to-services") + if err != nil { + return err + } + recoJobArgs = append(recoJobArgs, "--to_services", strconv.FormatBool(toServices)) + + executorInstances, err := cmd.Flags().GetInt32("executor-instances") + if err != nil { + return err + } + if executorInstances < 0 { + return fmt.Errorf("executor-instances should be an integer >= 0") + } + sparkResourceArgs.executorInstances = executorInstances + + driverCoreRequest, err := cmd.Flags().GetString("driver-core-request") + if err != nil { + return err + } + matchResult, err := regexp.MatchString(k8sQuantitiesReg, driverCoreRequest) + if err != nil || !matchResult { + return fmt.Errorf("driver-core-request should conform to the Kubernetes resource quantity convention") + } + sparkResourceArgs.driverCoreRequest = driverCoreRequest + + driverMemory, err := cmd.Flags().GetString("driver-memory") + if err != nil { + return err + } + matchResult, err = regexp.MatchString(k8sQuantitiesReg, driverMemory) + if err != nil || !matchResult { + return fmt.Errorf("driver-memory should conform to the Kubernetes resource quantity convention") + } + sparkResourceArgs.driverMemory = driverMemory + + executorCoreRequest, err := cmd.Flags().GetString("executor-core-request") + if err != nil { + return err + } + matchResult, err = regexp.MatchString(k8sQuantitiesReg, executorCoreRequest) + if err != nil || !matchResult { + return fmt.Errorf("executor-core-request should conform to the Kubernetes resource quantity convention") + } + sparkResourceArgs.executorCoreRequest = executorCoreRequest + + executorMemory, err := cmd.Flags().GetString("executor-memory") + if err != nil { + return err + } + matchResult, err = regexp.MatchString(k8sQuantitiesReg, executorMemory) + if err != nil || !matchResult { + return fmt.Errorf("executor-memory should conform to the Kubernetes resource quantity convention") + } + sparkResourceArgs.executorMemory = executorMemory + + kubeconfig, err := ResolveKubeConfig(cmd) + if err != nil { + return err + } + clientset, err := CreateK8sClient(kubeconfig) + if err != nil { + return fmt.Errorf("couldn't create k8s client using given kubeconfig, %v", err) + } + + waitFlag, err := cmd.Flags().GetBool("wait") + if err != nil { + return err + } + + err = PolicyRecoPreCheck(clientset) + if err != nil { + return err + } + + recommendationID := uuid.New().String() + recoJobArgs = append(recoJobArgs, "--id", recommendationID) + recommendationApplication := &sparkv1.SparkApplication{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "sparkoperator.k8s.io/v1beta2", + Kind: "SparkApplication", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "pr-" + recommendationID, + Namespace: flowVisibilityNS, + }, + Spec: sparkv1.SparkApplicationSpec{ + Type: "Python", + SparkVersion: sparkVersion, + Mode: "cluster", + Image: ConstStrToPointer(sparkImage), + ImagePullPolicy: ConstStrToPointer(sparkImagePullPolicy), + MainApplicationFile: ConstStrToPointer(sparkAppFile), + Arguments: recoJobArgs, + Driver: sparkv1.DriverSpec{ + CoreRequest: &driverCoreRequest, + SparkPodSpec: sparkv1.SparkPodSpec{ + Memory: &driverMemory, + Labels: map[string]string{ + "version": sparkVersion, + }, + EnvSecretKeyRefs: map[string]sparkv1.NameKey{ + "CH_USERNAME": { + Name: "clickhouse-secret", + Key: "username", + }, + "CH_PASSWORD": { + Name: "clickhouse-secret", + Key: "password", + }, + }, + ServiceAccount: ConstStrToPointer(sparkServiceAccount), + }, + }, + Executor: sparkv1.ExecutorSpec{ + CoreRequest: &executorCoreRequest, + SparkPodSpec: sparkv1.SparkPodSpec{ + Memory: &executorMemory, + Labels: map[string]string{ + "version": sparkVersion, + }, + EnvSecretKeyRefs: map[string]sparkv1.NameKey{ + "CH_USERNAME": { + Name: "clickhouse-secret", + Key: "username", + }, + "CH_PASSWORD": { + Name: "clickhouse-secret", + Key: "password", + }, + }, + }, + Instances: &sparkResourceArgs.executorInstances, + }, + }, + } + response := &sparkv1.SparkApplication{} + err = clientset.CoreV1().RESTClient(). + Post(). + AbsPath("/apis/sparkoperator.k8s.io/v1beta2"). + Namespace(flowVisibilityNS). + Resource("sparkapplications"). + Body(recommendationApplication). + Do(context.TODO()). + Into(response) + if err != nil { + return err + } + if waitFlag { + err = wait.Poll(statusCheckPollInterval, statusCheckPollTimeout, func() (bool, error) { + state, err := getPolicyRecommendationStatus(clientset, recommendationID) + if err != nil { + return false, err + } + if state == "COMPLETED" { + return true, nil + } + if state == "FAILED" || state == "SUBMISSION_FAILED" || state == "FAILING" || state == "INVALIDATING" { + return false, fmt.Errorf("policy recommendation job failed, state: %s", state) + } else { + return false, nil + } + }) + if err != nil { + if strings.Contains(err.Error(), "timed out") { + return fmt.Errorf(`Spark job with ID %s wait timeout of 60 minutes expired. +Job is still running. Please check completion status for job via CLI later.`, recommendationID) + } + return err + } + + endpoint, err := cmd.Flags().GetString("clickhouse-endpoint") + if err != nil { + return err + } + if endpoint != "" { + _, err := url.ParseRequestURI(endpoint) + if err != nil { + return fmt.Errorf("failed to decode input endpoint %s into a url, err: %v", endpoint, err) + } + } + useClusterIP, err := cmd.Flags().GetBool("use-cluster-ip") + if err != nil { + return err + } + filePath, err := cmd.Flags().GetString("file") + if err != nil { + return err + } + if err := CheckClickHousePod(clientset); err != nil { + return err + } + recoResult, err := getPolicyRecommendationResult(clientset, kubeconfig, endpoint, useClusterIP, filePath, recommendationID) + if err != nil { + return err + } else { + if recoResult != "" { + fmt.Print(recoResult) + } + } + return nil + } else { + fmt.Printf("Successfully created policy recommendation job with ID %s\n", recommendationID) + } + return nil + }, +} + +func init() { + policyRecommendationCmd.AddCommand(policyRecommendationRunCmd) + policyRecommendationRunCmd.Flags().StringP( + "type", + "t", + "initial", + "{initial|subsequent} Indicates this recommendation is an initial recommendion or a subsequent recommendation job.", + ) + policyRecommendationRunCmd.Flags().IntP( + "limit", + "l", + 0, + "The limit on the number of flow records read from the database. 0 means no limit.", + ) + policyRecommendationRunCmd.Flags().StringP( + "policy-type", + "p", + "anp-deny-applied", + `Types of generated NetworkPolicy. +Currently we have 3 generated NetworkPolicy types: +anp-deny-applied: Recommending allow ANP/ACNP policies, with default deny rules only on Pods which have an allow rule applied. +anp-deny-all: Recommending allow ANP/ACNP policies, with default deny rules for whole cluster. +k8s-np: Recommending allow K8s NetworkPolicies.`, + ) + policyRecommendationRunCmd.Flags().StringP( + "start-time", + "s", + "", + `The start time of the flow records considered for the policy recommendation. +Format is YYYY-MM-DD hh:mm:ss in UTC timezone. No limit of the start time of flow records by default.`, + ) + policyRecommendationRunCmd.Flags().StringP( + "end-time", + "e", + "", + `The end time of the flow records considered for the policy recommendation. +Format is YYYY-MM-DD hh:mm:ss in UTC timezone. No limit of the end time of flow records by default.`, + ) + policyRecommendationRunCmd.Flags().StringP( + "ns-allow-list", + "n", + "", + `List of default allow Namespaces. +If no Namespaces provided, Traffic inside Antrea CNI related Namespaces: ['kube-system', 'flow-aggregator', +'flow-visibility'] will be allowed by default.`, + ) + policyRecommendationRunCmd.Flags().Bool( + "exclude-labels", + true, + `Enable this option will exclude automatically generated Pod labels including 'pod-template-hash', +'controller-revision-hash', 'pod-template-generation' during policy recommendation.`, + ) + policyRecommendationRunCmd.Flags().Bool( + "to-services", + true, + `Use the toServices feature in ANP and recommendation toServices rules for Pod-to-Service flows, +only works when option is anp-deny-applied or anp-deny-all.`, + ) + policyRecommendationRunCmd.Flags().Int32( + "executor-instances", + 1, + "Specify the number of executors for the Spark application. Example values include 1, 2, 8, etc.", + ) + policyRecommendationRunCmd.Flags().String( + "driver-core-request", + "200m", + `Specify the CPU request for the driver Pod. Values conform to the Kubernetes resource quantity convention. +Example values include 0.1, 500m, 1.5, 5, etc.`, + ) + policyRecommendationRunCmd.Flags().String( + "driver-memory", + "512M", + `Specify the memory request for the driver Pod. Values conform to the Kubernetes resource quantity convention. +Example values include 512M, 1G, 8G, etc.`, + ) + policyRecommendationRunCmd.Flags().String( + "executor-core-request", + "200m", + `Specify the CPU request for the executor Pod. Values conform to the Kubernetes resource quantity convention. +Example values include 0.1, 500m, 1.5, 5, etc.`, + ) + policyRecommendationRunCmd.Flags().String( + "executor-memory", + "512M", + `Specify the memory request for the executor Pod. Values conform to the Kubernetes resource quantity convention. +Example values include 512M, 1G, 8G, etc.`, + ) + policyRecommendationRunCmd.Flags().Bool( + "wait", + false, + "Enable this option will hold and wait the whole policy recommendation job finishes.", + ) + policyRecommendationRunCmd.Flags().String( + "clickhouse-endpoint", + "", + "The ClickHouse Service endpoint. It can only be used when wait is enabled.", + ) + policyRecommendationRunCmd.Flags().Bool( + "use-cluster-ip", + false, + `Enable this option will use ClusterIP instead of port forwarding when connecting to the ClickHouse Service. +It can only be used when running in cluster and when wait is enabled.`, + ) + policyRecommendationRunCmd.Flags().StringP( + "file", + "f", + "", + "The file path where you want to save the result. It can only be used when wait is enabled.", + ) +} diff --git a/pkg/theia/commands/policy_recommendation_status.go b/pkg/theia/commands/policy_recommendation_status.go new file mode 100644 index 000000000..38adee0cb --- /dev/null +++ b/pkg/theia/commands/policy_recommendation_status.go @@ -0,0 +1,218 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package commands + +import ( + "context" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "strings" + "time" + + "github.com/google/uuid" + "github.com/spf13/cobra" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" + + sparkv1 "antrea.io/theia/third_party/sparkoperator/v1beta2" +) + +// policyRecommendationStatusCmd represents the policy-recommendation status command +var policyRecommendationStatusCmd = &cobra.Command{ + Use: "status", + Short: "Check the status of a policy recommendation Spark job", + Long: `Check the current status of a policy recommendation Spark job by ID. +It will return the status of this Spark application like SUBMITTED, RUNNING, COMPLETED, or FAILED.`, + Args: cobra.RangeArgs(0, 1), + Example: ` +Check the current status of job with ID e998433e-accb-4888-9fc8-06563f073e86 +$ theia policy-recommendation status --id e998433e-accb-4888-9fc8-06563f073e86 +Or +$ theia policy-recommendation status e998433e-accb-4888-9fc8-06563f073e86 +Use Service ClusterIP when checking the current status of job with ID e998433e-accb-4888-9fc8-06563f073e86 +$ theia policy-recommendation status e998433e-accb-4888-9fc8-06563f073e86 --use-cluster-ip +`, + RunE: func(cmd *cobra.Command, args []string) error { + recoID, err := cmd.Flags().GetString("id") + if err != nil { + return err + } + if recoID == "" && len(args) == 1 { + recoID = args[0] + } + _, err = uuid.Parse(recoID) + if err != nil { + return fmt.Errorf("failed to decode input id %s into a UUID, err: %v", recoID, err) + } + kubeconfig, err := ResolveKubeConfig(cmd) + if err != nil { + return err + } + clientset, err := CreateK8sClient(kubeconfig) + if err != nil { + return fmt.Errorf("couldn't create k8s client using given kubeconfig, %v", err) + } + useClusterIP, err := cmd.Flags().GetBool("use-cluster-ip") + if err != nil { + return err + } + + err = PolicyRecoPreCheck(clientset) + if err != nil { + return err + } + + state, err := getPolicyRecommendationStatus(clientset, recoID) + if err != nil { + return err + } + if state == "RUNNING" { + var endpoint string + service := fmt.Sprintf("pr-%s-ui-svc", recoID) + if useClusterIP { + serviceIP, servicePort, err := GetServiceAddr(clientset, service) + if err != nil { + klog.V(2).ErrorS(err, "error when getting the progress of the job, cannot get Spark Monitor Service address") + } else { + endpoint = fmt.Sprintf("tcp://%s:%d", serviceIP, servicePort) + } + } else { + servicePort := 4040 + listenAddress := "localhost" + listenPort := 4040 + pf, err := StartPortForward(kubeconfig, service, servicePort, listenAddress, listenPort) + if err != nil { + klog.V(2).ErrorS(err, "error when getting the progress of the job, cannot forward port") + } else { + endpoint = fmt.Sprintf("http://%s:%d", listenAddress, listenPort) + defer pf.Stop() + } + } + // Check the working progress of running recommendation job + if endpoint != "" { + stateProgress, err := getPolicyRecommendationProgress(endpoint) + if err != nil { + klog.V(2).ErrorS(err, "failed to get the progress of the job") + } + state += stateProgress + } + } + fmt.Printf("Status of this policy recommendation job is %s\n", state) + return nil + }, +} + +func getPolicyRecommendationStatus(clientset kubernetes.Interface, recoID string) (string, error) { + sparkApplication := &sparkv1.SparkApplication{} + err := clientset.CoreV1().RESTClient(). + Get(). + AbsPath("/apis/sparkoperator.k8s.io/v1beta2"). + Namespace(flowVisibilityNS). + Resource("sparkapplications"). + Name("pr-" + recoID). + Do(context.TODO()). + Into(sparkApplication) + if err != nil { + return "", err + } + state := strings.TrimSpace(string(sparkApplication.Status.AppState.State)) + return state, nil +} + +func getPolicyRecommendationProgress(baseUrl string) (string, error) { + // Get the id of current Spark application + url := fmt.Sprintf("%s/api/v1/applications", baseUrl) + response, err := getResponseFromSparkMonitoringSvc(url) + if err != nil { + return "", fmt.Errorf("failed to get response from the Spark Monitoring Service: %v", err) + } + var getAppsResult []map[string]interface{} + json.Unmarshal([]byte(response), &getAppsResult) + if len(getAppsResult) != 1 { + return "", fmt.Errorf("wrong Spark Application number, expected 1, got %d", len(getAppsResult)) + } + sparkAppID := getAppsResult[0]["id"] + // Check the percentage of completed stages + url = fmt.Sprintf("%s/api/v1/applications/%s/stages", baseUrl, sparkAppID) + response, err = getResponseFromSparkMonitoringSvc(url) + if err != nil { + return "", fmt.Errorf("failed to get response from the Spark Monitoring Service: %v", err) + } + var getStagesResult []map[string]interface{} + json.Unmarshal([]byte(response), &getStagesResult) + NumStageResult := len(getStagesResult) + if NumStageResult < 1 { + return "", fmt.Errorf("wrong Spark Application stages number, expected at least 1, got %d", NumStageResult) + } + completedStages := 0 + for _, stage := range getStagesResult { + if stage["status"] == "COMPLETE" || stage["status"] == "SKIPPED" { + completedStages++ + } + } + return fmt.Sprintf(": %d/%d (%d%%) stages completed", completedStages, NumStageResult, completedStages*100/NumStageResult), nil +} + +func getResponseFromSparkMonitoringSvc(url string) ([]byte, error) { + sparkMonitoringClient := http.Client{} + request, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + return nil, err + } + var res *http.Response + var getErr error + connRetryInterval := 1 * time.Second + connTimeout := 10 * time.Second + if err := wait.PollImmediate(connRetryInterval, connTimeout, func() (bool, error) { + res, err = sparkMonitoringClient.Do(request) + if err != nil { + getErr = err + return false, nil + } + return true, nil + }); err != nil { + return nil, getErr + } + if res == nil { + return nil, fmt.Errorf("response is nil") + } + if res.Body != nil { + defer res.Body.Close() + } + body, readErr := ioutil.ReadAll(res.Body) + if readErr != nil { + return nil, readErr + } + return body, nil +} + +func init() { + policyRecommendationCmd.AddCommand(policyRecommendationStatusCmd) + policyRecommendationStatusCmd.Flags().StringP( + "id", + "i", + "", + "ID of the policy recommendation Spark job.", + ) + policyRecommendationStatusCmd.Flags().Bool( + "use-cluster-ip", + false, + `Enable this option will use Service ClusterIP instead of port forwarding when connecting to the Spark Monitoring Service. +It can only be used when running theia in cluster.`, + ) +} diff --git a/pkg/theia/commands/policy_recommendation_status_test.go b/pkg/theia/commands/policy_recommendation_status_test.go new file mode 100644 index 000000000..bf658bdc8 --- /dev/null +++ b/pkg/theia/commands/policy_recommendation_status_test.go @@ -0,0 +1,57 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package commands + +import ( + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestGetPolicyRecommendationProgress(t *testing.T) { + sparkAppID := "spark-0fa6cc19ae23439794747a306d5ad705" + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch strings.TrimSpace(r.URL.Path) { + case "/api/v1/applications": + responses := []map[string]interface{}{ + {"id": sparkAppID}, + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(responses) + case fmt.Sprintf("/api/v1/applications/%s/stages", sparkAppID): + responses := []map[string]interface{}{ + {"status": "COMPLETE"}, + {"status": "COMPLETE"}, + {"status": "SKIPPED"}, + {"status": "PENDING"}, + {"status": "ACTIVE"}, + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(responses) + } + })) + defer server.Close() + expectedProgress := ": 3/5 (60%) stages completed" + progress, err := getPolicyRecommendationProgress(server.URL) + assert.NoError(t, err) + assert.Equal(t, expectedProgress, progress) +} diff --git a/pkg/theia/commands/root.go b/pkg/theia/commands/root.go new file mode 100644 index 000000000..2e0380779 --- /dev/null +++ b/pkg/theia/commands/root.go @@ -0,0 +1,56 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package commands + +import ( + "fmt" + "os" + + "github.com/spf13/cobra" + "k8s.io/klog/v2" +) + +// rootCmd represents the base command when called without any subcommands +var ( + verbose = 0 + rootCmd = &cobra.Command{ + Use: "theia", + Short: "theia is the command line tool for Theia", + Long: `theia is the command line tool for Theia which provides access +to Theia network flow visibility capabilities`, + PersistentPreRunE: func(cmd *cobra.Command, args []string) error { + verboseLevel, err := cmd.Flags().GetInt("verbose") + if err != nil { + return err + } + var l klog.Level + l.Set(fmt.Sprint(verboseLevel)) + return nil + }, + } +) + +// Execute adds all child commands to the root command and sets flags appropriately. +// This is called by main.main(). It only needs to happen once to the rootCmd. +func Execute() { + err := rootCmd.Execute() + if err != nil { + os.Exit(1) + } +} + +func init() { + rootCmd.PersistentFlags().IntVarP(&verbose, "verbose", "v", 0, "set verbose level") +} diff --git a/pkg/theia/commands/utils.go b/pkg/theia/commands/utils.go new file mode 100644 index 000000000..dc8dc5c6b --- /dev/null +++ b/pkg/theia/commands/utils.go @@ -0,0 +1,158 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package commands + +import ( + "context" + "fmt" + "os" + "strings" + + "github.com/spf13/cobra" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" + + "antrea.io/theia/pkg/theia/portforwarder" +) + +func CreateK8sClient(kubeconfig string) (kubernetes.Interface, error) { + config, err := clientcmd.BuildConfigFromFlags("", kubeconfig) + if err != nil { + return nil, err + } + // creates the clientset + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + return nil, err + } + return clientset, nil +} + +func PolicyRecoPreCheck(clientset kubernetes.Interface) error { + err := CheckSparkOperatorPod(clientset) + if err != nil { + return err + } + err = CheckClickHousePod(clientset) + if err != nil { + return err + } + return nil +} + +func CheckSparkOperatorPod(clientset kubernetes.Interface) error { + // Check the deployment of Spark Operator in flow-visibility ns + pods, err := clientset.CoreV1().Pods(flowVisibilityNS).List(context.TODO(), metav1.ListOptions{ + LabelSelector: "app.kubernetes.io/instance=policy-recommendation,app.kubernetes.io/name=spark-operator", + }) + if err != nil { + return fmt.Errorf("error %v when finding the policy-recommendation-spark-operator Pod, please check the deployment of the Spark Operator", err) + } + if len(pods.Items) < 1 { + return fmt.Errorf("can't find the policy-recommendation-spark-operator Pod, please check the deployment of the Spark Operator") + } + hasRunningPod := false + for _, pod := range pods.Items { + if pod.Status.Phase == "Running" { + hasRunningPod = true + break + } + } + if !hasRunningPod { + return fmt.Errorf("can't find a running ClickHouse Pod, please check the deployment of ClickHouse") + } + return nil +} + +func CheckClickHousePod(clientset kubernetes.Interface) error { + // Check the ClickHouse deployment in flow-visibility namespace + pods, err := clientset.CoreV1().Pods(flowVisibilityNS).List(context.TODO(), metav1.ListOptions{ + LabelSelector: "app=clickhouse", + }) + if err != nil { + return fmt.Errorf("error %v when finding the ClickHouse Pod, please check the deployment of the ClickHouse", err) + } + if len(pods.Items) < 1 { + return fmt.Errorf("can't find the ClickHouse Pod, please check the deployment of ClickHouse") + } + hasRunningPod := false + for _, pod := range pods.Items { + if pod.Status.Phase == "Running" { + hasRunningPod = true + break + } + } + if !hasRunningPod { + return fmt.Errorf("can't find a running ClickHouse Pod, please check the deployment of ClickHouse") + } + return nil +} + +func ConstStrToPointer(constStr string) *string { + return &constStr +} + +func GetServiceAddr(clientset kubernetes.Interface, serviceName string) (string, int, error) { + var serviceIP string + var servicePort int + service, err := clientset.CoreV1().Services(flowVisibilityNS).Get(context.TODO(), serviceName, metav1.GetOptions{}) + if err != nil { + return serviceIP, servicePort, fmt.Errorf("error when finding the Service %s: %v", serviceName, err) + } + serviceIP = service.Spec.ClusterIP + for _, port := range service.Spec.Ports { + if port.Name == "tcp" { + servicePort = int(port.Port) + } + } + if servicePort == 0 { + return serviceIP, servicePort, fmt.Errorf("error when finding the Service %s: %v", serviceName, err) + } + return serviceIP, servicePort, nil +} + +func StartPortForward(kubeconfig string, service string, servicePort int, listenAddress string, listenPort int) (*portforwarder.PortForwarder, error) { + config, err := clientcmd.BuildConfigFromFlags("", kubeconfig) + if err != nil { + return nil, err + } + // Forward the policy recommendation service port + pf, err := portforwarder.NewServicePortForwarder(config, flowVisibilityNS, service, servicePort, listenAddress, listenPort) + if err != nil { + return nil, err + } + err = pf.Start() + if err != nil { + return nil, err + } + return pf, nil +} + +func ResolveKubeConfig(cmd *cobra.Command) (string, error) { + var err error + kubeconfigPath, err := cmd.Flags().GetString("kubeconfig") + if err != nil { + return "", err + } + if len(kubeconfigPath) == 0 { + var hasIt bool + kubeconfigPath, hasIt = os.LookupEnv("KUBECONFIG") + if !hasIt || len(strings.TrimSpace(kubeconfigPath)) == 0 { + kubeconfigPath = clientcmd.RecommendedHomeFile + } + } + return kubeconfigPath, nil +} diff --git a/pkg/theia/commands/utils_test.go b/pkg/theia/commands/utils_test.go new file mode 100644 index 000000000..497943fe4 --- /dev/null +++ b/pkg/theia/commands/utils_test.go @@ -0,0 +1,72 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package commands + +import ( + "testing" + + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" +) + +var fakeClientset = fake.NewSimpleClientset( + &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "clickhouse", + Namespace: flowVisibilityNS, + Labels: map[string]string{"app": "clickhouse"}, + }, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + }, + }, + &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "spark-operator", + Namespace: flowVisibilityNS, + Labels: map[string]string{ + "app.kubernetes.io/instance": "policy-recommendation", + "app.kubernetes.io/name": "spark-operator", + }, + }, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + }, + }, + &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "clickhouse-clickhouse", + Namespace: flowVisibilityNS, + }, + Spec: v1.ServiceSpec{ + Ports: []v1.ServicePort{{Name: "tcp", Port: 9000}}, + ClusterIP: "10.98.208.26", + }, + }, +) + +func TestGetServiceAddr(t *testing.T) { + ip, port, err := GetServiceAddr(fakeClientset, "clickhouse-clickhouse") + assert.NoError(t, err) + assert.Equal(t, 9000, port) + assert.Equal(t, "10.98.208.26", ip) +} + +func TestPolicyRecoPreCheck(t *testing.T) { + err := PolicyRecoPreCheck(fakeClientset) + assert.NoError(t, err) +} diff --git a/pkg/theia/portforwarder/portforwarder.go b/pkg/theia/portforwarder/portforwarder.go new file mode 100644 index 000000000..279783c03 --- /dev/null +++ b/pkg/theia/portforwarder/portforwarder.go @@ -0,0 +1,189 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package portforwarder + +import ( + "context" + "fmt" + "io/ioutil" + "net/http" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/portforward" + "k8s.io/client-go/transport/spdy" + "k8s.io/klog/v2" +) + +type PortForwarder struct { + config *rest.Config + clientset kubernetes.Interface + namespace string + name string + targetPort int + listenAddress string + listenPort int + stopCh chan struct{} +} + +// This function creates Port Forwarder for a Pod +// After creating Port Forwarder object, call Start() on it to start forwarding +// channel and Stop() to terminate it +func NewPortForwarder(config *rest.Config, namespace string, pod string, targetPort int, listenAddress string, listenPort int) (*PortForwarder, error) { + klog.V(2).Infof("Port forwarder requested for pod %s/%s: %s:%d -> %d", namespace, pod, listenAddress, listenPort, targetPort) + + pf := &PortForwarder{ + config: config, + namespace: namespace, + name: pod, + targetPort: targetPort, + listenAddress: listenAddress, + listenPort: listenPort, + } + + var err error + pf.clientset, err = kubernetes.NewForConfig(pf.config) + if err != nil { + return pf, fmt.Errorf("could not create kubernetes client: %v", err) + } + + return pf, nil +} + +// This function creates Port Forwarder for a Service by finding first Pod +// that belongs to the Service, and target Port for requested Service Port. +// This code is based upon kubectl port-forward implementation +// After creating Port Forwarder object, call Start() on it to start forwarding +// channel and Stop() to terminate it +func NewServicePortForwarder(config *rest.Config, namespace string, service string, servicePort int, listenAddress string, listenPort int) (*PortForwarder, error) { + pf := &PortForwarder{ + config: config, + namespace: namespace, + listenAddress: listenAddress, + listenPort: listenPort, + } + + var err error + pf.clientset, err = kubernetes.NewForConfig(pf.config) + if err != nil { + return pf, fmt.Errorf("could not create kubernetes client: %v", err) + } + + serviceObj, err := pf.clientset.CoreV1().Services(pf.namespace).Get(context.TODO(), service, metav1.GetOptions{}) + if err != nil { + return pf, fmt.Errorf("failed to read Service %s: %v", service, err) + } + + // find container port that corresponds to requested service port + pf.targetPort, err = getContainerPortByServicePort(serviceObj, servicePort) + if err != nil { + return pf, err + } + + klog.V(2).Infof("Port forwarder requested for service %s/%s: %s:%d -> %d", namespace, service, listenAddress, listenPort, pf.targetPort) + + selector := labels.SelectorFromSet(serviceObj.Spec.Selector) + listOptions := metav1.ListOptions{ + LabelSelector: selector.String(), + } + + // for target Pod - take first Pod for the Service + pods, err := pf.clientset.CoreV1().Pods(pf.namespace).List(context.TODO(), listOptions) + + if err != nil { + return pf, fmt.Errorf("failed to read Pods for Service %s: %v", service, err) + } + if len(pods.Items) == 0 { + return pf, fmt.Errorf("no Pods found for Service %s: %v", service, err) + } + + pod := pods.Items[0] + pf.name = pod.Name + + return pf, nil +} + +// get Container Port by Service Port, based on Service configuration +// This code is based upon kubectl port-forward implementation +func getContainerPortByServicePort(svc *v1.Service, port int) (int, error) { + for _, portspec := range svc.Spec.Ports { + if int(portspec.Port) != port { + continue + } + if svc.Spec.ClusterIP == v1.ClusterIPNone { + return port, nil + } + if portspec.TargetPort.Type == intstr.Int { + if portspec.TargetPort.IntValue() == 0 { + return int(portspec.Port), nil + } + return portspec.TargetPort.IntValue(), nil + } + } + return port, fmt.Errorf("service %s does not have Port %d", svc.Name, port) +} + +// Start Port Forwarding channel +func (p *PortForwarder) Start() error { + p.stopCh = make(chan struct{}, 1) + readyCh := make(chan struct{}) + errCh := make(chan error, 1) + + url := p.clientset.CoreV1().RESTClient().Post(). + Resource("pods"). + Namespace(p.namespace). + Name(p.name). + SubResource("portforward").URL() + + transport, upgrader, err := spdy.RoundTripperFor(p.config) + if err != nil { + return fmt.Errorf("failed to create dialer: %v", err) + } + + dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", url) + + ports := []string{ + fmt.Sprintf("%d:%d", p.listenPort, p.targetPort), + } + + addresses := []string{ + p.listenAddress, + } + + pf, err := portforward.NewOnAddresses(dialer, addresses, ports, p.stopCh, readyCh, ioutil.Discard, ioutil.Discard) + if err != nil { + return fmt.Errorf("port forward request failed: %v", err) + } + + go func() { + errCh <- pf.ForwardPorts() + }() + + select { + case err = <-errCh: + return fmt.Errorf("port forward request failed: %v", err) + case <-readyCh: + return nil + } +} + +// Stop Port Forwarding channel +func (p *PortForwarder) Stop() { + p.stopCh <- struct{}{} +} diff --git a/pkg/theia/theia.go b/pkg/theia/theia.go new file mode 100644 index 000000000..8746255db --- /dev/null +++ b/pkg/theia/theia.go @@ -0,0 +1,21 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import "antrea.io/theia/pkg/theia/commands" + +func main() { + commands.Execute() +} diff --git a/plugins/policy-recommendation/policy_recommendation_job.py b/plugins/policy-recommendation/policy_recommendation_job.py index 9f6858f39..d611862a8 100644 --- a/plugins/policy-recommendation/policy_recommendation_job.py +++ b/plugins/policy-recommendation/policy_recommendation_job.py @@ -747,7 +747,7 @@ def main(argv): Options: -h, --help: Show help message. -t, --type=initial: {initial|subsequent} Indicates this recommendation is an initial recommendion or a subsequent recommendation job. - -d, --db_jdbc_url=None: The JDBC URL used by Spark jobs connect to the ClickHouse database for reading flow records and writing results. + -d, --db_jdbc_url=None: The JDBC URL used by Spark jobs connect to the ClickHouse database for reading flow records and writing result. jdbc:clickhouse://clickhouse-clickhouse.flow-visibility.svc:8123 is the ClickHouse JDBC URL used by default. -l, --limit=0: The limit on the number of flow records read from the database. 0 means no limit. -o, --option=1: Option of network isolation preference in policy recommendation. diff --git a/third_party/sparkoperator/README b/third_party/sparkoperator/README new file mode 100644 index 000000000..fe28bd0e3 --- /dev/null +++ b/third_party/sparkoperator/README @@ -0,0 +1,2 @@ +These API files are copied from [spark-on-k8s-operator](https://github.com/GoogleCloudPlatform/spark-on-k8s-operator) +to avoid the version discrepancy of k8s.io/kubernetes if importing this module directly. diff --git a/third_party/sparkoperator/v1beta2/defaults.go b/third_party/sparkoperator/v1beta2/defaults.go new file mode 100644 index 000000000..f722a36cf --- /dev/null +++ b/third_party/sparkoperator/v1beta2/defaults.go @@ -0,0 +1,75 @@ +/* +Copyright 2017 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1beta2 + +// SetSparkApplicationDefaults sets default values for certain fields of a SparkApplication. +func SetSparkApplicationDefaults(app *SparkApplication) { + if app == nil { + return + } + + if app.Spec.Mode == "" { + app.Spec.Mode = ClusterMode + } + + if app.Spec.RestartPolicy.Type == "" { + app.Spec.RestartPolicy.Type = Never + } + + if app.Spec.RestartPolicy.Type != Never { + // Default to 5 sec if the RestartPolicy is OnFailure or Always and these values aren't specified. + if app.Spec.RestartPolicy.OnFailureRetryInterval == nil { + app.Spec.RestartPolicy.OnFailureRetryInterval = new(int64) + *app.Spec.RestartPolicy.OnFailureRetryInterval = 5 + } + + if app.Spec.RestartPolicy.OnSubmissionFailureRetryInterval == nil { + app.Spec.RestartPolicy.OnSubmissionFailureRetryInterval = new(int64) + *app.Spec.RestartPolicy.OnSubmissionFailureRetryInterval = 5 + } + } + + setDriverSpecDefaults(&app.Spec.Driver, app.Spec.SparkConf) + setExecutorSpecDefaults(&app.Spec.Executor, app.Spec.SparkConf) +} + +func setDriverSpecDefaults(spec *DriverSpec, sparkConf map[string]string) { + + if _, exists := sparkConf["spark.driver.cores"]; !exists && spec.Cores == nil { + spec.Cores = new(int32) + *spec.Cores = 1 + } + if _, exists := sparkConf["spark.driver.memory"]; !exists && spec.Memory == nil { + spec.Memory = new(string) + *spec.Memory = "1g" + } +} + +func setExecutorSpecDefaults(spec *ExecutorSpec, sparkConf map[string]string) { + if _, exists := sparkConf["spark.executor.cores"]; !exists && spec.Cores == nil { + spec.Cores = new(int32) + *spec.Cores = 1 + } + if _, exists := sparkConf["spark.executor.memory"]; !exists && spec.Memory == nil { + spec.Memory = new(string) + *spec.Memory = "1g" + } + if _, exists := sparkConf["spark.executor.instances"]; !exists && spec.Instances == nil { + spec.Instances = new(int32) + *spec.Instances = 1 + } +} diff --git a/third_party/sparkoperator/v1beta2/types.go b/third_party/sparkoperator/v1beta2/types.go new file mode 100644 index 000000000..bd4aee27e --- /dev/null +++ b/third_party/sparkoperator/v1beta2/types.go @@ -0,0 +1,751 @@ +/* +Copyright 2017 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1beta2 + +import ( + apiv1 "k8s.io/api/core/v1" + networkingv1 "k8s.io/api/networking/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// SparkApplicationType describes the type of a Spark application. +type SparkApplicationType string + +// Different types of Spark applications. +const ( + JavaApplicationType SparkApplicationType = "Java" + ScalaApplicationType SparkApplicationType = "Scala" + PythonApplicationType SparkApplicationType = "Python" + RApplicationType SparkApplicationType = "R" +) + +// DeployMode describes the type of deployment of a Spark application. +type DeployMode string + +// Different types of deployments. +const ( + ClusterMode DeployMode = "cluster" + ClientMode DeployMode = "client" + InClusterClientMode DeployMode = "in-cluster-client" +) + +// RestartPolicy is the policy of if and in which conditions the controller should restart a terminated application. +// This completely defines actions to be taken on any kind of Failures during an application run. +type RestartPolicy struct { + // Type specifies the RestartPolicyType. + // +kubebuilder:validation:Enum={Never,Always,OnFailure} + Type RestartPolicyType `json:"type,omitempty"` + + // OnSubmissionFailureRetries is the number of times to retry submitting an application before giving up. + // This is best effort and actual retry attempts can be >= the value specified due to caching. + // These are required if RestartPolicy is OnFailure. + // +kubebuilder:validation:Minimum=0 + // +optional + OnSubmissionFailureRetries *int32 `json:"onSubmissionFailureRetries,omitempty"` + + // OnFailureRetries the number of times to retry running an application before giving up. + // +kubebuilder:validation:Minimum=0 + // +optional + OnFailureRetries *int32 `json:"onFailureRetries,omitempty"` + + // OnSubmissionFailureRetryInterval is the interval in seconds between retries on failed submissions. + // +kubebuilder:validation:Minimum=1 + // +optional + OnSubmissionFailureRetryInterval *int64 `json:"onSubmissionFailureRetryInterval,omitempty"` + + // OnFailureRetryInterval is the interval in seconds between retries on failed runs. + // +kubebuilder:validation:Minimum=1 + // +optional + OnFailureRetryInterval *int64 `json:"onFailureRetryInterval,omitempty"` +} + +type RestartPolicyType string + +const ( + Never RestartPolicyType = "Never" + OnFailure RestartPolicyType = "OnFailure" + Always RestartPolicyType = "Always" +) + +// +genclient +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object +// +k8s:defaulter-gen=true +// +kubebuilder:subresource:status +// +kubebuilder:resource:scope=Namespaced,shortName=scheduledsparkapp,singular=scheduledsparkapplication + +type ScheduledSparkApplication struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata"` + Spec ScheduledSparkApplicationSpec `json:"spec"` + Status ScheduledSparkApplicationStatus `json:"status,omitempty"` +} + +type ConcurrencyPolicy string + +const ( + // ConcurrencyAllow allows SparkApplications to run concurrently. + ConcurrencyAllow ConcurrencyPolicy = "Allow" + // ConcurrencyForbid forbids concurrent runs of SparkApplications, skipping the next run if the previous + // one hasn't finished yet. + ConcurrencyForbid ConcurrencyPolicy = "Forbid" + // ConcurrencyReplace kills the currently running SparkApplication instance and replaces it with a new one. + ConcurrencyReplace ConcurrencyPolicy = "Replace" +) + +type ScheduledSparkApplicationSpec struct { + // Schedule is a cron schedule on which the application should run. + Schedule string `json:"schedule"` + // Template is a template from which SparkApplication instances can be created. + Template SparkApplicationSpec `json:"template"` + // Suspend is a flag telling the controller to suspend subsequent runs of the application if set to true. + // +optional + // Defaults to false. + Suspend *bool `json:"suspend,omitempty"` + // ConcurrencyPolicy is the policy governing concurrent SparkApplication runs. + ConcurrencyPolicy ConcurrencyPolicy `json:"concurrencyPolicy,omitempty"` + // SuccessfulRunHistoryLimit is the number of past successful runs of the application to keep. + // +optional + // Defaults to 1. + SuccessfulRunHistoryLimit *int32 `json:"successfulRunHistoryLimit,omitempty"` + // FailedRunHistoryLimit is the number of past failed runs of the application to keep. + // +optional + // Defaults to 1. + FailedRunHistoryLimit *int32 `json:"failedRunHistoryLimit,omitempty"` +} + +type ScheduleState string + +const ( + FailedValidationState ScheduleState = "FailedValidation" + ScheduledState ScheduleState = "Scheduled" +) + +type ScheduledSparkApplicationStatus struct { + // LastRun is the time when the last run of the application started. + // +nullable + LastRun metav1.Time `json:"lastRun,omitempty"` + // NextRun is the time when the next run of the application will start. + // +nullable + NextRun metav1.Time `json:"nextRun,omitempty"` + // LastRunName is the name of the SparkApplication for the most recent run of the application. + LastRunName string `json:"lastRunName,omitempty"` + // PastSuccessfulRunNames keeps the names of SparkApplications for past successful runs. + PastSuccessfulRunNames []string `json:"pastSuccessfulRunNames,omitempty"` + // PastFailedRunNames keeps the names of SparkApplications for past failed runs. + PastFailedRunNames []string `json:"pastFailedRunNames,omitempty"` + // ScheduleState is the current scheduling state of the application. + ScheduleState ScheduleState `json:"scheduleState,omitempty"` + // Reason tells why the ScheduledSparkApplication is in the particular ScheduleState. + Reason string `json:"reason,omitempty"` +} + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// ScheduledSparkApplicationList carries a list of ScheduledSparkApplication objects. +type ScheduledSparkApplicationList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []ScheduledSparkApplication `json:"items,omitempty"` +} + +// +genclient +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object +// +k8s:defaulter-gen=true +// +kubebuilder:subresource:status +// +kubebuilder:resource:scope=Namespaced,shortName=sparkapp,singular=sparkapplication + +// SparkApplication represents a Spark application running on and using Kubernetes as a cluster manager. +type SparkApplication struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata"` + Spec SparkApplicationSpec `json:"spec"` + Status SparkApplicationStatus `json:"status,omitempty"` +} + +// SparkApplicationSpec describes the specification of a Spark application using Kubernetes as a cluster manager. +// It carries every pieces of information a spark-submit command takes and recognizes. +type SparkApplicationSpec struct { + // Type tells the type of the Spark application. + // +kubebuilder:validation:Enum={Java,Python,Scala,R} + Type SparkApplicationType `json:"type"` + // SparkVersion is the version of Spark the application uses. + SparkVersion string `json:"sparkVersion"` + // Mode is the deployment mode of the Spark application. + // +kubebuilder:validation:Enum={cluster,client} + Mode DeployMode `json:"mode,omitempty"` + // ProxyUser specifies the user to impersonate when submitting the application. + // It maps to the command-line flag "--proxy-user" in spark-submit. + // +optional + ProxyUser *string `json:"proxyUser,omitempty"` + // Image is the container image for the driver, executor, and init-container. Any custom container images for the + // driver, executor, or init-container takes precedence over this. + // +optional + Image *string `json:"image,omitempty"` + // ImagePullPolicy is the image pull policy for the driver, executor, and init-container. + // +optional + ImagePullPolicy *string `json:"imagePullPolicy,omitempty"` + // ImagePullSecrets is the list of image-pull secrets. + // +optional + ImagePullSecrets []string `json:"imagePullSecrets,omitempty"` + // MainClass is the fully-qualified main class of the Spark application. + // This only applies to Java/Scala Spark applications. + // +optional + MainClass *string `json:"mainClass,omitempty"` + // MainFile is the path to a bundled JAR, Python, or R file of the application. + // +optional + MainApplicationFile *string `json:"mainApplicationFile"` + // Arguments is a list of arguments to be passed to the application. + // +optional + Arguments []string `json:"arguments,omitempty"` + // SparkConf carries user-specified Spark configuration properties as they would use the "--conf" option in + // spark-submit. + // +optional + SparkConf map[string]string `json:"sparkConf,omitempty"` + // HadoopConf carries user-specified Hadoop configuration properties as they would use the the "--conf" option + // in spark-submit. The SparkApplication controller automatically adds prefix "spark.hadoop." to Hadoop + // configuration properties. + // +optional + HadoopConf map[string]string `json:"hadoopConf,omitempty"` + // SparkConfigMap carries the name of the ConfigMap containing Spark configuration files such as log4j.properties. + // The controller will add environment variable SPARK_CONF_DIR to the path where the ConfigMap is mounted to. + // +optional + SparkConfigMap *string `json:"sparkConfigMap,omitempty"` + // HadoopConfigMap carries the name of the ConfigMap containing Hadoop configuration files such as core-site.xml. + // The controller will add environment variable HADOOP_CONF_DIR to the path where the ConfigMap is mounted to. + // +optional + HadoopConfigMap *string `json:"hadoopConfigMap,omitempty"` + // Volumes is the list of Kubernetes volumes that can be mounted by the driver and/or executors. + // +optional + Volumes []apiv1.Volume `json:"volumes,omitempty"` + // Driver is the driver specification. + Driver DriverSpec `json:"driver"` + // Executor is the executor specification. + Executor ExecutorSpec `json:"executor"` + // Deps captures all possible types of dependencies of a Spark application. + // +optional + Deps Dependencies `json:"deps,omitempty"` + // RestartPolicy defines the policy on if and in which conditions the controller should restart an application. + RestartPolicy RestartPolicy `json:"restartPolicy,omitempty"` + // NodeSelector is the Kubernetes node selector to be added to the driver and executor pods. + // This field is mutually exclusive with nodeSelector at podSpec level (driver or executor). + // This field will be deprecated in future versions (at SparkApplicationSpec level). + // +optional + NodeSelector map[string]string `json:"nodeSelector,omitempty"` + // FailureRetries is the number of times to retry a failed application before giving up. + // This is best effort and actual retry attempts can be >= the value specified. + // +optional + FailureRetries *int32 `json:"failureRetries,omitempty"` + // RetryInterval is the unit of intervals in seconds between submission retries. + // +optional + RetryInterval *int64 `json:"retryInterval,omitempty"` + // This sets the major Python version of the docker + // image used to run the driver and executor containers. Can either be 2 or 3, default 2. + // +optional + // +kubebuilder:validation:Enum={"2","3"} + PythonVersion *string `json:"pythonVersion,omitempty"` + // This sets the Memory Overhead Factor that will allocate memory to non-JVM memory. + // For JVM-based jobs this value will default to 0.10, for non-JVM jobs 0.40. Value of this field will + // be overridden by `Spec.Driver.MemoryOverhead` and `Spec.Executor.MemoryOverhead` if they are set. + // +optional + MemoryOverheadFactor *string `json:"memoryOverheadFactor,omitempty"` + // Monitoring configures how monitoring is handled. + // +optional + Monitoring *MonitoringSpec `json:"monitoring,omitempty"` + // BatchScheduler configures which batch scheduler will be used for scheduling + // +optional + BatchScheduler *string `json:"batchScheduler,omitempty"` + // TimeToLiveSeconds defines the Time-To-Live (TTL) duration in seconds for this SparkApplication + // after its termination. + // The SparkApplication object will be garbage collected if the current time is more than the + // TimeToLiveSeconds since its termination. + // +optional + TimeToLiveSeconds *int64 `json:"timeToLiveSeconds,omitempty"` + // BatchSchedulerOptions provides fine-grained control on how to batch scheduling. + // +optional + BatchSchedulerOptions *BatchSchedulerConfiguration `json:"batchSchedulerOptions,omitempty"` + // SparkUIOptions allows configuring the Service and the Ingress to expose the sparkUI + // +optional + SparkUIOptions *SparkUIConfiguration `json:"sparkUIOptions,omitempty"` + // DynamicAllocation configures dynamic allocation that becomes available for the Kubernetes + // scheduler backend since Spark 3.0. + // +optional + DynamicAllocation *DynamicAllocation `json:"dynamicAllocation,omitempty"` +} + +// BatchSchedulerConfiguration used to configure how to batch scheduling Spark Application +type BatchSchedulerConfiguration struct { + // Queue stands for the resource queue which the application belongs to, it's being used in Volcano batch scheduler. + // +optional + Queue *string `json:"queue,omitempty"` + // PriorityClassName stands for the name of k8s PriorityClass resource, it's being used in Volcano batch scheduler. + // +optional + PriorityClassName *string `json:"priorityClassName,omitempty"` + // Resources stands for the resource list custom request for. Usually it is used to define the lower-bound limit. + // If specified, volcano scheduler will consider it as the resources requested. + // +optional + Resources apiv1.ResourceList `json:"resources,omitempty"` +} + +// SparkUIConfiguration is for driver UI specific configuration parameters. +type SparkUIConfiguration struct { + // ServicePort allows configuring the port at service level that might be different from the targetPort. + // TargetPort should be the same as the one defined in spark.ui.port + // +optional + ServicePort *int32 `json:"servicePort"` + // ServicePortName allows configuring the name of the service port. + // This may be useful for sidecar proxies like Envoy injected by Istio which require specific ports names to treat traffic as proper HTTP. + // Defaults to spark-driver-ui-port. + // +optional + ServicePortName *string `json:"servicePortName"` + // ServiceType allows configuring the type of the service. Defaults to ClusterIP. + // +optional + ServiceType *apiv1.ServiceType `json:"serviceType"` + // ServiceAnnotations is a map of key,value pairs of annotations that might be added to the service object. + // +optional + ServiceAnnotations map[string]string `json:"serviceAnnotations,omitempty"` + // IngressAnnotations is a map of key,value pairs of annotations that might be added to the ingress object. i.e. specify nginx as ingress.class + // +optional + IngressAnnotations map[string]string `json:"ingressAnnotations,omitempty"` + // TlsHosts is useful If we need to declare SSL certificates to the ingress object + // +optional + IngressTLS []networkingv1.IngressTLS `json:"ingressTLS,omitempty"` +} + +// ApplicationStateType represents the type of the current state of an application. +type ApplicationStateType string + +// Different states an application may have. +const ( + NewState ApplicationStateType = "" + SubmittedState ApplicationStateType = "SUBMITTED" + RunningState ApplicationStateType = "RUNNING" + CompletedState ApplicationStateType = "COMPLETED" + FailedState ApplicationStateType = "FAILED" + FailedSubmissionState ApplicationStateType = "SUBMISSION_FAILED" + PendingRerunState ApplicationStateType = "PENDING_RERUN" + InvalidatingState ApplicationStateType = "INVALIDATING" + SucceedingState ApplicationStateType = "SUCCEEDING" + FailingState ApplicationStateType = "FAILING" + UnknownState ApplicationStateType = "UNKNOWN" +) + +// ApplicationState tells the current state of the application and an error message in case of failures. +type ApplicationState struct { + State ApplicationStateType `json:"state"` + ErrorMessage string `json:"errorMessage,omitempty"` +} + +// DriverState tells the current state of a spark driver. +type DriverState string + +// Different states a spark driver may have. +const ( + DriverPendingState DriverState = "PENDING" + DriverRunningState DriverState = "RUNNING" + DriverCompletedState DriverState = "COMPLETED" + DriverFailedState DriverState = "FAILED" + DriverUnknownState DriverState = "UNKNOWN" +) + +// ExecutorState tells the current state of an executor. +type ExecutorState string + +// Different states an executor may have. +const ( + ExecutorPendingState ExecutorState = "PENDING" + ExecutorRunningState ExecutorState = "RUNNING" + ExecutorCompletedState ExecutorState = "COMPLETED" + ExecutorFailedState ExecutorState = "FAILED" + ExecutorUnknownState ExecutorState = "UNKNOWN" +) + +// SparkApplicationStatus describes the current status of a Spark application. +type SparkApplicationStatus struct { + // SparkApplicationID is set by the spark-distribution(via spark.app.id config) on the driver and executor pods + SparkApplicationID string `json:"sparkApplicationId,omitempty"` + // SubmissionID is a unique ID of the current submission of the application. + SubmissionID string `json:"submissionID,omitempty"` + // LastSubmissionAttemptTime is the time for the last application submission attempt. + // +nullable + LastSubmissionAttemptTime metav1.Time `json:"lastSubmissionAttemptTime,omitempty"` + // CompletionTime is the time when the application runs to completion if it does. + // +nullable + TerminationTime metav1.Time `json:"terminationTime,omitempty"` + // DriverInfo has information about the driver. + DriverInfo DriverInfo `json:"driverInfo"` + // AppState tells the overall application state. + AppState ApplicationState `json:"applicationState,omitempty"` + // ExecutorState records the state of executors by executor Pod names. + ExecutorState map[string]ExecutorState `json:"executorState,omitempty"` + // ExecutionAttempts is the total number of attempts to run a submitted application to completion. + // Incremented upon each attempted run of the application and reset upon invalidation. + ExecutionAttempts int32 `json:"executionAttempts,omitempty"` + // SubmissionAttempts is the total number of attempts to submit an application to run. + // Incremented upon each attempted submission of the application and reset upon invalidation and rerun. + SubmissionAttempts int32 `json:"submissionAttempts,omitempty"` +} + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// SparkApplicationList carries a list of SparkApplication objects. +type SparkApplicationList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []SparkApplication `json:"items,omitempty"` +} + +// Dependencies specifies all possible types of dependencies of a Spark application. +type Dependencies struct { + // Jars is a list of JAR files the Spark application depends on. + // +optional + Jars []string `json:"jars,omitempty"` + // Files is a list of files the Spark application depends on. + // +optional + Files []string `json:"files,omitempty"` + // PyFiles is a list of Python files the Spark application depends on. + // +optional + PyFiles []string `json:"pyFiles,omitempty"` + // Packages is a list of maven coordinates of jars to include on the driver and executor + // classpaths. This will search the local maven repo, then maven central and any additional + // remote repositories given by the "repositories" option. + // Each package should be of the form "groupId:artifactId:version". + // +optional + Packages []string `json:"packages,omitempty"` + // ExcludePackages is a list of "groupId:artifactId", to exclude while resolving the + // dependencies provided in Packages to avoid dependency conflicts. + // +optional + ExcludePackages []string `json:"excludePackages,omitempty"` + // Repositories is a list of additional remote repositories to search for the maven coordinate + // given with the "packages" option. + // +optional + Repositories []string `json:"repositories,omitempty"` +} + +// SparkPodSpec defines common things that can be customized for a Spark driver or executor pod. +// TODO: investigate if we should use v1.PodSpec and limit what can be set instead. +type SparkPodSpec struct { + // Cores maps to `spark.driver.cores` or `spark.executor.cores` for the driver and executors, respectively. + // +optional + // +kubebuilder:validation:Minimum=1 + Cores *int32 `json:"cores,omitempty"` + // CoreLimit specifies a hard limit on CPU cores for the pod. + // Optional + CoreLimit *string `json:"coreLimit,omitempty"` + // Memory is the amount of memory to request for the pod. + // +optional + Memory *string `json:"memory,omitempty"` + // MemoryOverhead is the amount of off-heap memory to allocate in cluster mode, in MiB unless otherwise specified. + // +optional + MemoryOverhead *string `json:"memoryOverhead,omitempty"` + // GPU specifies GPU requirement for the pod. + // +optional + GPU *GPUSpec `json:"gpu,omitempty"` + // Image is the container image to use. Overrides Spec.Image if set. + // +optional + Image *string `json:"image,omitempty"` + // ConfigMaps carries information of other ConfigMaps to add to the pod. + // +optional + ConfigMaps []NamePath `json:"configMaps,omitempty"` + // Secrets carries information of secrets to add to the pod. + // +optional + Secrets []SecretInfo `json:"secrets,omitempty"` + // Env carries the environment variables to add to the pod. + // +optional + Env []apiv1.EnvVar `json:"env,omitempty"` + // EnvVars carries the environment variables to add to the pod. + // Deprecated. Consider using `env` instead. + // +optional + EnvVars map[string]string `json:"envVars,omitempty"` + // EnvFrom is a list of sources to populate environment variables in the container. + // +optional + EnvFrom []apiv1.EnvFromSource `json:"envFrom,omitempty"` + // EnvSecretKeyRefs holds a mapping from environment variable names to SecretKeyRefs. + // Deprecated. Consider using `env` instead. + // +optional + EnvSecretKeyRefs map[string]NameKey `json:"envSecretKeyRefs,omitempty"` + // Labels are the Kubernetes labels to be added to the pod. + // +optional + Labels map[string]string `json:"labels,omitempty"` + // Annotations are the Kubernetes annotations to be added to the pod. + // +optional + Annotations map[string]string `json:"annotations,omitempty"` + // VolumeMounts specifies the volumes listed in ".spec.volumes" to mount into the main container's filesystem. + // +optional + VolumeMounts []apiv1.VolumeMount `json:"volumeMounts,omitempty"` + // Affinity specifies the affinity/anti-affinity settings for the pod. + // +optional + Affinity *apiv1.Affinity `json:"affinity,omitempty"` + // Tolerations specifies the tolerations listed in ".spec.tolerations" to be applied to the pod. + // +optional + Tolerations []apiv1.Toleration `json:"tolerations,omitempty"` + // PodSecurityContext specifies the PodSecurityContext to apply. + // +optional + PodSecurityContext *apiv1.PodSecurityContext `json:"podSecurityContext,omitempty"` + // SecurityContext specifies the container's SecurityContext to apply. + // +optional + SecurityContext *apiv1.SecurityContext `json:"securityContext,omitempty"` + // SchedulerName specifies the scheduler that will be used for scheduling + // +optional + SchedulerName *string `json:"schedulerName,omitempty"` + // Sidecars is a list of sidecar containers that run along side the main Spark container. + // +optional + Sidecars []apiv1.Container `json:"sidecars,omitempty"` + // InitContainers is a list of init-containers that run to completion before the main Spark container. + // +optional + InitContainers []apiv1.Container `json:"initContainers,omitempty"` + // HostNetwork indicates whether to request host networking for the pod or not. + // +optional + HostNetwork *bool `json:"hostNetwork,omitempty"` + // NodeSelector is the Kubernetes node selector to be added to the driver and executor pods. + // This field is mutually exclusive with nodeSelector at SparkApplication level (which will be deprecated). + // +optional + NodeSelector map[string]string `json:"nodeSelector,omitempty"` + // DnsConfig dns settings for the pod, following the Kubernetes specifications. + // +optional + DNSConfig *apiv1.PodDNSConfig `json:"dnsConfig,omitempty"` + // Termination grace period seconds for the pod + // +optional + TerminationGracePeriodSeconds *int64 `json:"terminationGracePeriodSeconds,omitempty"` + // ServiceAccount is the name of the custom Kubernetes service account used by the pod. + // +optional + ServiceAccount *string `json:"serviceAccount,omitempty"` + // HostAliases settings for the pod, following the Kubernetes specifications. + // +optional + HostAliases []apiv1.HostAlias `json:"hostAliases,omitempty"` + // ShareProcessNamespace settings for the pod, following the Kubernetes specifications. + // +optional + ShareProcessNamespace *bool `json:"shareProcessNamespace,omitempty"` +} + +// DriverSpec is specification of the driver. +type DriverSpec struct { + SparkPodSpec `json:",inline"` + // PodName is the name of the driver pod that the user creates. This is used for the + // in-cluster client mode in which the user creates a client pod where the driver of + // the user application runs. It's an error to set this field if Mode is not + // in-cluster-client. + // +optional + // +kubebuilder:validation:Pattern=[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)* + PodName *string `json:"podName,omitempty"` + // CoreRequest is the physical CPU core request for the driver. + // Maps to `spark.kubernetes.driver.request.cores` that is available since Spark 3.0. + // +optional + CoreRequest *string `json:"coreRequest,omitempty"` + // JavaOptions is a string of extra JVM options to pass to the driver. For instance, + // GC settings or other logging. + // +optional + JavaOptions *string `json:"javaOptions,omitempty"` + // Lifecycle for running preStop or postStart commands + // +optional + Lifecycle *apiv1.Lifecycle `json:"lifecycle,omitempty"` + // KubernetesMaster is the URL of the Kubernetes master used by the driver to manage executor pods and + // other Kubernetes resources. Default to https://kubernetes.default.svc. + // +optional + KubernetesMaster *string `json:"kubernetesMaster,omitempty"` + // ServiceAnnotations defines the annotations to be added to the Kubernetes headless service used by + // executors to connect to the driver. + // +optional + ServiceAnnotations map[string]string `json:"serviceAnnotations,omitempty"` + // Ports settings for the pods, following the Kubernetes specifications. + // +optional + Ports []Port `json:"ports,omitempty"` +} + +// ExecutorSpec is specification of the executor. +type ExecutorSpec struct { + SparkPodSpec `json:",inline"` + // Instances is the number of executor instances. + // +optional + // +kubebuilder:validation:Minimum=1 + Instances *int32 `json:"instances,omitempty"` + // CoreRequest is the physical CPU core request for the executors. + // Maps to `spark.kubernetes.executor.request.cores` that is available since Spark 2.4. + // +optional + CoreRequest *string `json:"coreRequest,omitempty"` + // JavaOptions is a string of extra JVM options to pass to the executors. For instance, + // GC settings or other logging. + // +optional + JavaOptions *string `json:"javaOptions,omitempty"` + // DeleteOnTermination specify whether executor pods should be deleted in case of failure or normal termination. + // Maps to `spark.kubernetes.executor.deleteOnTermination` that is available since Spark 3.0. + // +optional + DeleteOnTermination *bool `json:"deleteOnTermination,omitempty"` + // Ports settings for the pods, following the Kubernetes specifications. + // +optional + Ports []Port `json:"ports,omitempty"` +} + +// NamePath is a pair of a name and a path to which the named objects should be mounted to. +type NamePath struct { + Name string `json:"name"` + Path string `json:"path"` +} + +// SecretType tells the type of a secret. +type SecretType string + +// An enumeration of secret types supported. +const ( + // GCPServiceAccountSecret is for secrets from a GCP service account Json key file that needs + // the environment variable GOOGLE_APPLICATION_CREDENTIALS. + GCPServiceAccountSecret SecretType = "GCPServiceAccount" + // HadoopDelegationTokenSecret is for secrets from an Hadoop delegation token that needs the + // environment variable HADOOP_TOKEN_FILE_LOCATION. + HadoopDelegationTokenSecret SecretType = "HadoopDelegationToken" + // GenericType is for secrets that needs no special handling. + GenericType SecretType = "Generic" +) + +// DriverInfo captures information about the driver. +type DriverInfo struct { + WebUIServiceName string `json:"webUIServiceName,omitempty"` + // UI Details for the UI created via ClusterIP service accessible from within the cluster. + WebUIPort int32 `json:"webUIPort,omitempty"` + WebUIAddress string `json:"webUIAddress,omitempty"` + // Ingress Details if an ingress for the UI was created. + WebUIIngressName string `json:"webUIIngressName,omitempty"` + WebUIIngressAddress string `json:"webUIIngressAddress,omitempty"` + PodName string `json:"podName,omitempty"` +} + +// SecretInfo captures information of a secret. +type SecretInfo struct { + Name string `json:"name"` + Path string `json:"path"` + Type SecretType `json:"secretType"` +} + +// NameKey represents the name and key of a SecretKeyRef. +type NameKey struct { + Name string `json:"name"` + Key string `json:"key"` +} + +// Port represents the port definition in the pods objects. +type Port struct { + Name string `json:"name"` + Protocol string `json:"protocol"` + ContainerPort int32 `json:"containerPort"` +} + +// MonitoringSpec defines the monitoring specification. +type MonitoringSpec struct { + // ExposeDriverMetrics specifies whether to expose metrics on the driver. + ExposeDriverMetrics bool `json:"exposeDriverMetrics"` + // ExposeExecutorMetrics specifies whether to expose metrics on the executors. + ExposeExecutorMetrics bool `json:"exposeExecutorMetrics"` + // MetricsProperties is the content of a custom metrics.properties for configuring the Spark metric system. + // +optional + // If not specified, the content in spark-docker/conf/metrics.properties will be used. + MetricsProperties *string `json:"metricsProperties,omitempty"` + // MetricsPropertiesFile is the container local path of file metrics.properties for configuring + //the Spark metric system. If not specified, value /etc/metrics/conf/metrics.properties will be used. + // +optional + MetricsPropertiesFile *string `json:"metricsPropertiesFile,omitempty"` + // Prometheus is for configuring the Prometheus JMX exporter. + // +optional + Prometheus *PrometheusSpec `json:"prometheus,omitempty"` +} + +// PrometheusSpec defines the Prometheus specification when Prometheus is to be used for +// collecting and exposing metrics. +type PrometheusSpec struct { + // JmxExporterJar is the path to the Prometheus JMX exporter jar in the container. + JmxExporterJar string `json:"jmxExporterJar"` + // Port is the port of the HTTP server run by the Prometheus JMX exporter. + // If not specified, 8090 will be used as the default. + // +kubebuilder:validation:Minimum=1024 + // +kubebuilder:validation:Maximum=49151 + // +optional + Port *int32 `json:"port,omitempty"` + // PortName is the port name of prometheus JMX exporter port. + // If not specified, jmx-exporter will be used as the default. + // +optional + PortName *string `json:"portName,omitempty"` + // ConfigFile is the path to the custom Prometheus configuration file provided in the Spark image. + // ConfigFile takes precedence over Configuration, which is shown below. + // +optional + ConfigFile *string `json:"configFile,omitempty"` + // Configuration is the content of the Prometheus configuration needed by the Prometheus JMX exporter. + // If not specified, the content in spark-docker/conf/prometheus.yaml will be used. + // Configuration has no effect if ConfigFile is set. + // +optional + Configuration *string `json:"configuration,omitempty"` +} + +type GPUSpec struct { + // Name is GPU resource name, such as: nvidia.com/gpu or amd.com/gpu + Name string `json:"name"` + // Quantity is the number of GPUs to request for driver or executor. + Quantity int64 `json:"quantity"` +} + +// DynamicAllocation contains configuration options for dynamic allocation. +type DynamicAllocation struct { + // Enabled controls whether dynamic allocation is enabled or not. + Enabled bool `json:"enabled,omitempty"` + // InitialExecutors is the initial number of executors to request. If .spec.executor.instances + // is also set, the initial number of executors is set to the bigger of that and this option. + // +optional + InitialExecutors *int32 `json:"initialExecutors,omitempty"` + // MinExecutors is the lower bound for the number of executors if dynamic allocation is enabled. + // +optional + MinExecutors *int32 `json:"minExecutors,omitempty"` + // MaxExecutors is the upper bound for the number of executors if dynamic allocation is enabled. + // +optional + MaxExecutors *int32 `json:"maxExecutors,omitempty"` + // ShuffleTrackingTimeout controls the timeout in milliseconds for executors that are holding + // shuffle data if shuffle tracking is enabled (true by default if dynamic allocation is enabled). + // +optional + ShuffleTrackingTimeout *int64 `json:"shuffleTrackingTimeout,omitempty"` +} + +// PrometheusMonitoringEnabled returns if Prometheus monitoring is enabled or not. +func (s *SparkApplication) PrometheusMonitoringEnabled() bool { + return s.Spec.Monitoring != nil && s.Spec.Monitoring.Prometheus != nil +} + +// HasPrometheusConfigFile returns if Prometheus monitoring uses a configuration file in the container. +func (s *SparkApplication) HasPrometheusConfigFile() bool { + return s.PrometheusMonitoringEnabled() && + s.Spec.Monitoring.Prometheus.ConfigFile != nil && + *s.Spec.Monitoring.Prometheus.ConfigFile != "" +} + +// HasPrometheusConfig returns if Prometheus monitoring defines metricsProperties in the spec. +func (s *SparkApplication) HasMetricsProperties() bool { + return s.PrometheusMonitoringEnabled() && + s.Spec.Monitoring.MetricsProperties != nil && + *s.Spec.Monitoring.MetricsProperties != "" +} + +// HasPrometheusConfigFile returns if Monitoring defines metricsPropertiesFile in the spec. +func (s *SparkApplication) HasMetricsPropertiesFile() bool { + return s.PrometheusMonitoringEnabled() && + s.Spec.Monitoring.MetricsPropertiesFile != nil && + *s.Spec.Monitoring.MetricsPropertiesFile != "" +} + +// ExposeDriverMetrics returns if driver metrics should be exposed. +func (s *SparkApplication) ExposeDriverMetrics() bool { + return s.Spec.Monitoring != nil && s.Spec.Monitoring.ExposeDriverMetrics +} + +// ExposeExecutorMetrics returns if executor metrics should be exposed. +func (s *SparkApplication) ExposeExecutorMetrics() bool { + return s.Spec.Monitoring != nil && s.Spec.Monitoring.ExposeExecutorMetrics +} diff --git a/third_party/sparkoperator/v1beta2/zz_generated.deepcopy.go b/third_party/sparkoperator/v1beta2/zz_generated.deepcopy.go new file mode 100644 index 000000000..80995147a --- /dev/null +++ b/third_party/sparkoperator/v1beta2/zz_generated.deepcopy.go @@ -0,0 +1,1044 @@ +// +build !ignore_autogenerated + +// Code generated by k8s code-generator DO NOT EDIT. + +/* +Copyright 2018 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by deepcopy-gen. DO NOT EDIT. + +package v1beta2 + +import ( + v1 "k8s.io/api/core/v1" + networkingv1 "k8s.io/api/networking/v1" + runtime "k8s.io/apimachinery/pkg/runtime" +) + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ApplicationState) DeepCopyInto(out *ApplicationState) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ApplicationState. +func (in *ApplicationState) DeepCopy() *ApplicationState { + if in == nil { + return nil + } + out := new(ApplicationState) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *BatchSchedulerConfiguration) DeepCopyInto(out *BatchSchedulerConfiguration) { + *out = *in + if in.Queue != nil { + in, out := &in.Queue, &out.Queue + *out = new(string) + **out = **in + } + if in.PriorityClassName != nil { + in, out := &in.PriorityClassName, &out.PriorityClassName + *out = new(string) + **out = **in + } + if in.Resources != nil { + in, out := &in.Resources, &out.Resources + *out = make(v1.ResourceList, len(*in)) + for key, val := range *in { + (*out)[key] = val.DeepCopy() + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BatchSchedulerConfiguration. +func (in *BatchSchedulerConfiguration) DeepCopy() *BatchSchedulerConfiguration { + if in == nil { + return nil + } + out := new(BatchSchedulerConfiguration) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Dependencies) DeepCopyInto(out *Dependencies) { + *out = *in + if in.Jars != nil { + in, out := &in.Jars, &out.Jars + *out = make([]string, len(*in)) + copy(*out, *in) + } + if in.Files != nil { + in, out := &in.Files, &out.Files + *out = make([]string, len(*in)) + copy(*out, *in) + } + if in.PyFiles != nil { + in, out := &in.PyFiles, &out.PyFiles + *out = make([]string, len(*in)) + copy(*out, *in) + } + if in.Packages != nil { + in, out := &in.Packages, &out.Packages + *out = make([]string, len(*in)) + copy(*out, *in) + } + if in.ExcludePackages != nil { + in, out := &in.ExcludePackages, &out.ExcludePackages + *out = make([]string, len(*in)) + copy(*out, *in) + } + if in.Repositories != nil { + in, out := &in.Repositories, &out.Repositories + *out = make([]string, len(*in)) + copy(*out, *in) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Dependencies. +func (in *Dependencies) DeepCopy() *Dependencies { + if in == nil { + return nil + } + out := new(Dependencies) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *DriverInfo) DeepCopyInto(out *DriverInfo) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DriverInfo. +func (in *DriverInfo) DeepCopy() *DriverInfo { + if in == nil { + return nil + } + out := new(DriverInfo) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *DriverSpec) DeepCopyInto(out *DriverSpec) { + *out = *in + in.SparkPodSpec.DeepCopyInto(&out.SparkPodSpec) + if in.PodName != nil { + in, out := &in.PodName, &out.PodName + *out = new(string) + **out = **in + } + if in.CoreRequest != nil { + in, out := &in.CoreRequest, &out.CoreRequest + *out = new(string) + **out = **in + } + if in.JavaOptions != nil { + in, out := &in.JavaOptions, &out.JavaOptions + *out = new(string) + **out = **in + } + if in.Lifecycle != nil { + in, out := &in.Lifecycle, &out.Lifecycle + *out = new(v1.Lifecycle) + (*in).DeepCopyInto(*out) + } + if in.KubernetesMaster != nil { + in, out := &in.KubernetesMaster, &out.KubernetesMaster + *out = new(string) + **out = **in + } + if in.ServiceAnnotations != nil { + in, out := &in.ServiceAnnotations, &out.ServiceAnnotations + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + if in.Ports != nil { + in, out := &in.Ports, &out.Ports + *out = make([]Port, len(*in)) + copy(*out, *in) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DriverSpec. +func (in *DriverSpec) DeepCopy() *DriverSpec { + if in == nil { + return nil + } + out := new(DriverSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *DynamicAllocation) DeepCopyInto(out *DynamicAllocation) { + *out = *in + if in.InitialExecutors != nil { + in, out := &in.InitialExecutors, &out.InitialExecutors + *out = new(int32) + **out = **in + } + if in.MinExecutors != nil { + in, out := &in.MinExecutors, &out.MinExecutors + *out = new(int32) + **out = **in + } + if in.MaxExecutors != nil { + in, out := &in.MaxExecutors, &out.MaxExecutors + *out = new(int32) + **out = **in + } + if in.ShuffleTrackingTimeout != nil { + in, out := &in.ShuffleTrackingTimeout, &out.ShuffleTrackingTimeout + *out = new(int64) + **out = **in + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DynamicAllocation. +func (in *DynamicAllocation) DeepCopy() *DynamicAllocation { + if in == nil { + return nil + } + out := new(DynamicAllocation) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ExecutorSpec) DeepCopyInto(out *ExecutorSpec) { + *out = *in + in.SparkPodSpec.DeepCopyInto(&out.SparkPodSpec) + if in.Instances != nil { + in, out := &in.Instances, &out.Instances + *out = new(int32) + **out = **in + } + if in.CoreRequest != nil { + in, out := &in.CoreRequest, &out.CoreRequest + *out = new(string) + **out = **in + } + if in.JavaOptions != nil { + in, out := &in.JavaOptions, &out.JavaOptions + *out = new(string) + **out = **in + } + if in.DeleteOnTermination != nil { + in, out := &in.DeleteOnTermination, &out.DeleteOnTermination + *out = new(bool) + **out = **in + } + if in.Ports != nil { + in, out := &in.Ports, &out.Ports + *out = make([]Port, len(*in)) + copy(*out, *in) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ExecutorSpec. +func (in *ExecutorSpec) DeepCopy() *ExecutorSpec { + if in == nil { + return nil + } + out := new(ExecutorSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *GPUSpec) DeepCopyInto(out *GPUSpec) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new GPUSpec. +func (in *GPUSpec) DeepCopy() *GPUSpec { + if in == nil { + return nil + } + out := new(GPUSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *MonitoringSpec) DeepCopyInto(out *MonitoringSpec) { + *out = *in + if in.MetricsProperties != nil { + in, out := &in.MetricsProperties, &out.MetricsProperties + *out = new(string) + **out = **in + } + if in.MetricsPropertiesFile != nil { + in, out := &in.MetricsPropertiesFile, &out.MetricsPropertiesFile + *out = new(string) + **out = **in + } + if in.Prometheus != nil { + in, out := &in.Prometheus, &out.Prometheus + *out = new(PrometheusSpec) + (*in).DeepCopyInto(*out) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MonitoringSpec. +func (in *MonitoringSpec) DeepCopy() *MonitoringSpec { + if in == nil { + return nil + } + out := new(MonitoringSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *NameKey) DeepCopyInto(out *NameKey) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NameKey. +func (in *NameKey) DeepCopy() *NameKey { + if in == nil { + return nil + } + out := new(NameKey) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *NamePath) DeepCopyInto(out *NamePath) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NamePath. +func (in *NamePath) DeepCopy() *NamePath { + if in == nil { + return nil + } + out := new(NamePath) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Port) DeepCopyInto(out *Port) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Port. +func (in *Port) DeepCopy() *Port { + if in == nil { + return nil + } + out := new(Port) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PrometheusSpec) DeepCopyInto(out *PrometheusSpec) { + *out = *in + if in.Port != nil { + in, out := &in.Port, &out.Port + *out = new(int32) + **out = **in + } + if in.PortName != nil { + in, out := &in.PortName, &out.PortName + *out = new(string) + **out = **in + } + if in.ConfigFile != nil { + in, out := &in.ConfigFile, &out.ConfigFile + *out = new(string) + **out = **in + } + if in.Configuration != nil { + in, out := &in.Configuration, &out.Configuration + *out = new(string) + **out = **in + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PrometheusSpec. +func (in *PrometheusSpec) DeepCopy() *PrometheusSpec { + if in == nil { + return nil + } + out := new(PrometheusSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RestartPolicy) DeepCopyInto(out *RestartPolicy) { + *out = *in + if in.OnSubmissionFailureRetries != nil { + in, out := &in.OnSubmissionFailureRetries, &out.OnSubmissionFailureRetries + *out = new(int32) + **out = **in + } + if in.OnFailureRetries != nil { + in, out := &in.OnFailureRetries, &out.OnFailureRetries + *out = new(int32) + **out = **in + } + if in.OnSubmissionFailureRetryInterval != nil { + in, out := &in.OnSubmissionFailureRetryInterval, &out.OnSubmissionFailureRetryInterval + *out = new(int64) + **out = **in + } + if in.OnFailureRetryInterval != nil { + in, out := &in.OnFailureRetryInterval, &out.OnFailureRetryInterval + *out = new(int64) + **out = **in + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RestartPolicy. +func (in *RestartPolicy) DeepCopy() *RestartPolicy { + if in == nil { + return nil + } + out := new(RestartPolicy) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ScheduledSparkApplication) DeepCopyInto(out *ScheduledSparkApplication) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + in.Status.DeepCopyInto(&out.Status) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ScheduledSparkApplication. +func (in *ScheduledSparkApplication) DeepCopy() *ScheduledSparkApplication { + if in == nil { + return nil + } + out := new(ScheduledSparkApplication) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *ScheduledSparkApplication) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ScheduledSparkApplicationList) DeepCopyInto(out *ScheduledSparkApplicationList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]ScheduledSparkApplication, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ScheduledSparkApplicationList. +func (in *ScheduledSparkApplicationList) DeepCopy() *ScheduledSparkApplicationList { + if in == nil { + return nil + } + out := new(ScheduledSparkApplicationList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *ScheduledSparkApplicationList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ScheduledSparkApplicationSpec) DeepCopyInto(out *ScheduledSparkApplicationSpec) { + *out = *in + in.Template.DeepCopyInto(&out.Template) + if in.Suspend != nil { + in, out := &in.Suspend, &out.Suspend + *out = new(bool) + **out = **in + } + if in.SuccessfulRunHistoryLimit != nil { + in, out := &in.SuccessfulRunHistoryLimit, &out.SuccessfulRunHistoryLimit + *out = new(int32) + **out = **in + } + if in.FailedRunHistoryLimit != nil { + in, out := &in.FailedRunHistoryLimit, &out.FailedRunHistoryLimit + *out = new(int32) + **out = **in + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ScheduledSparkApplicationSpec. +func (in *ScheduledSparkApplicationSpec) DeepCopy() *ScheduledSparkApplicationSpec { + if in == nil { + return nil + } + out := new(ScheduledSparkApplicationSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ScheduledSparkApplicationStatus) DeepCopyInto(out *ScheduledSparkApplicationStatus) { + *out = *in + in.LastRun.DeepCopyInto(&out.LastRun) + in.NextRun.DeepCopyInto(&out.NextRun) + if in.PastSuccessfulRunNames != nil { + in, out := &in.PastSuccessfulRunNames, &out.PastSuccessfulRunNames + *out = make([]string, len(*in)) + copy(*out, *in) + } + if in.PastFailedRunNames != nil { + in, out := &in.PastFailedRunNames, &out.PastFailedRunNames + *out = make([]string, len(*in)) + copy(*out, *in) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ScheduledSparkApplicationStatus. +func (in *ScheduledSparkApplicationStatus) DeepCopy() *ScheduledSparkApplicationStatus { + if in == nil { + return nil + } + out := new(ScheduledSparkApplicationStatus) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SecretInfo) DeepCopyInto(out *SecretInfo) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SecretInfo. +func (in *SecretInfo) DeepCopy() *SecretInfo { + if in == nil { + return nil + } + out := new(SecretInfo) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SparkApplication) DeepCopyInto(out *SparkApplication) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + in.Status.DeepCopyInto(&out.Status) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SparkApplication. +func (in *SparkApplication) DeepCopy() *SparkApplication { + if in == nil { + return nil + } + out := new(SparkApplication) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *SparkApplication) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SparkApplicationList) DeepCopyInto(out *SparkApplicationList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]SparkApplication, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SparkApplicationList. +func (in *SparkApplicationList) DeepCopy() *SparkApplicationList { + if in == nil { + return nil + } + out := new(SparkApplicationList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *SparkApplicationList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SparkApplicationSpec) DeepCopyInto(out *SparkApplicationSpec) { + *out = *in + if in.ProxyUser != nil { + in, out := &in.ProxyUser, &out.ProxyUser + *out = new(string) + **out = **in + } + if in.Image != nil { + in, out := &in.Image, &out.Image + *out = new(string) + **out = **in + } + if in.ImagePullPolicy != nil { + in, out := &in.ImagePullPolicy, &out.ImagePullPolicy + *out = new(string) + **out = **in + } + if in.ImagePullSecrets != nil { + in, out := &in.ImagePullSecrets, &out.ImagePullSecrets + *out = make([]string, len(*in)) + copy(*out, *in) + } + if in.MainClass != nil { + in, out := &in.MainClass, &out.MainClass + *out = new(string) + **out = **in + } + if in.MainApplicationFile != nil { + in, out := &in.MainApplicationFile, &out.MainApplicationFile + *out = new(string) + **out = **in + } + if in.Arguments != nil { + in, out := &in.Arguments, &out.Arguments + *out = make([]string, len(*in)) + copy(*out, *in) + } + if in.SparkConf != nil { + in, out := &in.SparkConf, &out.SparkConf + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + if in.HadoopConf != nil { + in, out := &in.HadoopConf, &out.HadoopConf + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + if in.SparkConfigMap != nil { + in, out := &in.SparkConfigMap, &out.SparkConfigMap + *out = new(string) + **out = **in + } + if in.HadoopConfigMap != nil { + in, out := &in.HadoopConfigMap, &out.HadoopConfigMap + *out = new(string) + **out = **in + } + if in.Volumes != nil { + in, out := &in.Volumes, &out.Volumes + *out = make([]v1.Volume, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + in.Driver.DeepCopyInto(&out.Driver) + in.Executor.DeepCopyInto(&out.Executor) + in.Deps.DeepCopyInto(&out.Deps) + in.RestartPolicy.DeepCopyInto(&out.RestartPolicy) + if in.NodeSelector != nil { + in, out := &in.NodeSelector, &out.NodeSelector + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + if in.FailureRetries != nil { + in, out := &in.FailureRetries, &out.FailureRetries + *out = new(int32) + **out = **in + } + if in.RetryInterval != nil { + in, out := &in.RetryInterval, &out.RetryInterval + *out = new(int64) + **out = **in + } + if in.PythonVersion != nil { + in, out := &in.PythonVersion, &out.PythonVersion + *out = new(string) + **out = **in + } + if in.MemoryOverheadFactor != nil { + in, out := &in.MemoryOverheadFactor, &out.MemoryOverheadFactor + *out = new(string) + **out = **in + } + if in.Monitoring != nil { + in, out := &in.Monitoring, &out.Monitoring + *out = new(MonitoringSpec) + (*in).DeepCopyInto(*out) + } + if in.BatchScheduler != nil { + in, out := &in.BatchScheduler, &out.BatchScheduler + *out = new(string) + **out = **in + } + if in.TimeToLiveSeconds != nil { + in, out := &in.TimeToLiveSeconds, &out.TimeToLiveSeconds + *out = new(int64) + **out = **in + } + if in.BatchSchedulerOptions != nil { + in, out := &in.BatchSchedulerOptions, &out.BatchSchedulerOptions + *out = new(BatchSchedulerConfiguration) + (*in).DeepCopyInto(*out) + } + if in.SparkUIOptions != nil { + in, out := &in.SparkUIOptions, &out.SparkUIOptions + *out = new(SparkUIConfiguration) + (*in).DeepCopyInto(*out) + } + if in.DynamicAllocation != nil { + in, out := &in.DynamicAllocation, &out.DynamicAllocation + *out = new(DynamicAllocation) + (*in).DeepCopyInto(*out) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SparkApplicationSpec. +func (in *SparkApplicationSpec) DeepCopy() *SparkApplicationSpec { + if in == nil { + return nil + } + out := new(SparkApplicationSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SparkApplicationStatus) DeepCopyInto(out *SparkApplicationStatus) { + *out = *in + in.LastSubmissionAttemptTime.DeepCopyInto(&out.LastSubmissionAttemptTime) + in.TerminationTime.DeepCopyInto(&out.TerminationTime) + out.DriverInfo = in.DriverInfo + out.AppState = in.AppState + if in.ExecutorState != nil { + in, out := &in.ExecutorState, &out.ExecutorState + *out = make(map[string]ExecutorState, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SparkApplicationStatus. +func (in *SparkApplicationStatus) DeepCopy() *SparkApplicationStatus { + if in == nil { + return nil + } + out := new(SparkApplicationStatus) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SparkPodSpec) DeepCopyInto(out *SparkPodSpec) { + *out = *in + if in.Cores != nil { + in, out := &in.Cores, &out.Cores + *out = new(int32) + **out = **in + } + if in.CoreLimit != nil { + in, out := &in.CoreLimit, &out.CoreLimit + *out = new(string) + **out = **in + } + if in.Memory != nil { + in, out := &in.Memory, &out.Memory + *out = new(string) + **out = **in + } + if in.MemoryOverhead != nil { + in, out := &in.MemoryOverhead, &out.MemoryOverhead + *out = new(string) + **out = **in + } + if in.GPU != nil { + in, out := &in.GPU, &out.GPU + *out = new(GPUSpec) + **out = **in + } + if in.Image != nil { + in, out := &in.Image, &out.Image + *out = new(string) + **out = **in + } + if in.ConfigMaps != nil { + in, out := &in.ConfigMaps, &out.ConfigMaps + *out = make([]NamePath, len(*in)) + copy(*out, *in) + } + if in.Secrets != nil { + in, out := &in.Secrets, &out.Secrets + *out = make([]SecretInfo, len(*in)) + copy(*out, *in) + } + if in.Env != nil { + in, out := &in.Env, &out.Env + *out = make([]v1.EnvVar, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + if in.EnvVars != nil { + in, out := &in.EnvVars, &out.EnvVars + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + if in.EnvFrom != nil { + in, out := &in.EnvFrom, &out.EnvFrom + *out = make([]v1.EnvFromSource, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + if in.EnvSecretKeyRefs != nil { + in, out := &in.EnvSecretKeyRefs, &out.EnvSecretKeyRefs + *out = make(map[string]NameKey, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + if in.Labels != nil { + in, out := &in.Labels, &out.Labels + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + if in.Annotations != nil { + in, out := &in.Annotations, &out.Annotations + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + if in.VolumeMounts != nil { + in, out := &in.VolumeMounts, &out.VolumeMounts + *out = make([]v1.VolumeMount, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + if in.Affinity != nil { + in, out := &in.Affinity, &out.Affinity + *out = new(v1.Affinity) + (*in).DeepCopyInto(*out) + } + if in.Tolerations != nil { + in, out := &in.Tolerations, &out.Tolerations + *out = make([]v1.Toleration, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + if in.PodSecurityContext != nil { + in, out := &in.PodSecurityContext, &out.PodSecurityContext + *out = new(v1.PodSecurityContext) + (*in).DeepCopyInto(*out) + } + if in.SecurityContext != nil { + in, out := &in.SecurityContext, &out.SecurityContext + *out = new(v1.SecurityContext) + (*in).DeepCopyInto(*out) + } + if in.SchedulerName != nil { + in, out := &in.SchedulerName, &out.SchedulerName + *out = new(string) + **out = **in + } + if in.Sidecars != nil { + in, out := &in.Sidecars, &out.Sidecars + *out = make([]v1.Container, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + if in.InitContainers != nil { + in, out := &in.InitContainers, &out.InitContainers + *out = make([]v1.Container, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + if in.HostNetwork != nil { + in, out := &in.HostNetwork, &out.HostNetwork + *out = new(bool) + **out = **in + } + if in.NodeSelector != nil { + in, out := &in.NodeSelector, &out.NodeSelector + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + if in.DNSConfig != nil { + in, out := &in.DNSConfig, &out.DNSConfig + *out = new(v1.PodDNSConfig) + (*in).DeepCopyInto(*out) + } + if in.TerminationGracePeriodSeconds != nil { + in, out := &in.TerminationGracePeriodSeconds, &out.TerminationGracePeriodSeconds + *out = new(int64) + **out = **in + } + if in.ServiceAccount != nil { + in, out := &in.ServiceAccount, &out.ServiceAccount + *out = new(string) + **out = **in + } + if in.HostAliases != nil { + in, out := &in.HostAliases, &out.HostAliases + *out = make([]v1.HostAlias, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + if in.ShareProcessNamespace != nil { + in, out := &in.ShareProcessNamespace, &out.ShareProcessNamespace + *out = new(bool) + **out = **in + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SparkPodSpec. +func (in *SparkPodSpec) DeepCopy() *SparkPodSpec { + if in == nil { + return nil + } + out := new(SparkPodSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SparkUIConfiguration) DeepCopyInto(out *SparkUIConfiguration) { + *out = *in + if in.ServicePort != nil { + in, out := &in.ServicePort, &out.ServicePort + *out = new(int32) + **out = **in + } + if in.ServicePortName != nil { + in, out := &in.ServicePortName, &out.ServicePortName + *out = new(string) + **out = **in + } + if in.ServiceType != nil { + in, out := &in.ServiceType, &out.ServiceType + *out = new(v1.ServiceType) + **out = **in + } + if in.ServiceAnnotations != nil { + in, out := &in.ServiceAnnotations, &out.ServiceAnnotations + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + if in.IngressAnnotations != nil { + in, out := &in.IngressAnnotations, &out.IngressAnnotations + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + if in.IngressTLS != nil { + in, out := &in.IngressTLS, &out.IngressTLS + *out = make([]networkingv1.IngressTLS, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SparkUIConfiguration. +func (in *SparkUIConfiguration) DeepCopy() *SparkUIConfiguration { + if in == nil { + return nil + } + out := new(SparkUIConfiguration) + in.DeepCopyInto(out) + return out +}