Skip to content

Commit

Permalink
1. put clean sharding info in one transaction
Browse files Browse the repository at this point in the history
2. put allocate sharding info and clean sharding flag in one transaction
  • Loading branch information
terrymanu committed Mar 3, 2016
1 parent 54a6d54 commit 7db6392
Show file tree
Hide file tree
Showing 12 changed files with 287 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@

package com.dangdang.ddframe.job.internal.sharding;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import com.dangdang.ddframe.job.api.JobConfiguration;
import com.dangdang.ddframe.job.internal.config.ConfigurationService;
import com.dangdang.ddframe.job.internal.election.LeaderElectionService;
Expand All @@ -31,12 +26,20 @@
import com.dangdang.ddframe.job.internal.sharding.strategy.JobShardingStrategy;
import com.dangdang.ddframe.job.internal.sharding.strategy.JobShardingStrategyFactory;
import com.dangdang.ddframe.job.internal.sharding.strategy.JobShardingStrategyOption;
import com.dangdang.ddframe.job.internal.storage.JobNodePath;
import com.dangdang.ddframe.job.internal.storage.JobNodeStorage;
import com.dangdang.ddframe.job.internal.storage.TransactionExecutionCallback;
import com.dangdang.ddframe.job.internal.util.BlockUtils;
import com.dangdang.ddframe.job.internal.util.ItemUtils;
import com.dangdang.ddframe.reg.base.CoordinatorRegistryCenter;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

/**
* 作业分片服务.
Expand All @@ -59,6 +62,8 @@ public class ShardingService {
private final ServerService serverService;

private final ExecutionService executionService;

private final JobNodePath jobNodePath;

public ShardingService(final CoordinatorRegistryCenter coordinatorRegistryCenter, final JobConfiguration jobConfiguration) {
jobName = jobConfiguration.getJobName();
Expand All @@ -67,6 +72,7 @@ public ShardingService(final CoordinatorRegistryCenter coordinatorRegistryCenter
configService = new ConfigurationService(coordinatorRegistryCenter, jobConfiguration);
serverService = new ServerService(coordinatorRegistryCenter, jobConfiguration);
executionService = new ExecutionService(coordinatorRegistryCenter, jobConfiguration);
jobNodePath = new JobNodePath(jobConfiguration.getJobName());
}

/**
Expand Down Expand Up @@ -100,12 +106,10 @@ public void shardingIfNecessary() {
}
log.debug("Elastic job: sharding begin.");
jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");
clearShardingInfo();
jobNodeStorage.executeInTransaction(new ClearShardingInfoInfoTransactionExecutionCallback());
JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(configService.getJobShardingStrategyClass());
JobShardingStrategyOption option = new JobShardingStrategyOption(jobName, configService.getShardingTotalCount(), configService.getShardingItemParameters());
persistShardingInfo(jobShardingStrategy.sharding(serverService.getAvailableServers(), option));
jobNodeStorage.removeJobNodeIfExisted(ShardingNode.NECESSARY);
jobNodeStorage.removeJobNodeIfExisted(ShardingNode.PROCESSING);
jobNodeStorage.executeInTransaction(new PersistShardingInfoTransactionExecutionCallback(jobShardingStrategy.sharding(serverService.getAvailableServers(), option)));
log.debug("Elastic job: sharding completed.");
}

Expand All @@ -123,18 +127,6 @@ private void waitingOtherJobCompleted() {
}
}

private void clearShardingInfo() {
for (String each : serverService.getAllServers()) {
jobNodeStorage.removeJobNodeIfExisted(ShardingNode.getShardingNode(each));
}
}

private void persistShardingInfo(final Map<String, List<Integer>> shardingItems) {
for (Entry<String, List<Integer>> entry : shardingItems.entrySet()) {
jobNodeStorage.replaceJobNode(ShardingNode.getShardingNode(entry.getKey()), ItemUtils.toItemsString(entry.getValue()));
}
}

/**
* 获取运行在本作业服务器的分片序列号.
*
Expand All @@ -147,4 +139,30 @@ public List<Integer> getLocalHostShardingItems() {
}
return ItemUtils.toItemList(jobNodeStorage.getJobNodeDataDirectly(ShardingNode.getShardingNode(ip)));
}

class ClearShardingInfoInfoTransactionExecutionCallback implements TransactionExecutionCallback {

@Override
public void execute(final CuratorTransactionFinal curatorTransactionFinal) throws Exception {
for (String each : serverService.getAllServers()) {
String shardingNode = jobNodePath.getFullPath(ShardingNode.getShardingNode(each));
curatorTransactionFinal.check().forPath(shardingNode).and().delete().forPath(shardingNode).and();
}
}
}

@RequiredArgsConstructor
class PersistShardingInfoTransactionExecutionCallback implements TransactionExecutionCallback {

private final Map<String, List<Integer>> shardingItems;

@Override
public void execute(final CuratorTransactionFinal curatorTransactionFinal) throws Exception {
for (Entry<String, List<Integer>> entry : shardingItems.entrySet()) {
curatorTransactionFinal.create().forPath(jobNodePath.getFullPath(ShardingNode.getShardingNode(entry.getKey())), ItemUtils.toItemsString(entry.getValue()).getBytes()).and();
}
curatorTransactionFinal.delete().forPath(jobNodePath.getFullPath(ShardingNode.NECESSARY)).and();
curatorTransactionFinal.delete().forPath(jobNodePath.getFullPath(ShardingNode.PROCESSING)).and();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,19 @@

package com.dangdang.ddframe.job.internal.storage;

import java.util.List;

import com.dangdang.ddframe.job.api.JobConfiguration;
import com.dangdang.ddframe.job.exception.JobException;
import com.dangdang.ddframe.reg.base.CoordinatorRegistryCenter;
import com.dangdang.ddframe.reg.exception.RegExceptionHandler;
import lombok.Getter;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.state.ConnectionStateListener;

import com.dangdang.ddframe.job.api.JobConfiguration;
import com.dangdang.ddframe.job.exception.JobException;
import com.dangdang.ddframe.reg.base.CoordinatorRegistryCenter;

import lombok.Getter;
import java.util.List;

/**
* 作业节点数据访问类.
Expand Down Expand Up @@ -158,6 +158,23 @@ public void updateJobNode(final String node, final Object value) {
public void replaceJobNode(final String node, final Object value) {
coordinatorRegistryCenter.persist(jobNodePath.getFullPath(node), value.toString());
}

/**
* 在事务中执行操作.
*
* @param callback 执行操作的回调
*/
public void executeInTransaction(final TransactionExecutionCallback callback) {
try {
CuratorTransactionFinal curatorTransactionFinal = getClient().inTransaction().check().forPath("/").and();
callback.execute(curatorTransactionFinal);
curatorTransactionFinal.commit();
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
RegExceptionHandler.handleException(ex);
}
}

/**
* 在主节点执行操作.
Expand All @@ -173,11 +190,15 @@ public void executeInLeader(final String latchNode, final LeaderExecutionCallbac
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
if (ex instanceof InterruptedException) {
Thread.currentThread().interrupt();
} else {
throw new JobException(ex);
}
handleException(ex);
}
}

private void handleException(final Exception ex) {
if (ex instanceof InterruptedException) {
Thread.currentThread().interrupt();
} else {
throw new JobException(ex);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/

package com.dangdang.ddframe.job.internal.storage;

import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;

/**
* 事务执行操作的回调接口.
*
* @author zhangliang
*/
public interface TransactionExecutionCallback {

/**
* 事务执行的回调方法.
*
* @param curatorTransactionFinal 执行事务的上下文
* @throws Exception 处理中异常
*/
void execute(CuratorTransactionFinal curatorTransactionFinal) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public ZookeeperRegistryCenter(final ZookeeperConfiguration zkConfig) {
this.zkConfig = zkConfig;
}

@Override
public void init() {
if (zkConfig.isUseNestedZookeeper()) {
NestedZookeeperServers.getInstance().startServerIfNotStarted(zkConfig.getNestedPort(), zkConfig.getNestedDataDir());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@
@RunWith(Suite.class)
@SuiteClasses({
JobExecutionMultipleShardingContextTest.class,
JobSchedulerTest.class,
JobNodeStorageTest.class
JobSchedulerTest.class
})
public final class AllApiTests {
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@

package com.dangdang.ddframe.job.internal;

import org.junit.runner.RunWith;
import org.junit.runners.Suite;
import org.junit.runners.Suite.SuiteClasses;

import com.dangdang.ddframe.job.internal.config.ConfigurationListenerManagerTest;
import com.dangdang.ddframe.job.internal.config.ConfigurationNodeTest;
import com.dangdang.ddframe.job.internal.config.ConfigurationServiceTest;
Expand Down Expand Up @@ -54,12 +50,17 @@
import com.dangdang.ddframe.job.internal.statistics.ProcessCountStatisticsTest;
import com.dangdang.ddframe.job.internal.statistics.StatisticsServiceTest;
import com.dangdang.ddframe.job.internal.storage.JobNodePathTest;
import com.dangdang.ddframe.job.internal.storage.JobNodeStorageTest;
import com.dangdang.ddframe.job.internal.util.ItemUtilsTest;
import com.dangdang.ddframe.job.internal.util.SensitiveInfoUtilsTest;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
import org.junit.runners.Suite.SuiteClasses;

@RunWith(Suite.class)
@SuiteClasses({
JobNodePathTest.class,
JobNodeStorageTest.class,
ItemUtilsTest.class,
SensitiveInfoUtilsTest.class,
LocalHostServiceTest.class,
Expand Down
Loading

0 comments on commit 7db6392

Please sign in to comment.