You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
brokers在zookeeper里的值为:
get /brokers/ids/1
{"listener_security_protocol_map":{"SASL_PLAINTEXT":"SASL_PLAINTEXT"},"endpoints":["SASL_PLAINTEXT://QL110:9092"],"jmx_port":9999,"host":null,"timestamp":"1666333001145","port":-1,"version":4}
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicAuthorizationException: Topic authorization failed.
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at com.xiaojukeji.know.streaming.km.core.service.partition.impl.PartitionServiceImpl.batchGetPartitionOffsetFromKafkaAdminClient(PartitionServiceImpl.java:393)
at com.xiaojukeji.know.streaming.km.core.service.version.impl.VersionControlServiceImpl.doHandler(VersionControlServiceImpl.java:93)
at com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService.doVCHandler(BaseVersionControlService.java:62)
at com.xiaojukeji.know.streaming.km.core.service.version.BaseKafkaVersionControlService.doVCHandler(BaseKafkaVersionControlService.java:29)
at com.xiaojukeji.know.streaming.km.core.service.partition.impl.PartitionServiceImpl.getAllPartitionOffsetFromKafka(PartitionServiceImpl.java:195)
at com.xiaojukeji.know.streaming.km.core.service.cluster.impl.ClusterMetricServiceImpl.getMessageSize(ClusterMetricServiceImpl.java:352)
at com.xiaojukeji.know.streaming.km.core.service.version.impl.VersionControlServiceImpl.doHandler(VersionControlServiceImpl.java:93)
at com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService.doVCHandler(BaseVersionControlService.java:62)
at com.xiaojukeji.know.streaming.km.core.service.version.BaseKafkaVersionControlService.doVCHandler(BaseKafkaVersionControlService.java:29)
at com.xiaojukeji.know.streaming.km.core.service.cluster.impl.ClusterMetricServiceImpl.collectClusterMetricsFromKafka(ClusterMetricServiceImpl.java:208)
at com.xiaojukeji.know.streaming.km.collector.metric.kafka.ClusterMetricCollector.lambda$collectKafkaMetrics$0(ClusterMetricCollector.java:57)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.common.errors.TopicAuthorizationException: Topic authorization failed.
kafka 集群配置如下:
broker.id=1
listeners=SASL_PLAINTEXT://QL110:9092
advertised.listeners=SASL_PLAINTEXT://QL110:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
default.replication.factor=3
log.cleanup.policy=delete
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.91.198.2:2181,10.91.198.1:2181,10.91.198.3:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
sasl.enabled.mechanisms=SCRAM-SHA-256
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
security.inter.broker.protocol=SASL_PLAINTEXT
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:admin;User:tiuser
brokers在zookeeper里的值为:
get /brokers/ids/1
{"listener_security_protocol_map":{"SASL_PLAINTEXT":"SASL_PLAINTEXT"},"endpoints":["SASL_PLAINTEXT://QL110:9092"],"jmx_port":9999,"host":null,"timestamp":"1666333001145","port":-1,"version":4}
此时接入集群,正常填写zk、bootstrapServers、jmx、集群配置(SASL相关)配置,前端web response:
{"message":"成功","code":0,"data":{"jmxPort":9999,"kafkaVersion":null,"errList":[{"message":"连接Jmx失败","code":31,"data":null}],"zookeeper":"xxx.xxx.xxx.xx:2181"}}
如果此时忽略进行保存确定,后台会大量报:
2023-02-14 06:41:16.205 ERROR 48967 --- [-1-8-thread-484] c.x.k.s.k.c.s.p.i.PartitionServiceImpl : method=batchGetPartitionOffsetFromKafkaAdminClient||clusterPhyId=1||topicName=OGGTOORACLE||offsetSpec=com.xiaojukeji.know.streaming.km.common.bean.entity.offset.KSOffsetSpec$KSLatestSpec@494c3a83||errMsg=exception!
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicAuthorizationException: Topic authorization failed.
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at com.xiaojukeji.know.streaming.km.core.service.partition.impl.PartitionServiceImpl.batchGetPartitionOffsetFromKafkaAdminClient(PartitionServiceImpl.java:393)
at com.xiaojukeji.know.streaming.km.core.service.version.impl.VersionControlServiceImpl.doHandler(VersionControlServiceImpl.java:93)
at com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService.doVCHandler(BaseVersionControlService.java:62)
at com.xiaojukeji.know.streaming.km.core.service.version.BaseKafkaVersionControlService.doVCHandler(BaseKafkaVersionControlService.java:29)
at com.xiaojukeji.know.streaming.km.core.service.partition.impl.PartitionServiceImpl.getAllPartitionOffsetFromKafka(PartitionServiceImpl.java:195)
at com.xiaojukeji.know.streaming.km.core.service.cluster.impl.ClusterMetricServiceImpl.getMessageSize(ClusterMetricServiceImpl.java:352)
at com.xiaojukeji.know.streaming.km.core.service.version.impl.VersionControlServiceImpl.doHandler(VersionControlServiceImpl.java:93)
at com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService.doVCHandler(BaseVersionControlService.java:62)
at com.xiaojukeji.know.streaming.km.core.service.version.BaseKafkaVersionControlService.doVCHandler(BaseKafkaVersionControlService.java:29)
at com.xiaojukeji.know.streaming.km.core.service.cluster.impl.ClusterMetricServiceImpl.collectClusterMetricsFromKafka(ClusterMetricServiceImpl.java:208)
at com.xiaojukeji.know.streaming.km.collector.metric.kafka.ClusterMetricCollector.lambda$collectKafkaMetrics$0(ClusterMetricCollector.java:57)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.common.errors.TopicAuthorizationException: Topic authorization failed.
解决方法:
com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service.impl.KafkaZKDAOImpl.class 里的 getBrokerMetadata(String zkAddress) 方法体(55-56行之间)里增加BrokerMetadata.parseAndUpdateBrokerMetadata(metadata);逻辑:如果host为空,就取endpoint里的值,具体详见方法体,然后重新打包部署
The text was updated successfully, but these errors were encountered: