Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup hosts marked as missing on startup #2240

Merged
merged 13 commits into from
Nov 18, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@
import java.util.Set;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class InactiveAgentManager extends CuratorManager {
private static final String ROOT_PATH = "/inactiveSlaves";
private static final Logger LOG = LoggerFactory.getLogger(InactiveAgentManager.class);

@Inject
public InactiveAgentManager(
Expand Down Expand Up @@ -43,11 +46,24 @@ private String pathOf(String host) {
return String.format("%s/%s", ROOT_PATH, host);
}

/**
* Delete single agent from inactive agent list.
* @param host agent hostname
*/
public void cleanInactiveAgent(String host) {
Optional<Stat> stat = checkExists(pathOf(host));
if (stat.isPresent()) {
delete(pathOf(host));
LOG.debug("Deleted inactive host {}", host);
}
}

public void cleanInactiveAgentsList(long thresholdTime) {
for (String host : getInactiveAgents()) {
Optional<Stat> stat = checkExists(pathOf(host));
if (stat.isPresent() && stat.get().getMtime() < thresholdTime) {
delete(pathOf(host));
LOG.debug("Deleted inactive host {}", host);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.hubspot.singularity.helpers.MesosUtils;
import com.hubspot.singularity.mesos.SingularityAgentAndRackManager;
import com.hubspot.singularity.mesos.SingularityMesosScheduler;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -54,16 +55,16 @@ public class SingularityAgentReconciliationPoller extends SingularityLeaderOnlyP

@Override
public void runActionOnPoll() {
refereshSlavesAndRacks();
checkDeadSlaves();
refereshAgentsAndRacks();
checkInactiveAgents();
inactiveAgentManager.cleanInactiveAgentsList(
System.currentTimeMillis() -
TimeUnit.HOURS.toMillis(configuration.getCleanInactiveHostListEveryHours())
Comment on lines 60 to 62
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clean up anything in InactiveAgentManager's list that is 1) being deleted by the checks you already have above or 2) older than some amount of time (likely longer than the AgentManager threshold, more like a few days)

@ssalinas should this be addressing point 2 or are they different mechanisms?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, this looks like it should be doing the 'anything old than X hours' part already

);
clearOldSlaveHistory();
clearOldAgentHistory();
}

private void refereshSlavesAndRacks() {
private void refereshAgentsAndRacks() {
try {
Optional<MasterInfo> maybeMasterInfo = mesosScheduler.getMaster();
if (maybeMasterInfo.isPresent()) {
Expand All @@ -79,15 +80,28 @@ private void refereshSlavesAndRacks() {
}
}

private void checkDeadSlaves() {
private void checkInactiveAgents() {
final long start = System.currentTimeMillis();

final List<SingularityAgent> deadSlaves = agentManager.getObjectsFiltered(
// filter dead and missing on startup agents for cleanup
List<SingularityAgent> deadAgents = agentManager.getObjectsFiltered(
MachineState.DEAD
);

if (deadSlaves.isEmpty()) {
LOG.trace("No dead agents");
LOG.debug("Found {} dead agents", deadAgents.size());

List<SingularityAgent> missingOnStartupAgents = agentManager.getObjectsFiltered(
MachineState.MISSING_ON_STARTUP
);

LOG.debug("Found {} agents missing on startup", missingOnStartupAgents.size());

List<SingularityAgent> inactiveAgents = new ArrayList<>();
inactiveAgents.addAll(deadAgents);
inactiveAgents.addAll(missingOnStartupAgents);

if (inactiveAgents.isEmpty()) {
LOG.trace("No inactive agents");
return;
}

Expand All @@ -96,20 +110,19 @@ private void checkDeadSlaves() {
configuration.getDeleteDeadAgentsAfterHours()
);

for (SingularityAgent deadSlave : agentManager.getObjectsFiltered(
MachineState.DEAD
)) {
for (SingularityAgent inactiveAgent : inactiveAgents) {
final long duration =
System.currentTimeMillis() - deadSlave.getCurrentState().getTimestamp();
System.currentTimeMillis() - inactiveAgent.getCurrentState().getTimestamp();

if (duration > maxDuration) {
SingularityDeleteResult result = agentManager.deleteObject(deadSlave.getId());
SingularityDeleteResult result = agentManager.deleteObject(inactiveAgent.getId());
inactiveAgentManager.cleanInactiveAgent(inactiveAgent.getHost()); // delete agent from inactive list too

deleted++;

LOG.info(
"Removing dead agent {} ({}) after {} (max {})",
deadSlave.getId(),
"Removing inactive agent {} ({}) after {} (max {})",
inactiveAgent.getId(),
result,
JavaUtils.durationFromMillis(duration),
JavaUtils.durationFromMillis(maxDuration)
Expand All @@ -118,14 +131,14 @@ private void checkDeadSlaves() {
}

LOG.debug(
"Checked {} dead agents, deleted {} in {}",
deadSlaves.size(),
"Checked {} inactive agents, deleted {} in {}",
inactiveAgents.size(),
deleted,
JavaUtils.duration(start)
);
}

private void clearOldSlaveHistory() {
private void clearOldAgentHistory() {
for (SingularityAgent singularityAgent : agentManager.getObjects()) {
agentManager.clearOldHistory(singularityAgent.getId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,16 @@ public SingularityMachinesTest() {
}

@Test
public void testDeadSlavesArePurged() {
SingularityAgent liveSlave = new SingularityAgent(
public void testDeadAgentsArePurged() {
long previousDeleteDeadAgentsAfterHours = configuration.getDeleteDeadAgentsAfterHours();
SingularityAgent liveAgent = new SingularityAgent(
"1",
"h1",
"r1",
ImmutableMap.of("uniqueAttribute", "1"),
Optional.empty()
);
SingularityAgent deadSlave = new SingularityAgent(
SingularityAgent deadAgent = new SingularityAgent(
"2",
"h1",
"r1",
Expand All @@ -74,8 +75,8 @@ public void testDeadSlavesArePurged() {

final long now = System.currentTimeMillis();

liveSlave =
liveSlave.changeState(
liveAgent =
liveAgent.changeState(
new SingularityMachineStateHistoryUpdate(
"1",
MachineState.ACTIVE,
Expand All @@ -84,8 +85,8 @@ public void testDeadSlavesArePurged() {
Optional.empty()
)
);
deadSlave =
deadSlave.changeState(
deadAgent =
deadAgent.changeState(
new SingularityMachineStateHistoryUpdate(
"2",
MachineState.DEAD,
Expand All @@ -95,9 +96,8 @@ public void testDeadSlavesArePurged() {
)
);

agentManager.saveObject(liveSlave);
agentManager.saveObject(deadSlave);

agentManager.saveObject(liveAgent);
agentManager.saveObject(deadAgent);
agentReconciliationPoller.runActionOnPoll();

Assertions.assertEquals(
Expand All @@ -108,13 +108,87 @@ public void testDeadSlavesArePurged() {

configuration.setDeleteDeadAgentsAfterHours(1);

agentReconciliationPoller.runActionOnPoll();
agentReconciliationPoller.runActionOnPoll(); // dead agent should be deleted

Assertions.assertEquals(
1,
agentManager.getObjectsFiltered(MachineState.ACTIVE).size()
);
Assertions.assertEquals(0, agentManager.getObjectsFiltered(MachineState.DEAD).size());

// reset config to previous value for subsequent tests
configuration.setDeleteDeadAgentsAfterHours(previousDeleteDeadAgentsAfterHours);
}

@Test
public void testMissingAgentsArePurged() {
long previousDeleteDeadAgentsAfterHours = configuration.getDeleteDeadAgentsAfterHours();
SingularityAgent liveAgent = new SingularityAgent(
"3",
"h1",
"r1",
ImmutableMap.of("uniqueAttribute", "3"),
Optional.empty()
);
SingularityAgent missingAgent = new SingularityAgent(
"4",
"h1",
"r1",
ImmutableMap.of("uniqueAttribute", "4"),
Optional.empty()
);

final long now = System.currentTimeMillis();

liveAgent =
liveAgent.changeState(
new SingularityMachineStateHistoryUpdate(
"3",
MachineState.ACTIVE,
100,
Optional.empty(),
Optional.empty()
)
);
missingAgent =
missingAgent.changeState(
new SingularityMachineStateHistoryUpdate(
"4",
MachineState.MISSING_ON_STARTUP,
now - TimeUnit.HOURS.toMillis(10),
Optional.empty(),
Optional.empty()
)
);

agentManager.saveObject(liveAgent);
agentManager.saveObject(missingAgent);
agentReconciliationPoller.runActionOnPoll();

Assertions.assertEquals(
1,
agentManager.getObjectsFiltered(MachineState.ACTIVE).size()
);
Assertions.assertEquals(
1,
agentManager.getObjectsFiltered(MachineState.MISSING_ON_STARTUP).size()
);

configuration.setDeleteDeadAgentsAfterHours(1);

agentReconciliationPoller.runActionOnPoll(); // missing agent should be deleted

Assertions.assertEquals(
1,
agentManager.getObjectsFiltered(MachineState.ACTIVE).size()
);
Assertions.assertEquals(
0,
agentManager.getObjectsFiltered(MachineState.MISSING_ON_STARTUP).size()
);

// reset config to previous value for subsequent tests
configuration.setDeleteDeadAgentsAfterHours(previousDeleteDeadAgentsAfterHours);
}

@Test
Expand Down Expand Up @@ -1028,6 +1102,31 @@ public void testReconcileSlaves() {
}
}

@Test
public void testReconcileSlavesOnStartup() {
// Load 3 agents on startup
MesosMasterStateObject state = getMasterState(3);
singularityAgentAndRackManager.loadAgentsAndRacksFromMaster(state, true);

// Load 2 agents on startup
MesosMasterStateObject newState = getMasterState(2); // 2 agents, third has died
singularityAgentAndRackManager.loadAgentsAndRacksFromMaster(newState, true);
List<SingularityAgent> agents = agentManager.getObjects();

Assertions.assertEquals(3, agents.size());

for (SingularityAgent agent : agents) {
if (agent.getId().equals("2")) {
Assertions.assertEquals(
MachineState.MISSING_ON_STARTUP,
agent.getCurrentState().getState()
);
} else {
Assertions.assertEquals(MachineState.ACTIVE, agent.getCurrentState().getState());
}
}
}

private MesosMasterStateObject getMasterState(int numSlaves) {
long now = System.currentTimeMillis();
Map<String, Object> resources = new HashMap<>();
Expand Down