Skip to content

Commit

Permalink
Refactor getUserTaskStatus with a common method for handling the resp…
Browse files Browse the repository at this point in the history
…onse for both proposal and rebalance request

Signed-off-by: Gantigmaa Selenge <tina.selenge@gmail.com>
  • Loading branch information
tinaselenge committed Oct 1, 2024
1 parent 042360d commit bf7827c
Showing 1 changed file with 76 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.strimzi.operator.cluster.operator.resource.cruisecontrol.CruiseControlApi;
import io.strimzi.operator.cluster.operator.resource.cruisecontrol.CruiseControlApiImpl;
import io.strimzi.operator.cluster.operator.resource.cruisecontrol.CruiseControlRebalanceResponse;
import io.strimzi.operator.cluster.operator.resource.cruisecontrol.CruiseControlResponse;
import io.strimzi.operator.cluster.operator.resource.cruisecontrol.CruiseControlRestException;
import io.strimzi.operator.cluster.operator.resource.cruisecontrol.RebalanceOptions;
import io.strimzi.operator.cluster.operator.resource.cruisecontrol.RemoveBrokerOptions;
Expand Down Expand Up @@ -745,43 +746,7 @@ private Future<MapAndStatus<ConfigMap, KafkaRebalanceStatus>> onPendingProposal(
requestRebalance(reconciliation, host, apiClient, kafkaRebalance, true, rebalanceOptionsBuilder).onSuccess(p::complete);
} else {
apiClient.getUserTaskStatus(reconciliation, host, cruiseControlPort, sessionId)
.onSuccess(cruiseControlResponse -> {
if (cruiseControlResponse.getJson().isEmpty()) {
// This may happen if:
// 1. Cruise Control restarted so resetting the state because the tasks queue is not persisted
// 2. Task's retention time expired, or the cache has become full
LOGGER.warnCr(reconciliation, "User task {} not found, going to generate a new proposal", sessionId);
requestRebalance(reconciliation, host, apiClient, kafkaRebalance, true, rebalanceOptionsBuilder).onSuccess(p::complete);
} else {
JsonObject taskStatusJson = cruiseControlResponse.getJson();
CruiseControlUserTaskStatus taskStatus = CruiseControlUserTaskStatus.lookup(taskStatusJson.getString("Status"));
switch (taskStatus) {
case COMPLETED_WITH_ERROR:
LOGGER.errorCr(reconciliation, "Rebalance ({}) optimization proposal has failed to complete", sessionId);
p.complete(buildRebalanceStatus(sessionId, KafkaRebalanceState.NotReady, conditions));
break;
case COMPLETED:
case IN_EXECUTION:
// If the returned status has an optimization result then the rebalance proposal is ready.
if (taskStatusJson.containsKey(CruiseControlRebalanceKeys.LOAD_BEFORE_OPTIMIZATION.getKey()) &&
taskStatusJson.containsKey(CruiseControlRebalanceKeys.LOAD_AFTER_OPTIMIZATION.getKey())) {
LOGGER.infoCr(reconciliation, "Rebalance ({}) optimization proposal is now ready", sessionId);
System.out.println("Status" + taskStatusJson);
p.complete(buildRebalanceStatus(kafkaRebalance, sessionId, KafkaRebalanceState.ProposalReady, taskStatusJson, conditions));
break;
}
case ACTIVE: // Rebalance proposal is still being calculated
LOGGER.infoCr(reconciliation, "Rebalance ({}) optimization proposal is still being prepared", sessionId);
configMapOperator.getAsync(kafkaRebalance.getMetadata().getNamespace(), kafkaRebalance.getMetadata().getName())
.onSuccess(loadmap -> p.complete(new MapAndStatus<>(loadmap, buildRebalanceStatusFromPreviousStatus(kafkaRebalance.getStatus(), conditions))));
break;
default:
LOGGER.errorCr(reconciliation, "Unexpected state {}", taskStatus);
p.fail("Unexpected state " + taskStatus);
break;
}
}
})
.onSuccess(cruiseControlResponse -> handleUserTaskStatusResponse(reconciliation, cruiseControlResponse, p, sessionId, conditions, kafkaRebalance, configMapOperator, true, host, apiClient, rebalanceOptionsBuilder))
.onFailure(e -> {
LOGGER.errorCr(reconciliation, "Cruise Control getting rebalance proposal status failed", e.getCause());
p.fail(new CruiseControlRestException("Cruise Control getting rebalance proposal status failed"));
Expand All @@ -791,6 +756,79 @@ private Future<MapAndStatus<ConfigMap, KafkaRebalanceStatus>> onPendingProposal(
return p.future();
}

private void handleUserTaskStatusResponse(Reconciliation reconciliation, CruiseControlResponse cruiseControlResponse,
Promise<MapAndStatus<ConfigMap, KafkaRebalanceStatus>> p, String sessionId,
Set<Condition> conditions, KafkaRebalance kafkaRebalance,
ConfigMapOperator configMapOperator, boolean dryRun,
String host, CruiseControlApi apiClient,
AbstractRebalanceOptions.AbstractRebalanceOptionsBuilder<?, ?> rebalanceOptionsBuilder) {
if (cruiseControlResponse.getJson().isEmpty()) {
// This may happen if:
// 1. Cruise Control restarted so resetting the state because the tasks queue is not persisted
// 2. Task's retention time expired, or the cache has become full
LOGGER.warnCr(reconciliation, "User task {} not found, going to generate a new proposal", sessionId);
requestRebalance(reconciliation, host, apiClient, kafkaRebalance, true, rebalanceOptionsBuilder).onSuccess(p::complete);
return;
}

JsonObject taskStatusJson = cruiseControlResponse.getJson();
CruiseControlUserTaskStatus taskStatus = CruiseControlUserTaskStatus.lookup(taskStatusJson.getString("Status"));
switch (taskStatus) {
case COMPLETED_WITH_ERROR:
// TODO: There doesn't seem to be a way to retrieve the actual error message from the user tasks endpoint?
// We may need to propose an upstream PR for this.
// TODO: Once we can get the error details we need to add an error field to the Rebalance Status to hold
// details of any issues while rebalancing.
LOGGER.errorCr(reconciliation, "Rebalance ({}) optimization proposal has failed to complete", sessionId);
p.complete(buildRebalanceStatus(sessionId, KafkaRebalanceState.NotReady, conditions));
break;
case COMPLETED:
if (!dryRun) {
p.complete(buildRebalanceStatus(kafkaRebalance, null, KafkaRebalanceState.Ready, taskStatusJson, conditions));
break;
}
// If it is for the status of rebalance(dryrun=true), optimization load needs to be checked
case IN_EXECUTION:
if (dryRun) {
// If the returned status has an optimization result then the rebalance proposal is ready.
if (taskStatusJson.containsKey(CruiseControlRebalanceKeys.LOAD_BEFORE_OPTIMIZATION.getKey()) &&
taskStatusJson.containsKey(CruiseControlRebalanceKeys.LOAD_AFTER_OPTIMIZATION.getKey())) {
LOGGER.infoCr(reconciliation, "Rebalance ({}) optimization proposal is now ready", sessionId);
System.out.println("Status" + taskStatusJson);
p.complete(buildRebalanceStatus(kafkaRebalance, sessionId, KafkaRebalanceState.ProposalReady, taskStatusJson, conditions));
break;
}
} else {
// We need to check that the status has been updated with the ongoing optimisation proposal
// The proposal field can be empty if a rebalance(dryrun=false) was called and the optimisation
// proposal was still being prepared (in progress). In that case the rebalance will start when
// the proposal is complete but the optimisation proposal summary will be missing.
if (kafkaRebalance.getStatus().getOptimizationResult() == null ||
kafkaRebalance.getStatus().getOptimizationResult().isEmpty()) {
LOGGER.infoCr(reconciliation, "Rebalance ({}) optimization proposal is now ready and has been added to the status", sessionId);
p.complete(buildRebalanceStatus(
kafkaRebalance, sessionId, KafkaRebalanceState.Rebalancing, taskStatusJson, conditions));
break;
}
// TODO: Find out if there is any way to check the progress of a rebalance.
// We could parse the verbose proposal for total number of reassignments and compare to number completed (if available)?
// We can then update the status at this point.
}
case ACTIVE:
// If a rebalance(dryrun=false) was called and the proposal is still being prepared then the task
// will be in an ACTIVE state. When the proposal is ready it will shift to IN_EXECUTION and we will
// check that the optimisation proposal is added to the status on the next reconcile.
LOGGER.infoCr(reconciliation, "Rebalance ({}) optimization proposal is still being prepared", sessionId);
configMapOperator.getAsync(kafkaRebalance.getMetadata().getNamespace(), kafkaRebalance.getMetadata().getName())
.onSuccess(loadmap -> p.complete(new MapAndStatus<>(loadmap, buildRebalanceStatusFromPreviousStatus(kafkaRebalance.getStatus(), conditions))));
break;
default:
LOGGER.errorCr(reconciliation, "Unexpected state {}", taskStatus);
p.fail("Unexpected state " + taskStatus);
break;
}
}

/**
* This method handles the transition from {@code ProposalReady} state.
* It is related to the value that the user apply to the strimzi.io/rebalance annotation.
Expand Down Expand Up @@ -882,63 +920,7 @@ private Future<MapAndStatus<ConfigMap, KafkaRebalanceStatus>> onRebalancing(Reco
Set<Condition> conditions = StatusUtils.validate(reconciliation, kafkaRebalance);
validateAnnotation(reconciliation, conditions, KafkaRebalanceState.Rebalancing, rebalanceAnnotation(kafkaRebalance), kafkaRebalance);
apiClient.getUserTaskStatus(reconciliation, host, cruiseControlPort, sessionId)
.onSuccess(cruiseControlResponse -> {
if (cruiseControlResponse.getJson().isEmpty()) {
// This may happen if:
// 1. Cruise Control restarted so resetting the state because the tasks queue is not persisted
// 2. Task's retention time expired, or the cache has become full
LOGGER.warnCr(reconciliation, "User task {} not found, going to generate a new proposal", sessionId);
requestRebalance(reconciliation, host, apiClient, kafkaRebalance, true, rebalanceOptionsBuilder).onSuccess(p::complete);
} else {
JsonObject taskStatusJson = cruiseControlResponse.getJson();
CruiseControlUserTaskStatus taskStatus = CruiseControlUserTaskStatus.lookup(taskStatusJson.getString("Status"));
switch (taskStatus) {
case COMPLETED:
LOGGER.infoCr(reconciliation, "Rebalance ({}) is now complete", sessionId);
p.complete(buildRebalanceStatus(kafkaRebalance, null, KafkaRebalanceState.Ready, taskStatusJson, conditions));
break;
case COMPLETED_WITH_ERROR:
// TODO: There doesn't seem to be a way to retrieve the actual error message from the user tasks endpoint?
// We may need to propose an upstream PR for this.
// TODO: Once we can get the error details we need to add an error field to the Rebalance Status to hold
// details of any issues while rebalancing.
LOGGER.errorCr(reconciliation, "Rebalance ({}) optimization proposal has failed to complete", sessionId);
p.complete(buildRebalanceStatus(sessionId, KafkaRebalanceState.NotReady, conditions));
break;
case IN_EXECUTION: // Rebalance is still in progress
LOGGER.infoCr(reconciliation, "Rebalance ({}) optimization proposal in execution", sessionId);
// We need to check that the status has been updated with the ongoing optimisation proposal
// The proposal field can be empty if a rebalance(dryrun=false) was called and the optimisation
// proposal was still being prepared (in progress). In that case the rebalance will start when
// the proposal is complete but the optimisation proposal summary will be missing.
if (kafkaRebalance.getStatus().getOptimizationResult() == null ||
kafkaRebalance.getStatus().getOptimizationResult().isEmpty()) {
LOGGER.infoCr(reconciliation, "Rebalance ({}) optimization proposal is now ready and has been added to the status", sessionId);
p.complete(buildRebalanceStatus(
kafkaRebalance, sessionId, KafkaRebalanceState.Rebalancing, taskStatusJson, conditions));
} else {
configMapOperator.getAsync(kafkaRebalance.getMetadata().getNamespace(), kafkaRebalance.getMetadata().getName())
.onSuccess(loadmap -> p.complete(new MapAndStatus<>(loadmap, buildRebalanceStatusFromPreviousStatus(kafkaRebalance.getStatus(), conditions))));
}
// TODO: Find out if there is any way to check the progress of a rebalance.
// We could parse the verbose proposal for total number of reassignments and compare to number completed (if available)?
// We can then update the status at this point.
break;
case ACTIVE: // Rebalance proposal is still being calculated
// If a rebalance(dryrun=false) was called and the proposal is still being prepared then the task
// will be in an ACTIVE state. When the proposal is ready it will shift to IN_EXECUTION and we will
// check that the optimisation proposal is added to the status on the next reconcile.
LOGGER.infoCr(reconciliation, "Rebalance ({}) optimization proposal is still being prepared", sessionId);
configMapOperator.getAsync(kafkaRebalance.getMetadata().getNamespace(), kafkaRebalance.getMetadata().getName())
.onSuccess(loadmap -> p.complete(new MapAndStatus<>(loadmap, buildRebalanceStatusFromPreviousStatus(kafkaRebalance.getStatus(), conditions))));
break;
default:
LOGGER.errorCr(reconciliation, "Unexpected state {}", taskStatus);
p.fail("Unexpected state " + taskStatus);
break;
}
}
})
.onSuccess(cruiseControlResponse -> handleUserTaskStatusResponse(reconciliation, cruiseControlResponse, p, sessionId, conditions, kafkaRebalance, configMapOperator, false, host, apiClient, rebalanceOptionsBuilder))
.onFailure(e -> {
LOGGER.errorCr(reconciliation, "Cruise Control getting rebalance task status failed", e.getCause());
p.fail(new CruiseControlRestException("Cruise Control getting rebalance task status failed"));
Expand Down

0 comments on commit bf7827c

Please sign in to comment.