From 17482c81aa7baddad4153ecd77d8a95971608133 Mon Sep 17 00:00:00 2001 From: ic4y <83933160+ic4y@users.noreply.github.com> Date: Thu, 24 Aug 2023 18:03:37 +0800 Subject: [PATCH] [Feature][zeta] Add the UNKNOWABLE job status. (#5303) --------- Co-authored-by: Jia Fan --- .../seatunnel/common/utils/RetryUtils.java | 2 +- .../ClusterFaultToleranceTwoPipelineIT.java | 12 ++++++--- .../seatunnel/engine/e2e/JobExecutionIT.java | 26 +++++++++++++++++-- .../seatunnel/engine/core/job/JobStatus.java | 5 +++- .../engine/server/CoordinatorService.java | 19 +++++++++++--- .../engine/server/SeaTunnelServer.java | 2 +- 6 files changed, 54 insertions(+), 12 deletions(-) diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/RetryUtils.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/RetryUtils.java index aa1bbd5934b..e8ee03a5013 100644 --- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/RetryUtils.java +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/RetryUtils.java @@ -66,7 +66,7 @@ public static T retryWithException( backoff); Thread.sleep(backoff); } else { - log.debug(attemptMessage, ExceptionUtils.getMessage(e), i, retryTimes, 0); + log.info(attemptMessage, ExceptionUtils.getMessage(e), i, retryTimes, 0); } } } diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java index 608871dd561..3c677b45f3d 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java @@ -543,9 +543,11 @@ public void testTwoPipelineStreamJobRestoreIn2NodeWorkerDown() @Test public void testTwoPipelineBatchJobRestoreIn2NodeMasterDown() throws ExecutionException, InterruptedException { - String testCaseName = "testTwoPipelineBatchJobRestoreIn2NodeMasterDown"; + String testCaseName = + "testTwoPipelineBatchJobRestoreIn2NodeMasterDown" + System.currentTimeMillis(); String testClusterName = - "ClusterFaultToleranceTwoPipelineIT_testTwoPipelineBatchJobRestoreIn2NodeMasterDown"; + "ClusterFaultToleranceTwoPipelineIT_testTwoPipelineBatchJobRestoreIn2NodeMasterDown" + + System.currentTimeMillis(); long testRowNumber = 1000; int testParallelism = 6; HazelcastInstanceImpl node1 = null; @@ -651,9 +653,11 @@ public void testTwoPipelineBatchJobRestoreIn2NodeMasterDown() @Test public void testTwoPipelineStreamJobRestoreIn2NodeMasterDown() throws ExecutionException, InterruptedException { - String testCaseName = "testTwoPipelineStreamJobRestoreIn2NodeMasterDown"; + String testCaseName = + "testTwoPipelineStreamJobRestoreIn2NodeMasterDown" + System.currentTimeMillis(); String testClusterName = - "ClusterFaultToleranceTwoPipelineIT_testTwoPipelineStreamJobRestoreIn2NodeMasterDown"; + "ClusterFaultToleranceTwoPipelineIT_testTwoPipelineStreamJobRestoreIn2NodeMasterDown" + + System.currentTimeMillis(); long testRowNumber = 1000; int testParallelism = 6; HazelcastInstanceImpl node1 = null; diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java index cba498e9992..4ecee663ae5 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java @@ -145,6 +145,28 @@ public void testGetErrorInfo() throws ExecutionException, InterruptedException { Assertions.assertTrue(result.getError().startsWith("java.lang.NumberFormatException")); } + @Test + public void testGetUnKnownJobID() { + + ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig(); + clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT")); + SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig); + + ClientJobProxy newClientJobProxy = + engineClient.createJobClient().getJobProxy(System.currentTimeMillis()); + CompletableFuture waitForJobCompleteFuture = + CompletableFuture.supplyAsync(newClientJobProxy::waitForJobComplete); + + await().atMost(20000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + Assertions.assertEquals( + JobStatus.UNKNOWABLE, waitForJobCompleteFuture.get())); + + Assertions.assertEquals( + "UNKNOWABLE", engineClient.getJobClient().getJobStatus(System.currentTimeMillis())); + } + @Test public void testExpiredJobWasDeleted() throws Exception { Common.setDeployMode(DeployMode.CLIENT); @@ -164,8 +186,8 @@ public void testExpiredJobWasDeleted() throws Exception { await().atMost(65, TimeUnit.SECONDS) .untilAsserted( () -> - Assertions.assertThrowsExactly( - NullPointerException.class, clientJobProxy::getJobStatus)); + Assertions.assertEquals( + JobStatus.UNKNOWABLE, clientJobProxy.getJobStatus())); } @AfterEach diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java index f9dbfb4c6cc..7c50744dba0 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java @@ -60,7 +60,10 @@ public enum JobStatus { SUSPENDED(EndState.LOCALLY), /** The job is currently reconciling and waits for task execution report to recover state. */ - RECONCILING(EndState.NOT_END); + RECONCILING(EndState.NOT_END), + + /** Cannot find the JobID or the job status has already been cleared. */ + UNKNOWABLE(EndState.GLOBALLY); // -------------------------------------------------------------------------------------------- diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java index 5293fe8bf99..89a2258ce2d 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java @@ -505,9 +505,22 @@ public PassiveCompletableFuture savePoint(long jobId) { public PassiveCompletableFuture waitForJobComplete(long jobId) { JobMaster runningJobMaster = runningJobMasterMap.get(jobId); if (runningJobMaster == null) { - JobHistoryService.JobState jobState = jobHistoryService.getJobDetailState(jobId); + // Because operations on Imap cannot be performed within Operation. + CompletableFuture jobStateFuture = + CompletableFuture.supplyAsync( + () -> { + return jobHistoryService.getJobDetailState(jobId); + }, + executorService); + JobHistoryService.JobState jobState = null; + try { + jobState = jobStateFuture.get(); + } catch (Exception e) { + throw new SeaTunnelEngineException("get job state error", e); + } + CompletableFuture future = new CompletableFuture<>(); - if (jobState == null) future.complete(new JobResult(JobStatus.FAILED, null)); + if (jobState == null) future.complete(new JobResult(JobStatus.UNKNOWABLE, null)); else future.complete(new JobResult(jobState.getJobStatus(), jobState.getErrorMessage())); return new PassiveCompletableFuture<>(future); @@ -537,7 +550,7 @@ public JobStatus getJobStatus(long jobId) { JobMaster runningJobMaster = runningJobMasterMap.get(jobId); if (runningJobMaster == null) { JobHistoryService.JobState jobDetailState = jobHistoryService.getJobDetailState(jobId); - return null == jobDetailState ? null : jobDetailState.getJobStatus(); + return null == jobDetailState ? JobStatus.UNKNOWABLE : jobDetailState.getJobStatus(); } return runningJobMaster.getJobStatus(); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java index 56b0e5ec008..88ee1afc9dd 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java @@ -230,7 +230,7 @@ public boolean isMasterNode() { // must retry until the cluster have master node try { return RetryUtils.retryWithException( - () -> nodeEngine.getMasterAddress().equals(nodeEngine.getThisAddress()), + () -> nodeEngine.getThisAddress().equals(nodeEngine.getMasterAddress()), new RetryUtils.RetryMaterial( Constant.OPERATION_RETRY_TIME, true,