Skip to content

Commit

Permalink
[Feature][zeta] Add the UNKNOWABLE job status. (#5303)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: Jia Fan <fanjiaeminem@qq.com>
  • Loading branch information
ic4y and Hisoka-X authored Aug 24, 2023
1 parent 9e033a8 commit 17482c8
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public static <T> T retryWithException(
backoff);
Thread.sleep(backoff);
} else {
log.debug(attemptMessage, ExceptionUtils.getMessage(e), i, retryTimes, 0);
log.info(attemptMessage, ExceptionUtils.getMessage(e), i, retryTimes, 0);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,9 +543,11 @@ public void testTwoPipelineStreamJobRestoreIn2NodeWorkerDown()
@Test
public void testTwoPipelineBatchJobRestoreIn2NodeMasterDown()
throws ExecutionException, InterruptedException {
String testCaseName = "testTwoPipelineBatchJobRestoreIn2NodeMasterDown";
String testCaseName =
"testTwoPipelineBatchJobRestoreIn2NodeMasterDown" + System.currentTimeMillis();
String testClusterName =
"ClusterFaultToleranceTwoPipelineIT_testTwoPipelineBatchJobRestoreIn2NodeMasterDown";
"ClusterFaultToleranceTwoPipelineIT_testTwoPipelineBatchJobRestoreIn2NodeMasterDown"
+ System.currentTimeMillis();
long testRowNumber = 1000;
int testParallelism = 6;
HazelcastInstanceImpl node1 = null;
Expand Down Expand Up @@ -651,9 +653,11 @@ public void testTwoPipelineBatchJobRestoreIn2NodeMasterDown()
@Test
public void testTwoPipelineStreamJobRestoreIn2NodeMasterDown()
throws ExecutionException, InterruptedException {
String testCaseName = "testTwoPipelineStreamJobRestoreIn2NodeMasterDown";
String testCaseName =
"testTwoPipelineStreamJobRestoreIn2NodeMasterDown" + System.currentTimeMillis();
String testClusterName =
"ClusterFaultToleranceTwoPipelineIT_testTwoPipelineStreamJobRestoreIn2NodeMasterDown";
"ClusterFaultToleranceTwoPipelineIT_testTwoPipelineStreamJobRestoreIn2NodeMasterDown"
+ System.currentTimeMillis();
long testRowNumber = 1000;
int testParallelism = 6;
HazelcastInstanceImpl node1 = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,28 @@ public void testGetErrorInfo() throws ExecutionException, InterruptedException {
Assertions.assertTrue(result.getError().startsWith("java.lang.NumberFormatException"));
}

@Test
public void testGetUnKnownJobID() {

ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);

ClientJobProxy newClientJobProxy =
engineClient.createJobClient().getJobProxy(System.currentTimeMillis());
CompletableFuture<JobStatus> waitForJobCompleteFuture =
CompletableFuture.supplyAsync(newClientJobProxy::waitForJobComplete);

await().atMost(20000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
JobStatus.UNKNOWABLE, waitForJobCompleteFuture.get()));

Assertions.assertEquals(
"UNKNOWABLE", engineClient.getJobClient().getJobStatus(System.currentTimeMillis()));
}

@Test
public void testExpiredJobWasDeleted() throws Exception {
Common.setDeployMode(DeployMode.CLIENT);
Expand All @@ -164,8 +186,8 @@ public void testExpiredJobWasDeleted() throws Exception {
await().atMost(65, TimeUnit.SECONDS)
.untilAsserted(
() ->
Assertions.assertThrowsExactly(
NullPointerException.class, clientJobProxy::getJobStatus));
Assertions.assertEquals(
JobStatus.UNKNOWABLE, clientJobProxy.getJobStatus()));
}

@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ public enum JobStatus {
SUSPENDED(EndState.LOCALLY),

/** The job is currently reconciling and waits for task execution report to recover state. */
RECONCILING(EndState.NOT_END);
RECONCILING(EndState.NOT_END),

/** Cannot find the JobID or the job status has already been cleared. */
UNKNOWABLE(EndState.GLOBALLY);

// --------------------------------------------------------------------------------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,9 +505,22 @@ public PassiveCompletableFuture<Void> savePoint(long jobId) {
public PassiveCompletableFuture<JobResult> waitForJobComplete(long jobId) {
JobMaster runningJobMaster = runningJobMasterMap.get(jobId);
if (runningJobMaster == null) {
JobHistoryService.JobState jobState = jobHistoryService.getJobDetailState(jobId);
// Because operations on Imap cannot be performed within Operation.
CompletableFuture<JobHistoryService.JobState> jobStateFuture =
CompletableFuture.supplyAsync(
() -> {
return jobHistoryService.getJobDetailState(jobId);
},
executorService);
JobHistoryService.JobState jobState = null;
try {
jobState = jobStateFuture.get();
} catch (Exception e) {
throw new SeaTunnelEngineException("get job state error", e);
}

CompletableFuture<JobResult> future = new CompletableFuture<>();
if (jobState == null) future.complete(new JobResult(JobStatus.FAILED, null));
if (jobState == null) future.complete(new JobResult(JobStatus.UNKNOWABLE, null));
else
future.complete(new JobResult(jobState.getJobStatus(), jobState.getErrorMessage()));
return new PassiveCompletableFuture<>(future);
Expand Down Expand Up @@ -537,7 +550,7 @@ public JobStatus getJobStatus(long jobId) {
JobMaster runningJobMaster = runningJobMasterMap.get(jobId);
if (runningJobMaster == null) {
JobHistoryService.JobState jobDetailState = jobHistoryService.getJobDetailState(jobId);
return null == jobDetailState ? null : jobDetailState.getJobStatus();
return null == jobDetailState ? JobStatus.UNKNOWABLE : jobDetailState.getJobStatus();
}
return runningJobMaster.getJobStatus();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public boolean isMasterNode() {
// must retry until the cluster have master node
try {
return RetryUtils.retryWithException(
() -> nodeEngine.getMasterAddress().equals(nodeEngine.getThisAddress()),
() -> nodeEngine.getThisAddress().equals(nodeEngine.getMasterAddress()),
new RetryUtils.RetryMaterial(
Constant.OPERATION_RETRY_TIME,
true,
Expand Down

0 comments on commit 17482c8

Please sign in to comment.