Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18365 Remove zookeeper.connect in Test #18353

Merged
merged 2 commits into from
Jan 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 15 additions & 18 deletions core/src/test/scala/unit/kafka/KafkaConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.utils.Exit
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ZkConfigs}
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.api.Assertions._

Expand All @@ -47,20 +47,20 @@ class KafkaConfigTest {

// We should load configuration file without any arguments
val config1 = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile)))
assertEquals(1, config1.brokerId)
assertEquals(1, config1.nodeId)

// We should be able to override given property on command line
val config2 = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", "broker.id=2")))
assertEquals(2, config2.brokerId)
val config2 = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", "node.id=2")))
assertEquals(2, config2.nodeId)

// We should be also able to set completely new property
val config3 = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", "log.cleanup.policy=compact")))
assertEquals(1, config3.brokerId)
assertEquals(1, config3.nodeId)
assertEquals(util.Arrays.asList("compact"), config3.logCleanupPolicy)

// We should be also able to set several properties
val config4 = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", "log.cleanup.policy=compact,delete", "--override", "broker.id=2")))
assertEquals(2, config4.brokerId)
val config4 = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", "log.cleanup.policy=compact,delete", "--override", "node.id=2")))
assertEquals(2, config4.nodeId)
assertEquals(util.Arrays.asList("compact","delete"), config4.logCleanupPolicy)
}

Expand Down Expand Up @@ -155,16 +155,6 @@ class KafkaConfigTest {
|must contain the set of bootstrap controllers or controller.quorum.voters must contain a
|parseable set of controllers.""".stripMargin.replace("\n", " ")
)

// Ensure that if neither process.roles nor controller.quorum.voters is populated, then an exception is thrown if zookeeper.connect is not defined
propertiesFile.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "")
assertBadConfigContainingMessage(propertiesFile,
"Missing required configuration `zookeeper.connect` which has no default value.")

// Ensure that no exception is thrown once zookeeper.connect is defined (and we clear controller.listener.names)
propertiesFile.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181")
propertiesFile.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "")
KafkaConfig.fromProps(propertiesFile)
}

private def setListenerProps(props: Properties): Unit = {
Expand Down Expand Up @@ -244,7 +234,14 @@ class KafkaConfigTest {
}

def prepareDefaultConfig(): String = {
prepareConfig(Array("broker.id=1", "zookeeper.connect=somewhere"))
prepareConfig(Array(
"node.id=1",
"process.roles=controller",
"controller.listener.names=CONTROLLER",
"controller.quorum.voters=1@localhost:9093,2@localhost:9093",
"listeners=CONTROLLER://:9093",
"advertised.listeners=CONTROLLER://127.0.0.1:9093"
))
}

def prepareConfig(lines : Array[String]): String = {
Expand Down
8 changes: 7 additions & 1 deletion core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.{ListOffsetsRequest, ListOffsetsResponse}
import org.apache.kafka.common.utils.{BufferSupplier, Time, Utils}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.config.KRaftConfigs
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
Expand Down Expand Up @@ -2270,7 +2271,12 @@ class UnifiedLogTest {

private def createKafkaConfigWithRLM: KafkaConfig = {
val props = new Properties()
props.put("zookeeper.connect", "test")
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller")
props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "0")
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
props.setProperty("controller.quorum.bootstrap.servers", "localhost:9093")
props.setProperty("listeners", "CONTROLLER://:9093")
props.setProperty("advertised.listeners", "CONTROLLER://127.0.0.1:9093")
props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true")
props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteStorageManager].getName)
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteLogMetadataManager].getName)
Expand Down
16 changes: 13 additions & 3 deletions core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState}
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
import org.apache.kafka.server.common.{DirectoryEventHandler, KRaftVersion, MetadataVersion, OffsetAndEpoch, RequestLocal, StopPartition}
import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs}
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerLogConfigs}
import org.apache.kafka.server.log.remote.storage._
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
import org.apache.kafka.server.network.BrokerEndPoint
Expand Down Expand Up @@ -4101,7 +4101,12 @@ class ReplicaManagerTest {
val tidp0 = new TopicIdPartition(topicId, tp0)

val props = new Properties()
props.put("zookeeper.connect", "test")
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller")
props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "0")
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
props.setProperty("controller.quorum.bootstrap.servers", "localhost:9093")
props.setProperty("listeners", "CONTROLLER://:9093")
props.setProperty("advertised.listeners", "CONTROLLER://127.0.0.1:9093")
props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true.toString)
props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteStorageManager].getName)
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteLogMetadataManager].getName)
Expand Down Expand Up @@ -4209,7 +4214,12 @@ class ReplicaManagerTest {
val tidp0 = new TopicIdPartition(topicId, tp0)

val props = new Properties()
props.put("zookeeper.connect", "test")
props.setProperty("process.roles", "controller")
props.setProperty("node.id", "0")
props.setProperty("controller.listener.names", "CONTROLLER")
props.setProperty("controller.quorum.bootstrap.servers", "localhost:9093")
props.setProperty("listeners", "CONTROLLER://:9093")
props.setProperty("advertised.listeners", "CONTROLLER://127.0.0.1:9093")
props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true.toString)
props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteStorageManager].getName)
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteLogMetadataManager].getName)
Expand Down
13 changes: 0 additions & 13 deletions core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -294,19 +294,6 @@ Found problem:
"Failed to find content in output: " + stream.toString())
}

@Test
def testFormatFailsInZkMode(): Unit = {
val availableDirs = Seq(TestUtils.tempDir())
val properties = new Properties()
properties.setProperty("log.dirs", availableDirs.mkString(","))
properties.setProperty("zookeeper.connect", "localhost:2181")
val stream = new ByteArrayOutputStream()
assertEquals("The kafka configuration file appears to be for a legacy cluster. " +
"Formatting is only supported for clusters in KRaft mode.",
assertThrows(classOf[TerseFailure],
() => runFormatCommand(stream, properties)).getMessage)
}

@Test
def testFormatWithReleaseVersion(): Unit = {
val availableDirs = Seq(TestUtils.tempDir())
Expand Down
Loading