Skip to content

Commit

Permalink
KAFKA-15750: KRaft support in KafkaMetricReporterExceptionHandlingTest (
Browse files Browse the repository at this point in the history
apache#14707)


Reviewers: Mickael Maison <mickael.maison@gmail.com>, Qichao Chu <qichao@uber.com>, Gantigmaa Selenge <gselenge@redhat.com>
  • Loading branch information
linzihao1999 committed Jan 15, 2024
1 parent 8d89e63 commit 3041151
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,25 @@

package kafka.integration

import java.io.File
import java.util
import java.util.Arrays
import kafka.server.QuorumTestHarness
import kafka.server._
import kafka.utils.TestUtils
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}

import scala.collection.{Seq, mutable}
import scala.jdk.CollectionConverters._
import java.util.Properties
import kafka.utils.TestUtils.{createAdminClient, resource}
import org.apache.kafka.common.acl.AccessControlEntry
import org.apache.kafka.common.{KafkaException, Uuid}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity}
import org.apache.kafka.common.resource.ResourcePattern
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.security.scram.ScramCredential
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{KafkaException, Uuid}
import org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}

import java.io.File
import java.util
import java.util.{Arrays, Collections, Properties}
import scala.collection.{Seq, mutable}
import scala.jdk.CollectionConverters._

/**
* A test harness that brings up some number of broker nodes
Expand Down Expand Up @@ -385,4 +384,20 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
)
}
}

def changeClientIdConfig(sanitizedClientId: String, configs: Properties): Unit = {
if (isKRaftTest()) {
resource(createAdminClient(brokers, listenerName)) {
admin => {
admin.alterClientQuotas(Collections.singleton(
new ClientQuotaAlteration(
new ClientQuotaEntity(Map(ClientQuotaEntity.CLIENT_ID -> (if (sanitizedClientId == "<default>") null else sanitizedClientId)).asJava),
configs.asScala.map { case (key, value) => new ClientQuotaAlteration.Op(key, value.toDouble) }.toList.asJava))).all().get()
}
}
}
else {
adminZkClient.changeClientIdConfig(sanitizedClientId, configs)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,26 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
* */

package kafka.server

import java.net.Socket
import java.util.{Collections, Properties}

import kafka.utils.TestUtils
import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.common.config.internals.QuotaConfigs
import org.apache.kafka.common.message.ListGroupsRequestData
import org.apache.kafka.common.metrics.{KafkaMetric, MetricsReporter}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{ListGroupsRequest, ListGroupsResponse}
import org.apache.kafka.common.metrics.MetricsReporter
import org.apache.kafka.common.metrics.KafkaMetric
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.protocol.Errors
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import java.util.concurrent.atomic.AtomicInteger
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource

import org.apache.kafka.common.message.ListGroupsRequestData
import java.net.Socket
import java.util.concurrent.atomic.AtomicInteger
import java.util.{Collections, Properties}

/*
* this test checks that a reporter that throws an exception will not affect other reporters
Expand All @@ -50,19 +50,21 @@ class KafkaMetricReporterExceptionHandlingTest extends BaseRequestTest {
// need a quota prop to register a "throttle-time" metrics after server startup
val quotaProps = new Properties()
quotaProps.put(QuotaConfigs.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, "0.1")
adminZkClient.changeClientIdConfig("<default>", quotaProps)

changeClientIdConfig("<default>", quotaProps)
}

@AfterEach
override def tearDown(): Unit = {
KafkaMetricReporterExceptionHandlingTest.goodReporterRegistered.set(0)
KafkaMetricReporterExceptionHandlingTest.badReporterRegistered.set(0)

super.tearDown()
}

@Test
def testBothReportersAreInvoked(): Unit = {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testBothReportersAreInvoked(quorum: String): Unit = {
val port = anySocketServer.boundPort(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
val socket = new Socket("localhost", port)
socket.setSoTimeout(10000)
Expand Down
15 changes: 1 addition & 14 deletions core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.kafka.common.message._
import org.apache.kafka.common.metrics.{KafkaMetric, Quota, Sensor}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter}
import org.apache.kafka.common.quota.ClientQuotaFilter
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests._
import org.apache.kafka.common.resource.{PatternType, ResourceType => AdminResourceType}
Expand Down Expand Up @@ -134,19 +134,6 @@ class RequestQuotaTest extends BaseRequestTest {
}
}

private def changeClientIdConfig(sanitizedClientId: String, configs: Properties): Unit = {
if (isKRaftTest()) {
val admin = createAdminClient()
admin.alterClientQuotas(Collections.singleton(
new ClientQuotaAlteration(
new ClientQuotaEntity(Map(ClientQuotaEntity.CLIENT_ID -> (if (sanitizedClientId == "<default>") null else sanitizedClientId)).asJava),
configs.asScala.map { case (key, value) => new ClientQuotaAlteration.Op(key, value.toDouble) }.toList.asJava)
)).all().get()
} else {
adminZkClient.changeClientIdConfig(sanitizedClientId, configs)
}
}

@AfterEach
override def tearDown(): Unit = {
try executor.shutdownNow()
Expand Down

0 comments on commit 3041151

Please sign in to comment.