Skip to content

Commit

Permalink
Fix the regression issue of get Spark job done on Spark 2.2
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Zhang <wezhang@outlook.com>
  • Loading branch information
wezhang committed Apr 25, 2018
1 parent 71c5969 commit 596878d
Showing 1 changed file with 7 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -750,11 +750,11 @@ public boolean isActive() throws IOException {
public Observable<SimpleImmutableEntry<SparkBatchJobState, String>> getJobDoneObservable() {
return Observable.create((Subscriber<? super SimpleImmutableEntry<SparkBatchJobState, String>> ob) -> {
try {
boolean isJobActive = true;
boolean isJobActive;
SparkBatchJobState state = SparkBatchJobState.NOT_STARTED;
String diagnostics = "";

while (isJobActive) {
do {
HttpResponse httpResponse = this.getSubmission().getBatchSparkJobStatus(
this.getConnectUri().toString(), batchId);

Expand All @@ -768,17 +768,20 @@ public Observable<SimpleImmutableEntry<SparkBatchJobState, String>> getJobDoneOb
diagnostics = String.join("\n", jobResp.getLog());

isJobActive = !state.isJobDone();
} else {
isJobActive = false;
}


// Retry interval
sleep(1000);
}
} while (isJobActive);

ob.onNext(new SimpleImmutableEntry<>(state, diagnostics));
ob.onCompleted();
} catch (IOException ex) {
ob.onError(ex);
} catch (InterruptedException ignored) {
} finally {
ob.onCompleted();
}
});
Expand Down

0 comments on commit 596878d

Please sign in to comment.