Skip to content

Commit d916d4d

Browse files
authored
Fix strimzi#9270: Incorrect TO ResourceConflict message (strimzi#9617)
Signed-off-by: Tom Bentley <tbentley@redhat.com>
1 parent 6c6c683 commit d916d4d

File tree

4 files changed

+107
-20
lines changed

4 files changed

+107
-20
lines changed

topic-operator/src/main/java/io/strimzi/operator/topic/v2/BatchingLoop.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ private boolean runOnce(int batchId, Batch batch) {
191191
try {
192192
synchronized (BatchingLoop.this) {
193193
// remove the old batch from the inflight set and reset the batch
194-
LOGGER.debugOp("[Batch #{}] Removing batch from inflight", batchId - 1);
194+
LOGGER.traceOp("[Batch #{}] Removing batch from inflight", batchId - 1);
195195
batch.toUpdate.stream().map(TopicEvent::toRef).forEach(inFlight::remove);
196196
batch.toDelete.stream().map(TopicEvent::toRef).forEach(inFlight::remove);
197197
batch.clear();
@@ -210,7 +210,7 @@ private boolean runOnce(int batchId, Batch batch) {
210210
}
211211
LOGGER.debugOp("[Batch #{}] Reconciled batch", batchId);
212212
} else {
213-
LOGGER.debugOp("[Batch #{}] Empty batch", batchId);
213+
LOGGER.traceOp("[Batch #{}] Empty batch", batchId);
214214
}
215215
} catch (InterruptedException e) {
216216
LOGGER.infoOp("[Batch #{}] Interrupted", batchId);

topic-operator/src/main/java/io/strimzi/operator/topic/v2/BatchingTopicController.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444

4545
import java.io.InterruptedIOException;
4646
import java.time.Instant;
47+
import java.util.ArrayList;
4748
import java.util.Collection;
4849
import java.util.Comparator;
4950
import java.util.HashMap;
@@ -80,7 +81,7 @@ public class BatchingTopicController {
8081
private final KubernetesClient kubeClient;
8182

8283
// Key: topic name, Value: The KafkaTopics known to manage that topic
83-
/* test */ final Map<String, Set<KubeRef>> topics = new HashMap<>();
84+
/* test */ final Map<String, List<KubeRef>> topics = new HashMap<>();
8485

8586
private final TopicOperatorMetricsHolder metrics;
8687
private final String namespace;
@@ -201,9 +202,11 @@ private Either<TopicOperatorException, Boolean> validate(ReconcilableTopic recon
201202

202203
private boolean rememberTopic(ReconcilableTopic reconcilableTopic) {
203204
String tn = reconcilableTopic.topicName();
204-
var existing = topics.computeIfAbsent(tn, k -> new HashSet<>());
205+
var existing = topics.computeIfAbsent(tn, k -> new ArrayList<>(1));
205206
KubeRef thisRef = new KubeRef(reconcilableTopic.kt());
206-
existing.add(thisRef);
207+
if (!existing.contains(thisRef)) {
208+
existing.add(thisRef);
209+
}
207210
return true;
208211
}
209212

topic-operator/src/test/java/io/strimzi/operator/topic/v2/TopicControllerIT.java

+94-15
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.junit.jupiter.api.BeforeAll;
5050
import org.junit.jupiter.api.BeforeEach;
5151
import org.junit.jupiter.api.Disabled;
52+
import org.junit.jupiter.api.RepeatedTest;
5253
import org.junit.jupiter.api.Test;
5354
import org.junit.jupiter.api.TestInfo;
5455
import org.junit.jupiter.api.extension.ExtendWith;
@@ -68,7 +69,10 @@
6869
import java.util.Set;
6970
import java.util.Stack;
7071
import java.util.UUID;
72+
import java.util.concurrent.CountDownLatch;
7173
import java.util.concurrent.ExecutionException;
74+
import java.util.concurrent.ExecutorService;
75+
import java.util.concurrent.Executors;
7276
import java.util.concurrent.Future;
7377
import java.util.concurrent.TimeUnit;
7478
import java.util.concurrent.TimeoutException;
@@ -80,6 +84,7 @@
8084

8185
import static io.strimzi.api.ResourceAnnotations.ANNO_STRIMZI_IO_PAUSE_RECONCILIATION;
8286
import static io.strimzi.operator.topic.v2.BatchingTopicController.isPaused;
87+
import static io.strimzi.operator.topic.v2.TopicOperatorTestUtil.findKafkaTopicByName;
8388
import static java.lang.String.format;
8489
import static org.junit.jupiter.api.Assertions.assertEquals;
8590
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -476,6 +481,42 @@ private KafkaTopic createTopic(KafkaCluster kc, KafkaTopic kt, Predicate<KafkaTo
476481
return waitUntil(created, condition);
477482
}
478483

484+
private List<KafkaTopic> createTopicsConcurrently(KafkaCluster kc, KafkaTopic... kts) throws InterruptedException, ExecutionException {
485+
if (kts == null || kts.length == 0) {
486+
throw new IllegalArgumentException("You need pass at least one topic to be created");
487+
}
488+
String ns = namespace(kts[0].getMetadata().getNamespace());
489+
maybeStartOperator(topicOperatorConfig(ns, kc));
490+
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
491+
CountDownLatch latch = new CountDownLatch(kts.length);
492+
List<KafkaTopic> result = new ArrayList<>();
493+
for (KafkaTopic kt : kts) {
494+
executor.submit(() -> {
495+
try {
496+
var created = Crds.topicOperation(client).resource(kt).create();
497+
LOGGER.info("Test created KafkaTopic {} with creationTimestamp {}",
498+
created.getMetadata().getName(),
499+
created.getMetadata().getCreationTimestamp());
500+
var reconciled = waitUntil(created, readyIsTrueOrFalse());
501+
result.add(reconciled);
502+
} catch (Exception e) {
503+
throw new RuntimeException(e);
504+
}
505+
latch.countDown();
506+
});
507+
}
508+
latch.await(1, TimeUnit.MINUTES);
509+
try {
510+
executor.shutdown();
511+
executor.awaitTermination(5, TimeUnit.SECONDS);
512+
} catch (InterruptedException e) {
513+
if (!executor.isTerminated()) {
514+
executor.shutdownNow();
515+
}
516+
}
517+
return result;
518+
}
519+
479520
private KafkaTopic pauseTopic(String namespace, String topicName) {
480521
var current = Crds.topicOperation(client).inNamespace(namespace).withName(topicName).get();
481522
var paused = Crds.topicOperation(client).resource(new KafkaTopicBuilder(current)
@@ -686,7 +727,7 @@ public void shouldNotUpdateTopicInKafkaWhenKafkaTopicBecomesUnselected(
686727
assertEquals(Set.of(kt.getSpec().getReplicas()), replicationFactors(topicDescription));
687728
assertEquals(Map.of(), topicConfigMap(expectedTopicName));
688729

689-
Map<String, Set<KubeRef>> topics = new HashMap<>(operator.controller.topics);
730+
Map<String, List<KubeRef>> topics = new HashMap<>(operator.controller.topics);
690731
assertFalse(topics.containsKey("foo")
691732
|| topics.containsKey("FOO"),
692733
"Transition to a non-selected resource should result in removal from topics map: " + topics);
@@ -1834,20 +1875,15 @@ public void shouldFailDeleteIfNoTopicAuthz(KafkaTopic kt,
18341875

18351876
@Test
18361877
public void shouldFailIfNumPartitionsDivergedWithConfigChange(@BrokerConfig(name = "auto.create.topics.enable", value = "false")
1837-
KafkaCluster kafkaCluster) throws ExecutionException, InterruptedException, TimeoutException {
1838-
// Scenario from https://github.com/strimzi/strimzi-kafka-operator/pull/8627#pullrequestreview-1477513413
1839-
1840-
// given
1878+
KafkaCluster kafkaCluster) throws ExecutionException, InterruptedException, TimeoutException {
1879+
// scenario from https://github.com/strimzi/strimzi-kafka-operator/pull/8627#pullrequestreview-1477513413
18411880

18421881
// create foo
18431882
var topicName = randomTopicName();
18441883
LOGGER.info("Create foo");
18451884
var foo = kafkaTopic(NAMESPACE, "foo", null, null, 1, 1);
18461885
var createdFoo = createTopicAndAssertSuccess(kafkaCluster, foo);
18471886

1848-
// TODO remove after fixing https://github.com/strimzi/strimzi-kafka-operator/issues/9270
1849-
Thread.sleep(1000);
1850-
18511887
// create conflicting bar
18521888
LOGGER.info("Create conflicting bar");
18531889
var bar = kafkaTopic(NAMESPACE, "bar", SELECTOR, null, null, "foo", 1, 1,
@@ -1861,29 +1897,72 @@ public void shouldFailIfNumPartitionsDivergedWithConfigChange(@BrokerConfig(name
18611897
// increase partitions of foo
18621898
LOGGER.info("Increase partitions of foo");
18631899
var editedFoo = modifyTopicAndAwait(createdFoo, theKt ->
1864-
new KafkaTopicBuilder(theKt).editSpec().withPartitions(3).endSpec().build(),
1865-
readyIsTrue());
1900+
new KafkaTopicBuilder(theKt).editSpec().withPartitions(3).endSpec().build(),
1901+
readyIsTrue());
18661902

18671903
// unmanage foo
18681904
LOGGER.info("Unmanage foo");
18691905
var unmanagedFoo = modifyTopicAndAwait(editedFoo, theKt ->
1870-
new KafkaTopicBuilder(theKt).editMetadata().addToAnnotations(BatchingTopicController.MANAGED, "false").endMetadata().build(),
1871-
readyIsTrue());
1906+
new KafkaTopicBuilder(theKt).editMetadata().addToAnnotations(BatchingTopicController.MANAGED, "false").endMetadata().build(),
1907+
readyIsTrue());
18721908

18731909
// when: delete foo
18741910
LOGGER.info("Delete foo");
18751911
Crds.topicOperation(client).resource(unmanagedFoo).delete();
18761912
LOGGER.info("Test deleted KafkaTopic {} with resourceVersion {}",
1877-
unmanagedFoo.getMetadata().getName(), BatchingTopicController.resourceVersion(unmanagedFoo));
1913+
unmanagedFoo.getMetadata().getName(), BatchingTopicController.resourceVersion(unmanagedFoo));
18781914
Resource<KafkaTopic> resource = Crds.topicOperation(client).resource(unmanagedFoo);
18791915
TopicOperatorTestUtil.waitUntilCondition(resource, Objects::isNull);
18801916

18811917
// then: expect bar's unreadiness to be due to mismatching #partitions
18821918
waitUntil(createdBar, readyIsFalseAndReasonIs(
1883-
TopicOperatorException.Reason.NOT_SUPPORTED.reason,
1884-
"Decreasing partitions not supported"));
1919+
TopicOperatorException.Reason.NOT_SUPPORTED.reason,
1920+
"Decreasing partitions not supported"));
1921+
}
1922+
@RepeatedTest(10)
1923+
public void shouldDetectConflictingKafkaTopicCreations(
1924+
@BrokerConfig(name = "auto.create.topics.enable", value = "false")
1925+
KafkaCluster kafkaCluster) throws ExecutionException, InterruptedException {
1926+
var foo = kafkaTopic("ns", "foo", null, null, 1, 1);
1927+
var bar = kafkaTopic("ns", "bar", SELECTOR, null, null, "foo", 1, 1,
1928+
Map.of(TopicConfig.COMPRESSION_TYPE_CONFIG, "snappy"));
1929+
1930+
LOGGER.info("Create conflicting topics: foo and bar");
1931+
var reconciledTopics = createTopicsConcurrently(kafkaCluster, foo, bar);
1932+
var reconciledFoo = findKafkaTopicByName(reconciledTopics, "foo");
1933+
var reconciledBar = findKafkaTopicByName(reconciledTopics, "bar");
1934+
1935+
// only one resource with the same topicName should be reconciled
1936+
var fooFailed = readyIsFalse().test(reconciledFoo);
1937+
var barFailed = readyIsFalse().test(reconciledBar);
1938+
assertTrue(fooFailed ^ barFailed);
1939+
1940+
if (fooFailed) {
1941+
assertKafkaTopicConflict(reconciledFoo, reconciledBar);
1942+
} else {
1943+
assertKafkaTopicConflict(reconciledBar, reconciledFoo);
1944+
}
18851945
}
18861946

1947+
private void assertKafkaTopicConflict(KafkaTopic failed, KafkaTopic ready) {
1948+
// the error message should refer to the ready resource name
1949+
var condition = assertExactlyOneCondition(failed);
1950+
assertEquals(TopicOperatorException.Reason.RESOURCE_CONFLICT.reason, condition.getReason());
1951+
assertEquals(format("Managed by Ref{namespace='ns', name='%s'}", ready.getMetadata().getName()), condition.getMessage());
1952+
1953+
// the failed resource should become ready after we unmanage and delete the other
1954+
LOGGER.info("Unmanage {}", ready.getMetadata().getName());
1955+
var unmanagedBar = modifyTopicAndAwait(ready, theKt -> new KafkaTopicBuilder(theKt)
1956+
.editMetadata().addToAnnotations(BatchingTopicController.MANAGED, "false").endMetadata().build(),
1957+
readyIsTrue());
1958+
1959+
LOGGER.info("Delete {}", ready.getMetadata().getName());
1960+
Crds.topicOperation(client).resource(unmanagedBar).delete();
1961+
Resource<KafkaTopic> resource = Crds.topicOperation(client).resource(unmanagedBar);
1962+
TopicOperatorTestUtil.waitUntilCondition(resource, Objects::isNull);
1963+
1964+
waitUntil(failed, readyIsTrue());
1965+
}
18871966
private static <T> KafkaFuture<T> failedFuture(Throwable error) {
18881967
var future = new KafkaFutureImpl<T>();
18891968
future.completeExceptionally(error);

topic-operator/src/test/java/io/strimzi/operator/topic/v2/TopicOperatorTestUtil.java

+5
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.logging.log4j.Logger;
2121
import org.junit.jupiter.api.TestInfo;
2222

23+
import java.util.List;
2324
import java.util.Objects;
2425
import java.util.concurrent.TimeUnit;
2526
import java.util.function.Predicate;
@@ -124,4 +125,8 @@ static <T> T waitUntilCondition(Resource<T> resource, Predicate<T> condition) {
124125
}
125126
}
126127
}
128+
129+
static KafkaTopic findKafkaTopicByName(List<KafkaTopic> topics, String name) {
130+
return topics.stream().filter(kt -> kt.getMetadata().getName().equals(name)).findFirst().orElse(null);
131+
}
127132
}

0 commit comments

Comments
 (0)