Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Refactor operator to use the controller runtime with multi-version support #658

Closed

Conversation

skonto
Copy link
Contributor

@skonto skonto commented Oct 11, 2019

Aim is to solve #547.
Important: _This a big refactoring and does not cover everything at the moment. This is work in progress, goal is to get community feedback for moving forward with the refactoring.
Some important changes are:
a) proper support for the status subresource
b) support for contr runtime 0.2.0 GA
c) support for logr -zap log
d) use the certification manager for injecting the webhook ca
e) use a hash to overcome the restriction that comes with the controller runtime about accessing previous state, as described here kubernetes-sigs/kubebuilder#37 (comment). Predicates could be used but might be too much work.
f) go mod support
h) multi-namespace support
Note: crds require some work to become of StructuralSchema I noticed several violations when running kubectl describe crd.
In order to test this:

$docker build --build-arg SPARK_IMAGE=gcr.io/spark-operator/spark:v2.4.4  -t skonto/operator:test -f Dockerfile .
#or use the following that requires to set GOPATH, go bin under PATH and GO111MODULE=on via export
$make docker-build SPARK-IMAGE=gcr.io/spark-operator/spark:v2.4.4 IMG=skonto/operator:test
$docker push skonto/operator:test
#setup minikube latest

$kubectl create namespace cert-manager
$kubectl label namespace cert-manager certmanager.k8s.io/disable-validation=true
$kubectl apply -f https://github.com/jetstack/cert-manager/releases/download/v0.10.1/cert-manager.yaml

# check in namespace cert-manager that all pods are running
# create crds
$kubectl apply -f manifest/crds/sparkoperator.k8s.io_scheduledsparkapplications.yaml
$kubectl apply -f manifest/crds/sparkoperator.k8s.io_sparkapplications.yaml
# create Spark operator pod, service etc
$kubectl apply -f all.yaml 
# check operator has started in namespace sparkoperator
# enable webhook service
$kubectl apply -f webhook.yaml

Run an application with the following spec spark-pi.yaml:

apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: spark
spec:
  type: Scala
  mode: cluster
  image: "gcr.io/spark-operator/spark:v2.4.4"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.4.jar"
  arguments:
    - "100"
  sparkVersion: "2.4.4"
  sparkConf: {}
  restartPolicy:
    type: Never
  driver:
    memory: "1g"
    labels:
      version: 2.4.4
    serviceAccount: spark-sa
    secrets:
      - name: mysecret
        path: /etc/config-secret
        secretType: generic
    configMaps:
      - name: myconfigmap
        path: /etc/config
  executor:
    instances: 2
    memory: "500m"
    labels:
      version: 2.4.4

To create the secrets use:

apiVersion: v1
kind: Secret
metadata:
  name: mysecret
  namespace: spark
type: Opaque
data:
  username: YWRtaW4=
  password: MWYyZDFlMmU2N2Rm

---
kind: ConfigMap
apiVersion: v1
metadata:
  name: myconfigmap
  namespace: spark
data:
  database: mysql
  database_uri: blablabla

#download file https://github.com/apache/spark/blob/master/resource-managers/kubernetes/integration-tests/dev/spark-rbac.yaml
$kubectl apply -f spark-rbac.yaml
$kubectl apply -f spark-pi.yaml

Log output of the operator can be found [here]https://gist.github.com/skonto/919a59d662b38fb4eb1dbc54d5765f72) for the app running to completion.

