From bf7827c2a86624bec2225c3beb9d9ddbccd13c21 Mon Sep 17 00:00:00 2001 From: Gantigmaa Selenge Date: Mon, 23 Sep 2024 16:36:23 +0100 Subject: [PATCH] Refactor getUserTaskStatus with a common method for handling the response for both proposal and rebalance request Signed-off-by: Gantigmaa Selenge --- .../KafkaRebalanceAssemblyOperator.java | 170 ++++++++---------- 1 file changed, 76 insertions(+), 94 deletions(-) diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceAssemblyOperator.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceAssemblyOperator.java index 6ad21f760ec..5dd7a064e84 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceAssemblyOperator.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceAssemblyOperator.java @@ -34,6 +34,7 @@ import io.strimzi.operator.cluster.operator.resource.cruisecontrol.CruiseControlApi; import io.strimzi.operator.cluster.operator.resource.cruisecontrol.CruiseControlApiImpl; import io.strimzi.operator.cluster.operator.resource.cruisecontrol.CruiseControlRebalanceResponse; +import io.strimzi.operator.cluster.operator.resource.cruisecontrol.CruiseControlResponse; import io.strimzi.operator.cluster.operator.resource.cruisecontrol.CruiseControlRestException; import io.strimzi.operator.cluster.operator.resource.cruisecontrol.RebalanceOptions; import io.strimzi.operator.cluster.operator.resource.cruisecontrol.RemoveBrokerOptions; @@ -745,43 +746,7 @@ private Future> onPendingProposal( requestRebalance(reconciliation, host, apiClient, kafkaRebalance, true, rebalanceOptionsBuilder).onSuccess(p::complete); } else { apiClient.getUserTaskStatus(reconciliation, host, cruiseControlPort, sessionId) - .onSuccess(cruiseControlResponse -> { - if (cruiseControlResponse.getJson().isEmpty()) { - // This may happen if: - // 1. Cruise Control restarted so resetting the state because the tasks queue is not persisted - // 2. Task's retention time expired, or the cache has become full - LOGGER.warnCr(reconciliation, "User task {} not found, going to generate a new proposal", sessionId); - requestRebalance(reconciliation, host, apiClient, kafkaRebalance, true, rebalanceOptionsBuilder).onSuccess(p::complete); - } else { - JsonObject taskStatusJson = cruiseControlResponse.getJson(); - CruiseControlUserTaskStatus taskStatus = CruiseControlUserTaskStatus.lookup(taskStatusJson.getString("Status")); - switch (taskStatus) { - case COMPLETED_WITH_ERROR: - LOGGER.errorCr(reconciliation, "Rebalance ({}) optimization proposal has failed to complete", sessionId); - p.complete(buildRebalanceStatus(sessionId, KafkaRebalanceState.NotReady, conditions)); - break; - case COMPLETED: - case IN_EXECUTION: - // If the returned status has an optimization result then the rebalance proposal is ready. - if (taskStatusJson.containsKey(CruiseControlRebalanceKeys.LOAD_BEFORE_OPTIMIZATION.getKey()) && - taskStatusJson.containsKey(CruiseControlRebalanceKeys.LOAD_AFTER_OPTIMIZATION.getKey())) { - LOGGER.infoCr(reconciliation, "Rebalance ({}) optimization proposal is now ready", sessionId); - System.out.println("Status" + taskStatusJson); - p.complete(buildRebalanceStatus(kafkaRebalance, sessionId, KafkaRebalanceState.ProposalReady, taskStatusJson, conditions)); - break; - } - case ACTIVE: // Rebalance proposal is still being calculated - LOGGER.infoCr(reconciliation, "Rebalance ({}) optimization proposal is still being prepared", sessionId); - configMapOperator.getAsync(kafkaRebalance.getMetadata().getNamespace(), kafkaRebalance.getMetadata().getName()) - .onSuccess(loadmap -> p.complete(new MapAndStatus<>(loadmap, buildRebalanceStatusFromPreviousStatus(kafkaRebalance.getStatus(), conditions)))); - break; - default: - LOGGER.errorCr(reconciliation, "Unexpected state {}", taskStatus); - p.fail("Unexpected state " + taskStatus); - break; - } - } - }) + .onSuccess(cruiseControlResponse -> handleUserTaskStatusResponse(reconciliation, cruiseControlResponse, p, sessionId, conditions, kafkaRebalance, configMapOperator, true, host, apiClient, rebalanceOptionsBuilder)) .onFailure(e -> { LOGGER.errorCr(reconciliation, "Cruise Control getting rebalance proposal status failed", e.getCause()); p.fail(new CruiseControlRestException("Cruise Control getting rebalance proposal status failed")); @@ -791,6 +756,79 @@ private Future> onPendingProposal( return p.future(); } + private void handleUserTaskStatusResponse(Reconciliation reconciliation, CruiseControlResponse cruiseControlResponse, + Promise> p, String sessionId, + Set conditions, KafkaRebalance kafkaRebalance, + ConfigMapOperator configMapOperator, boolean dryRun, + String host, CruiseControlApi apiClient, + AbstractRebalanceOptions.AbstractRebalanceOptionsBuilder rebalanceOptionsBuilder) { + if (cruiseControlResponse.getJson().isEmpty()) { + // This may happen if: + // 1. Cruise Control restarted so resetting the state because the tasks queue is not persisted + // 2. Task's retention time expired, or the cache has become full + LOGGER.warnCr(reconciliation, "User task {} not found, going to generate a new proposal", sessionId); + requestRebalance(reconciliation, host, apiClient, kafkaRebalance, true, rebalanceOptionsBuilder).onSuccess(p::complete); + return; + } + + JsonObject taskStatusJson = cruiseControlResponse.getJson(); + CruiseControlUserTaskStatus taskStatus = CruiseControlUserTaskStatus.lookup(taskStatusJson.getString("Status")); + switch (taskStatus) { + case COMPLETED_WITH_ERROR: + // TODO: There doesn't seem to be a way to retrieve the actual error message from the user tasks endpoint? + // We may need to propose an upstream PR for this. + // TODO: Once we can get the error details we need to add an error field to the Rebalance Status to hold + // details of any issues while rebalancing. + LOGGER.errorCr(reconciliation, "Rebalance ({}) optimization proposal has failed to complete", sessionId); + p.complete(buildRebalanceStatus(sessionId, KafkaRebalanceState.NotReady, conditions)); + break; + case COMPLETED: + if (!dryRun) { + p.complete(buildRebalanceStatus(kafkaRebalance, null, KafkaRebalanceState.Ready, taskStatusJson, conditions)); + break; + } + // If it is for the status of rebalance(dryrun=true), optimization load needs to be checked + case IN_EXECUTION: + if (dryRun) { + // If the returned status has an optimization result then the rebalance proposal is ready. + if (taskStatusJson.containsKey(CruiseControlRebalanceKeys.LOAD_BEFORE_OPTIMIZATION.getKey()) && + taskStatusJson.containsKey(CruiseControlRebalanceKeys.LOAD_AFTER_OPTIMIZATION.getKey())) { + LOGGER.infoCr(reconciliation, "Rebalance ({}) optimization proposal is now ready", sessionId); + System.out.println("Status" + taskStatusJson); + p.complete(buildRebalanceStatus(kafkaRebalance, sessionId, KafkaRebalanceState.ProposalReady, taskStatusJson, conditions)); + break; + } + } else { + // We need to check that the status has been updated with the ongoing optimisation proposal + // The proposal field can be empty if a rebalance(dryrun=false) was called and the optimisation + // proposal was still being prepared (in progress). In that case the rebalance will start when + // the proposal is complete but the optimisation proposal summary will be missing. + if (kafkaRebalance.getStatus().getOptimizationResult() == null || + kafkaRebalance.getStatus().getOptimizationResult().isEmpty()) { + LOGGER.infoCr(reconciliation, "Rebalance ({}) optimization proposal is now ready and has been added to the status", sessionId); + p.complete(buildRebalanceStatus( + kafkaRebalance, sessionId, KafkaRebalanceState.Rebalancing, taskStatusJson, conditions)); + break; + } + // TODO: Find out if there is any way to check the progress of a rebalance. + // We could parse the verbose proposal for total number of reassignments and compare to number completed (if available)? + // We can then update the status at this point. + } + case ACTIVE: + // If a rebalance(dryrun=false) was called and the proposal is still being prepared then the task + // will be in an ACTIVE state. When the proposal is ready it will shift to IN_EXECUTION and we will + // check that the optimisation proposal is added to the status on the next reconcile. + LOGGER.infoCr(reconciliation, "Rebalance ({}) optimization proposal is still being prepared", sessionId); + configMapOperator.getAsync(kafkaRebalance.getMetadata().getNamespace(), kafkaRebalance.getMetadata().getName()) + .onSuccess(loadmap -> p.complete(new MapAndStatus<>(loadmap, buildRebalanceStatusFromPreviousStatus(kafkaRebalance.getStatus(), conditions)))); + break; + default: + LOGGER.errorCr(reconciliation, "Unexpected state {}", taskStatus); + p.fail("Unexpected state " + taskStatus); + break; + } + } + /** * This method handles the transition from {@code ProposalReady} state. * It is related to the value that the user apply to the strimzi.io/rebalance annotation. @@ -882,63 +920,7 @@ private Future> onRebalancing(Reco Set conditions = StatusUtils.validate(reconciliation, kafkaRebalance); validateAnnotation(reconciliation, conditions, KafkaRebalanceState.Rebalancing, rebalanceAnnotation(kafkaRebalance), kafkaRebalance); apiClient.getUserTaskStatus(reconciliation, host, cruiseControlPort, sessionId) - .onSuccess(cruiseControlResponse -> { - if (cruiseControlResponse.getJson().isEmpty()) { - // This may happen if: - // 1. Cruise Control restarted so resetting the state because the tasks queue is not persisted - // 2. Task's retention time expired, or the cache has become full - LOGGER.warnCr(reconciliation, "User task {} not found, going to generate a new proposal", sessionId); - requestRebalance(reconciliation, host, apiClient, kafkaRebalance, true, rebalanceOptionsBuilder).onSuccess(p::complete); - } else { - JsonObject taskStatusJson = cruiseControlResponse.getJson(); - CruiseControlUserTaskStatus taskStatus = CruiseControlUserTaskStatus.lookup(taskStatusJson.getString("Status")); - switch (taskStatus) { - case COMPLETED: - LOGGER.infoCr(reconciliation, "Rebalance ({}) is now complete", sessionId); - p.complete(buildRebalanceStatus(kafkaRebalance, null, KafkaRebalanceState.Ready, taskStatusJson, conditions)); - break; - case COMPLETED_WITH_ERROR: - // TODO: There doesn't seem to be a way to retrieve the actual error message from the user tasks endpoint? - // We may need to propose an upstream PR for this. - // TODO: Once we can get the error details we need to add an error field to the Rebalance Status to hold - // details of any issues while rebalancing. - LOGGER.errorCr(reconciliation, "Rebalance ({}) optimization proposal has failed to complete", sessionId); - p.complete(buildRebalanceStatus(sessionId, KafkaRebalanceState.NotReady, conditions)); - break; - case IN_EXECUTION: // Rebalance is still in progress - LOGGER.infoCr(reconciliation, "Rebalance ({}) optimization proposal in execution", sessionId); - // We need to check that the status has been updated with the ongoing optimisation proposal - // The proposal field can be empty if a rebalance(dryrun=false) was called and the optimisation - // proposal was still being prepared (in progress). In that case the rebalance will start when - // the proposal is complete but the optimisation proposal summary will be missing. - if (kafkaRebalance.getStatus().getOptimizationResult() == null || - kafkaRebalance.getStatus().getOptimizationResult().isEmpty()) { - LOGGER.infoCr(reconciliation, "Rebalance ({}) optimization proposal is now ready and has been added to the status", sessionId); - p.complete(buildRebalanceStatus( - kafkaRebalance, sessionId, KafkaRebalanceState.Rebalancing, taskStatusJson, conditions)); - } else { - configMapOperator.getAsync(kafkaRebalance.getMetadata().getNamespace(), kafkaRebalance.getMetadata().getName()) - .onSuccess(loadmap -> p.complete(new MapAndStatus<>(loadmap, buildRebalanceStatusFromPreviousStatus(kafkaRebalance.getStatus(), conditions)))); - } - // TODO: Find out if there is any way to check the progress of a rebalance. - // We could parse the verbose proposal for total number of reassignments and compare to number completed (if available)? - // We can then update the status at this point. - break; - case ACTIVE: // Rebalance proposal is still being calculated - // If a rebalance(dryrun=false) was called and the proposal is still being prepared then the task - // will be in an ACTIVE state. When the proposal is ready it will shift to IN_EXECUTION and we will - // check that the optimisation proposal is added to the status on the next reconcile. - LOGGER.infoCr(reconciliation, "Rebalance ({}) optimization proposal is still being prepared", sessionId); - configMapOperator.getAsync(kafkaRebalance.getMetadata().getNamespace(), kafkaRebalance.getMetadata().getName()) - .onSuccess(loadmap -> p.complete(new MapAndStatus<>(loadmap, buildRebalanceStatusFromPreviousStatus(kafkaRebalance.getStatus(), conditions)))); - break; - default: - LOGGER.errorCr(reconciliation, "Unexpected state {}", taskStatus); - p.fail("Unexpected state " + taskStatus); - break; - } - } - }) + .onSuccess(cruiseControlResponse -> handleUserTaskStatusResponse(reconciliation, cruiseControlResponse, p, sessionId, conditions, kafkaRebalance, configMapOperator, false, host, apiClient, rebalanceOptionsBuilder)) .onFailure(e -> { LOGGER.errorCr(reconciliation, "Cruise Control getting rebalance task status failed", e.getCause()); p.fail(new CruiseControlRestException("Cruise Control getting rebalance task status failed"));