Skip to content

Commit

Permalink
[SPARK-44738][PYTHON][CONNECT] Add missing client metadata to calls
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

The refactoring for the re-attachable execution missed properly propagating the client metadata for the individual RPC calls.

### Why are the changes needed?
Compatibility.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing UT

Closes apache#42409 from grundprinzip/SPARK-44738.

Authored-by: Martin Grund <martin.grund@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit c73660c)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
grundprinzip authored and HyukjinKwon committed Aug 9, 2023
1 parent 7520fc1 commit 03dfd47
Showing 1 changed file with 7 additions and 4 deletions.
11 changes: 7 additions & 4 deletions python/pyspark/sql/connect/client/reattach.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,9 @@ def _has_next(self) -> bool:
if not attempt.is_first_try():
# on retry, the iterator is borked, so we need a new one
self._iterator = iter(
self._stub.ReattachExecute(self._create_reattach_execute_request())
self._stub.ReattachExecute(
self._create_reattach_execute_request(), metadata=self._metadata
)
)

if self._current is None:
Expand All @@ -154,7 +156,8 @@ def _has_next(self) -> bool:
while not has_next:
self._iterator = iter(
self._stub.ReattachExecute(
self._create_reattach_execute_request()
self._create_reattach_execute_request(),
metadata=self._metadata,
)
)
# shouldn't change
Expand Down Expand Up @@ -192,7 +195,7 @@ def target() -> None:
can_retry=SparkConnectClient.retry_exception, **self._retry_policy
):
with attempt:
self._stub.ReleaseExecute(request)
self._stub.ReleaseExecute(request, metadata=self._metadata)
except Exception as e:
warnings.warn(f"ReleaseExecute failed with exception: {e}.")

Expand Down Expand Up @@ -220,7 +223,7 @@ def target() -> None:
can_retry=SparkConnectClient.retry_exception, **self._retry_policy
):
with attempt:
self._stub.ReleaseExecute(request)
self._stub.ReleaseExecute(request, metadata=self._metadata)
except Exception as e:
warnings.warn(f"ReleaseExecute failed with exception: {e}.")

Expand Down

0 comments on commit 03dfd47

Please sign in to comment.