Skip to content

Commit

Permalink
KAFKA-16516: Fix the controller node provider for broker to control c…
Browse files Browse the repository at this point in the history
…hannel

Fix the code in the RaftControllerNodeProvider to query RaftManager to find Node information,
rather than consulting a static map. Add a RaftManager.voterNode function to supply this
information. In KRaftClusterTest, add testControllerFailover to get more coverage of controller
failovers.

Reviewers: José Armando García Sancio <jsancio@apache.org>
  • Loading branch information
cmccabe committed May 24, 2024
1 parent 2432a18 commit 90892ae
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 16 deletions.
13 changes: 9 additions & 4 deletions core/src/main/scala/kafka/raft/RaftManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ import kafka.utils.CoreUtils
import kafka.utils.FileLock
import kafka.utils.Logging
import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClient}
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.{KafkaException, Node, TopicPartition, Uuid}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.{ChannelBuilders, ListenerName, NetworkReceive, Selectable, Selector}
Expand All @@ -41,14 +39,15 @@ import org.apache.kafka.common.requests.RequestHeader
import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{LogContext, Time, Utils}
import org.apache.kafka.raft.{FileQuorumStateStore, KafkaNetworkChannel, KafkaRaftClient, KafkaRaftClientDriver, LeaderAndEpoch, RaftClient, QuorumConfig, ReplicatedLog}
import org.apache.kafka.raft.{FileQuorumStateStore, KafkaNetworkChannel, KafkaRaftClient, KafkaRaftClientDriver, LeaderAndEpoch, QuorumConfig, RaftClient, ReplicatedLog}
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.common.serialization.RecordSerde
import org.apache.kafka.server.util.KafkaScheduler
import org.apache.kafka.server.fault.FaultHandler
import org.apache.kafka.server.util.timer.SystemTimer

import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters._

