Skip to content

Commit

Permalink
Check node and broker id equality in KRaft mode on start up and not i…
Browse files Browse the repository at this point in the history
…n broker.id setter (#125)

Fixes #124

Signed-off-by: Fedor Dudinsky <fdudinsk@redhat.com>
  • Loading branch information
fedinskiy authored Dec 20, 2024
1 parent b697d95 commit 0f2e3f3
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 16 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@
<avoidCallsTo>org.apache.commons.logging</avoidCallsTo>
</avoidCallsTo>
<mutationThreshold>91</mutationThreshold>
<coverageThreshold>72</coverageThreshold>
<coverageThreshold>71</coverageThreshold>
<verbose>true</verbose>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,12 @@ protected void containerIsStarting(final InspectContainerResponse containerInfo,
this.nodeId = this.brokerId;
}

if (this.useKraft) {
if (this.brokerId != this.nodeId) {
throw new IllegalStateException("`broker.id` and `node.id` must have the same value!");
}
}

final String[] listenersConfig = this.buildListenersConfig(containerInfo);
final Properties defaultServerProperties = this.buildDefaultServerProperties(
listenersConfig[0],
Expand Down Expand Up @@ -654,10 +660,6 @@ public StrimziKafkaContainer withExternalZookeeperConnect(final String externalZ
* @return StrimziKafkaContainer instance
*/
public StrimziKafkaContainer withBrokerId(final int brokerId) {
if (this.useKraft && this.brokerId != this.nodeId) {
throw new IllegalStateException("`broker.id` and `node.id` must have same value!");
}

this.brokerId = brokerId;
return self();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,17 +178,6 @@ void testWithAuthenticationTypeNullDoesNotChangeType() {
assertThat(kafkaContainer.getAuthenticationType(), is(AuthenticationType.OAUTH_BEARER));
}

@Test
void testWithBrokerIdThrowsExceptionWhenKraftAndIdsMismatch() {
kafkaContainer.withKraft();
kafkaContainer.withNodeId(1);

IllegalStateException exception = assertThrows(IllegalStateException.class,
() -> kafkaContainer.withBrokerId(2));

assertThat(exception.getMessage(), containsString("`broker.id` and `node.id` must have same value!"));
}

@Test
void testWithNodeIdReturnsSelf() {
StrimziKafkaContainer result = kafkaContainer.withNodeId(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.testcontainers.containers.ContainerLaunchException;
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;

import java.time.Duration;
Expand All @@ -37,6 +38,7 @@
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

@SuppressWarnings("ClassDataAbstractionCoupling")
Expand Down Expand Up @@ -128,6 +130,26 @@ void testUnsupportedKRaftUsingImageName() {
assertThrows(UnsupportedKraftKafkaVersionException.class, () -> systemUnderTest.start());
}

@ParameterizedTest(name = "testStartContainerWithSomeConfiguration-{0}")
@MethodSource("retrieveKafkaVersionsFile")
void testUnsupportedKraftAndIdsMismatch(final String imageName, final String kafkaVersion) {
supportsKraftMode(imageName);
systemUnderTest = new StrimziKafkaContainer(imageName)
.withNodeId(1)
.withBrokerId(2)
.withKraft()
.waitForRunning();
ContainerLaunchException exception = assertThrows(ContainerLaunchException.class,
() -> systemUnderTest.start());
Throwable cause = exception;
while (cause.getCause() != null) {
cause = cause.getCause();
}
assertEquals(cause.getClass(), IllegalStateException.class);
assertThat(cause.getMessage(), containsString("`broker.id` and `node.id` must have the same value!"));
}


@Test
void testWithKafkaLog() {
systemUnderTest = new StrimziKafkaContainer()
Expand Down

0 comments on commit 0f2e3f3

Please sign in to comment.