Skip to content

Commit

Permalink
fix #87
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed May 12, 2016
1 parent 54af210 commit b9f49c2
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.dangdang.ddframe.job.internal.listener.AbstractJobListener;
import com.dangdang.ddframe.job.internal.listener.AbstractListenerManager;
import com.dangdang.ddframe.job.internal.server.ServerNode;
import com.dangdang.ddframe.job.internal.server.ServerService;
import com.dangdang.ddframe.reg.base.CoordinatorRegistryCenter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -38,13 +39,16 @@ public class ElectionListenerManager extends AbstractListenerManager {

private final LeaderElectionService leaderElectionService;

private final ServerService serverService;

private final ElectionNode electionNode;

private final ServerNode serverNode;

public ElectionListenerManager(final CoordinatorRegistryCenter coordinatorRegistryCenter, final JobConfiguration jobConfiguration) {
super(coordinatorRegistryCenter, jobConfiguration);
leaderElectionService = new LeaderElectionService(coordinatorRegistryCenter, jobConfiguration);
serverService = new ServerService(coordinatorRegistryCenter, jobConfiguration);
electionNode = new ElectionNode(jobConfiguration.getJobName());
serverNode = new ServerNode(jobConfiguration.getJobName());
}
Expand All @@ -59,13 +63,13 @@ class LeaderElectionJobListener extends AbstractJobListener {
@Override
protected void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path) {
EventHelper eventHelper = new EventHelper(path, event);
if ((eventHelper.isLeaderCrashed() || eventHelper.isServerEnabled() || eventHelper.isServerResumed()) && !leaderElectionService.hasLeader()) {
if (eventHelper.isLeaderCrashedOrServerOn() && !leaderElectionService.hasLeader() && !serverService.getAvailableServers().isEmpty()) {
log.debug("Elastic job: leader crashed, elect a new leader now.");
leaderElectionService.leaderElection();
log.debug("Elastic job: leader election completed.");
return;
}
if ((eventHelper.isServerDisabled() || eventHelper.isServerPaused() || eventHelper.isServerShutdown()) && leaderElectionService.isLeader()) {
if (eventHelper.isServerOff() && leaderElectionService.isLeader()) {
leaderElectionService.removeLeader();
}
}
Expand All @@ -76,28 +80,36 @@ final class EventHelper {
private final String path;

private final TreeCacheEvent event;

boolean isLeaderCrashed() {

boolean isLeaderCrashedOrServerOn() {
return isLeaderCrashed() || isServerEnabled() || isServerResumed();
}

private boolean isLeaderCrashed() {
return electionNode.isLeaderHostPath(path) && Type.NODE_REMOVED == event.getType();
}

boolean isServerEnabled() {
private boolean isServerEnabled() {
return serverNode.isLocalServerDisabledPath(path) && Type.NODE_REMOVED == event.getType();
}

boolean isServerResumed() {
private boolean isServerResumed() {
return serverNode.isLocalJobPausedPath(path) && Type.NODE_REMOVED == event.getType();
}

boolean isServerDisabled() {

boolean isServerOff() {
return isServerDisabled() || isServerPaused() || isServerShutdown();
}

private boolean isServerDisabled() {
return serverNode.isLocalServerDisabledPath(path) && Type.NODE_ADDED == event.getType();
}

boolean isServerPaused() {
private boolean isServerPaused() {
return serverNode.isLocalJobPausedPath(path) && Type.NODE_ADDED == event.getType();
}

boolean isServerShutdown() {
private boolean isServerShutdown() {
return serverNode.isLocalJobShutdownPath(path) && Type.NODE_ADDED == event.getType();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.dangdang.ddframe.job.fixture.TestJob;
import com.dangdang.ddframe.job.internal.election.ElectionListenerManager.LeaderElectionJobListener;
import com.dangdang.ddframe.job.internal.server.ServerNode;
import com.dangdang.ddframe.job.internal.server.ServerService;
import com.dangdang.ddframe.job.internal.storage.JobNodeStorage;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
Expand All @@ -31,6 +32,8 @@
import org.mockito.MockitoAnnotations;
import org.unitils.util.ReflectionUtils;

import java.util.Collections;

import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand All @@ -46,6 +49,9 @@ public final class ElectionListenerManagerTest {
@Mock
private LeaderElectionService leaderElectionService;

@Mock
private ServerService serverService;

private final ElectionListenerManager electionListenerManager = new ElectionListenerManager(null, new JobConfiguration("testJob", TestJob.class, 3, "0/1 * * * * ?"));

@Before
Expand All @@ -54,6 +60,7 @@ public void setUp() throws NoSuchFieldException {
ReflectionUtils.setFieldValue(electionListenerManager, electionListenerManager.getClass().getSuperclass().getDeclaredField("jobNodeStorage"), jobNodeStorage);
ReflectionUtils.setFieldValue(electionListenerManager, "serverNode", serverNode);
ReflectionUtils.setFieldValue(electionListenerManager, "leaderElectionService", leaderElectionService);
ReflectionUtils.setFieldValue(electionListenerManager, "serverService", serverService);
}

@Test
Expand Down Expand Up @@ -86,14 +93,27 @@ electionListenerManager.new LeaderElectionJobListener().dataChanged(null, new Tr
}

@Test
public void assertLeaderElectionJobListenerWhenIsLeaderHostPathAndIsRemoveAndIsNotLeader() {
public void assertLeaderElectionJobListenerWhenIsLeaderHostPathAndIsRemoveAndIsNotLeaderWithAvailableServers() {
when(leaderElectionService.hasLeader()).thenReturn(false);
when(serverService.getAvailableServers()).thenReturn(Collections.singletonList("localhost"));
electionListenerManager.new LeaderElectionJobListener().dataChanged(null, new TreeCacheEvent(
TreeCacheEvent.Type.NODE_REMOVED, new ChildData("/testJob/leader/election/host", null, "localhost".getBytes())), "/testJob/leader/election/host");
verify(leaderElectionService).hasLeader();
verify(serverService).getAvailableServers();
verify(leaderElectionService).leaderElection();
}

@Test
public void assertLeaderElectionJobListenerWhenIsLeaderHostPathAndIsRemoveAndIsNotLeaderWithoutAvailableServers() {
when(leaderElectionService.hasLeader()).thenReturn(false);
when(serverService.getAvailableServers()).thenReturn(Collections.<String>emptyList());
electionListenerManager.new LeaderElectionJobListener().dataChanged(null, new TreeCacheEvent(
TreeCacheEvent.Type.NODE_REMOVED, new ChildData("/testJob/leader/election/host", null, "localhost".getBytes())), "/testJob/leader/election/host");
verify(leaderElectionService).hasLeader();
verify(serverService).getAvailableServers();
verify(leaderElectionService, times(0)).leaderElection();
}

@Test
public void assertLeaderElectionJobListenerWhenJobDisabledAndIsNotLeader() {
when(leaderElectionService.isLeader()).thenReturn(false);
Expand Down

0 comments on commit b9f49c2

Please sign in to comment.