Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-38564][SS] Support collecting metrics from streaming sinks #35872

Closed
wants to merge 5 commits into from

Conversation

jerrypeng
Copy link
Contributor

@jerrypeng jerrypeng commented Mar 16, 2022

What changes were proposed in this pull request?

Add the capability for streaming sinks to report custom metrics just like streaming sources

Why are the changes needed?

Allowing streaming sinks to report custom metrics is useful and achieve feature parity with streaming sources

Does this PR introduce any user-facing change?

no

How was this patch tested?

New UT

@HyukjinKwon HyukjinKwon changed the title [SPARK-38564] Support collecting metrics from streaming sinks [SPARK-38564][ Support collecting metrics from streaming sinks Mar 17, 2022
@HyukjinKwon HyukjinKwon changed the title [SPARK-38564][ Support collecting metrics from streaming sinks [SPARK-38564][SS] Support collecting metrics from streaming sinks Mar 17, 2022
@HyukjinKwon
Copy link
Member

cc @HeartSaVioR FYI

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code change looks OK given the proposed interface is symmetric with source one.

Shall we add some tests for this new feature? We would like to be very sure about the functional behavior.

Thanks in advance!

* A mix-in interface for streaming sinks to signal that they can report
* metrics.
*
* @since 3.3.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: 3.4.0 as we missed to catch the train.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to be missed. Could you please update this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually updated this but forgot to include in commit :(

@jerrypeng jerrypeng requested a review from HeartSaVioR March 20, 2022 08:41
@jerrypeng
Copy link
Contributor Author

@HeartSaVioR thanks for the review! Please take another look!

}

def createRelation(
sqlContext: SQLContext,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indent

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will fix


inputData.addData(1, 2, 3)

var metricsMap: java.util.Map[String, String] = null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be safer to register the listener before executing the query. Since you've started the query also added the data here, the execution of streaming query and registration of listener go concurrently.

I'd move the registration of listener out before try statement, and remove the listener in finally statement as we do for StreamingListenerQuerySuite, as a zen of defensive programming.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will fix

df.writeStream
.outputMode("append")
.format("org.apache.spark.sql.streaming.TestSinkProvider")
.option("checkPointLocation", Files.createTempDirectory("some-prefix").toFile.getName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let's use withTempDir as following the practice on test code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will fix

* A mix-in interface for streaming sinks to signal that they can report
* metrics.
*
* @since 3.3.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to be missed. Could you please update this?

@jerrypeng jerrypeng requested a review from HeartSaVioR March 21, 2022 06:52
@jerrypeng
Copy link
Contributor Author

@HeartSaVioR thanks for the review again! PTAL!

mode: SaveMode,
parameters: Map[String, String],
data: DataFrame): BaseRelation = {
def createRelation(sqlContext: SQLContext,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably the last nit: the indentation rule Spark uses is quite different with others, hence you'd like to refer to the Scala style guide.

https://github.com/databricks/scala-style-guide

https://github.com/databricks/scala-style-guide#spacing-and-indentation

For method declarations, use 4 space indentation for their parameters and put each in each line when the parameters don't fit in two lines. Return types can be either on the same line as the last parameter, or start a new line with 2 space indent.

Below is the correct indentation for this case.

def createRelation(
    sqlContext: SQLContext,
    mode: SaveMode,
    parameters: Map[String, String],
    data: DataFrame): BaseRelation = {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix

@jerrypeng jerrypeng requested a review from HeartSaVioR March 21, 2022 21:41
@jerrypeng
Copy link
Contributor Author

@HeartSaVioR thanks for the review again! PTAL!

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@HeartSaVioR
Copy link
Contributor

Thanks! Merging to master!

import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

class ReportSinkMetricsSuite extends StreamTest {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests added here seem flaky:

ReportSinkMetricsSuite:
- test ReportSinkMetrics *** FAILED *** (244 milliseconds)
  Expected null, but got {"metrics-1"="value-1", "metrics-2"="value-2"} (ReportSinkMetricsSuite.scala:75)
  org.scalatest.exceptions.TestFailedException:
  at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
  at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
  at org.scalatest.funsuite.AnyFunSuite.newAssertionFailedException(AnyFunSuite.scala:1563)
  at org.scalatest.Assertions.assertResult(Assertions.scala:867)
  at org.scalatest.Assertions.assertResult$(Assertions.scala:863)
  at org.scalatest.funsuite.AnyFunSuite.assertResult(AnyFunSuite.scala:1563)
  at org.apache.spark.sql.streaming.ReportSinkMetricsSuite.$anonfun$new$2(ReportSinkMetricsSuite.scala:75)
  at org.apache.spark.sql.streaming.ReportSinkMetricsSuite.$anonfun$new$2$adapted(ReportSinkMetricsSuite.scala:60)
  at org.apache.spark.sql.test.SQLTestUtils.$anonfun$withTempDir$1(SQLTestUtils.scala:79)
  at org.apache.spark.sql.test.SQLTestUtils.$anonfun$withTempDir$1$adapted(SQLTestUtils.scala:78)
  at org.apache.spark.SparkFunSuite.withTempDir(SparkFunSuite.scala:221)
  at org.apache.spark.sql.streaming.ReportSinkMetricsSuite.org$apache$spark$sql$test$SQLTestUtils$$super$withTempDir(ReportSinkMetricsSuite.scala:35)
  at org.apache.spark.sql.test.SQLTestUtils.withTempDir(SQLTestUtils.scala:78)
  at org.apache.spark.sql.test.SQLTestUtils.withTempDir$(SQLTestUtils.scala:77)
  at org.apache.spark.sql.streaming.ReportSinkMetricsSuite.withTempDir(ReportSinkMetricsSuite.scala:35)
  at org.apache.spark.sql.streaming.ReportSinkMetricsSuite.$anonfun$new$1(ReportSinkMetricsSuite.scala:60)
  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)

https://github.com/apache/spark/runs/5646670314?check_suite_focus=true

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reporting. Seems odd. @jerrypeng Could you please check this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, the fix seems pretty simple .. I made a quick followup- #35945

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

query.processAllAvailable()
}

assertResult(metricsMap) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I think we missed that listener callback happens in different thread than stream thread.

@jerrypeng
We may need to add sc.listenerBus.waitUntilEmpty(), or wrap this with eventually. Could you please create a follow-up PR? Thanks in advance!

HyukjinKwon added a commit that referenced this pull request Mar 23, 2022
…csSuite

### What changes were proposed in this pull request?

The test is flaky:

```
ReportSinkMetricsSuite:
- test ReportSinkMetrics *** FAILED *** (244 milliseconds)
  Expected null, but got {"metrics-1"="value-1", "metrics-2"="value-2"} (ReportSinkMetricsSuite.scala:75)
  org.scalatest.exceptions.TestFailedException:
  at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
  at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
  at org.scalatest.funsuite.AnyFunSuite.newAssertionFailedException(AnyFunSuite.scala:1563)
  at org.scalatest.Assertions.assertResult(Assertions.scala:867)
  at org.scalatest.Assertions.assertResult$(Assertions.scala:863)
  at org.scalatest.funsuite.AnyFunSuite.assertResult(AnyFunSuite.scala:1563)
  at org.apache.spark.sql.streaming.ReportSinkMetricsSuite.$anonfun$new$2(ReportSinkMetricsSuite.scala:75)
  at org.apache.spark.sql.streaming.ReportSinkMetricsSuite.$anonfun$new$2$adapted(ReportSinkMetricsSuite.scala:60)
  at org.apache.spark.sql.test.SQLTestUtils.$anonfun$withTempDir$1(SQLTestUtils.scala:79)
  at org.apache.spark.sql.test.SQLTestUtils.$anonfun$withTempDir$1$adapted(SQLTestUtils.scala:78)
  at org.apache.spark.SparkFunSuite.withTempDir(SparkFunSuite.scala:221)
  at org.apache.spark.sql.streaming.ReportSinkMetricsSuite.org$apache$spark$sql$test$SQLTestUtils$$super$withTempDir(ReportSinkMetricsSuite.scala:35)
  at org.apache.spark.sql.test.SQLTestUtils.withTempDir(SQLTestUtils.scala:78)
  at org.apache.spark.sql.test.SQLTestUtils.withTempDir$(SQLTestUtils.scala:77)
  at org.apache.spark.sql.streaming.ReportSinkMetricsSuite.withTempDir(ReportSinkMetricsSuite.scala:35)
  at org.apache.spark.sql.streaming.ReportSinkMetricsSuite.$anonfun$new$1(ReportSinkMetricsSuite.scala:60)
  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
```

We should wait all events to be processed.

See  #35872 (comment).

### Why are the changes needed?

To make the test not flaky.

### Does this PR introduce _any_ user-facing change?

No, test-only.

### How was this patch tested?

Existing tests. CI in this PR should test it out.

Closes #35945 from HyukjinKwon/SPARK-38564.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants