Skip to content

Commit

Permalink
Refactor the implementation for getting status of rebalance proposal …
Browse files Browse the repository at this point in the history
…request

Signed-off-by: Gantigmaa Selenge <tina.selenge@gmail.com>
  • Loading branch information
tinaselenge committed Sep 19, 2024
1 parent 8e30be3 commit 06aed7a
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -739,27 +738,55 @@ private Future<MapAndStatus<ConfigMap, KafkaRebalanceStatus>> onPendingProposal(
p.complete(buildRebalanceStatus(null, KafkaRebalanceState.Stopped, StatusUtils.validate(reconciliation, kafkaRebalance)));
} else {
LOGGER.infoCr(reconciliation, "Requesting a new proposal or checking the status for an already issued one");
requestRebalance(reconciliation, host, apiClient, kafkaRebalance, true, rebalanceOptionsBuilder,
kafkaRebalance.getStatus().getSessionId())
.onSuccess(rebalanceMapAndStatus -> {
// If the returned status has an optimization result then the rebalance proposal is ready
KafkaRebalanceStatus status = rebalanceMapAndStatus.getStatus();
Set<Condition> conditions = new HashSet<>();
validateAnnotation(reconciliation, conditions, KafkaRebalanceState.PendingProposal, rebalanceAnnotation(kafkaRebalance), kafkaRebalance);
status.addConditions(conditions);
rebalanceMapAndStatus.setStatus(status);
if (rebalanceMapAndStatus.getStatus().getOptimizationResult() != null &&
!rebalanceMapAndStatus.getStatus().getOptimizationResult().isEmpty()) {
LOGGER.infoCr(reconciliation, "Optimization proposal ready");
} else {
LOGGER.infoCr(reconciliation, "Waiting for optimization proposal to be ready");
}
p.complete(rebalanceMapAndStatus);
})
.onFailure(e -> {
LOGGER.errorCr(reconciliation, "Cruise Control getting rebalance proposal failed");
p.fail(e);
});
Set<Condition> conditions = StatusUtils.validate(reconciliation, kafkaRebalance);
validateAnnotation(reconciliation, conditions, KafkaRebalanceState.PendingProposal, rebalanceAnnotation(kafkaRebalance), kafkaRebalance);
String sessionId = kafkaRebalance.getStatus().getSessionId();
if (sessionId == null) {
requestRebalance(reconciliation, host, apiClient, kafkaRebalance, true, rebalanceOptionsBuilder).onSuccess(p::complete);
} else {
apiClient.getUserTaskStatus(reconciliation, host, cruiseControlPort, sessionId)
.onSuccess(cruiseControlResponse -> {
if (cruiseControlResponse.getJson().isEmpty()) {
// This may happen if:
// 1. Cruise Control restarted so resetting the state because the tasks queue is not persisted
// 2. Task's retention time expired, or the cache has become full
LOGGER.warnCr(reconciliation, "User task {} not found, going to generate a new proposal", sessionId);
requestRebalance(reconciliation, host, apiClient, kafkaRebalance, true, rebalanceOptionsBuilder).onSuccess(p::complete);
} else {
JsonObject taskStatusJson = cruiseControlResponse.getJson();
CruiseControlUserTaskStatus taskStatus = CruiseControlUserTaskStatus.lookup(taskStatusJson.getString("Status"));
switch (taskStatus) {
case COMPLETED_WITH_ERROR:
LOGGER.errorCr(reconciliation, "Rebalance ({}) optimization proposal has failed to complete", sessionId);
p.complete(buildRebalanceStatus(sessionId, KafkaRebalanceState.NotReady, conditions));
break;
case COMPLETED:
case IN_EXECUTION:
// If the returned status has an optimization result then the rebalance proposal is ready.
if (taskStatusJson.containsKey(CruiseControlRebalanceKeys.LOAD_BEFORE_OPTIMIZATION.getKey()) &&
taskStatusJson.containsKey(CruiseControlRebalanceKeys.LOAD_AFTER_OPTIMIZATION.getKey())) {
LOGGER.infoCr(reconciliation, "Rebalance ({}) optimization proposal is now ready", sessionId);
System.out.println("Status" + taskStatusJson);
p.complete(buildRebalanceStatus(kafkaRebalance, sessionId, KafkaRebalanceState.ProposalReady, taskStatusJson, conditions));
break;
}
case ACTIVE: // Rebalance proposal is still being calculated
LOGGER.infoCr(reconciliation, "Rebalance ({}) optimization proposal is still being prepared", sessionId);
configMapOperator.getAsync(kafkaRebalance.getMetadata().getNamespace(), kafkaRebalance.getMetadata().getName())
.onSuccess(loadmap -> p.complete(new MapAndStatus<>(loadmap, buildRebalanceStatusFromPreviousStatus(kafkaRebalance.getStatus(), conditions))));
break;
default:
LOGGER.errorCr(reconciliation, "Unexpected state {}", taskStatus);
p.fail("Unexpected state " + taskStatus);
break;
}
}
})
.onFailure(e -> {
LOGGER.errorCr(reconciliation, "Cruise Control getting rebalance proposal status failed", e.getCause());
p.fail(new CruiseControlRestException("Cruise Control getting rebalance proposal status failed"));
});
}
}
return p.future();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,8 @@ public void testNewToPendingProposalToProposalReadyRemoveBroker(VertxTestContext

private void krNewToPendingProposalToProposalReady(VertxTestContext context, CruiseControlEndpoints endpoint, KafkaRebalance kr) throws IOException, URISyntaxException {
// Set up the rebalance endpoint with the number of pending calls before a response is received.
cruiseControlServer.setupCCRebalanceResponse(2, endpoint);
cruiseControlServer.setupCCRebalanceResponse(1, endpoint);
cruiseControlServer.setupCCUserTasksResponseNoGoals(1, 0);

Crds.kafkaRebalanceOperation(client).inNamespace(namespace).resource(kr).create();
crdCreateKafka();
Expand Down Expand Up @@ -529,6 +530,7 @@ public void testNewToPendingProposalToStoppedAndRefreshRemoveBroker(VertxTestCon
private void krNewToPendingProposalToStoppedAndRefresh(VertxTestContext context, CruiseControlEndpoints endpoint, KafkaRebalance kr) throws IOException, URISyntaxException {
// Set up the rebalance endpoint with the number of pending calls before a response is received.
cruiseControlServer.setupCCRebalanceResponse(2, endpoint);
cruiseControlServer.setupCCUserTasksResponseNoGoals(0, 0);

Crds.kafkaRebalanceOperation(client).inNamespace(namespace).resource(kr).create();
crdCreateKafka();
Expand Down Expand Up @@ -1131,7 +1133,8 @@ public void testNewToPendingProposalDeleteRemoveBroker(VertxTestContext context)

private void krNewToPendingProposalDelete(VertxTestContext context, CruiseControlEndpoints endpoint, KafkaRebalance kr) throws IOException, URISyntaxException {
// Set up the rebalance endpoint with the number of pending calls before a response is received.
cruiseControlServer.setupCCRebalanceResponse(2, endpoint);
cruiseControlServer.setupCCRebalanceResponse(1, endpoint);
cruiseControlServer.setupCCUserTasksResponseNoGoals(1, 0);

Crds.kafkaRebalanceOperation(client).inNamespace(namespace).resource(kr).create();
crdCreateKafka();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ public void setupCCRebalanceResponse(int pendingCalls, int responseDelay, Cruise
.respond(
response()
.withBody(pendingJson)
.withHeaders(header("User-Task-ID", REBALANCE_NO_GOALS_RESPONSE_UTID))
.withHeaders(header("User-Task-ID", REBALANCE_NO_GOALS_VERBOSE_RESPONSE_UTID))
.withStatusCode(202)
.withDelay(TimeUnit.SECONDS, responseDelay));

Expand Down

0 comments on commit 06aed7a

Please sign in to comment.