-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-38564][SS] Support collecting metrics from streaming sinks #35872
Conversation
cc @HeartSaVioR FYI |
Can one of the admins verify this patch? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code change looks OK given the proposed interface is symmetric with source one.
Shall we add some tests for this new feature? We would like to be very sure about the functional behavior.
Thanks in advance!
* A mix-in interface for streaming sinks to signal that they can report | ||
* metrics. | ||
* | ||
* @since 3.3.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: 3.4.0 as we missed to catch the train.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to be missed. Could you please update this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually updated this but forgot to include in commit :(
@HeartSaVioR thanks for the review! Please take another look! |
} | ||
|
||
def createRelation( | ||
sqlContext: SQLContext, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will fix
|
||
inputData.addData(1, 2, 3) | ||
|
||
var metricsMap: java.util.Map[String, String] = null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be safer to register the listener before executing the query. Since you've started the query also added the data here, the execution of streaming query and registration of listener go concurrently.
I'd move the registration of listener out before try statement, and remove the listener in finally statement as we do for StreamingListenerQuerySuite, as a zen of defensive programming.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will fix
df.writeStream | ||
.outputMode("append") | ||
.format("org.apache.spark.sql.streaming.TestSinkProvider") | ||
.option("checkPointLocation", Files.createTempDirectory("some-prefix").toFile.getName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: let's use withTempDir
as following the practice on test code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will fix
* A mix-in interface for streaming sinks to signal that they can report | ||
* metrics. | ||
* | ||
* @since 3.3.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to be missed. Could you please update this?
@HeartSaVioR thanks for the review again! PTAL! |
mode: SaveMode, | ||
parameters: Map[String, String], | ||
data: DataFrame): BaseRelation = { | ||
def createRelation(sqlContext: SQLContext, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably the last nit: the indentation rule Spark uses is quite different with others, hence you'd like to refer to the Scala style guide.
https://github.com/databricks/scala-style-guide
https://github.com/databricks/scala-style-guide#spacing-and-indentation
For method declarations, use 4 space indentation for their parameters and put each in each line when the parameters don't fit in two lines. Return types can be either on the same line as the last parameter, or start a new line with 2 space indent.
Below is the correct indentation for this case.
def createRelation(
sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
data: DataFrame): BaseRelation = {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix
@HeartSaVioR thanks for the review again! PTAL! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Thanks! Merging to master! |
import org.apache.spark.sql.types.StructType | ||
import org.apache.spark.sql.util.CaseInsensitiveStringMap | ||
|
||
class ReportSinkMetricsSuite extends StreamTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The tests added here seem flaky:
ReportSinkMetricsSuite:
- test ReportSinkMetrics *** FAILED *** (244 milliseconds)
Expected null, but got {"metrics-1"="value-1", "metrics-2"="value-2"} (ReportSinkMetricsSuite.scala:75)
org.scalatest.exceptions.TestFailedException:
at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
at org.scalatest.funsuite.AnyFunSuite.newAssertionFailedException(AnyFunSuite.scala:1563)
at org.scalatest.Assertions.assertResult(Assertions.scala:867)
at org.scalatest.Assertions.assertResult$(Assertions.scala:863)
at org.scalatest.funsuite.AnyFunSuite.assertResult(AnyFunSuite.scala:1563)
at org.apache.spark.sql.streaming.ReportSinkMetricsSuite.$anonfun$new$2(ReportSinkMetricsSuite.scala:75)
at org.apache.spark.sql.streaming.ReportSinkMetricsSuite.$anonfun$new$2$adapted(ReportSinkMetricsSuite.scala:60)
at org.apache.spark.sql.test.SQLTestUtils.$anonfun$withTempDir$1(SQLTestUtils.scala:79)
at org.apache.spark.sql.test.SQLTestUtils.$anonfun$withTempDir$1$adapted(SQLTestUtils.scala:78)
at org.apache.spark.SparkFunSuite.withTempDir(SparkFunSuite.scala:221)
at org.apache.spark.sql.streaming.ReportSinkMetricsSuite.org$apache$spark$sql$test$SQLTestUtils$$super$withTempDir(ReportSinkMetricsSuite.scala:35)
at org.apache.spark.sql.test.SQLTestUtils.withTempDir(SQLTestUtils.scala:78)
at org.apache.spark.sql.test.SQLTestUtils.withTempDir$(SQLTestUtils.scala:77)
at org.apache.spark.sql.streaming.ReportSinkMetricsSuite.withTempDir(ReportSinkMetricsSuite.scala:35)
at org.apache.spark.sql.streaming.ReportSinkMetricsSuite.$anonfun$new$1(ReportSinkMetricsSuite.scala:60)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
https://github.com/apache/spark/runs/5646670314?check_suite_focus=true
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for reporting. Seems odd. @jerrypeng Could you please check this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, the fix seems pretty simple .. I made a quick followup- #35945
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was a bit late lol https://github.com/apache/spark/pull/35872/files#r832810740
query.processAllAvailable() | ||
} | ||
|
||
assertResult(metricsMap) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK I think we missed that listener callback happens in different thread than stream thread.
@jerrypeng
We may need to add sc.listenerBus.waitUntilEmpty()
, or wrap this with eventually
. Could you please create a follow-up PR? Thanks in advance!
…csSuite ### What changes were proposed in this pull request? The test is flaky: ``` ReportSinkMetricsSuite: - test ReportSinkMetrics *** FAILED *** (244 milliseconds) Expected null, but got {"metrics-1"="value-1", "metrics-2"="value-2"} (ReportSinkMetricsSuite.scala:75) org.scalatest.exceptions.TestFailedException: at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472) at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471) at org.scalatest.funsuite.AnyFunSuite.newAssertionFailedException(AnyFunSuite.scala:1563) at org.scalatest.Assertions.assertResult(Assertions.scala:867) at org.scalatest.Assertions.assertResult$(Assertions.scala:863) at org.scalatest.funsuite.AnyFunSuite.assertResult(AnyFunSuite.scala:1563) at org.apache.spark.sql.streaming.ReportSinkMetricsSuite.$anonfun$new$2(ReportSinkMetricsSuite.scala:75) at org.apache.spark.sql.streaming.ReportSinkMetricsSuite.$anonfun$new$2$adapted(ReportSinkMetricsSuite.scala:60) at org.apache.spark.sql.test.SQLTestUtils.$anonfun$withTempDir$1(SQLTestUtils.scala:79) at org.apache.spark.sql.test.SQLTestUtils.$anonfun$withTempDir$1$adapted(SQLTestUtils.scala:78) at org.apache.spark.SparkFunSuite.withTempDir(SparkFunSuite.scala:221) at org.apache.spark.sql.streaming.ReportSinkMetricsSuite.org$apache$spark$sql$test$SQLTestUtils$$super$withTempDir(ReportSinkMetricsSuite.scala:35) at org.apache.spark.sql.test.SQLTestUtils.withTempDir(SQLTestUtils.scala:78) at org.apache.spark.sql.test.SQLTestUtils.withTempDir$(SQLTestUtils.scala:77) at org.apache.spark.sql.streaming.ReportSinkMetricsSuite.withTempDir(ReportSinkMetricsSuite.scala:35) at org.apache.spark.sql.streaming.ReportSinkMetricsSuite.$anonfun$new$1(ReportSinkMetricsSuite.scala:60) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) ``` We should wait all events to be processed. See #35872 (comment). ### Why are the changes needed? To make the test not flaky. ### Does this PR introduce _any_ user-facing change? No, test-only. ### How was this patch tested? Existing tests. CI in this PR should test it out. Closes #35945 from HyukjinKwon/SPARK-38564. Authored-by: Hyukjin Kwon <gurwls223@apache.org> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
What changes were proposed in this pull request?
Add the capability for streaming sinks to report custom metrics just like streaming sources
Why are the changes needed?
Allowing streaming sinks to report custom metrics is useful and achieve feature parity with streaming sources
Does this PR introduce any user-facing change?
no
How was this patch tested?
New UT