-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Refactor operator to use the controller runtime with multi-version support #658
Conversation
* Add resource request and limits for spark-submit pod * Minor changes for submission-job-manager + add tests * Remove un-used fields from spec * deepcopy funcs * Update auto-generated code * PR feedback * PR comments * Handling submission failure
@liyinan926 gentle ping. |
@skonto sorry was busying with other stuffs. Will take a look ASAP. |
thanks @liyinan926 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got some time to briefly go over the PR. I think this needs extensive testing to make sure all functionalities still work as expected.
.SILENT: | ||
.PHONY: clean-sparkctl | ||
|
||
# Image URL to use all building/pushing image targets | ||
IMG ?= skonto/operator:test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please change this to gcr.io/spark-operator/spark-operator:latest
annotations: | ||
certmanager.k8s.io/inject-ca-from: $(CERTIFICATE_NAMESPACE)/$(CERTIFICATE_NAME) | ||
--- | ||
#apiVersion: admissionregistration.k8s.io/v1beta1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's delete this def.
} | ||
|
||
var batchSchedulerMgr *batchscheduler.SchedulerManager |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Has this been moved somewhere else?
@@ -304,6 +301,8 @@ type SparkApplicationStatus struct { | |||
// ExecutionAttempts is the total number of attempts to run a submitted application to completion. | |||
// Incremented upon each attempted run of the application and reset upon invalidation. | |||
ExecutionAttempts int32 `json:"executionAttempts,omitempty"` | |||
// Information about the spec, we should know if we are targeting the correct state | |||
AppHash string `json:"appHash,omitempty"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider renaming this to SpecHash
instead.
@@ -340,6 +332,8 @@ type SparkApplicationStatus struct { | |||
// SubmissionAttempts is the total number of attempts to submit an application to run. | |||
// Incremented upon each attempted submission of the application and reset upon invalidation and rerun. | |||
SubmissionAttempts int32 `json:"submissionAttempts,omitempty"` | |||
// Information about the spec, we should know if we are targeting the correct state | |||
AppHash string `json:"appHash,omitempty"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto.
if err != nil { | ||
return err | ||
} | ||
|
||
var completedRuns []string | ||
var failedRuns []string | ||
for _, a := range sortedApps { | ||
for _, a := range sortedApps.Items { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto.
|
||
// add adds a new Controller to mgr with r as the reconcile.Reconciler | ||
func add(mgr manager.Manager, r reconcile.Reconciler, controllerThreads int) error { | ||
//p := predicate.Funcs{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please delete this.
UpdateFunc: controller.onUpdate, | ||
DeleteFunc: controller.onDelete, | ||
// Watch for Pod created by SparkApplication | ||
err = c.Watch(&source.Kind{Type: &apiv1.Pod{}}, &handler.EnqueueRequestForOwner{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This only covers driver pods.
glog.V(2).Infof("Trying to update SparkApplication %s/%s, from: [%v] to [%v]", app.Namespace, app.Name, app.Status, appToUpdate.Status) | ||
err = c.updateStatusAndExportMetrics(app, appToUpdate) | ||
if appToUpdate.Status.AppHash == "" { | ||
appToUpdate.Status.AppHash = r.calculateSparkAppSpecHash((*appToUpdate).Spec) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can reuse calculatedHash
.
appToUpdate.Status.ExecutionAttempts++ | ||
if completionTime != nil { | ||
appToUpdate.Status.SubmissionTime = *completionTime | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we refactor this such that we can return early in if shouldInvalidate {}
? Then we don't need to have this huge else branch.
@liyinan926 yes it needs testing I will try to complete the rest of the functionality. How do you think we should proceed with testing, you mean integration tests? Should we discuss this on slack? |
@skonto if necessary, we can set up a quick Hangout meeting. Are you going to KubeCon next week? If so, we can also have a F2F meeting. |
ba0ea5e
to
b7055ea
Compare
831f414
to
c832ee0
Compare
ee55c01
to
ed5b884
Compare
4cf368a
to
0600e96
Compare
@liyinan926 sorry I missed the conversation here. I have now free time to revive the effort. A lot of things happened lately and also with this virus situation (hope you are staying safe). |
@liyinan926 gentle ping. |
Aim is to solve #547.
Important: _This a big refactoring and does not cover everything at the moment. This is work in progress, goal is to get community feedback for moving forward with the refactoring.
Some important changes are:
a) proper support for the status subresource
b) support for contr runtime 0.2.0 GA
c) support for logr -zap log
d) use the certification manager for injecting the webhook ca
e) use a hash to overcome the restriction that comes with the controller runtime about accessing previous state, as described here kubernetes-sigs/kubebuilder#37 (comment). Predicates could be used but might be too much work.
f) go mod support
h) multi-namespace support
Note: crds require some work to become of
StructuralSchema
I noticed several violations when runningkubectl describe crd
.In order to test this:
Run an application with the following spec spark-pi.yaml:
To create the secrets use:
Log output of the operator can be found [here]https://gist.github.com/skonto/919a59d662b38fb4eb1dbc54d5765f72) for the app running to completion.
The following pass:
just export GOPATH to be at your topdir,
export GO111MODULE=on
and add go binary to your path.@liyinan926 pls review.
Most stuff has been migreated.
Pending issues:
resourceusage
capabilities