Skip to content

Commit

Permalink
Fixed lock failure on periodic reconcile (strimzi#58)
Browse files Browse the repository at this point in the history
Updating NotReady state as a failure

Signed-off-by: Paolo Patierno <ppatierno@live.com>
  • Loading branch information
ppatierno authored May 6, 2020
1 parent e34198a commit 72fcd17
Showing 1 changed file with 23 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
* When a new {@code KafkaRebalance} custom resource is created, the operator sends a rebalance proposal
* request to the Cruise Control REST API and moves to the {@code PendingProposal} state. It stays in this state
* until a the rebalance proposal is ready, polling the related status on Cruise Control, and then finally moves
* to the {@ProposalReady} state. The status of the {@code KafkaRebalance} custom resource is updated with the
* to the {@code ProposalReady} state. The status of the {@code KafkaRebalance} custom resource is updated with the
* computed rebalance proposal so that the user can view it and making a decision to execute it or not.
* For starting the actual rebalancing on the cluster, the user annotate the custom resource with
* the {@code strimzi.io/rebalance=approve} annotation, triggering the operator to send a rebalance request to the
Expand All @@ -69,7 +69,7 @@
* to the {@code Ready} state when the rebalancing is done.
*
* The user is also able to stop an in-progress rebalance proposal computation or an actual rebalancing,
* annotating the custom resource with {@code strimzi.io/rebalance=stop} when it is in {@PendingProposal}
* annotating the custom resource with {@code strimzi.io/rebalance=stop} when it is in {@code PendingProposal}
* or {@code Rebalancing} state; the operator moves to the {@code Stopped} state and the user can request a new
* proposal applying the {@code strimzi.io/rebalance=refresh} annotation on the custom resource.
*
Expand Down Expand Up @@ -149,7 +149,8 @@ public void eventReceived(Action action, KafkaRebalance kafkaRebalance) {
kafkaRebalance.getStatus() != null ? kafkaRebalance.getStatus().getConditions().get(0).getType() : null,
ANNO_STRIMZI_IO_REBALANCE, rawRebalanceAnnotation(kafkaRebalance));

reconcileRebalance(reconciliation, action == Action.DELETED ? null : kafkaRebalance);
withLock(reconciliation, LOCK_TIMEOUT_MS,
() -> reconcileRebalance(reconciliation, action == Action.DELETED ? null : kafkaRebalance));
}

@Override
Expand Down Expand Up @@ -392,9 +393,12 @@ private Future<KafkaRebalanceStatus> onNotReady(Reconciliation reconciliation,
RebalanceAnnotation rebalanceAnnotation,
RebalanceOptions.RebalanceOptionsBuilder rebalanceOptionsBuilder) {
if (rebalanceAnnotation == RebalanceAnnotation.refresh) {
// the user fixed the error on the resource and want to "refresh", actually
// requesting a new rebalance proposal
return onNew(reconciliation, host, apiClient, rebalanceOptionsBuilder);
} else {
return Future.succeededFuture();
// stay in the current error state, actually failing the Future
return Future.failedFuture(clusterRebalance.getStatus().getConditions().get(0).getMessage());
}
}

