From fe6bb404827ca5347fb5794b93a3cbdb49522001 Mon Sep 17 00:00:00 2001 From: terrymanu Date: Wed, 14 Oct 2015 17:02:01 +0800 Subject: [PATCH] =?UTF-8?q?[=E5=8A=9F=E8=83=BD=E6=8F=90=E5=8D=87]=E5=88=86?= =?UTF-8?q?=E7=89=87=E6=8E=92=E5=BA=8F=E7=AD=96=E7=95=A5=E5=8F=AF=E9=85=8D?= =?UTF-8?q?=E7=BD=AE=E5=8C=96=20#9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../job/console/domain/JobSettings.java | 2 + .../service/impl/JobDimensionServiceImpl.java | 2 + .../src/main/webapp/js/job_detail.js | 4 +- .../src/main/webapp/templates/job_detail.ftl | 7 ++++ .../ddframe/job/api/JobConfiguration.java | 9 +++++ ...ngStrategyClassConfigurationException.java | 32 ++++++++++++++++ .../internal/config/ConfigurationNode.java | 2 + .../internal/config/ConfigurationService.java | 10 +++++ .../internal/sharding/ShardingService.java | 15 ++++++-- .../AverageAllocationJobShardingStrategy.java | 8 ++-- .../{ => strategy}/JobShardingStrategy.java | 8 ++-- .../strategy/JobShardingStrategyFactory.java | 35 ++++++++++++++++++ .../strategy/JobShardingStrategyOption.java | 31 ++++++++++++++++ .../com/dangdang/ddframe/job/AllJobTests.java | 4 +- .../config/ConfigurationServiceTest.java | 9 +++++ ...rageAllocationJobShardingStrategyTest.java | 18 +++++---- .../JobShardingStrategyFactoryTest.java | 37 +++++++++++++++++++ .../fixture/InvalidJobShardingStrategy.java | 18 +++++++++ .../namespace/JobBeanDefinitionParser.java | 1 + .../main/resources/META-INF/namespace/job.xsd | 1 + 20 files changed, 232 insertions(+), 21 deletions(-) create mode 100644 elastic-job-core/src/main/java/com/dangdang/ddframe/job/exception/JobShardingStrategyClassConfigurationException.java rename elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/sharding/{ => strategy}/AverageAllocationJobShardingStrategy.java (92%) rename elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/sharding/{ => strategy}/JobShardingStrategy.java (85%) create mode 100644 elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/sharding/strategy/JobShardingStrategyFactory.java create mode 100644 elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/sharding/strategy/JobShardingStrategyOption.java rename elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/sharding/{ => strategy}/AverageAllocationJobShardingStrategyTest.java (80%) create mode 100644 elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/sharding/strategy/JobShardingStrategyFactoryTest.java create mode 100644 elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/sharding/strategy/fixture/InvalidJobShardingStrategy.java diff --git a/elastic-job-console/src/main/java/com/dangdang/ddframe/job/console/domain/JobSettings.java b/elastic-job-console/src/main/java/com/dangdang/ddframe/job/console/domain/JobSettings.java index 30447b0d78..b19f6ca00d 100644 --- a/elastic-job-console/src/main/java/com/dangdang/ddframe/job/console/domain/JobSettings.java +++ b/elastic-job-console/src/main/java/com/dangdang/ddframe/job/console/domain/JobSettings.java @@ -52,5 +52,7 @@ public final class JobSettings implements Serializable { private boolean misfire; + private String jobShardingStrategyClass; + private String description; } diff --git a/elastic-job-console/src/main/java/com/dangdang/ddframe/job/console/service/impl/JobDimensionServiceImpl.java b/elastic-job-console/src/main/java/com/dangdang/ddframe/job/console/service/impl/JobDimensionServiceImpl.java index cdf5b07d32..3c5e7d0df4 100644 --- a/elastic-job-console/src/main/java/com/dangdang/ddframe/job/console/service/impl/JobDimensionServiceImpl.java +++ b/elastic-job-console/src/main/java/com/dangdang/ddframe/job/console/service/impl/JobDimensionServiceImpl.java @@ -99,6 +99,7 @@ public JobSettings getJobSettings(final String jobName) { result.setFetchDataCount(Integer.parseInt(curatorRepository.getData(JobNodePath.getConfigNodePath(jobName, "fetchDataCount")))); result.setFailover(Boolean.valueOf(curatorRepository.getData(JobNodePath.getConfigNodePath(jobName, "failover")))); result.setMisfire(Boolean.valueOf(curatorRepository.getData(JobNodePath.getConfigNodePath(jobName, "misfire")))); + result.setJobShardingStrategyClass(curatorRepository.getData(JobNodePath.getConfigNodePath(jobName, "jobShardingStrategyClass"))); result.setDescription(curatorRepository.getData(JobNodePath.getConfigNodePath(jobName, "description"))); return result; } @@ -115,6 +116,7 @@ public void updateJobSettings(final JobSettings jobSettings) { updateIfchanged(jobSettings.getJobName(), "fetchDataCount", jobSettings.getFetchDataCount()); updateIfchanged(jobSettings.getJobName(), "failover", jobSettings.isFailover()); updateIfchanged(jobSettings.getJobName(), "misfire", jobSettings.isMisfire()); + updateIfchanged(jobSettings.getJobName(), "jobShardingStrategyClass", jobSettings.getJobShardingStrategyClass()); updateIfchanged(jobSettings.getJobName(), "description", jobSettings.getDescription()); } diff --git a/elastic-job-console/src/main/webapp/js/job_detail.js b/elastic-job-console/src/main/webapp/js/job_detail.js index 29ecc40f17..9b9bbc6b3a 100644 --- a/elastic-job-console/src/main/webapp/js/job_detail.js +++ b/elastic-job-console/src/main/webapp/js/job_detail.js @@ -29,6 +29,7 @@ function renderSettings() { $("#processCountIntervalSeconds").attr("value", data.processCountIntervalSeconds); $("#concurrentDataProcessThreadCount").attr("value", data.concurrentDataProcessThreadCount); $("#fetchDataCount").attr("value", data.fetchDataCount); + $("#jobShardingStrategyClass").attr("value", data.jobShardingStrategyClass); $("#description").text(data.description); if (!data.monitorExecution) { $("#execution_info_tab").addClass("disabled"); @@ -51,8 +52,9 @@ function bindSubmitJobSettingsForm() { var failover = $("#failover").prop("checked"); var misfire = $("#misfire").prop("checked"); var shardingItemParameters = $("#shardingItemParameters").val(); + var jobShardingStrategyClass = $("#jobShardingStrategyClass").val(); var description = $("#description").val(); - $.post("job/settings", {jobName: jobName, jobClass : jobClass, shardingTotalCount: shardingTotalCount, jobParameter: jobParameter, cron: cron, concurrentDataProcessThreadCount: concurrentDataProcessThreadCount, processCountIntervalSeconds: processCountIntervalSeconds, fetchDataCount: fetchDataCount, monitorExecution: monitorExecution, failover: failover, misfire: misfire,shardingItemParameters: shardingItemParameters, description: description}, function(data) { + $.post("job/settings", {jobName: jobName, jobClass : jobClass, shardingTotalCount: shardingTotalCount, jobParameter: jobParameter, cron: cron, concurrentDataProcessThreadCount: concurrentDataProcessThreadCount, processCountIntervalSeconds: processCountIntervalSeconds, fetchDataCount: fetchDataCount, monitorExecution: monitorExecution, failover: failover, misfire: misfire, shardingItemParameters: shardingItemParameters, jobShardingStrategyClass: jobShardingStrategyClass, description: description}, function(data) { showSuccessDialog(); if (monitorExecution) { $("#execution_info_tab").removeClass("disabled"); diff --git a/elastic-job-console/src/main/webapp/templates/job_detail.ftl b/elastic-job-console/src/main/webapp/templates/job_detail.ftl index 98068680c9..4cc65f1c00 100644 --- a/elastic-job-console/src/main/webapp/templates/job_detail.ftl +++ b/elastic-job-console/src/main/webapp/templates/job_detail.ftl @@ -74,6 +74,13 @@ +
+ +
+ +
+
+
diff --git a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/api/JobConfiguration.java b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/api/JobConfiguration.java index 8b559cfc85..2e33a9c613 100644 --- a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/api/JobConfiguration.java +++ b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/api/JobConfiguration.java @@ -124,6 +124,15 @@ public class JobConfiguration { */ private boolean misfire = true; + /** + * 作业分片策略实现类全路径. + * + *

+ * 默认使用{@code com.dangdang.ddframe.job.internal.sharding.strategy.AverageAllocationJobShardingStrategy}. + *

+ */ + private String jobShardingStrategyClass = ""; + /** * 作业描述信息. */ diff --git a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/exception/JobShardingStrategyClassConfigurationException.java b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/exception/JobShardingStrategyClassConfigurationException.java new file mode 100644 index 0000000000..0a9008b49f --- /dev/null +++ b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/exception/JobShardingStrategyClassConfigurationException.java @@ -0,0 +1,32 @@ +/** + * Copyright 1999-2015 dangdang.com. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *

+ */ + +package com.dangdang.ddframe.job.exception; + +/** + * 作业分片策略类配置错误抛出的异常. + * + * @author zhangliang + */ +public final class JobShardingStrategyClassConfigurationException extends JobException { + + private static final long serialVersionUID = -5017090222608389272L; + + public JobShardingStrategyClassConfigurationException(final Exception cause) { + super(cause); + } +} diff --git a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/config/ConfigurationNode.java b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/config/ConfigurationNode.java index 8a6b552839..e5b48e42df 100644 --- a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/config/ConfigurationNode.java +++ b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/config/ConfigurationNode.java @@ -50,6 +50,8 @@ public final class ConfigurationNode { static final String MISFIRE = ROOT + "/misfire"; + static final String JOB_SHARDING_STRATEGY_CLASS = ROOT + "/jobShardingStrategyClass"; + static final String DESCRIPTION = ROOT + "/description"; private final JobNodePath jobNodePath; diff --git a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/config/ConfigurationService.java b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/config/ConfigurationService.java index f378334bb6..021bd0d96d 100644 --- a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/config/ConfigurationService.java +++ b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/config/ConfigurationService.java @@ -71,6 +71,7 @@ private void registerJobInfo() { jobNodeStorage.fillJobNodeIfNullOrOverwrite(ConfigurationNode.FETCH_DATA_COUNT, jobNodeStorage.getJobConfiguration().getFetchDataCount()); jobNodeStorage.fillJobNodeIfNullOrOverwrite(ConfigurationNode.FAILOVER, jobNodeStorage.getJobConfiguration().isFailover()); jobNodeStorage.fillJobNodeIfNullOrOverwrite(ConfigurationNode.MISFIRE, jobNodeStorage.getJobConfiguration().isMisfire()); + jobNodeStorage.fillJobNodeIfNullOrOverwrite(ConfigurationNode.JOB_SHARDING_STRATEGY_CLASS, jobNodeStorage.getJobConfiguration().getJobShardingStrategyClass()); jobNodeStorage.fillJobNodeIfNullOrOverwrite(ConfigurationNode.DESCRIPTION, jobNodeStorage.getJobConfiguration().getDescription()); } @@ -180,4 +181,13 @@ public boolean isFailover() { public boolean isMisfire() { return Boolean.valueOf(jobNodeStorage.getJobNodeData(ConfigurationNode.MISFIRE)); } + + /** + * 获取作业分片策略实现类全路径. + * + * @return 作业分片策略实现类全路径 + */ + public String getJobShardingStrategyClass() { + return jobNodeStorage.getJobNodeData(ConfigurationNode.JOB_SHARDING_STRATEGY_CLASS); + } } diff --git a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/sharding/ShardingService.java b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/sharding/ShardingService.java index 8f081613db..4c7ec44e2e 100644 --- a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/sharding/ShardingService.java +++ b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/sharding/ShardingService.java @@ -22,6 +22,8 @@ import java.util.Map; import java.util.Map.Entry; +import lombok.extern.slf4j.Slf4j; + import com.dangdang.ddframe.job.api.JobConfiguration; import com.dangdang.ddframe.job.internal.config.ConfigurationService; import com.dangdang.ddframe.job.internal.election.LeaderElectionService; @@ -29,13 +31,14 @@ import com.dangdang.ddframe.job.internal.env.RealLocalHostService; import com.dangdang.ddframe.job.internal.execution.ExecutionService; import com.dangdang.ddframe.job.internal.server.ServerService; +import com.dangdang.ddframe.job.internal.sharding.strategy.JobShardingStrategy; +import com.dangdang.ddframe.job.internal.sharding.strategy.JobShardingStrategyFactory; +import com.dangdang.ddframe.job.internal.sharding.strategy.JobShardingStrategyOption; import com.dangdang.ddframe.job.internal.storage.JobNodeStorage; import com.dangdang.ddframe.job.internal.util.BlockUtils; import com.dangdang.ddframe.job.internal.util.ItemUtils; import com.dangdang.ddframe.reg.base.CoordinatorRegistryCenter; -import lombok.extern.slf4j.Slf4j; - /** * 作业分片服务. * @@ -44,6 +47,8 @@ @Slf4j public final class ShardingService { + private final String jobName; + private final JobNodeStorage jobNodeStorage; private final LocalHostService localHostService = new RealLocalHostService(); @@ -57,6 +62,7 @@ public final class ShardingService { private final ExecutionService executionService; public ShardingService(final CoordinatorRegistryCenter coordinatorRegistryCenter, final JobConfiguration jobConfiguration) { + jobName = jobConfiguration.getJobName(); jobNodeStorage = new JobNodeStorage(coordinatorRegistryCenter, jobConfiguration); leaderElectionService = new LeaderElectionService(coordinatorRegistryCenter, jobConfiguration); configService = new ConfigurationService(coordinatorRegistryCenter, jobConfiguration); @@ -96,8 +102,9 @@ public void shardingIfNecessary() { log.debug("Elastic job: sharding begin."); jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, ""); clearShardingInfo(); - JobShardingStrategy jobShardingStrategy = new AverageAllocationJobShardingStrategy(); - persistShardingInfo(jobShardingStrategy.sharding(serverService.getAvailableServers(), configService.getShardingTotalCount())); + JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(configService.getJobShardingStrategyClass()); + JobShardingStrategyOption option = new JobShardingStrategyOption(jobName, configService.getShardingTotalCount(), configService.getShardingItemParameters()); + persistShardingInfo(jobShardingStrategy.sharding(serverService.getAvailableServers(), option)); jobNodeStorage.removeJobNodeIfExisted(ShardingNode.NECESSARY); jobNodeStorage.removeJobNodeIfExisted(ShardingNode.PROCESSING); log.debug("Elastic job: sharding completed."); diff --git a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/sharding/AverageAllocationJobShardingStrategy.java b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/sharding/strategy/AverageAllocationJobShardingStrategy.java similarity index 92% rename from elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/sharding/AverageAllocationJobShardingStrategy.java rename to elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/sharding/strategy/AverageAllocationJobShardingStrategy.java index c7ee3043b8..63ce864379 100644 --- a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/sharding/AverageAllocationJobShardingStrategy.java +++ b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/sharding/strategy/AverageAllocationJobShardingStrategy.java @@ -15,7 +15,7 @@ *

*/ -package com.dangdang.ddframe.job.internal.sharding; +package com.dangdang.ddframe.job.internal.sharding.strategy; import java.util.ArrayList; import java.util.Collections; @@ -40,12 +40,12 @@ public final class AverageAllocationJobShardingStrategy implements JobShardingStrategy { @Override - public Map> sharding(final List serversList, final int shardingTotalCount) { + public Map> sharding(final List serversList, final JobShardingStrategyOption option) { if (serversList.isEmpty()) { return Collections.emptyMap(); } - Map> result = shardingAliquot(serversList, shardingTotalCount); - addAliquant(serversList, shardingTotalCount, result); + Map> result = shardingAliquot(serversList, option.getShardingTotalCount()); + addAliquant(serversList, option.getShardingTotalCount(), result); return result; } diff --git a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/sharding/JobShardingStrategy.java b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/sharding/strategy/JobShardingStrategy.java similarity index 85% rename from elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/sharding/JobShardingStrategy.java rename to elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/sharding/strategy/JobShardingStrategy.java index 0fee66c1f5..a663c88ae9 100644 --- a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/sharding/JobShardingStrategy.java +++ b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/sharding/strategy/JobShardingStrategy.java @@ -15,13 +15,13 @@ *

*/ -package com.dangdang.ddframe.job.internal.sharding; +package com.dangdang.ddframe.job.internal.sharding.strategy; import java.util.List; import java.util.Map; /** - * 作业的分片策略. + * 作业分片策略. * * @author zhangliang */ @@ -31,8 +31,8 @@ public interface JobShardingStrategy { * 进行作业分片. * * @param serversList 所有参与分片的服务器列表 - * @param shardingTotalCount 分片总数量 + * @param option 作业分片策略选项 * @return 分配分片的服务器IP和分片集合的映射 */ - Map> sharding(List serversList, int shardingTotalCount); + Map> sharding(List serversList, JobShardingStrategyOption option); } diff --git a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/sharding/strategy/JobShardingStrategyFactory.java b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/sharding/strategy/JobShardingStrategyFactory.java new file mode 100644 index 0000000000..4cb10cb2f6 --- /dev/null +++ b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/sharding/strategy/JobShardingStrategyFactory.java @@ -0,0 +1,35 @@ +package com.dangdang.ddframe.job.internal.sharding.strategy; + +import com.dangdang.ddframe.job.exception.JobShardingStrategyClassConfigurationException; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; + +/** + * 作业分片策略工厂. + * + * @author zhangliang + */ +public final class JobShardingStrategyFactory { + + private JobShardingStrategyFactory() { + } + + /** + * 获取 作业分片策略实例. + * + * @param jobShardingStrategyClassName 作业分片策略类名 + * @return 作业分片策略实例 + */ + public static JobShardingStrategy getStrategy(final String jobShardingStrategyClassName) { + if (Strings.isNullOrEmpty(jobShardingStrategyClassName)) { + return new AverageAllocationJobShardingStrategy(); + } + try { + Class jobShardingStrategyClass = Class.forName(jobShardingStrategyClassName); + Preconditions.checkState(JobShardingStrategy.class.isAssignableFrom(jobShardingStrategyClass), String.format("Class [%s] is not job strategy class", jobShardingStrategyClassName)); + return (JobShardingStrategy) jobShardingStrategyClass.newInstance(); + } catch (final ClassNotFoundException | InstantiationException | IllegalAccessException ex) { + throw new JobShardingStrategyClassConfigurationException(ex); + } + } +} diff --git a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/sharding/strategy/JobShardingStrategyOption.java b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/sharding/strategy/JobShardingStrategyOption.java new file mode 100644 index 0000000000..611f686d96 --- /dev/null +++ b/elastic-job-core/src/main/java/com/dangdang/ddframe/job/internal/sharding/strategy/JobShardingStrategyOption.java @@ -0,0 +1,31 @@ +package com.dangdang.ddframe.job.internal.sharding.strategy; + +import java.util.Map; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +/** + * 作业分片策略选项. + * + * @author zhangliang + */ +@Getter +@RequiredArgsConstructor +public final class JobShardingStrategyOption { + + /** + * 作业名称. + */ + private final String jobName; + + /** + * 作业分片总数. + */ + private final int shardingTotalCount; + + /** + * 分片序列号和个性化参数对照表. + */ + private final Map shardingItemParameters; +} diff --git a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/AllJobTests.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/AllJobTests.java index 21d9f43bdf..7412539b60 100644 --- a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/AllJobTests.java +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/AllJobTests.java @@ -47,10 +47,11 @@ import com.dangdang.ddframe.job.internal.offset.OffsetServiceTest; import com.dangdang.ddframe.job.internal.server.ServerNodeTest; import com.dangdang.ddframe.job.internal.server.ServerServiceTest; -import com.dangdang.ddframe.job.internal.sharding.AverageAllocationJobShardingStrategyTest; import com.dangdang.ddframe.job.internal.sharding.ShardingListenerManagerTest; import com.dangdang.ddframe.job.internal.sharding.ShardingNodeTest; import com.dangdang.ddframe.job.internal.sharding.ShardingServiceTest; +import com.dangdang.ddframe.job.internal.sharding.strategy.AverageAllocationJobShardingStrategyTest; +import com.dangdang.ddframe.job.internal.sharding.strategy.JobShardingStrategyFactoryTest; import com.dangdang.ddframe.job.internal.statistics.ProcessCountJobTest; import com.dangdang.ddframe.job.internal.statistics.ProcessCountStatisticsTest; import com.dangdang.ddframe.job.internal.storage.JobNodePathTest; @@ -80,6 +81,7 @@ FailoverListenerManagerTest.class, OffsetServiceTest.class, OffsetNodeTest.class, + JobShardingStrategyFactoryTest.class, AverageAllocationJobShardingStrategyTest.class, ProcessCountJobTest.class, ProcessCountStatisticsTest.class, diff --git a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/config/ConfigurationServiceTest.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/config/ConfigurationServiceTest.java index b76d91c4f5..e934e68e4c 100644 --- a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/config/ConfigurationServiceTest.java +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/config/ConfigurationServiceTest.java @@ -39,6 +39,7 @@ import com.dangdang.ddframe.job.exception.JobConflictException; import com.dangdang.ddframe.job.exception.ShardingItemParametersException; import com.dangdang.ddframe.job.internal.AbstractBaseJobTest; +import com.dangdang.ddframe.job.internal.sharding.strategy.JobShardingStrategy; public final class ConfigurationServiceTest extends AbstractBaseJobTest { @@ -109,6 +110,7 @@ private void assertJobConfiguration(final JobConfiguration jobConfiguration) { assertThat(Integer.parseInt(getRegistryCenter().getDirectly("/testJob/config/fetchDataCount")), is(jobConfiguration.getFetchDataCount())); assertThat(Boolean.valueOf(getRegistryCenter().getDirectly("/testJob/config/failover")), is(jobConfiguration.isFailover())); assertThat(Boolean.valueOf(getRegistryCenter().getDirectly("/testJob/config/misfire")), is(jobConfiguration.isMisfire())); + assertThat(getRegistryCenter().getDirectly("/testJob/config/jobShardingStrategyClass"), is(jobConfiguration.getJobShardingStrategyClass())); assertThat(getRegistryCenter().getDirectly("/testJob/config/description"), is(jobConfiguration.getDescription())); } @@ -217,6 +219,13 @@ public void assertIsMisfire() { assertTrue(configService.isMisfire()); } + @Test + public void assertGetJobShardingStrategyClass() { + getJobConfig().setJobShardingStrategyClass(JobShardingStrategy.class.getName()); + configService.persistJobConfiguration(); + assertThat(configService.getJobShardingStrategyClass(), is(JobShardingStrategy.class.getName())); + } + class ConflictJob extends AbstractElasticJob { @Override diff --git a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/sharding/AverageAllocationJobShardingStrategyTest.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/sharding/strategy/AverageAllocationJobShardingStrategyTest.java similarity index 80% rename from elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/sharding/AverageAllocationJobShardingStrategyTest.java rename to elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/sharding/strategy/AverageAllocationJobShardingStrategyTest.java index c573128c43..da29fd97a3 100644 --- a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/sharding/AverageAllocationJobShardingStrategyTest.java +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/sharding/strategy/AverageAllocationJobShardingStrategyTest.java @@ -15,7 +15,7 @@ *

*/ -package com.dangdang.ddframe.job.internal.sharding; +package com.dangdang.ddframe.job.internal.sharding.strategy; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat; @@ -34,14 +34,14 @@ public final class AverageAllocationJobShardingStrategyTest { @Test public void shardingForZeroServer() { - assertThat(jobShardingStrategy.sharding(Collections.emptyList(), 3), is(Collections.EMPTY_MAP)); + assertThat(jobShardingStrategy.sharding(Collections.emptyList(), getJobShardingStrategyOption(3)), is(Collections.EMPTY_MAP)); } @Test public void shardingForOneServer() { Map> expected = new LinkedHashMap<>(1); expected.put("host0", Arrays.asList(0, 1, 2)); - assertThat(jobShardingStrategy.sharding(Arrays.asList("host0"), 3), is(expected)); + assertThat(jobShardingStrategy.sharding(Arrays.asList("host0"), getJobShardingStrategyOption(3)), is(expected)); } @Test @@ -50,7 +50,7 @@ public void shardingForServersMoreThanShardingCount() { expected.put("host0", Arrays.asList(0)); expected.put("host1", Arrays.asList(1)); expected.put("host2", Collections.emptyList()); - assertThat(jobShardingStrategy.sharding(Arrays.asList("host0", "host1", "host2"), 2), is(expected)); + assertThat(jobShardingStrategy.sharding(Arrays.asList("host0", "host1", "host2"), getJobShardingStrategyOption(2)), is(expected)); } @Test @@ -59,7 +59,7 @@ public void shardingForServersLessThanShardingCountAliquot() { expected.put("host0", Arrays.asList(0, 1, 2)); expected.put("host1", Arrays.asList(3, 4, 5)); expected.put("host2", Arrays.asList(6, 7, 8)); - assertThat(jobShardingStrategy.sharding(Arrays.asList("host0", "host1", "host2"), 9), is(expected)); + assertThat(jobShardingStrategy.sharding(Arrays.asList("host0", "host1", "host2"), getJobShardingStrategyOption(9)), is(expected)); } @Test @@ -68,7 +68,7 @@ public void shardingForServersLessThanShardingCountAliquantFor8ShardingCountAnd3 expected.put("host0", Arrays.asList(0, 1, 6)); expected.put("host1", Arrays.asList(2, 3, 7)); expected.put("host2", Arrays.asList(4, 5)); - assertThat(jobShardingStrategy.sharding(Arrays.asList("host0", "host1", "host2"), 8), is(expected)); + assertThat(jobShardingStrategy.sharding(Arrays.asList("host0", "host1", "host2"), getJobShardingStrategyOption(8)), is(expected)); } @Test @@ -77,6 +77,10 @@ public void shardingForServersLessThanShardingCountAliquantFor10ShardingCountAnd expected.put("host0", Arrays.asList(0, 1, 2, 9)); expected.put("host1", Arrays.asList(3, 4, 5)); expected.put("host2", Arrays.asList(6, 7, 8)); - assertThat(jobShardingStrategy.sharding(Arrays.asList("host0", "host1", "host2"), 10), is(expected)); + assertThat(jobShardingStrategy.sharding(Arrays.asList("host0", "host1", "host2"), getJobShardingStrategyOption(10)), is(expected)); + } + + private JobShardingStrategyOption getJobShardingStrategyOption(final int shardingTotalCount) { + return new JobShardingStrategyOption("testJob", shardingTotalCount, Collections.emptyMap()); } } diff --git a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/sharding/strategy/JobShardingStrategyFactoryTest.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/sharding/strategy/JobShardingStrategyFactoryTest.java new file mode 100644 index 0000000000..4253a6ce9e --- /dev/null +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/sharding/strategy/JobShardingStrategyFactoryTest.java @@ -0,0 +1,37 @@ +package com.dangdang.ddframe.job.internal.sharding.strategy; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.junit.Assert.assertThat; + +import org.junit.Test; + +import com.dangdang.ddframe.job.exception.JobShardingStrategyClassConfigurationException; +import com.dangdang.ddframe.job.internal.sharding.strategy.fixture.InvalidJobShardingStrategy; + +public class JobShardingStrategyFactoryTest { + + @Test + public void assertGetDefaultStrategy() { + assertThat(JobShardingStrategyFactory.getStrategy(null), instanceOf(AverageAllocationJobShardingStrategy.class)); + } + + @Test(expected = JobShardingStrategyClassConfigurationException.class) + public void assertGetStrategyFailureWhenClassNotFound() { + JobShardingStrategyFactory.getStrategy("NotClass"); + } + + @Test(expected = IllegalStateException.class) + public void assertGetStrategyFailureWhenNotStrategyClass() { + JobShardingStrategyFactory.getStrategy(Object.class.getName()); + } + + @Test(expected = JobShardingStrategyClassConfigurationException.class) + public void assertGetStrategyFailureWhenStrategyClassInvalid() { + JobShardingStrategyFactory.getStrategy(InvalidJobShardingStrategy.class.getName()); + } + + @Test + public void assertGetStrategySuccess() { + assertThat(JobShardingStrategyFactory.getStrategy(AverageAllocationJobShardingStrategy.class.getName()), instanceOf(AverageAllocationJobShardingStrategy.class)); + } +} diff --git a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/sharding/strategy/fixture/InvalidJobShardingStrategy.java b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/sharding/strategy/fixture/InvalidJobShardingStrategy.java new file mode 100644 index 0000000000..9c6568952e --- /dev/null +++ b/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/sharding/strategy/fixture/InvalidJobShardingStrategy.java @@ -0,0 +1,18 @@ +package com.dangdang.ddframe.job.internal.sharding.strategy.fixture; + +import java.util.List; +import java.util.Map; + +import com.dangdang.ddframe.job.internal.sharding.strategy.JobShardingStrategy; +import com.dangdang.ddframe.job.internal.sharding.strategy.JobShardingStrategyOption; + +public class InvalidJobShardingStrategy implements JobShardingStrategy { + + public InvalidJobShardingStrategy(final String input) { + } + + @Override + public Map> sharding(final List serversList, final JobShardingStrategyOption option) { + return null; + } +} diff --git a/elastic-job-spring/src/main/java/com/dangdang/ddframe/job/spring/namespace/JobBeanDefinitionParser.java b/elastic-job-spring/src/main/java/com/dangdang/ddframe/job/spring/namespace/JobBeanDefinitionParser.java index 5dcbf2bed6..7a551a4231 100644 --- a/elastic-job-spring/src/main/java/com/dangdang/ddframe/job/spring/namespace/JobBeanDefinitionParser.java +++ b/elastic-job-spring/src/main/java/com/dangdang/ddframe/job/spring/namespace/JobBeanDefinitionParser.java @@ -60,6 +60,7 @@ private String createJobConfiguration(final Element element, final ParserContext addPropertyValueIfNotEmpty("fetchDataCount", element, factory); addPropertyValueIfNotEmpty("failover", element, factory); addPropertyValueIfNotEmpty("misfire", element, factory); + addPropertyValueIfNotEmpty("jobShardingStrategyClass", element, factory); addPropertyValueIfNotEmpty("description", element, factory); addPropertyValueIfNotEmpty("disabled", element, factory); addPropertyValueIfNotEmpty("overwrite", element, factory); diff --git a/elastic-job-spring/src/main/resources/META-INF/namespace/job.xsd b/elastic-job-spring/src/main/resources/META-INF/namespace/job.xsd index 24af30e494..693bb232c7 100644 --- a/elastic-job-spring/src/main/resources/META-INF/namespace/job.xsd +++ b/elastic-job-spring/src/main/resources/META-INF/namespace/job.xsd @@ -23,6 +23,7 @@ +