Skip to content

Commit

Permalink
Added TTL for SparkApplications
Browse files Browse the repository at this point in the history
  • Loading branch information
liyinan926 committed Sep 13, 2019
1 parent 55a1eeb commit fca3a46
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 5 deletions.
2 changes: 0 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions docs/user-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ The Kubernetes Operator for Apache Spark ships with a command-line tool called `
* [Checking a SparkApplication](#checking-a-sparkapplication)
* [Configuring Automatic Application Restart](#configuring-automatic-application-restart)
* [Configuring Automatic Application Re-submission on Submission Failures](#configuring-automatic-application-re-submission-on-submission-failures)
* [Setting TTL for a SparkApplication](#setting-ttl-for-a-sparkapplication)
* [Running Spark Applications on a Schedule using a ScheduledSparkApplication](#running-spark-applications-on-a-schedule-using-a-scheduledsparkapplication)
* [Enabling Leader Election for High Availability](#enabling-leader-election-for-high-availability)
* [Enabling Resource Quota Enforcement](#enabling-resource-quota-enforcement)
Expand Down Expand Up @@ -509,6 +510,17 @@ the operator retries submitting the application using a linear backoff with the
The old resources like driver pod, ui service/ingress etc. are deleted if it still exists before submitting the new run, and a new driver pod is created by the submission
client so effectively the driver gets restarted.

### Setting TTL for a SparkApplication

The `v1beta2` version of the `SparkApplication` API starts having TTL support for `SparkApplication`s through a new optional field named `TimeToLiveSeconds`, which if set, defines the Time-To-Live (TTL) duration in seconds for a SparkAplication after its termination. The `SparkApplication` object will be garbage collected if the current time is more than the `TimeToLiveSeconds` since its termination. The example below illustrates how to use the field:

```yaml
spec:
timeToLiveSeconds: 3600
```

Note that this feature requires that informer cache resync to be enabled, which is true by default with a resync internal of 30 seconds. You can change the resync interval by setting the flag `-resync-interval=<internval>`.

## Running Spark Applications on a Schedule using a ScheduledSparkApplication

The operator supports running a Spark application on a standard [cron](https://en.wikipedia.org/wiki/Cron) schedule using objects of the `ScheduledSparkApplication` custom resource type. A `ScheduledSparkApplication` object specifies a cron schedule on which the application should run and a `SparkApplication` template from which a `SparkApplication` object for each run of the application is created. The following is an example `ScheduledSparkApplication`:
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions pkg/apis/sparkoperator.k8s.io/v1beta2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,12 @@ type SparkApplicationSpec struct {
// BatchScheduler configures which batch scheduler will be used for scheduling
// Optional.
BatchScheduler *string `json:"batchScheduler,omitempty"`
// TimeToLiveSeconds defines the Time-To-Live (TTL) duration in seconds for this SparkAplication
// after its termination.
// The SparkApplication object will be garbage collected if the current time is more than the
// TimeToLiveSeconds since its termination.
// Optional.
TimeToLiveSeconds *int64 `json:"timeToLiveSeconds,omitempty"`
}

// ApplicationStateType represents the type of the current state of an application.
Expand Down
10 changes: 10 additions & 0 deletions pkg/apis/sparkoperator.k8s.io/v1beta2/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 22 additions & 2 deletions pkg/controller/sparkapplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ import (
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/listers/core/v1"
v1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"

"github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/apis/sparkoperator.k8s.io/v1beta2"
"github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/batchscheduler"
"github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/batchscheduler/interface"
schedulerinterface "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/batchscheduler/interface"
crdclientset "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/client/clientset/versioned"
crdscheme "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/client/clientset/versioned/scheme"
crdinformers "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/client/informers/externalversions"
Expand Down Expand Up @@ -569,6 +569,11 @@ func (c *Controller) syncSparkApplication(key string) error {
if err := c.getAndUpdateAppState(appToUpdate); err != nil {
return err
}
case v1beta2.CompletedState, v1beta2.FailedState:
if c.hasApplicationExpired(app) {
glog.Infof("Garbage collecting expired SparkApplication %s/%s", app.Namespace, app.Name)
return c.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Delete(app.Name, metav1.NewDeleteOptions(0))
}
}

if appToUpdate != nil {
Expand Down Expand Up @@ -950,3 +955,18 @@ func (c *Controller) clearStatus(status *v1beta2.SparkApplicationStatus) {
status.ExecutorState = nil
}
}

func (c *Controller) hasApplicationExpired(app *v1beta2.SparkApplication) bool {
// The application has no TTL defined and will never expire.
if app.Spec.TimeToLiveSeconds == nil {
return false
}

ttl := time.Duration(*app.Spec.TimeToLiveSeconds) * time.Second
now := time.Now()
if !app.Status.TerminationTime.IsZero() && now.Sub(app.Status.TerminationTime.Time) > ttl {
return true
}

return false
}
48 changes: 48 additions & 0 deletions pkg/controller/sparkapplication/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
prometheus_model "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
kubeclientfake "k8s.io/client-go/kubernetes/fake"
Expand Down Expand Up @@ -1211,6 +1212,53 @@ func TestSyncSparkApplication_ExecutingState(t *testing.T) {
}
}

func TestSyncSparkApplication_ApplicationExpired(t *testing.T) {
os.Setenv(kubernetesServiceHostEnvVar, "localhost")
os.Setenv(kubernetesServicePortEnvVar, "443")

appName := "foo"
driverPodName := appName + "-driver"

now := time.Now()
terminatiomTime := now.Add(-2 * time.Second)
app := &v1beta2.SparkApplication{
ObjectMeta: metav1.ObjectMeta{
Name: appName,
Namespace: "test",
},
Spec: v1beta2.SparkApplicationSpec{
RestartPolicy: v1beta2.RestartPolicy{
Type: v1beta2.Never,
},
TimeToLiveSeconds: int64ptr(1),
},
Status: v1beta2.SparkApplicationStatus{
AppState: v1beta2.ApplicationState{
State: v1beta2.CompletedState,
ErrorMessage: "",
},
DriverInfo: v1beta2.DriverInfo{
PodName: driverPodName,
},
TerminationTime: metav1.Time{
Time: terminatiomTime,
},
ExecutorState: map[string]v1beta2.ExecutorState{"exec-1": v1beta2.ExecutorCompletedState},
},
}

ctrl, _ := newFakeController(app)
_, err := ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Create(app)
if err != nil {
t.Fatal(err)
}
err = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", app.Namespace, app.Name))
assert.Nil(t, err)

_, err = ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(app.Name, metav1.GetOptions{})
assert.True(t, errors.IsNotFound(err))
}

func TestHasRetryIntervalPassed(t *testing.T) {
// Failure cases.
assert.False(t, hasRetryIntervalPassed(nil, 3, metav1.Time{Time: metav1.Now().Add(-100 * time.Second)}))
Expand Down

0 comments on commit fca3a46

Please sign in to comment.