diff --git a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/api/ElasticJob.java b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/api/ElasticJob.java index 7a19010e7b..c34cbf0c75 100644 --- a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/api/ElasticJob.java +++ b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/api/ElasticJob.java @@ -24,4 +24,5 @@ * * @author zhangliang */ -public interface ElasticJob extends Job { } +public interface ElasticJob extends Job, Stopable { +} diff --git a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/api/JobScheduler.java b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/api/JobScheduler.java index 176b2c2aac..dc3b890c05 100644 --- a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/api/JobScheduler.java +++ b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/api/JobScheduler.java @@ -25,7 +25,6 @@ import org.quartz.CronTrigger; import org.quartz.JobBuilder; import org.quartz.JobDetail; -import org.quartz.JobExecutionContext; import org.quartz.Scheduler; import org.quartz.SchedulerException; import org.quartz.Trigger; @@ -39,7 +38,6 @@ import com.dangdang.ddframe.job.internal.execution.ExecutionContextService; import com.dangdang.ddframe.job.internal.execution.ExecutionService; import com.dangdang.ddframe.job.internal.failover.FailoverService; -import com.dangdang.ddframe.job.internal.job.AbstractElasticJob; import com.dangdang.ddframe.job.internal.listener.ListenerManager; import com.dangdang.ddframe.job.internal.monitor.MonitorService; import com.dangdang.ddframe.job.internal.offset.OffsetService; @@ -56,7 +54,8 @@ /** * 作业调度器. * - * @author zhangliang, caohao + * @author zhangliang + * @author caohao */ @Slf4j public class JobScheduler { @@ -125,7 +124,7 @@ public void init() { } catch (final SchedulerException ex) { throw new JobException(ex); } - JobRegistry.getInstance().addJob(jobConfiguration.getJobName(), this); + JobRegistry.getInstance().addJobScheduler(jobConfiguration.getJobName(), this); } private void registerElasticEnv() { @@ -224,11 +223,7 @@ public Date getNextFireTime() { */ public void stopJob() { try { - for (JobExecutionContext each : scheduler.getCurrentlyExecutingJobs()) { - if (each.getJobInstance() instanceof AbstractElasticJob) { - ((AbstractElasticJob) each.getJobInstance()).stop(); - } - } + JobRegistry.getInstance().getJobInstance(jobConfiguration.getJobName()).stop(); scheduler.pauseAll(); } catch (final SchedulerException ex) { throw new JobException(ex); @@ -243,8 +238,8 @@ public void resumeManualStopedJob() { if (scheduler.isShutdown()) { return; } + JobRegistry.getInstance().getJobInstance(jobConfiguration.getJobName()).resume(); scheduler.resumeAll(); - // TODO 恢复stoped=fasle状态 } catch (final SchedulerException ex) { throw new JobException(ex); } @@ -264,6 +259,7 @@ public void resumeCrashedJob() { if (serverService.isJobStopedManually()) { return; } + JobRegistry.getInstance().getJobInstance(jobConfiguration.getJobName()).resume(); try { scheduler.resumeAll(); } catch (final SchedulerException ex) { diff --git a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/api/Stopable.java b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/api/Stopable.java new file mode 100644 index 0000000000..c8aba249a3 --- /dev/null +++ b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/api/Stopable.java @@ -0,0 +1,36 @@ +/** + * Copyright 1999-2015 dangdang.com. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *

+ */ + +package com.dangdang.ddframe.job.api; + +/** + * 可停止的作业或目标. + * + * @author caohao + */ +public interface Stopable { + + /** + * 停止运行中的作业或目标. + */ + void stop(); + + /** + * 恢复运行作业或目标. + */ + void resume(); +} diff --git a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/config/ConfigurationListenerManager.java b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/config/ConfigurationListenerManager.java index 9f0845d1a8..5e2b938e49 100644 --- a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/config/ConfigurationListenerManager.java +++ b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/config/ConfigurationListenerManager.java @@ -56,7 +56,7 @@ class CronSettingChangedJobListener extends AbstractJobListener { protected void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path) { if (configNode.isCronPath(path) && Type.NODE_UPDATED == event.getType()) { String cronExpression = new String(event.getData().getData()); - JobScheduler jobScheduler = JobRegistry.getInstance().getJob(jobName); + JobScheduler jobScheduler = JobRegistry.getInstance().getJobScheduler(jobName); if (null != jobScheduler) { jobScheduler.rescheduleJob(cronExpression); } diff --git a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/config/ConfigurationService.java b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/config/ConfigurationService.java index fa1ae56d83..0aae45f35d 100644 --- a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/config/ConfigurationService.java +++ b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/config/ConfigurationService.java @@ -216,4 +216,14 @@ public String getJobShardingStrategyClass() { public int getMonitorPort() { return Integer.valueOf(jobNodeStorage.getJobNodeData(ConfigurationNode.MONITOR_PORT)); } + + + /** + * 获取作业名称. + * + * @return 作业名称 + */ + public String getJobName() { + return jobNodeStorage.getJobConfiguration().getJobName(); + } } diff --git a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/execution/ExecutionService.java b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/execution/ExecutionService.java index f30800db4f..9b53b7e794 100644 --- a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/execution/ExecutionService.java +++ b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/execution/ExecutionService.java @@ -38,7 +38,8 @@ /** * 执行作业的服务. * - * @author zhangliang, caohao + * @author zhangliang + * @author caohao */ public class ExecutionService { @@ -71,7 +72,7 @@ public void registerJobBegin(final JobExecutionMultipleShardingContext jobExecut for (int each : jobExecutionShardingContext.getShardingItems()) { jobNodeStorage.fillEphemeralJobNode(ExecutionNode.getRunningNode(each), ""); jobNodeStorage.replaceJobNode(ExecutionNode.getLastBeginTimeNode(each), System.currentTimeMillis()); - JobScheduler jobScheduler = JobRegistry.getInstance().getJob(jobConfiguration.getJobName()); + JobScheduler jobScheduler = JobRegistry.getInstance().getJobScheduler(jobConfiguration.getJobName()); if (null == jobScheduler) { continue; } diff --git a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/failover/FailoverService.java b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/failover/FailoverService.java index 725a383b2c..9cf6da13df 100644 --- a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/failover/FailoverService.java +++ b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/failover/FailoverService.java @@ -154,7 +154,7 @@ public void execute() { log.debug("Elastic job: failover job begin, crashed item:{}.", crashedItem); jobNodeStorage.fillEphemeralJobNode(FailoverNode.getExecutionFailoverNode(crashedItem), localHostService.getIp()); jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getItemsNode(crashedItem)); - JobRegistry.getInstance().getJob(jobConfiguration.getJobName()).triggerJob(); + JobRegistry.getInstance().getJobScheduler(jobConfiguration.getJobName()).triggerJob(); } } } diff --git a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/job/AbstractElasticJob.java b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/job/AbstractElasticJob.java index 74432a258a..33f22cd831 100644 --- a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/job/AbstractElasticJob.java +++ b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/job/AbstractElasticJob.java @@ -17,11 +17,6 @@ package com.dangdang.ddframe.job.internal.job; -import lombok.AccessLevel; -import lombok.Getter; -import lombok.Setter; -import lombok.extern.slf4j.Slf4j; - import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; @@ -32,12 +27,19 @@ import com.dangdang.ddframe.job.internal.execution.ExecutionService; import com.dangdang.ddframe.job.internal.failover.FailoverService; import com.dangdang.ddframe.job.internal.offset.OffsetService; +import com.dangdang.ddframe.job.internal.schedule.JobRegistry; import com.dangdang.ddframe.job.internal.sharding.ShardingService; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; + /** * 弹性化分布式作业的基类. * - * @author zhangliang, caohao + * @author zhangliang + * @author caohao */ @Slf4j public abstract class AbstractElasticJob implements ElasticJob { @@ -45,7 +47,6 @@ public abstract class AbstractElasticJob implements ElasticJob { @Getter(AccessLevel.PROTECTED) private volatile boolean stoped; - @Setter @Getter(AccessLevel.PROTECTED) private ConfigurationService configService; @@ -106,10 +107,18 @@ private void executeJobInternal(final JobExecutionMultipleShardingContext shardi protected abstract void executeJob(final JobExecutionMultipleShardingContext shardingContext); - /** - * 停止运行中的作业. - */ - public void stop() { + @Override + public final void stop() { stoped = true; } + + @Override + public final void resume() { + stoped = false; + } + + public final void setConfigService(final ConfigurationService configService) { + this.configService = configService; + JobRegistry.getInstance().addJobInstance(configService.getJobName(), this); + } } diff --git a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/schedule/JobRegistry.java b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/schedule/JobRegistry.java index 4721965bc5..efb838064a 100644 --- a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/schedule/JobRegistry.java +++ b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/schedule/JobRegistry.java @@ -17,21 +17,25 @@ package com.dangdang.ddframe.job.internal.schedule; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; +import com.dangdang.ddframe.job.api.ElasticJob; import com.dangdang.ddframe.job.api.JobScheduler; /** * 作业注册表. * * @author zhangliang + * @author caohao */ public final class JobRegistry { private static volatile JobRegistry instance; - private ConcurrentMap map = new ConcurrentHashMap<>(); + private Map schedulerMap = new ConcurrentHashMap<>(); + + private ConcurrentHashMap instanceMap = new ConcurrentHashMap<>(); private JobRegistry() { } @@ -48,21 +52,42 @@ public static JobRegistry getInstance() { } /** - * 添加作业. + * 添加作业控制器. * * @param jobName 作业名称 * @param jobScheduler 作业控制器 */ - public void addJob(final String jobName, final JobScheduler jobScheduler) { - map.put(jobName, jobScheduler); + public void addJobScheduler(final String jobName, final JobScheduler jobScheduler) { + schedulerMap.put(jobName, jobScheduler); + } + + /** + * 获取作业控制器. + * + * @param jobName 作业名称 + * @return 作业控制器 + */ + public JobScheduler getJobScheduler(final String jobName) { + return schedulerMap.get(jobName); + } + + /** + * 添加作业实例. + * + * @param jobName 作业名称 + * @param job 作业实例 + */ + public void addJobInstance(final String jobName, final ElasticJob job) { + instanceMap.putIfAbsent(jobName, job); } /** - * 获取作业. + * 获取作业实例. * * @param jobName 作业名称 + * @return 作业实例 */ - public JobScheduler getJob(final String jobName) { - return map.get(jobName); + public ElasticJob getJobInstance(final String jobName) { + return instanceMap.get(jobName); } } diff --git a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/server/JobOperationListenerManager.java b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/server/JobOperationListenerManager.java index aef1716bb1..b3e395dba0 100644 --- a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/server/JobOperationListenerManager.java +++ b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/server/JobOperationListenerManager.java @@ -58,9 +58,9 @@ class ConnectionLostListener implements ConnectionStateListener { @Override public void stateChanged(final CuratorFramework client, final ConnectionState newState) { if (ConnectionState.LOST == newState) { - JobRegistry.getInstance().getJob(jobName).stopJob(); + JobRegistry.getInstance().getJobScheduler(jobName).stopJob(); } else if (ConnectionState.RECONNECTED == newState) { - JobRegistry.getInstance().getJob(jobName).resumeCrashedJob(); + JobRegistry.getInstance().getJobScheduler(jobName).resumeCrashedJob(); } } } @@ -72,7 +72,7 @@ protected void dataChanged(final CuratorFramework client, final TreeCacheEvent e if (!serverNode.isJobStopedPath(path)) { return; } - JobScheduler jobScheduler = JobRegistry.getInstance().getJob(jobName); + JobScheduler jobScheduler = JobRegistry.getInstance().getJobScheduler(jobName); if (null == jobScheduler) { return; } diff --git a/elastic-job-core/src/main/java/com/dangdang/ddframe/reg/base/CoordinatorRegistryCenter.java b/elastic-job-core/src/main/java/com/dangdang/ddframe/reg/base/CoordinatorRegistryCenter.java index e12f703f89..3ca707e402 100644 --- a/elastic-job-core/src/main/java/com/dangdang/ddframe/reg/base/CoordinatorRegistryCenter.java +++ b/elastic-job-core/src/main/java/com/dangdang/ddframe/reg/base/CoordinatorRegistryCenter.java @@ -60,7 +60,7 @@ public interface CoordinatorRegistryCenter extends RegistryCenter { /** * 添加本地缓存. * - * @param watcherPath 需加入缓存的路径 + * @param cachePath 需加入缓存的路径 */ void addCacheData(String cachePath); diff --git a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/AbstractBaseStdJobTest.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/AbstractBaseStdJobTest.java index 08e02bca21..1b4f7ed0f6 100644 --- a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/AbstractBaseStdJobTest.java +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/integrate/AbstractBaseStdJobTest.java @@ -109,9 +109,9 @@ public void setUp() { @After public void tearDown() throws SchedulerException, NoSuchFieldException { ProcessCountStatistics.reset(jobName); - JobScheduler jobScheduler = JobRegistry.getInstance().getJob(jobName); + JobScheduler jobScheduler = JobRegistry.getInstance().getJobScheduler(jobName); if (null != jobScheduler) { - JobRegistry.getInstance().getJob(jobName).shutdown(); + JobRegistry.getInstance().getJobScheduler(jobName).shutdown(); } ReflectionUtils.setFieldValue(JobRegistry.getInstance(), "instance", null); } diff --git a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/config/ConfigurationListenerManagerTest.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/config/ConfigurationListenerManagerTest.java index 6b20437303..9ec7d98dc6 100644 --- a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/config/ConfigurationListenerManagerTest.java +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/config/ConfigurationListenerManagerTest.java @@ -81,7 +81,7 @@ configurationListenerManager.new CronSettingChangedJobListener().dataChanged(nul @Test public void assertCronSettingChangedJobListenerWhenIsCronPathAndUpdateAndFindJob() { - JobRegistry.getInstance().addJob("testJob", jobScheduler); + JobRegistry.getInstance().addJobScheduler("testJob", jobScheduler); configurationListenerManager.new CronSettingChangedJobListener().dataChanged(null, new TreeCacheEvent( TreeCacheEvent.Type.NODE_UPDATED, new ChildData("/testJob/config/cron", null, "*/10 * * * * *".getBytes())), "/testJob/config/cron"); verify(jobScheduler).rescheduleJob("*/10 * * * * *"); diff --git a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/execution/ExecutionServiceTest.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/execution/ExecutionServiceTest.java index ecfa36d389..b2a0645aef 100644 --- a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/execution/ExecutionServiceTest.java +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/execution/ExecutionServiceTest.java @@ -104,7 +104,7 @@ public void assertRegisterJobBeginWhenNotMonitorExecution() { public void assertRegisterJobBeginWithoutNextFireTime() { when(configService.isMonitorExecution()).thenReturn(true); when(jobScheduler.getNextFireTime()).thenReturn(null); - JobRegistry.getInstance().addJob("testJob", jobScheduler); + JobRegistry.getInstance().addJobScheduler("testJob", jobScheduler); JobExecutionMultipleShardingContext jobExecutionShardingContext = new JobExecutionMultipleShardingContext(); jobExecutionShardingContext.setShardingItems(Arrays.asList(0, 1, 2)); executionService.registerJobBegin(jobExecutionShardingContext); @@ -122,7 +122,7 @@ public void assertRegisterJobBeginWithoutNextFireTime() { public void assertRegisterJobBeginWithNextFireTime() { when(configService.isMonitorExecution()).thenReturn(true); when(jobScheduler.getNextFireTime()).thenReturn(new Date(0L)); - JobRegistry.getInstance().addJob("testJob", jobScheduler); + JobRegistry.getInstance().addJobScheduler("testJob", jobScheduler); JobExecutionMultipleShardingContext jobExecutionShardingContext = new JobExecutionMultipleShardingContext(); jobExecutionShardingContext.setShardingItems(Arrays.asList(0, 1, 2)); executionService.registerJobBegin(jobExecutionShardingContext); diff --git a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/failover/FailoverServiceTest.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/failover/FailoverServiceTest.java index 2971e03e5b..f8bc4939f4 100644 --- a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/failover/FailoverServiceTest.java +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/failover/FailoverServiceTest.java @@ -149,7 +149,7 @@ public void assertFailoverLeaderExecutionCallbackIfNecessary() { when(jobNodeStorage.isJobNodeExisted("leader/failover/items")).thenReturn(true); when(jobNodeStorage.getJobNodeChildrenKeys("leader/failover/items")).thenReturn(Arrays.asList("0", "1", "2")); when(serverService.isServerReady()).thenReturn(true); - JobRegistry.getInstance().addJob("testJob", jobScheduler); + JobRegistry.getInstance().addJobScheduler("testJob", jobScheduler); failoverService.new FailoverLeaderExecutionCallback().execute(); verify(jobNodeStorage).isJobNodeExisted("leader/failover/items"); verify(jobNodeStorage, times(2)).getJobNodeChildrenKeys("leader/failover/items"); diff --git a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/server/JobOperationListenerManagerTest.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/server/JobOperationListenerManagerTest.java index f58c3f4be5..0b17d1352d 100644 --- a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/server/JobOperationListenerManagerTest.java +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/server/JobOperationListenerManagerTest.java @@ -66,14 +66,14 @@ public void assertStart() { @Test public void assertConnectionLostListenerWhenConnectionStateIsLost() { - JobRegistry.getInstance().addJob("testJob", jobScheduler); + JobRegistry.getInstance().addJobScheduler("testJob", jobScheduler); jobOperationListenerManager.new ConnectionLostListener().stateChanged(null, ConnectionState.LOST); verify(jobScheduler).stopJob(); } @Test public void assertConnectionLostListenerWhenConnectionStateIsReconnected() { - JobRegistry.getInstance().addJob("testJob", jobScheduler); + JobRegistry.getInstance().addJobScheduler("testJob", jobScheduler); jobOperationListenerManager.new ConnectionLostListener().stateChanged(null, ConnectionState.RECONNECTED); verify(jobScheduler).resumeCrashedJob(); } @@ -96,7 +96,7 @@ jobOperationListenerManager.new JobStopedStatusJobListener().dataChanged(null, n @Test public void assertJobStopedStatusJobListenerWhenIsJobStopedPathAndUpdate() { - JobRegistry.getInstance().addJob("testJob", jobScheduler); + JobRegistry.getInstance().addJobScheduler("testJob", jobScheduler); jobOperationListenerManager.new JobStopedStatusJobListener().dataChanged(null, new TreeCacheEvent( TreeCacheEvent.Type.NODE_UPDATED, new ChildData("/testJob/servers/" + ip + "/stoped", null, "".getBytes())), "/testJob/servers/" + ip + "/stoped"); verify(jobScheduler, times(0)).stopJob(); @@ -105,7 +105,7 @@ jobOperationListenerManager.new JobStopedStatusJobListener().dataChanged(null, n @Test public void assertJobStopedStatusJobListenerWhenIsJobStopedPathAndAdd() { - JobRegistry.getInstance().addJob("testJob", jobScheduler); + JobRegistry.getInstance().addJobScheduler("testJob", jobScheduler); jobOperationListenerManager.new JobStopedStatusJobListener().dataChanged(null, new TreeCacheEvent( TreeCacheEvent.Type.NODE_ADDED, new ChildData("/testJob/servers/" + ip + "/stoped", null, "".getBytes())), "/testJob/servers/" + ip + "/stoped"); verify(jobScheduler).stopJob(); @@ -114,7 +114,7 @@ jobOperationListenerManager.new JobStopedStatusJobListener().dataChanged(null, n @Test public void assertJobStopedStatusJobListenerWhenIsJobStopedPathAndRemove() { - JobRegistry.getInstance().addJob("testJob", jobScheduler); + JobRegistry.getInstance().addJobScheduler("testJob", jobScheduler); jobOperationListenerManager.new JobStopedStatusJobListener().dataChanged(null, new TreeCacheEvent( TreeCacheEvent.Type.NODE_REMOVED, new ChildData("/testJob/servers/" + ip + "/stoped", null, "".getBytes())), "/testJob/servers/" + ip + "/stoped"); verify(jobScheduler, times(0)).stopJob(); diff --git a/elastic-job-spring/src/test/java/com/dangdang/ddframe/job/spring/AbstractJobSpringIntegrateTest.java b/elastic-job-spring/src/test/java/com/dangdang/ddframe/job/spring/AbstractJobSpringIntegrateTest.java index fcba356dc5..0a3035832d 100644 --- a/elastic-job-spring/src/test/java/com/dangdang/ddframe/job/spring/AbstractJobSpringIntegrateTest.java +++ b/elastic-job-spring/src/test/java/com/dangdang/ddframe/job/spring/AbstractJobSpringIntegrateTest.java @@ -50,8 +50,8 @@ public void reset() { @After public void tearDown() { - JobRegistry.getInstance().getJob("simpleElasticJob").shutdown(); - JobRegistry.getInstance().getJob("throughputDataFlowElasticJob").shutdown(); + JobRegistry.getInstance().getJobScheduler("simpleElasticJob").shutdown(); + JobRegistry.getInstance().getJobScheduler("throughputDataFlowElasticJob").shutdown(); WaitingUtils.waitingLongTime(); }