Expand Down Expand Up @@ -706,48 +710,46 @@ private Future<Void> reconcileRebalance(Reconciliation reconciliation, KafkaReba
} else {
String clusterName = kafkaRebalance.getMetadata().getLabels() == null ? null : kafkaRebalance.getMetadata().getLabels().get(Labels.STRIMZI_CLUSTER_LABEL);
String clusterNamespace = kafkaRebalance.getMetadata().getNamespace();
KafkaRebalanceStatus desiredStatus = new KafkaRebalanceStatus();
if (clusterName != null) {
return kafkaOperator.getAsync(clusterNamespace, clusterName)
.compose(kafka -> {
if (kafka == null) {
log.warn("{}: Kafka resource '{}' identified by label '{}' does not exist in namespace {}.",
reconciliation, clusterName, Labels.STRIMZI_CLUSTER_LABEL, clusterNamespace);
return updateStatus(kafkaRebalance, desiredStatus, kafkaRebalanceOperator,
return updateStatus(kafkaRebalance, new KafkaRebalanceStatus(), kafkaRebalanceOperator,
new NoSuchResourceException("Kafka resource '" + clusterName
+ "' identified by label '" + Labels.STRIMZI_CLUSTER_LABEL
+ "' does not exist in namespace " + clusterNamespace + ".")).mapEmpty();
} else if (kafka.getSpec().getCruiseControl() != null) {
CruiseControlApi apiClient = cruiseControlClientProvider.apply(vertx);

return withLock(reconciliation, LOCK_TIMEOUT_MS,
() -> kafkaRebalanceOperator.getAsync(kafkaRebalance.getMetadata().getNamespace(), kafkaRebalance.getMetadata().getName())
.compose(fetchedClusterRebalance -> {
KafkaRebalanceStatus clusterRebalanceStatus = fetchedClusterRebalance.getStatus();
// cluster rebalance is new or it is in one of others states
State currentState = clusterRebalanceStatus == null ? State.New :
State.valueOf(clusterRebalanceStatus.getConditions().get(0).getType());
// check annotation
RebalanceAnnotation rebalanceAnnotation = rebalanceAnnotation(fetchedClusterRebalance);
return reconcile(reconciliation, ccHost == null ? CruiseControlResources.serviceName(clusterName) : ccHost, apiClient, fetchedClusterRebalance, currentState, rebalanceAnnotation).mapEmpty();
}, exception -> Future.failedFuture(exception).mapEmpty())
);
return kafkaRebalanceOperator.getAsync(kafkaRebalance.getMetadata().getNamespace(), kafkaRebalance.getMetadata().getName())
.compose(fetchedClusterRebalance -> {
KafkaRebalanceStatus clusterRebalanceStatus = fetchedClusterRebalance.getStatus();
// cluster rebalance is new or it is in one of others states
State currentState = clusterRebalanceStatus == null ? State.New :
State.valueOf(clusterRebalanceStatus.getConditions().get(0).getType());
// check annotation
RebalanceAnnotation rebalanceAnnotation = rebalanceAnnotation(fetchedClusterRebalance);
return reconcile(reconciliation, ccHost == null ? CruiseControlResources.serviceName(clusterName) : ccHost, apiClient, fetchedClusterRebalance, currentState, rebalanceAnnotation).mapEmpty();
}, exception -> Future.failedFuture(exception).mapEmpty());

} else {
log.warn("{}: Kafka resouce lacks 'cruiseControl' declaration : No deployed Cruise Control for doing a rebalance.", reconciliation);
return updateStatus(kafkaRebalance, desiredStatus, kafkaRebalanceOperator,
return updateStatus(kafkaRebalance, new KafkaRebalanceStatus(), kafkaRebalanceOperator,
new InvalidResourceException("Kafka resouce lacks 'cruiseControl' declaration "
+ ": No deployed Cruise Control for doing a rebalance.")).mapEmpty();
}
}, exception -> updateStatus(kafkaRebalance, desiredStatus, kafkaRebalanceOperator, exception).mapEmpty());
}, exception -> updateStatus(kafkaRebalance, new KafkaRebalanceStatus(), kafkaRebalanceOperator, exception).mapEmpty());
} else {
log.warn("{}: Resource lacks label '{}': No cluster related to a possible rebalance.", reconciliation, Labels.STRIMZI_CLUSTER_LABEL);
return updateStatus(kafkaRebalance, desiredStatus, kafkaRebalanceOperator,
return updateStatus(kafkaRebalance, new KafkaRebalanceStatus(), kafkaRebalanceOperator,
new InvalidResourceException("Resource lacks label '"
+ Labels.STRIMZI_CLUSTER_LABEL
+ "': No cluster related to a possible rebalance.")).mapEmpty();
}
}

}

private Future<KafkaRebalanceStatus> requestRebalance(Reconciliation reconciliation,
Expand Down

0 comments on commit 72fcd17

Please sign in to comment.