Skip to content

Commit

Permalink
Add total SparkApplication count metric (kubeflow#856)
Browse files Browse the repository at this point in the history
* Add total SparkApplication count metric

Total SparkApplication count is the total number of SparkApplications
that have been processed by the operator. This metric can be used to
track how many SparkApplication the users have submitted to the K8s API
server, and also can be used as denominator when computing job success
rate, for example.

* Export SparkApp count metric in sparkapp_metric.go

Invoking the export of SparkApp count metric in exportMetrics() in
sparkapp_metrics.go instead of syncSparkApplication() in controller.go,
in order to align with the metric exporting convention in the code base.
  • Loading branch information
huskysun authored Apr 6, 2020
1 parent 8a1e259 commit e1d70af
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 2 deletions.
3 changes: 2 additions & 1 deletion docs/quick-start-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,8 @@ If enabled, the operator generates the following metrics:
#### Spark Application Metrics
| Metric | Description |
| ------------- | ------------- |
| `spark_app_submit_count` | Total number of SparkApplication submitted by the Operator.|
| `spark_app_count` | Total number of SparkApplication handled by the Operator.|
| `spark_app_submit_count` | Total number of SparkApplication spark-submitted by the Operator.|
| `spark_app_success_count` | Total number of SparkApplication which completed successfully.|
| `spark_app_failure_count` | Total number of SparkApplication which failed to complete. |
| `spark_app_running_count` | Total number of SparkApplication which are currently running.|
Expand Down
4 changes: 4 additions & 0 deletions pkg/controller/sparkapplication/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ func TestSyncSparkApplication_SubmissionFailed(t *testing.T) {

assert.Equal(t, v1beta2.FailedSubmissionState, updatedApp.Status.AppState.State)
assert.Equal(t, int32(1), updatedApp.Status.SubmissionAttempts)
assert.Equal(t, float64(1), fetchCounterValue(ctrl.metrics.sparkAppCount, map[string]string{}))
assert.Equal(t, float64(0), fetchCounterValue(ctrl.metrics.sparkAppSubmitCount, map[string]string{}))
assert.Equal(t, float64(1), fetchCounterValue(ctrl.metrics.sparkAppFailedSubmissionCount, map[string]string{}))

Expand Down Expand Up @@ -607,6 +608,9 @@ func TestSyncSparkApplication_SubmissionSuccess(t *testing.T) {
updatedApp, err := ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(test.app.Namespace).Get(test.app.Name, metav1.GetOptions{})
assert.Nil(t, err)
assert.Equal(t, test.expectedState, updatedApp.Status.AppState.State)
if test.app.Status.AppState.State == v1beta2.NewState {
assert.Equal(t, float64(1), fetchCounterValue(ctrl.metrics.sparkAppCount, map[string]string{}))
}
if test.expectedState == v1beta2.SubmittedState {
assert.Equal(t, float64(1), fetchCounterValue(ctrl.metrics.sparkAppSubmitCount, map[string]string{}))
}
Expand Down
19 changes: 18 additions & 1 deletion pkg/controller/sparkapplication/sparkapp_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type sparkAppMetrics struct {
labels []string
prefix string

sparkAppCount *prometheus.CounterVec
sparkAppSubmitCount *prometheus.CounterVec
sparkAppSuccessCount *prometheus.CounterVec
sparkAppFailureCount *prometheus.CounterVec
Expand All @@ -54,6 +55,13 @@ func newSparkAppMetrics(metricsConfig *util.MetricConfig) *sparkAppMetrics {
validLabels[i] = util.CreateValidMetricNameLabel("", label)
}

sparkAppCount := prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: util.CreateValidMetricNameLabel(prefix, "spark_app_count"),
Help: "Total Number of Spark Apps Handled by the Operator",
},
validLabels,
)
sparkAppSubmitCount := prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: util.CreateValidMetricNameLabel(prefix, "spark_app_submit_count"),
Expand Down Expand Up @@ -133,6 +141,7 @@ func newSparkAppMetrics(metricsConfig *util.MetricConfig) *sparkAppMetrics {
return &sparkAppMetrics{
labels: validLabels,
prefix: prefix,
sparkAppCount: sparkAppCount,
sparkAppSubmitCount: sparkAppSubmitCount,
sparkAppRunningCount: sparkAppRunningCount,
sparkAppSuccessCount: sparkAppSuccessCount,
Expand All @@ -149,6 +158,7 @@ func newSparkAppMetrics(metricsConfig *util.MetricConfig) *sparkAppMetrics {
}

func (sm *sparkAppMetrics) registerMetrics() {
util.RegisterMetric(sm.sparkAppCount)
util.RegisterMetric(sm.sparkAppSubmitCount)
util.RegisterMetric(sm.sparkAppSuccessCount)
util.RegisterMetric(sm.sparkAppFailureCount)
Expand All @@ -168,6 +178,14 @@ func (sm *sparkAppMetrics) exportMetrics(oldApp, newApp *v1beta2.SparkApplicatio
oldState := oldApp.Status.AppState.State
newState := newApp.Status.AppState.State
if newState != oldState {
if oldState == v1beta2.NewState {
if m, err := sm.sparkAppCount.GetMetricWith(metricLabels); err != nil {
glog.Errorf("Error while exporting metrics: %v", err)
} else {
m.Inc()
}
}

switch newState {
case v1beta2.SubmittedState:
if m, err := sm.sparkAppSubmitCount.GetMetricWith(metricLabels); err != nil {
Expand Down Expand Up @@ -279,7 +297,6 @@ func (sm *sparkAppMetrics) exportJobStartLatencyMetrics(app *v1beta2.SparkApplic
} else {
m.Observe(float64(latency / time.Second))
}

}
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/sparkapplication/sparkapp_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func TestSparkAppMetrics(t *testing.T) {
wg.Add(1)
go func() {
for i := 0; i < 10; i++ {
metrics.sparkAppCount.With(app1).Inc()
metrics.sparkAppSubmitCount.With(app1).Inc()
metrics.sparkAppRunningCount.Inc(app1)
metrics.sparkAppSuccessCount.With(app1).Inc()
Expand All @@ -61,6 +62,7 @@ func TestSparkAppMetrics(t *testing.T) {
}()

wg.Wait()
assert.Equal(t, float64(10), fetchCounterValue(metrics.sparkAppCount, app1))
assert.Equal(t, float64(10), fetchCounterValue(metrics.sparkAppSubmitCount, app1))
assert.Equal(t, float64(5), metrics.sparkAppRunningCount.Value(app1))
assert.Equal(t, float64(10), fetchCounterValue(metrics.sparkAppSuccessCount, app1))
Expand Down

0 comments on commit e1d70af

Please sign in to comment.