diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala index 4e8c261e4f8fa..5a74febb4bd46 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala @@ -30,7 +30,6 @@ final class Broker private( val host: String, /** Broker's port */ val port: Int) extends Serializable { - override def equals(obj: Any): Boolean = obj match { case that: Broker => this.host == that.host && @@ -45,7 +44,6 @@ final class Broker private( override def toString(): String = { s"Broker($host, $port)" } - } /** diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java index a007c0bff8f97..9d2e1705c6c73 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java @@ -80,7 +80,8 @@ public void testKafkaRDD() throws InterruptedException { HashMap emptyLeaders = new HashMap(); HashMap leaders = new HashMap(); - Broker broker = Broker.create(suiteBase.brokerHost(), suiteBase.brokerPort()); + String[] hostAndPort = suiteBase.brokerAddress().split(":"); + Broker broker = Broker.create(hostAndPort[0], Integer.parseInt(hostAndPort[1])); leaders.put(new TopicAndPartition(topic1, 0), broker); leaders.put(new TopicAndPartition(topic2, 0), broker); diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala index afaf8f16c5d07..e4966eebb9b34 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala @@ -53,8 +53,8 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin private val zkConnectionTimeout = 6000 private val zkSessionTimeout = 6000 private var zookeeper: EmbeddedZookeeper = _ - protected val brokerHost = "localhost" - protected var brokerPort = 9092 + private val brokerHost = "localhost" + private var brokerPort = 9092 private var brokerConf: KafkaConfig = _ private var server: KafkaServer = _ private var producer: Producer[String, String] = _