object KafkaRaftManager {
private def createLogDirectory(logDir: File, logDirName: String): File = {
Expand Down Expand Up @@ -133,6 +132,8 @@ trait RaftManager[T] {
def client: RaftClient[T]

def replicatedLog: ReplicatedLog

def voterNode(id: Int, listener: String): Option[Node]
}

class KafkaRaftManager[T](
Expand Down Expand Up @@ -313,4 +314,8 @@ class KafkaRaftManager[T](
override def leaderAndEpoch: LeaderAndEpoch = {
client.leaderAndEpoch
}

override def voterNode(id: Int, listener: String): Option[Node] = {
client.voterNode(id, listener).toScala
}
}
6 changes: 2 additions & 4 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import org.apache.kafka.coordinator.group.metrics.{GroupCoordinatorMetrics, Grou
import org.apache.kafka.coordinator.group.{CoordinatorRecord, GroupCoordinator, GroupCoordinatorConfig, GroupCoordinatorService, CoordinatorRecordSerde}
import org.apache.kafka.image.publisher.MetadataPublisher
import org.apache.kafka.metadata.{BrokerState, ListenerInfo, VersionRange}
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.{AssignmentsManager, ClientMetricsManager, NodeToControllerChannelManager}
import org.apache.kafka.server.authorizer.Authorizer
Expand Down Expand Up @@ -218,12 +217,11 @@ class BrokerServer(
tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)

val voterConnections = FutureUtils.waitWithLogging(logger.underlying, logIdent,
FutureUtils.waitWithLogging(logger.underlying, logIdent,
"controller quorum voters future",
sharedServer.controllerQuorumVotersFuture,
startupDeadline, time)
val controllerNodes = QuorumConfig.voterConnectionsToNodes(voterConnections).asScala
val controllerNodeProvider = RaftControllerNodeProvider(raftManager, config, controllerNodes)
val controllerNodeProvider = RaftControllerNodeProvider(raftManager, config)

clientToControllerChannelManager = new NodeToControllerChannelManagerImpl(
controllerNodeProvider,
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ class ControllerServer(
/**
* Start the KIP-919 controller registration manager.
*/
val controllerNodeProvider = RaftControllerNodeProvider(raftManager, config, controllerNodes.asScala)
val controllerNodeProvider = RaftControllerNodeProvider(raftManager, config)
registrationChannelManager = new NodeToControllerChannelManagerImpl(
controllerNodeProvider,
time,
Expand Down
3 changes: 1 addition & 2 deletions core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -446,8 +446,7 @@ class KafkaServer(
CompletableFuture.completedFuture(quorumVoters),
fatalFaultHandler = new LoggingFaultHandler("raftManager", () => shutdown())
)
val controllerNodes = QuorumConfig.voterConnectionsToNodes(quorumVoters).asScala
val quorumControllerNodeProvider = RaftControllerNodeProvider(raftManager, config, controllerNodes)
val quorumControllerNodeProvider = RaftControllerNodeProvider(raftManager, config)
val brokerToQuorumChannelManager = new NodeToControllerChannelManagerImpl(
controllerNodeProvider = quorumControllerNodeProvider,
time = time,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,12 @@ object RaftControllerNodeProvider {
def apply(
raftManager: RaftManager[ApiMessageAndVersion],
config: KafkaConfig,
controllerQuorumVoterNodes: Seq[Node]
): RaftControllerNodeProvider = {
val controllerListenerName = new ListenerName(config.controllerListenerNames.head)
val controllerSecurityProtocol = config.effectiveListenerSecurityProtocolMap.getOrElse(controllerListenerName, SecurityProtocol.forName(controllerListenerName.value()))
val controllerSaslMechanism = config.saslMechanismControllerProtocol
new RaftControllerNodeProvider(
raftManager,
controllerQuorumVoterNodes,
controllerListenerName,
controllerSecurityProtocol,
controllerSaslMechanism
Expand All @@ -120,15 +118,15 @@ object RaftControllerNodeProvider {
*/
class RaftControllerNodeProvider(
val raftManager: RaftManager[ApiMessageAndVersion],
controllerQuorumVoterNodes: Seq[Node],
val listenerName: ListenerName,
val securityProtocol: SecurityProtocol,
val saslMechanism: String
) extends ControllerNodeProvider with Logging {
private val idToNode = controllerQuorumVoterNodes.map(node => node.id() -> node).toMap

private def idToNode(id: Int): Option[Node] = raftManager.voterNode(id, listenerName.value())

override def getControllerInfo(): ControllerInformation =
ControllerInformation(raftManager.leaderAndEpoch.leaderId.asScala.map(idToNode),
ControllerInformation(raftManager.leaderAndEpoch.leaderId.asScala.flatMap(idToNode),
listenerName, securityProtocol, saslMechanism, isZkController = false)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1541,6 +1541,44 @@ class KRaftClusterTest {
cluster.close()
}
}

@Test
def testControllerFailover(): Unit = {
val cluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setNumBrokerNodes(1).
setNumControllerNodes(5).build()).build()
try {
cluster.format()
cluster.startup()
cluster.waitForReadyBrokers()
TestUtils.waitUntilTrue(() => cluster.brokers().get(0).brokerState == BrokerState.RUNNING,
"Broker never made it to RUNNING state.")
TestUtils.waitUntilTrue(() => cluster.raftManagers().get(0).client.leaderAndEpoch().leaderId.isPresent,
"RaftManager was not initialized.")

val admin = Admin.create(cluster.clientProperties())
try {
// Create a test topic
admin.createTopics(Collections.singletonList(
new NewTopic("test-topic", 1, 1.toShort))).all().get()
waitForTopicListing(admin, Seq("test-topic"), Seq())

// Shut down active controller
val active = cluster.waitForActiveController()
cluster.raftManagers().get(active.asInstanceOf[QuorumController].nodeId()).shutdown()

// Create a test topic on the new active controller
admin.createTopics(Collections.singletonList(
new NewTopic("test-topic2", 1, 1.toShort))).all().get()
waitForTopicListing(admin, Seq("test-topic2"), Seq())
} finally {
admin.close()
}
} finally {
cluster.close()
}
}
}

class BadAuthorizer extends Authorizer {
Expand Down
5 changes: 5 additions & 0 deletions raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.raft;

import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression;
Expand Down Expand Up @@ -2548,6 +2549,10 @@ public OptionalLong highWatermark() {
}
}

public Optional<Node> voterNode(int id, String listener) {
return partitionState.lastVoterSet().voterNode(id, listener);
}

// Visible only for test
QuorumState quorum() {
// It's okay to return null since this method is only called by tests
Expand Down
1 change: 1 addition & 0 deletions raft/src/main/java/org/apache/kafka/raft/QuorumState.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.OptionalInt;
import java.util.Random;
import java.util.function.Supplier;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
Expand Down
14 changes: 14 additions & 0 deletions raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.kafka.common.Node;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.feature.SupportedVersionRange;
import org.apache.kafka.common.message.VotersRecord;
Expand Down Expand Up @@ -366,4 +368,16 @@ public static VoterSet fromInetSocketAddresses(String listener, Map<Integer, Ine

return new VoterSet(voterNodes);
}

public Optional<Node> voterNode(int id, String listener) {
VoterNode voterNode = voters.get(id);
if (voterNode == null) {
return Optional.empty();
}
InetSocketAddress address = voterNode.listeners.get(listener);
if (address == null) {
return Optional.empty();
}
return Optional.of(new Node(id, address.getHostString(), address.getPort()));
}
}

0 comments on commit 90892ae

Please sign in to comment.