kubectl describe sparkapplications  -n spark
Name:         spark-pi
Namespace:    spark
Labels:       <none>
Annotations:  kubectl.kubernetes.io/last-applied-configuration:
                {"apiVersion":"sparkoperator.k8s.io/v1beta2","kind":"SparkApplication","metadata":{"annotations":{},"name":"spark-pi","namespace":"spark"}...
API Version:  sparkoperator.k8s.io/v1beta2
Kind:         SparkApplication
Metadata:
  Creation Timestamp:  2019-10-11T00:27:14Z
  Generation:          2
  Resource Version:    220909
  Self Link:           /apis/sparkoperator.k8s.io/v1beta2/namespaces/spark/sparkapplications/spark-pi
  UID:                 7d959c9a-acca-4764-baae-ce8fec588033
Spec:
  Arguments:
    100
  Driver:
    Config Maps:
      Name:  myconfigmap
      Path:  /etc/config
    Labels:
      Version:  2.4.4
    Memory:     1g
    Secrets:
      Name:           mysecret
      Path:           /etc/config-secret
      Secret Type:    generic
    Service Account:  spark-sa
  Executor:
    Instances:  2
    Labels:
      Version:            2.4.4
    Memory:               500m
  Image:                  lightbend/spark:2.1.2-OpenShift-2.4.4-rh-2.12
  Image Pull Policy:      Always
  Main Application File:  local:///opt/spark/examples/jars/spark-examples_2.12-2.1.2-2.4.4-lightbend.jar
  Main Class:             org.apache.spark.examples.SparkPi
  Mode:                   cluster
  Restart Policy:
    Type:  Never
  Spark Conf:
  Spark Version:  2.4.4
  Type:           Scala
Status:
  Apphash:  04f14ac0b65a58627350f019adcbe3b9a034d9adef34e47c5b21d42e14ea3d92
  Application State:
    State:  COMPLETED
  Driver Info:
    Pod Name:                    spark-pi-driver
    Web UI Address:              10.96.42.237:4040
    Web UI Port:                 4040
    Web UI Service Name:         spark-pi-ui-svc
  Execution Attempts:            1
  Last Submission Attempt Time:  2019-10-11T00:27:50Z
  Spark Application Id:          spark-f7789c37ae884e16832a478bb7cee04d
  Submission Attempts:           1
  Submission ID:                 0b4a134a-cd53-4a84-8622-218b5882cca4
  Termination Time:              <nil>
Events:
  Type     Reason                        Age                 From            Message
  ----     ------                        ----                ----            -------
  Normal   SparkApplicationAdded         2m                  spark-operator  SparkApplication spark-pi was added, enqueuing it for submission
  Normal   SparkExecutorRunning          95s                 spark-operator  Executor spark-pi-1570753637441-exec-1 is running
  Normal   SparkExecutorRunning          95s                 spark-operator  Executor spark-pi-1570753637441-exec-2 is running
  Warning  SparkApplicationPendingRerun  92s                 spark-operator  SparkApplication spark-pi is pending rerun
  Normal   SparkApplicationSubmitted     84s (x2 over 112s)  spark-operator  SparkApplication spark-pi was submitted successfully
  Normal   SparkDriverRunning            79s (x2 over 108s)  spark-operator  Driver spark-pi-driver is running
  Normal   SparkDriverCompleted          65s                 spark-operator  Driver spark-pi-driver completed
  Normal   SparkApplicationCompleted     65s                 spark-operator  SparkApplication spark-pi completed

The following pass:

go vet ./...
go gmt ./...
go test ./...

just export GOPATH to be at your topdir, export GO111MODULE=on and add go binary to your path.
@liyinan926 pls review.

Most stuff has been migreated.
Pending issues:

  1. use the ctr runtime metrics (it is similar to what the operator implements right now but hides the details of having to setup your own http endpoint for metrics etc)
  2. enable resourceusage capabilities
  3. build sparkctl with make file
  4. extend controller tests with the subJobManager details
  5. remove deps on crdclientset.
  6. verify all functionality is working

liyinan926 and others added 5 commits September 25, 2019 13:35
* Add resource request and limits for spark-submit pod

* Minor changes for submission-job-manager + add tests

* Remove un-used fields from spec

* deepcopy funcs

* Update auto-generated code

* PR feedback

* PR comments

* Handling submission failure
@skonto
Copy link
Contributor Author

skonto commented Oct 21, 2019

@liyinan926 gentle ping.

@liyinan926
Copy link
Collaborator

@skonto sorry was busying with other stuffs. Will take a look ASAP.

@skonto
Copy link
Contributor Author

skonto commented Oct 27, 2019

thanks @liyinan926

Copy link
Collaborator

@liyinan926 liyinan926 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got some time to briefly go over the PR. I think this needs extensive testing to make sure all functionalities still work as expected.

.SILENT:
.PHONY: clean-sparkctl

# Image URL to use all building/pushing image targets
IMG ?= skonto/operator:test
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please change this to gcr.io/spark-operator/spark-operator:latest

annotations:
certmanager.k8s.io/inject-ca-from: $(CERTIFICATE_NAMESPACE)/$(CERTIFICATE_NAME)
---
#apiVersion: admissionregistration.k8s.io/v1beta1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's delete this def.

}

