Skip to content

Commit

Permalink
fix #16
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Feb 5, 2016
1 parent d9d56ff commit e4a7466
Show file tree
Hide file tree
Showing 45 changed files with 392 additions and 199 deletions.
20 changes: 15 additions & 5 deletions elastic-job-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,22 @@
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
</dependency>

<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-test</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<dependency>
<groupId>org.unitils</groupId>
<artifactId>unitils-core</artifactId>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,12 @@ private void executeJobInternal(final JobExecutionMultipleShardingContext shardi
} catch (final Exception ex) {
//CHECKSTYLE:ON
handleJobExecutionException(new JobExecutionException(ex));
}
// TODO 如果抛出异常则作业状态将不正确,待考虑如何保持作业完整性
executionService.registerJobCompleted(shardingContext);
if (configService.isFailover()) {
failoverService.updateFailoverComplete(shardingContext.getShardingItems());
} finally {
// TODO 考虑增加作业失败的状态,并且考虑如何处理作业失败的整体回路
executionService.registerJobCompleted(shardingContext);
if (configService.isFailover()) {
failoverService.updateFailoverComplete(shardingContext.getShardingItems());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* </p>
*/

package com.dangdang.ddframe.test;
package com.dangdang.ddframe.reg.zookeeper;

import java.io.File;
import java.io.IOException;
Expand All @@ -24,39 +24,27 @@

import org.apache.curator.test.TestingServer;

import com.dangdang.ddframe.reg.exception.RegExceptionHandler;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;

/**
* 启动用于测试的内嵌Zookeeper服务.
* 内嵌的Zookeeper服务器.
*
* <p>
* 可以根据不同的端口号启动多个Zookeeper服务.
* 但每个相同的端口号共用一个服务实例.
* </p>
*
* <p>
* 整个测试结束后, 随着JVM的关闭而关闭内嵌Zookeeper服务器.
* 之所以不调用close方法关闭Zookeeper容器并清理资源, 原因是Curator连接时会不停的扫描Zookeeper, 如果Zookeeper先于Curator关闭, Curator会不停的重连Zookeeper容器, 导致测试用例不能继续进行.
* 所以只能采用这种方式关闭, 目前已知的问题是: 测试的文件夹test_zk_data不能在测试结束后自动删除.
* </p>
*
* @author zhangliang
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class NestedZookeeperServers {

/**
* 内嵌Zookeeper的连接地址.
*/
public static final String ZK_CONNECTION_STRING = String.format("localhost:%s", NestedZookeeperServers.DEFAULT_PORT);

private static final int DEFAULT_PORT = 3181;

private static final String TEST_TEMP_DIRECTORY = String.format("target/test_zk_data/%s/", System.nanoTime());

private static NestedZookeeperServers instance = new NestedZookeeperServers();

private static ConcurrentMap<Integer, TestingServer> testingServers = new ConcurrentHashMap<>();

private NestedZookeeperServers() {
}
private static ConcurrentMap<Integer, TestingServer> nestedServers = new ConcurrentHashMap<>();

/**
* 获取单例实例.
Expand All @@ -68,39 +56,44 @@ public static NestedZookeeperServers getInstance() {
}

/**
* 启动内嵌的端口号为3181的Zookeeper服务.
* 启动内嵌的Zookeeper服务.
*
* @param port 端口号
*
* <p>
* 如果该端口号的Zookeeper服务未启动, 则启动服务.
* 如果该端口号的Zookeeper服务已启动, 则不做任何操作.
* </p>
*/
public void startServerIfNotStarted() {
startServerIfNotStarted(DEFAULT_PORT);
}

private synchronized void startServerIfNotStarted(final int port) {
if (!testingServers.containsKey(port)) {
TestingServer testingServer;
public synchronized void startServerIfNotStarted(final int port, final String dataDir) {
if (!nestedServers.containsKey(port)) {
TestingServer testingServer = null;
try {
testingServer = new TestingServer(port, new File(TEST_TEMP_DIRECTORY + port));
testingServer = new TestingServer(port, new File(dataDir));
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
throw new TestEnvironmentException(ex);
RegExceptionHandler.handleException(ex);
}
testingServers.putIfAbsent(port, testingServer);
nestedServers.putIfAbsent(port, testingServer);
}
}

/**
* 关闭内嵌的端口号为3181的Zookeeper服务.
* 关闭内嵌的Zookeeper服务.
*
* @param port 端口号
*/
public void closeServer() {
public void closeServer(final int port) {
TestingServer nestedServer = nestedServers.get(port);
if (null == nestedServer) {
return;
}
try {
testingServers.get(DEFAULT_PORT).close();
nestedServer.close();
nestedServers.remove(port);
} catch (final IOException ex) {
throw new TestEnvironmentException(ex);
RegExceptionHandler.handleException(ex);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import lombok.Setter;

import com.dangdang.ddframe.reg.base.AbstractRegistryCenterConfiguration;
import com.google.common.base.Strings;

/**
* 基于Zookeeper的注册中心配置.
Expand Down Expand Up @@ -81,6 +82,17 @@ public class ZookeeperConfiguration extends AbstractRegistryCenterConfiguration
*/
private String digest;

/**
* 内嵌Zookeeper的端口号.
* -1表示不开启内嵌Zookeeper.
*/
private int nestedPort = -1;

/**
* 内嵌Zookeeper的数据存储路径.
*/
private String nestedDataDir;

/**
* 包含了必需属性的构造器.
*
Expand All @@ -97,4 +109,13 @@ public ZookeeperConfiguration(final String serverLists, final String namespace,
this.maxSleepTimeMilliseconds = maxSleepTimeMilliseconds;
this.maxRetries = maxRetries;
}

/**
* 判断是否需要开启内嵌Zookeeper.
*
* @return 是否需要开启内嵌Zookeeper
*/
public boolean isUseNestedZookeeper() {
return -1 != nestedPort && !Strings.isNullOrEmpty(nestedDataDir);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ public ZookeeperRegistryCenter(final ZookeeperConfiguration zkConfig) {
}

public void init() {
if (zkConfig.isUseNestedZookeeper()) {
NestedZookeeperServers.getInstance().startServerIfNotStarted(zkConfig.getNestedPort(), zkConfig.getNestedDataDir());
}
log.debug("Elastic job: zookeeper registry center init, server lists is: {}.", zkConfig.getServerLists());
Builder builder = CuratorFrameworkFactory.builder()
.connectString(zkConfig.getServerLists())
Expand Down Expand Up @@ -143,6 +146,9 @@ public void close() {
}
waitForCacheClose();
CloseableUtils.closeQuietly(client);
if (zkConfig.isUseNestedZookeeper()) {
NestedZookeeperServers.getInstance().closeServer(zkConfig.getNestedPort());
}
}

/* TODO 等待500ms, cache先关闭再关闭client, 否则会抛异常
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@
import org.junit.runners.Suite.SuiteClasses;

import com.dangdang.ddframe.job.AllJobTests;
import com.dangdang.ddframe.reg.AbstractNestedZookeeperBaseTest;
import com.dangdang.ddframe.reg.AllRegTests;
import com.dangdang.ddframe.test.NestedZookeeperServers;
import com.dangdang.ddframe.reg.zookeeper.NestedZookeeperServers;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
Expand All @@ -39,6 +40,6 @@ public final class AllTests {

@AfterClass
public static void clear() {
NestedZookeeperServers.getInstance().closeServer();
NestedZookeeperServers.getInstance().closeServer(AbstractNestedZookeeperBaseTest.PORT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,20 @@
import com.dangdang.ddframe.job.internal.schedule.JobRegistry;
import com.dangdang.ddframe.job.internal.server.ServerStatus;
import com.dangdang.ddframe.job.internal.statistics.ProcessCountStatistics;
import com.dangdang.ddframe.reg.AbstractNestedZookeeperBaseTest;
import com.dangdang.ddframe.reg.base.CoordinatorRegistryCenter;
import com.dangdang.ddframe.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.reg.zookeeper.ZookeeperRegistryCenter;
import com.dangdang.ddframe.test.NestedZookeeperServers;

import lombok.AccessLevel;
import lombok.Getter;

public abstract class AbstractBaseStdJobTest {
public abstract class AbstractBaseStdJobTest extends AbstractNestedZookeeperBaseTest {

protected static final CoordinatorRegistryCenter REG_CENTER = new ZookeeperRegistryCenter(
new ZookeeperConfiguration(NestedZookeeperServers.ZK_CONNECTION_STRING, "zkRegTestCenter", 1000, 3000, 3));
private static ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(ZK_CONNECTION_STRING, "zkRegTestCenter", 1000, 3000, 3);

@Getter(value = AccessLevel.PROTECTED)
private static CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zkConfig);

@Getter(AccessLevel.PROTECTED)
private final LocalHostService localHostService = new LocalHostService();
Expand All @@ -68,24 +70,25 @@ public abstract class AbstractBaseStdJobTest {

protected AbstractBaseStdJobTest(final Class<? extends ElasticJob> elasticJobClass, final boolean disabled) {
jobConfig = new JobConfiguration(jobName, elasticJobClass, 3, "0/1 * * * * ?");
jobScheduler = new JobScheduler(REG_CENTER, jobConfig);
jobScheduler = new JobScheduler(regCenter, jobConfig);
this.disabled = disabled;
monitorPort = -1;
leaderElectionService = new LeaderElectionService(REG_CENTER, jobConfig);
leaderElectionService = new LeaderElectionService(regCenter, jobConfig);
}

protected AbstractBaseStdJobTest(final Class<? extends ElasticJob> elasticJobClass, final int monitorPort) {
jobConfig = new JobConfiguration(jobName, elasticJobClass, 3, "0/1 * * * * ?");
jobScheduler = new JobScheduler(REG_CENTER, jobConfig);
jobScheduler = new JobScheduler(regCenter, jobConfig);
disabled = false;
this.monitorPort = monitorPort;
leaderElectionService = new LeaderElectionService(REG_CENTER, jobConfig);
leaderElectionService = new LeaderElectionService(regCenter, jobConfig);
}

@BeforeClass
public static void init() {
NestedZookeeperServers.getInstance().startServerIfNotStarted();
REG_CENTER.init();
zkConfig.setNestedPort(PORT);
zkConfig.setNestedDataDir(TEST_TEMP_DIRECTORY);
regCenter.init();
}

@Before
Expand All @@ -95,7 +98,7 @@ public void setUp() {
jobConfig.setDisabled(disabled);
jobConfig.setMonitorPort(monitorPort);
jobConfig.setOverwrite(true);
REG_CENTER.init();
regCenter.init();
}

@After
Expand All @@ -113,19 +116,19 @@ protected void initJob() {
}

protected void assertRegCenterCommonInfo() {
assertThat(REG_CENTER.get("/" + jobName + "/leader/election/host"), is(localHostService.getIp()));
assertThat(REG_CENTER.get("/" + jobName + "/config/shardingTotalCount"), is("3"));
assertThat(REG_CENTER.get("/" + jobName + "/config/shardingItemParameters"), is("0=A,1=B,2=C"));
assertThat(REG_CENTER.get("/" + jobName + "/config/cron"), is("0/1 * * * * ?"));
assertThat(REG_CENTER.get("/" + jobName + "/servers/" + localHostService.getIp() + "/hostName"), is(localHostService.getHostName()));
assertThat(regCenter.get("/" + jobName + "/leader/election/host"), is(localHostService.getIp()));
assertThat(regCenter.get("/" + jobName + "/config/shardingTotalCount"), is("3"));
assertThat(regCenter.get("/" + jobName + "/config/shardingItemParameters"), is("0=A,1=B,2=C"));
assertThat(regCenter.get("/" + jobName + "/config/cron"), is("0/1 * * * * ?"));
assertThat(regCenter.get("/" + jobName + "/servers/" + localHostService.getIp() + "/hostName"), is(localHostService.getHostName()));
if (disabled) {
assertTrue(REG_CENTER.isExisted("/" + jobName + "/servers/" + localHostService.getIp() + "/disabled"));
assertTrue(regCenter.isExisted("/" + jobName + "/servers/" + localHostService.getIp() + "/disabled"));
} else {
assertFalse(REG_CENTER.isExisted("/" + jobName + "/servers/" + localHostService.getIp() + "/disabled"));
assertFalse(regCenter.isExisted("/" + jobName + "/servers/" + localHostService.getIp() + "/disabled"));
}
assertFalse(REG_CENTER.isExisted("/" + jobName + "/servers/" + localHostService.getIp() + "/stoped"));
assertThat(REG_CENTER.get("/" + jobName + "/servers/" + localHostService.getIp() + "/status"), is(ServerStatus.READY.name()));
REG_CENTER.remove("/" + jobName + "/leader/election");
assertFalse(regCenter.isExisted("/" + jobName + "/servers/" + localHostService.getIp() + "/stoped"));
assertThat(regCenter.get("/" + jobName + "/servers/" + localHostService.getIp() + "/status"), is(ServerStatus.READY.name()));
regCenter.remove("/" + jobName + "/leader/election");
assertTrue(leaderElectionService.isLeader());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,18 @@
* </p>
*/

package com.dangdang.ddframe.test;
package com.dangdang.ddframe.job.integrate;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;

@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class WaitingUtils {

private WaitingUtils() {
}

public static void waitingShortTime() {
sleep(300L);
}

public static void waitingLongTime() {
sleep(1500L);
}

private static void sleep(final long millis) {
try {
Thread.sleep(millis);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@

import com.dangdang.ddframe.job.api.JobConfiguration;
import com.dangdang.ddframe.job.integrate.AbstractBaseStdJobAutoInitTest;
import com.dangdang.ddframe.job.integrate.WaitingUtils;
import com.dangdang.ddframe.job.integrate.fixture.dataflow.sequence.OneOffSequenceDataFlowElasticJob;
import com.dangdang.ddframe.job.internal.statistics.ProcessCountStatistics;
import com.dangdang.ddframe.test.WaitingUtils;

public final class OneOffSequenceDataFlowElasticJobTest extends AbstractBaseStdJobAutoInitTest {

Expand All @@ -53,7 +53,7 @@ public void assertJobInit() {
while (!OneOffSequenceDataFlowElasticJob.isCompleted()) {
WaitingUtils.waitingShortTime();
}
assertTrue(REG_CENTER.isExisted("/" + getJobName() + "/execution"));
assertTrue(getRegCenter().isExisted("/" + getJobName() + "/execution"));
assertThat(ProcessCountStatistics.getProcessSuccessCount(getJobName()), is(30));
assertThat(ProcessCountStatistics.getProcessFailureCount(getJobName()), is(0));
}
Expand Down
Loading

0 comments on commit e4a7466

Please sign in to comment.