From 72fcd17f64761a8bd4c852466f8d875ed9a7e638 Mon Sep 17 00:00:00 2001 From: Paolo Patierno Date: Wed, 6 May 2020 19:47:33 +0200 Subject: [PATCH] Fixed lock failure on periodic reconcile (#58) Updating NotReady state as a failure Signed-off-by: Paolo Patierno --- .../KafkaRebalanceAssemblyOperator.java | 44 ++++++++++--------- 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceAssemblyOperator.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceAssemblyOperator.java index 21ba01b82e6..b43e4aa83a2 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceAssemblyOperator.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceAssemblyOperator.java @@ -60,7 +60,7 @@ * When a new {@code KafkaRebalance} custom resource is created, the operator sends a rebalance proposal * request to the Cruise Control REST API and moves to the {@code PendingProposal} state. It stays in this state * until a the rebalance proposal is ready, polling the related status on Cruise Control, and then finally moves - * to the {@ProposalReady} state. The status of the {@code KafkaRebalance} custom resource is updated with the + * to the {@code ProposalReady} state. The status of the {@code KafkaRebalance} custom resource is updated with the * computed rebalance proposal so that the user can view it and making a decision to execute it or not. * For starting the actual rebalancing on the cluster, the user annotate the custom resource with * the {@code strimzi.io/rebalance=approve} annotation, triggering the operator to send a rebalance request to the @@ -69,7 +69,7 @@ * to the {@code Ready} state when the rebalancing is done. * * The user is also able to stop an in-progress rebalance proposal computation or an actual rebalancing, - * annotating the custom resource with {@code strimzi.io/rebalance=stop} when it is in {@PendingProposal} + * annotating the custom resource with {@code strimzi.io/rebalance=stop} when it is in {@code PendingProposal} * or {@code Rebalancing} state; the operator moves to the {@code Stopped} state and the user can request a new * proposal applying the {@code strimzi.io/rebalance=refresh} annotation on the custom resource. * @@ -149,7 +149,8 @@ public void eventReceived(Action action, KafkaRebalance kafkaRebalance) { kafkaRebalance.getStatus() != null ? kafkaRebalance.getStatus().getConditions().get(0).getType() : null, ANNO_STRIMZI_IO_REBALANCE, rawRebalanceAnnotation(kafkaRebalance)); - reconcileRebalance(reconciliation, action == Action.DELETED ? null : kafkaRebalance); + withLock(reconciliation, LOCK_TIMEOUT_MS, + () -> reconcileRebalance(reconciliation, action == Action.DELETED ? null : kafkaRebalance)); } @Override @@ -392,9 +393,12 @@ private Future onNotReady(Reconciliation reconciliation, RebalanceAnnotation rebalanceAnnotation, RebalanceOptions.RebalanceOptionsBuilder rebalanceOptionsBuilder) { if (rebalanceAnnotation == RebalanceAnnotation.refresh) { + // the user fixed the error on the resource and want to "refresh", actually + // requesting a new rebalance proposal return onNew(reconciliation, host, apiClient, rebalanceOptionsBuilder); } else { - return Future.succeededFuture(); + // stay in the current error state, actually failing the Future + return Future.failedFuture(clusterRebalance.getStatus().getConditions().get(0).getMessage()); } } @@ -706,48 +710,46 @@ private Future reconcileRebalance(Reconciliation reconciliation, KafkaReba } else { String clusterName = kafkaRebalance.getMetadata().getLabels() == null ? null : kafkaRebalance.getMetadata().getLabels().get(Labels.STRIMZI_CLUSTER_LABEL); String clusterNamespace = kafkaRebalance.getMetadata().getNamespace(); - KafkaRebalanceStatus desiredStatus = new KafkaRebalanceStatus(); if (clusterName != null) { return kafkaOperator.getAsync(clusterNamespace, clusterName) .compose(kafka -> { if (kafka == null) { log.warn("{}: Kafka resource '{}' identified by label '{}' does not exist in namespace {}.", reconciliation, clusterName, Labels.STRIMZI_CLUSTER_LABEL, clusterNamespace); - return updateStatus(kafkaRebalance, desiredStatus, kafkaRebalanceOperator, + return updateStatus(kafkaRebalance, new KafkaRebalanceStatus(), kafkaRebalanceOperator, new NoSuchResourceException("Kafka resource '" + clusterName + "' identified by label '" + Labels.STRIMZI_CLUSTER_LABEL + "' does not exist in namespace " + clusterNamespace + ".")).mapEmpty(); } else if (kafka.getSpec().getCruiseControl() != null) { CruiseControlApi apiClient = cruiseControlClientProvider.apply(vertx); - return withLock(reconciliation, LOCK_TIMEOUT_MS, - () -> kafkaRebalanceOperator.getAsync(kafkaRebalance.getMetadata().getNamespace(), kafkaRebalance.getMetadata().getName()) - .compose(fetchedClusterRebalance -> { - KafkaRebalanceStatus clusterRebalanceStatus = fetchedClusterRebalance.getStatus(); - // cluster rebalance is new or it is in one of others states - State currentState = clusterRebalanceStatus == null ? State.New : - State.valueOf(clusterRebalanceStatus.getConditions().get(0).getType()); - // check annotation - RebalanceAnnotation rebalanceAnnotation = rebalanceAnnotation(fetchedClusterRebalance); - return reconcile(reconciliation, ccHost == null ? CruiseControlResources.serviceName(clusterName) : ccHost, apiClient, fetchedClusterRebalance, currentState, rebalanceAnnotation).mapEmpty(); - }, exception -> Future.failedFuture(exception).mapEmpty()) - ); + return kafkaRebalanceOperator.getAsync(kafkaRebalance.getMetadata().getNamespace(), kafkaRebalance.getMetadata().getName()) + .compose(fetchedClusterRebalance -> { + KafkaRebalanceStatus clusterRebalanceStatus = fetchedClusterRebalance.getStatus(); + // cluster rebalance is new or it is in one of others states + State currentState = clusterRebalanceStatus == null ? State.New : + State.valueOf(clusterRebalanceStatus.getConditions().get(0).getType()); + // check annotation + RebalanceAnnotation rebalanceAnnotation = rebalanceAnnotation(fetchedClusterRebalance); + return reconcile(reconciliation, ccHost == null ? CruiseControlResources.serviceName(clusterName) : ccHost, apiClient, fetchedClusterRebalance, currentState, rebalanceAnnotation).mapEmpty(); + }, exception -> Future.failedFuture(exception).mapEmpty()); } else { log.warn("{}: Kafka resouce lacks 'cruiseControl' declaration : No deployed Cruise Control for doing a rebalance.", reconciliation); - return updateStatus(kafkaRebalance, desiredStatus, kafkaRebalanceOperator, + return updateStatus(kafkaRebalance, new KafkaRebalanceStatus(), kafkaRebalanceOperator, new InvalidResourceException("Kafka resouce lacks 'cruiseControl' declaration " + ": No deployed Cruise Control for doing a rebalance.")).mapEmpty(); } - }, exception -> updateStatus(kafkaRebalance, desiredStatus, kafkaRebalanceOperator, exception).mapEmpty()); + }, exception -> updateStatus(kafkaRebalance, new KafkaRebalanceStatus(), kafkaRebalanceOperator, exception).mapEmpty()); } else { log.warn("{}: Resource lacks label '{}': No cluster related to a possible rebalance.", reconciliation, Labels.STRIMZI_CLUSTER_LABEL); - return updateStatus(kafkaRebalance, desiredStatus, kafkaRebalanceOperator, + return updateStatus(kafkaRebalance, new KafkaRebalanceStatus(), kafkaRebalanceOperator, new InvalidResourceException("Resource lacks label '" + Labels.STRIMZI_CLUSTER_LABEL + "': No cluster related to a possible rebalance.")).mapEmpty(); } } + } private Future requestRebalance(Reconciliation reconciliation,