var batchSchedulerMgr *batchscheduler.SchedulerManager
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Has this been moved somewhere else?

@@ -304,6 +301,8 @@ type SparkApplicationStatus struct {
// ExecutionAttempts is the total number of attempts to run a submitted application to completion.
// Incremented upon each attempted run of the application and reset upon invalidation.
ExecutionAttempts int32 `json:"executionAttempts,omitempty"`
// Information about the spec, we should know if we are targeting the correct state
AppHash string `json:"appHash,omitempty"`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider renaming this to SpecHash instead.

@@ -340,6 +332,8 @@ type SparkApplicationStatus struct {
// SubmissionAttempts is the total number of attempts to submit an application to run.
// Incremented upon each attempted submission of the application and reset upon invalidation and rerun.
SubmissionAttempts int32 `json:"submissionAttempts,omitempty"`
// Information about the spec, we should know if we are targeting the correct state
AppHash string `json:"appHash,omitempty"`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

if err != nil {
return err
}

var completedRuns []string
var failedRuns []string
for _, a := range sortedApps {
for _, a := range sortedApps.Items {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.


// add adds a new Controller to mgr with r as the reconcile.Reconciler
func add(mgr manager.Manager, r reconcile.Reconciler, controllerThreads int) error {
//p := predicate.Funcs{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please delete this.

UpdateFunc: controller.onUpdate,
DeleteFunc: controller.onDelete,
// Watch for Pod created by SparkApplication
err = c.Watch(&source.Kind{Type: &apiv1.Pod{}}, &handler.EnqueueRequestForOwner{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only covers driver pods.

glog.V(2).Infof("Trying to update SparkApplication %s/%s, from: [%v] to [%v]", app.Namespace, app.Name, app.Status, appToUpdate.Status)
err = c.updateStatusAndExportMetrics(app, appToUpdate)
if appToUpdate.Status.AppHash == "" {
appToUpdate.Status.AppHash = r.calculateSparkAppSpecHash((*appToUpdate).Spec)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can reuse calculatedHash.

appToUpdate.Status.ExecutionAttempts++
if completionTime != nil {
appToUpdate.Status.SubmissionTime = *completionTime
} else {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we refactor this such that we can return early in if shouldInvalidate {}? Then we don't need to have this huge else branch.

@skonto
Copy link
Contributor Author

skonto commented Nov 9, 2019

@liyinan926 yes it needs testing I will try to complete the rest of the functionality. How do you think we should proceed with testing, you mean integration tests? Should we discuss this on slack?

@liyinan926
Copy link
Collaborator

@skonto if necessary, we can set up a quick Hangout meeting. Are you going to KubeCon next week? If so, we can also have a F2F meeting.

@liyinan926 liyinan926 force-pushed the multi-version-support branch 2 times, most recently from ba0ea5e to b7055ea Compare December 16, 2019 17:51
@liyinan926 liyinan926 force-pushed the multi-version-support branch from 831f414 to c832ee0 Compare January 22, 2020 23:30
@liyinan926 liyinan926 force-pushed the multi-version-support branch from ee55c01 to ed5b884 Compare February 5, 2020 19:11
@liyinan926 liyinan926 force-pushed the multi-version-support branch 2 times, most recently from 4cf368a to 0600e96 Compare April 18, 2020 18:21
@skonto
Copy link
Contributor Author

skonto commented Apr 21, 2020

@liyinan926 sorry I missed the conversation here. I have now free time to revive the effort. A lot of things happened lately and also with this virus situation (hope you are staying safe).
Btw are you still available for a quick chat?

@skonto
Copy link
Contributor Author

skonto commented May 18, 2020

@liyinan926 gentle ping.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants