Skip to content

Commit

Permalink
[SPARK-45631][SS][PYSPARK] Remove @AbstractMethod from onQueryIdle in…
Browse files Browse the repository at this point in the history
… PySpark StreamingQueryListener

### What changes were proposed in this pull request?

Credit to anish-db for the initial investigation and the fix.

This PR proposes to remove `abstractmethod` annotation from `onQueryIdle` in PySpark StreamingQueryListener.

The function `onQueryIdle` was added with the annotation `abstractmethod`, which does not pick up default implementation and enforces users to implement the new method. This breaks all existing streaming query listener implementations and enforces them to add the dummy function implementation at least.

This PR re-allows existing implementations to work properly without explicitly adding a new function `onQueryIdle`.

### Why are the changes needed?

We broke backward compatibility in [SPARK-43183](https://issues.apache.org/jira/browse/SPARK-43183) and we want to fix it.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Modified tests. Now tests are verifying two different implementations covering old interface vs new interface.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #43483 from HeartSaVioR/SPARK-45631.

Lead-authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Co-authored-by: Anish Shrigondekar <anish.shrigondekar@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
(cherry picked from commit 75bc5ac)
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
  • Loading branch information
HeartSaVioR and anishshri-db committed Oct 23, 2023
1 parent 75a38b9 commit 06f4885
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 39 deletions.
4 changes: 3 additions & 1 deletion python/pyspark/sql/streaming/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,9 @@ def onQueryProgress(self, event: "QueryProgressEvent") -> None:
"""
pass

@abstractmethod
# NOTE: Do not mark this as abstract method, since we released this abstract class without
# this method in prior version and marking this as abstract method would break existing
# implementations.
def onQueryIdle(self, event: "QueryIdleEvent") -> None:
"""
Called when the query is idle and waiting for new data to process.
Expand Down
117 changes: 79 additions & 38 deletions python/pyspark/sql/tests/streaming/test_streaming_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,23 @@ def test_listener_events(self):
progress_event = None
terminated_event = None

class TestListener(StreamingQueryListener):
# V1: Initial interface of StreamingQueryListener containing methods `onQueryStarted`,
# `onQueryProgress`, `onQueryTerminated`. It is prior to Spark 3.5.
class TestListenerV1(StreamingQueryListener):
def onQueryStarted(self, event):
nonlocal start_event
start_event = event

def onQueryProgress(self, event):
nonlocal progress_event
progress_event = event

def onQueryTerminated(self, event):
nonlocal terminated_event
terminated_event = event

# V2: The interface after the method `onQueryIdle` is added. It is Spark 3.5+.
class TestListenerV2(StreamingQueryListener):
def onQueryStarted(self, event):
nonlocal start_event
start_event = event
Expand All @@ -267,48 +283,71 @@ def onQueryTerminated(self, event):
nonlocal terminated_event
terminated_event = event

test_listener = TestListener()
def verify(test_listener):
nonlocal start_event
nonlocal progress_event
nonlocal terminated_event

try:
self.spark.streams.addListener(test_listener)
start_event = None
progress_event = None
terminated_event = None

df = self.spark.readStream.format("rate").option("rowsPerSecond", 10).load()
try:
self.spark.streams.addListener(test_listener)

# check successful stateful query
df_stateful = df.groupBy().count() # make query stateful
q = (
df_stateful.writeStream.format("noop")
.queryName("test")
.outputMode("complete")
.start()
)
self.assertTrue(q.isActive)
time.sleep(10)
q.stop()
df = self.spark.readStream.format("rate").option("rowsPerSecond", 10).load()

# Make sure all events are empty
self.spark.sparkContext._jsc.sc().listenerBus().waitUntilEmpty()
# check successful stateful query
df_stateful = df.groupBy().count() # make query stateful
q = (
df_stateful.writeStream.format("noop")
.queryName("test")
.outputMode("complete")
.start()
)
self.assertTrue(q.isActive)
time.sleep(10)
q.stop()

self.check_start_event(start_event)
self.check_progress_event(progress_event)
self.check_terminated_event(terminated_event)
# Make sure all events are empty
self.spark.sparkContext._jsc.sc().listenerBus().waitUntilEmpty()

# Check query terminated with exception
from pyspark.sql.functions import col, udf
self.check_start_event(start_event)
self.check_progress_event(progress_event)
self.check_terminated_event(terminated_event)

bad_udf = udf(lambda x: 1 / 0)
q = df.select(bad_udf(col("value"))).writeStream.format("noop").start()
time.sleep(5)
q.stop()
self.spark.sparkContext._jsc.sc().listenerBus().waitUntilEmpty()
self.check_terminated_event(terminated_event, "ZeroDivisionError")
# Check query terminated with exception
from pyspark.sql.functions import col, udf

finally:
self.spark.streams.removeListener(test_listener)
bad_udf = udf(lambda x: 1 / 0)
q = df.select(bad_udf(col("value"))).writeStream.format("noop").start()
time.sleep(5)
q.stop()
self.spark.sparkContext._jsc.sc().listenerBus().waitUntilEmpty()
self.check_terminated_event(terminated_event, "ZeroDivisionError")

finally:
self.spark.streams.removeListener(test_listener)

verify(TestListenerV1())
verify(TestListenerV2())

def test_remove_listener(self):
# SPARK-38804: Test StreamingQueryManager.removeListener
class TestListener(StreamingQueryListener):
# V1: Initial interface of StreamingQueryListener containing methods `onQueryStarted`,
# `onQueryProgress`, `onQueryTerminated`. It is prior to Spark 3.5.
class TestListenerV1(StreamingQueryListener):
def onQueryStarted(self, event):
pass

def onQueryProgress(self, event):
pass

def onQueryTerminated(self, event):
pass

# V2: The interface after the method `onQueryIdle` is added. It is Spark 3.5+.
class TestListenerV2(StreamingQueryListener):
def onQueryStarted(self, event):
pass

Expand All @@ -321,13 +360,15 @@ def onQueryIdle(self, event):
def onQueryTerminated(self, event):
pass

test_listener = TestListener()
def verify(test_listener):
num_listeners = len(self.spark.streams._jsqm.listListeners())
self.spark.streams.addListener(test_listener)
self.assertEqual(num_listeners + 1, len(self.spark.streams._jsqm.listListeners()))
self.spark.streams.removeListener(test_listener)
self.assertEqual(num_listeners, len(self.spark.streams._jsqm.listListeners()))

num_listeners = len(self.spark.streams._jsqm.listListeners())
self.spark.streams.addListener(test_listener)
self.assertEqual(num_listeners + 1, len(self.spark.streams._jsqm.listListeners()))
self.spark.streams.removeListener(test_listener)
self.assertEqual(num_listeners, len(self.spark.streams._jsqm.listListeners()))
verify(TestListenerV1())
verify(TestListenerV2())

def test_query_started_event_fromJson(self):
start_event = """
Expand Down

0 comments on commit 06f4885

Please sign in to comment.