Skip to content

Commit

Permalink
[功能提升]分片排序策略可配置化 #9
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Oct 14, 2015
1 parent 2d1e3de commit fe6bb40
Show file tree
Hide file tree
Showing 20 changed files with 232 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,7 @@ public final class JobSettings implements Serializable {

private boolean misfire;

private String jobShardingStrategyClass;

private String description;
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public JobSettings getJobSettings(final String jobName) {
result.setFetchDataCount(Integer.parseInt(curatorRepository.getData(JobNodePath.getConfigNodePath(jobName, "fetchDataCount"))));
result.setFailover(Boolean.valueOf(curatorRepository.getData(JobNodePath.getConfigNodePath(jobName, "failover"))));
result.setMisfire(Boolean.valueOf(curatorRepository.getData(JobNodePath.getConfigNodePath(jobName, "misfire"))));
result.setJobShardingStrategyClass(curatorRepository.getData(JobNodePath.getConfigNodePath(jobName, "jobShardingStrategyClass")));
result.setDescription(curatorRepository.getData(JobNodePath.getConfigNodePath(jobName, "description")));
return result;
}
Expand All @@ -115,6 +116,7 @@ public void updateJobSettings(final JobSettings jobSettings) {
updateIfchanged(jobSettings.getJobName(), "fetchDataCount", jobSettings.getFetchDataCount());
updateIfchanged(jobSettings.getJobName(), "failover", jobSettings.isFailover());
updateIfchanged(jobSettings.getJobName(), "misfire", jobSettings.isMisfire());
updateIfchanged(jobSettings.getJobName(), "jobShardingStrategyClass", jobSettings.getJobShardingStrategyClass());
updateIfchanged(jobSettings.getJobName(), "description", jobSettings.getDescription());
}

Expand Down
4 changes: 3 additions & 1 deletion elastic-job-console/src/main/webapp/js/job_detail.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ function renderSettings() {
$("#processCountIntervalSeconds").attr("value", data.processCountIntervalSeconds);
$("#concurrentDataProcessThreadCount").attr("value", data.concurrentDataProcessThreadCount);
$("#fetchDataCount").attr("value", data.fetchDataCount);
$("#jobShardingStrategyClass").attr("value", data.jobShardingStrategyClass);
$("#description").text(data.description);
if (!data.monitorExecution) {
$("#execution_info_tab").addClass("disabled");
Expand All @@ -51,8 +52,9 @@ function bindSubmitJobSettingsForm() {
var failover = $("#failover").prop("checked");
var misfire = $("#misfire").prop("checked");
var shardingItemParameters = $("#shardingItemParameters").val();
var jobShardingStrategyClass = $("#jobShardingStrategyClass").val();
var description = $("#description").val();
$.post("job/settings", {jobName: jobName, jobClass : jobClass, shardingTotalCount: shardingTotalCount, jobParameter: jobParameter, cron: cron, concurrentDataProcessThreadCount: concurrentDataProcessThreadCount, processCountIntervalSeconds: processCountIntervalSeconds, fetchDataCount: fetchDataCount, monitorExecution: monitorExecution, failover: failover, misfire: misfire,shardingItemParameters: shardingItemParameters, description: description}, function(data) {
$.post("job/settings", {jobName: jobName, jobClass : jobClass, shardingTotalCount: shardingTotalCount, jobParameter: jobParameter, cron: cron, concurrentDataProcessThreadCount: concurrentDataProcessThreadCount, processCountIntervalSeconds: processCountIntervalSeconds, fetchDataCount: fetchDataCount, monitorExecution: monitorExecution, failover: failover, misfire: misfire, shardingItemParameters: shardingItemParameters, jobShardingStrategyClass: jobShardingStrategyClass, description: description}, function(data) {
showSuccessDialog();
if (monitorExecution) {
$("#execution_info_tab").removeClass("disabled");
Expand Down
7 changes: 7 additions & 0 deletions elastic-job-console/src/main/webapp/templates/job_detail.ftl
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@
</div>
</div>

<div class="form-group">
<label for="jobShardingStrategyClass" class="col-sm-2 control-label">作业分片策略实现类全路径</label>
<div class="col-sm-9">
<input type="text" id="jobShardingStrategyClass" name="jobShardingStrategyClass" class="form-control" data-toggle="tooltip" data-placement="bottom" title="默认使用按照IP地址顺序分片策略,可参照文档定制化分片策略" />
</div>
</div>

<div class="form-group">
<label for="description" class="col-sm-2 control-label">作业描述信息</label>
<div class="col-sm-9">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,15 @@ public class JobConfiguration {
*/
private boolean misfire = true;

/**
* 作业分片策略实现类全路径.
*
* <p>
* 默认使用{@code com.dangdang.ddframe.job.internal.sharding.strategy.AverageAllocationJobShardingStrategy}.
* </p>
*/
private String jobShardingStrategyClass = "";

/**
* 作业描述信息.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/**
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/

package com.dangdang.ddframe.job.exception;

/**
* 作业分片策略类配置错误抛出的异常.
*
* @author zhangliang
*/
public final class JobShardingStrategyClassConfigurationException extends JobException {

private static final long serialVersionUID = -5017090222608389272L;

public JobShardingStrategyClassConfigurationException(final Exception cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public final class ConfigurationNode {

static final String MISFIRE = ROOT + "/misfire";

static final String JOB_SHARDING_STRATEGY_CLASS = ROOT + "/jobShardingStrategyClass";

static final String DESCRIPTION = ROOT + "/description";

private final JobNodePath jobNodePath;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ private void registerJobInfo() {
jobNodeStorage.fillJobNodeIfNullOrOverwrite(ConfigurationNode.FETCH_DATA_COUNT, jobNodeStorage.getJobConfiguration().getFetchDataCount());
jobNodeStorage.fillJobNodeIfNullOrOverwrite(ConfigurationNode.FAILOVER, jobNodeStorage.getJobConfiguration().isFailover());
jobNodeStorage.fillJobNodeIfNullOrOverwrite(ConfigurationNode.MISFIRE, jobNodeStorage.getJobConfiguration().isMisfire());
jobNodeStorage.fillJobNodeIfNullOrOverwrite(ConfigurationNode.JOB_SHARDING_STRATEGY_CLASS, jobNodeStorage.getJobConfiguration().getJobShardingStrategyClass());
jobNodeStorage.fillJobNodeIfNullOrOverwrite(ConfigurationNode.DESCRIPTION, jobNodeStorage.getJobConfiguration().getDescription());
}

Expand Down Expand Up @@ -180,4 +181,13 @@ public boolean isFailover() {
public boolean isMisfire() {
return Boolean.valueOf(jobNodeStorage.getJobNodeData(ConfigurationNode.MISFIRE));
}

/**
* 获取作业分片策略实现类全路径.
*
* @return 作业分片策略实现类全路径
*/
public String getJobShardingStrategyClass() {
return jobNodeStorage.getJobNodeData(ConfigurationNode.JOB_SHARDING_STRATEGY_CLASS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,23 @@
import java.util.Map;
import java.util.Map.Entry;

import lombok.extern.slf4j.Slf4j;

import com.dangdang.ddframe.job.api.JobConfiguration;
import com.dangdang.ddframe.job.internal.config.ConfigurationService;
import com.dangdang.ddframe.job.internal.election.LeaderElectionService;
import com.dangdang.ddframe.job.internal.env.LocalHostService;
import com.dangdang.ddframe.job.internal.env.RealLocalHostService;
import com.dangdang.ddframe.job.internal.execution.ExecutionService;
import com.dangdang.ddframe.job.internal.server.ServerService;
import com.dangdang.ddframe.job.internal.sharding.strategy.JobShardingStrategy;
import com.dangdang.ddframe.job.internal.sharding.strategy.JobShardingStrategyFactory;
import com.dangdang.ddframe.job.internal.sharding.strategy.JobShardingStrategyOption;
import com.dangdang.ddframe.job.internal.storage.JobNodeStorage;
import com.dangdang.ddframe.job.internal.util.BlockUtils;
import com.dangdang.ddframe.job.internal.util.ItemUtils;
import com.dangdang.ddframe.reg.base.CoordinatorRegistryCenter;

import lombok.extern.slf4j.Slf4j;

/**
* 作业分片服务.
*
Expand All @@ -44,6 +47,8 @@
@Slf4j
public final class ShardingService {

private final String jobName;

private final JobNodeStorage jobNodeStorage;

private final LocalHostService localHostService = new RealLocalHostService();
Expand All @@ -57,6 +62,7 @@ public final class ShardingService {
private final ExecutionService executionService;

public ShardingService(final CoordinatorRegistryCenter coordinatorRegistryCenter, final JobConfiguration jobConfiguration) {
jobName = jobConfiguration.getJobName();
jobNodeStorage = new JobNodeStorage(coordinatorRegistryCenter, jobConfiguration);
leaderElectionService = new LeaderElectionService(coordinatorRegistryCenter, jobConfiguration);
configService = new ConfigurationService(coordinatorRegistryCenter, jobConfiguration);
Expand Down Expand Up @@ -96,8 +102,9 @@ public void shardingIfNecessary() {
log.debug("Elastic job: sharding begin.");
jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");
clearShardingInfo();
JobShardingStrategy jobShardingStrategy = new AverageAllocationJobShardingStrategy();
persistShardingInfo(jobShardingStrategy.sharding(serverService.getAvailableServers(), configService.getShardingTotalCount()));
JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(configService.getJobShardingStrategyClass());
JobShardingStrategyOption option = new JobShardingStrategyOption(jobName, configService.getShardingTotalCount(), configService.getShardingItemParameters());
persistShardingInfo(jobShardingStrategy.sharding(serverService.getAvailableServers(), option));
jobNodeStorage.removeJobNodeIfExisted(ShardingNode.NECESSARY);
jobNodeStorage.removeJobNodeIfExisted(ShardingNode.PROCESSING);
log.debug("Elastic job: sharding completed.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* </p>
*/

package com.dangdang.ddframe.job.internal.sharding;
package com.dangdang.ddframe.job.internal.sharding.strategy;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -40,12 +40,12 @@
public final class AverageAllocationJobShardingStrategy implements JobShardingStrategy {

@Override
public Map<String, List<Integer>> sharding(final List<String> serversList, final int shardingTotalCount) {
public Map<String, List<Integer>> sharding(final List<String> serversList, final JobShardingStrategyOption option) {
if (serversList.isEmpty()) {
return Collections.emptyMap();
}
Map<String, List<Integer>> result = shardingAliquot(serversList, shardingTotalCount);
addAliquant(serversList, shardingTotalCount, result);
Map<String, List<Integer>> result = shardingAliquot(serversList, option.getShardingTotalCount());
addAliquant(serversList, option.getShardingTotalCount(), result);
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
* </p>
*/

package com.dangdang.ddframe.job.internal.sharding;
package com.dangdang.ddframe.job.internal.sharding.strategy;

import java.util.List;
import java.util.Map;

/**
* 作业的分片策略.
* 作业分片策略.
*
* @author zhangliang
*/
Expand All @@ -31,8 +31,8 @@ public interface JobShardingStrategy {
* 进行作业分片.
*
* @param serversList 所有参与分片的服务器列表
* @param shardingTotalCount 分片总数量
* @param option 作业分片策略选项
* @return 分配分片的服务器IP和分片集合的映射
*/
Map<String, List<Integer>> sharding(List<String> serversList, int shardingTotalCount);
Map<String, List<Integer>> sharding(List<String> serversList, JobShardingStrategyOption option);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.dangdang.ddframe.job.internal.sharding.strategy;

import com.dangdang.ddframe.job.exception.JobShardingStrategyClassConfigurationException;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;

/**
* 作业分片策略工厂.
*
* @author zhangliang
*/
public final class JobShardingStrategyFactory {

private JobShardingStrategyFactory() {
}

/**
* 获取 作业分片策略实例.
*
* @param jobShardingStrategyClassName 作业分片策略类名
* @return 作业分片策略实例
*/
public static JobShardingStrategy getStrategy(final String jobShardingStrategyClassName) {
if (Strings.isNullOrEmpty(jobShardingStrategyClassName)) {
return new AverageAllocationJobShardingStrategy();
}
try {
Class<?> jobShardingStrategyClass = Class.forName(jobShardingStrategyClassName);
Preconditions.checkState(JobShardingStrategy.class.isAssignableFrom(jobShardingStrategyClass), String.format("Class [%s] is not job strategy class", jobShardingStrategyClassName));
return (JobShardingStrategy) jobShardingStrategyClass.newInstance();
} catch (final ClassNotFoundException | InstantiationException | IllegalAccessException ex) {
throw new JobShardingStrategyClassConfigurationException(ex);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.dangdang.ddframe.job.internal.sharding.strategy;

import java.util.Map;

import lombok.Getter;
import lombok.RequiredArgsConstructor;

/**
* 作业分片策略选项.
*
* @author zhangliang
*/
@Getter
@RequiredArgsConstructor
public final class JobShardingStrategyOption {

/**
* 作业名称.
*/
private final String jobName;

/**
* 作业分片总数.
*/
private final int shardingTotalCount;

/**
* 分片序列号和个性化参数对照表.
*/
private final Map<Integer, String> shardingItemParameters;
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,11 @@
import com.dangdang.ddframe.job.internal.offset.OffsetServiceTest;
import com.dangdang.ddframe.job.internal.server.ServerNodeTest;
import com.dangdang.ddframe.job.internal.server.ServerServiceTest;
import com.dangdang.ddframe.job.internal.sharding.AverageAllocationJobShardingStrategyTest;
import com.dangdang.ddframe.job.internal.sharding.ShardingListenerManagerTest;
import com.dangdang.ddframe.job.internal.sharding.ShardingNodeTest;
import com.dangdang.ddframe.job.internal.sharding.ShardingServiceTest;
import com.dangdang.ddframe.job.internal.sharding.strategy.AverageAllocationJobShardingStrategyTest;
import com.dangdang.ddframe.job.internal.sharding.strategy.JobShardingStrategyFactoryTest;
import com.dangdang.ddframe.job.internal.statistics.ProcessCountJobTest;
import com.dangdang.ddframe.job.internal.statistics.ProcessCountStatisticsTest;
import com.dangdang.ddframe.job.internal.storage.JobNodePathTest;
Expand Down Expand Up @@ -80,6 +81,7 @@
FailoverListenerManagerTest.class,
OffsetServiceTest.class,
OffsetNodeTest.class,
JobShardingStrategyFactoryTest.class,
AverageAllocationJobShardingStrategyTest.class,
ProcessCountJobTest.class,
ProcessCountStatisticsTest.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.dangdang.ddframe.job.exception.JobConflictException;
import com.dangdang.ddframe.job.exception.ShardingItemParametersException;
import com.dangdang.ddframe.job.internal.AbstractBaseJobTest;
import com.dangdang.ddframe.job.internal.sharding.strategy.JobShardingStrategy;

public final class ConfigurationServiceTest extends AbstractBaseJobTest {

Expand Down Expand Up @@ -109,6 +110,7 @@ private void assertJobConfiguration(final JobConfiguration jobConfiguration) {
assertThat(Integer.parseInt(getRegistryCenter().getDirectly("/testJob/config/fetchDataCount")), is(jobConfiguration.getFetchDataCount()));
assertThat(Boolean.valueOf(getRegistryCenter().getDirectly("/testJob/config/failover")), is(jobConfiguration.isFailover()));
assertThat(Boolean.valueOf(getRegistryCenter().getDirectly("/testJob/config/misfire")), is(jobConfiguration.isMisfire()));
assertThat(getRegistryCenter().getDirectly("/testJob/config/jobShardingStrategyClass"), is(jobConfiguration.getJobShardingStrategyClass()));
assertThat(getRegistryCenter().getDirectly("/testJob/config/description"), is(jobConfiguration.getDescription()));
}

Expand Down Expand Up @@ -217,6 +219,13 @@ public void assertIsMisfire() {
assertTrue(configService.isMisfire());
}

@Test
public void assertGetJobShardingStrategyClass() {
getJobConfig().setJobShardingStrategyClass(JobShardingStrategy.class.getName());
configService.persistJobConfiguration();
assertThat(configService.getJobShardingStrategyClass(), is(JobShardingStrategy.class.getName()));
}

class ConflictJob extends AbstractElasticJob {

@Override
Expand Down
Loading

0 comments on commit fe6bb40

Please sign in to comment.