diff --git a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala index 53789213ddd50..fc46b22a38aba 100644 --- a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala @@ -29,7 +29,7 @@ import org.apache.kafka.common.internals.FatalExitError import org.apache.kafka.common.utils.Exit import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.raft.QuorumConfig -import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ZkConfigs} +import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.api.Assertions._ @@ -47,20 +47,20 @@ class KafkaConfigTest { // We should load configuration file without any arguments val config1 = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile))) - assertEquals(1, config1.brokerId) + assertEquals(1, config1.nodeId) // We should be able to override given property on command line - val config2 = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", "broker.id=2"))) - assertEquals(2, config2.brokerId) + val config2 = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", "node.id=2"))) + assertEquals(2, config2.nodeId) // We should be also able to set completely new property val config3 = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", "log.cleanup.policy=compact"))) - assertEquals(1, config3.brokerId) + assertEquals(1, config3.nodeId) assertEquals(util.Arrays.asList("compact"), config3.logCleanupPolicy) // We should be also able to set several properties - val config4 = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", "log.cleanup.policy=compact,delete", "--override", "broker.id=2"))) - assertEquals(2, config4.brokerId) + val config4 = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", "log.cleanup.policy=compact,delete", "--override", "node.id=2"))) + assertEquals(2, config4.nodeId) assertEquals(util.Arrays.asList("compact","delete"), config4.logCleanupPolicy) } @@ -155,16 +155,6 @@ class KafkaConfigTest { |must contain the set of bootstrap controllers or controller.quorum.voters must contain a |parseable set of controllers.""".stripMargin.replace("\n", " ") ) - - // Ensure that if neither process.roles nor controller.quorum.voters is populated, then an exception is thrown if zookeeper.connect is not defined - propertiesFile.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "") - assertBadConfigContainingMessage(propertiesFile, - "Missing required configuration `zookeeper.connect` which has no default value.") - - // Ensure that no exception is thrown once zookeeper.connect is defined (and we clear controller.listener.names) - propertiesFile.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181") - propertiesFile.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "") - KafkaConfig.fromProps(propertiesFile) } private def setListenerProps(props: Properties): Unit = { @@ -244,7 +234,14 @@ class KafkaConfigTest { } def prepareDefaultConfig(): String = { - prepareConfig(Array("broker.id=1", "zookeeper.connect=somewhere")) + prepareConfig(Array( + "node.id=1", + "process.roles=controller", + "controller.listener.names=CONTROLLER", + "controller.quorum.voters=1@localhost:9093,2@localhost:9093", + "listeners=CONTROLLER://:9093", + "advertised.listeners=CONTROLLER://127.0.0.1:9093" + )) } def prepareConfig(lines : Array[String]): String = { diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala index 399828ddedc43..fefeb50e944a8 100755 --- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala @@ -33,6 +33,7 @@ import org.apache.kafka.common.record._ import org.apache.kafka.common.requests.{ListOffsetsRequest, ListOffsetsResponse} import org.apache.kafka.common.utils.{BufferSupplier, Time, Utils} import org.apache.kafka.coordinator.transaction.TransactionLogConfig +import org.apache.kafka.server.config.KRaftConfigs import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig} import org.apache.kafka.server.metrics.KafkaYammerMetrics @@ -2270,7 +2271,12 @@ class UnifiedLogTest { private def createKafkaConfigWithRLM: KafkaConfig = { val props = new Properties() - props.put("zookeeper.connect", "test") + props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller") + props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "0") + props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER") + props.setProperty("controller.quorum.bootstrap.servers", "localhost:9093") + props.setProperty("listeners", "CONTROLLER://:9093") + props.setProperty("advertised.listeners", "CONTROLLER://127.0.0.1:9093") props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true") props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteStorageManager].getName) props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteLogMetadataManager].getName) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 950975f9faf97..0a7529608edbc 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -58,7 +58,7 @@ import org.apache.kafka.metadata.LeaderConstants.NO_LEADER import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState} import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils} import org.apache.kafka.server.common.{DirectoryEventHandler, KRaftVersion, MetadataVersion, OffsetAndEpoch, RequestLocal, StopPartition} -import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs} +import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerLogConfigs} import org.apache.kafka.server.log.remote.storage._ import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics} import org.apache.kafka.server.network.BrokerEndPoint @@ -4101,7 +4101,12 @@ class ReplicaManagerTest { val tidp0 = new TopicIdPartition(topicId, tp0) val props = new Properties() - props.put("zookeeper.connect", "test") + props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller") + props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "0") + props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER") + props.setProperty("controller.quorum.bootstrap.servers", "localhost:9093") + props.setProperty("listeners", "CONTROLLER://:9093") + props.setProperty("advertised.listeners", "CONTROLLER://127.0.0.1:9093") props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true.toString) props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteStorageManager].getName) props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteLogMetadataManager].getName) @@ -4209,7 +4214,12 @@ class ReplicaManagerTest { val tidp0 = new TopicIdPartition(topicId, tp0) val props = new Properties() - props.put("zookeeper.connect", "test") + props.setProperty("process.roles", "controller") + props.setProperty("node.id", "0") + props.setProperty("controller.listener.names", "CONTROLLER") + props.setProperty("controller.quorum.bootstrap.servers", "localhost:9093") + props.setProperty("listeners", "CONTROLLER://:9093") + props.setProperty("advertised.listeners", "CONTROLLER://127.0.0.1:9093") props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true.toString) props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteStorageManager].getName) props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteLogMetadataManager].getName) diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala index ed8a30aade001..9355a65ceb2cb 100644 --- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala +++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala @@ -294,19 +294,6 @@ Found problem: "Failed to find content in output: " + stream.toString()) } - @Test - def testFormatFailsInZkMode(): Unit = { - val availableDirs = Seq(TestUtils.tempDir()) - val properties = new Properties() - properties.setProperty("log.dirs", availableDirs.mkString(",")) - properties.setProperty("zookeeper.connect", "localhost:2181") - val stream = new ByteArrayOutputStream() - assertEquals("The kafka configuration file appears to be for a legacy cluster. " + - "Formatting is only supported for clusters in KRaft mode.", - assertThrows(classOf[TerseFailure], - () => runFormatCommand(stream, properties)).getMessage) - } - @Test def testFormatWithReleaseVersion(): Unit = { val availableDirs = Seq(TestUtils.tempDir())