Here you have some useful commands for kafka.
Tested on Kafka 2.5
Goals
-
✓ Add most useful commands for kafka
-
✓ Add kafkacat commands
-
❏ Add commands output
-
✓ Add commands for Kafka on Kubernetes
Table of Contents
- Pre-req
- Zookeeper Operations
- Broker Operations
- Topic Operations
- List topics using kafka-topics.sh
- Describe topic
- Describe topic configs
- Delete topic config
- Copy topic to another topic in the same cluster
- Copy topic to another topic in another cluster
- Move topic to another broker
- Create topic
- Create topic with config
- Increase replication factor of __consumer_offsets
- Alter topic
- List topics under-replicated
- Delete topic
- Get earliest offset
- Get latest offset
- Partition Operations
- Consumer
- List consumer groups
- Describe consumer groups
- Describe all consumer groups
- Delete consumer group
- Active member in a consumer group
- Partition Assigned to each member
- Consumer Group State
- Consuming message
- Consuming message and formatting output
- Consuming message from the beginning
- Consuming message from the end
- Consuming message and show output in JSON
- Consuming and showing message key
- Read one message
- Read the last 2 messages from topic and then exit
- Read the last 2 messages from partition 0
- Read from __consumer_offsets
- Describe __consumer_offsets
- Read from __transaction_state
- Consume using consumer group
- Topics to which group is subscribed
- Reset offset
- Describe consumer group
- Check offset for consumer group
- Producer
- Quotas
- ACLs
- Mirror Maker
- Delegation Token
- Performance Test
First, set some kafka environment vars.
# For Kafka running on top of VMs/Bare Metal
KAFKA_BIN=/opt/kafka/bin
ZOOKEEPER_HOST=zookeeper-host:2181
BROKER_HOST=broker-host:9092
# For Kafka running on top of Kubernetes (Using strimzi)
KAFKA_NAMESPACE=kafka-demo
ZOOKEEPER_HOST=localhost:2181
BROKER_HOST=localhost:9092
ZOOKEEPER_POD=$(kubectl -n $KAFKA_NAMESPACE get pods -l app.kubernetes.io/name=zookeeper -o=jsonpath='{.items[0].metadata.name}')
KAFKA_BROKER_POD=$(kubectl -n $KAFKA_NAMESPACE get pods -l app.kubernetes.io/name=kafka -o=jsonpath='{.items[0].metadata.name}')
You need to whitelist all the commands bellow.
zookeeper.properties
4lw.commands.whitelist=stat,ruok,reqs,envi,dump,conf,cons,srvr,wchs,wchc,dirs,wchp,mntr,isro
-
If using Zookeeper Auth (SASL)
# Zookeeper Auth
export KAFKA_OPTS="-Djava.security.auth.login.config=/tmp/jaas.conf"
jass.conf
Client { org.apache.zookeeper.server.auth.DigestLoginModule required username="test" password="test"; };
-
If using SSL/TLS on Zookeeper + SASL
export KAFKA_OPTS="-Djava.security.auth.login.config=/tmp/jaas.conf -Dzookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty -Dzookeeper.client.secure=true -Dzookeeper.ssl.trustStore.location=/tmp/kafka.server.truststore -Dzookeeper.ssl.trustStore.password=mypass -Dzookeeper.ssl.trustStore.type=PKCS12"
Note
|
Remember to change your zookeeper port on the ZOOKEEPER_HOST if necessary
|
# For VMs
echo conf | curl telnet://$ZOOKEEPER_HOST
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo conf | curl telnet://localhost:2181"
# For VMs
echo envi | curl telnet://$ZOOKEEPER_HOST
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo envi | curl telnet://localhost:2181"
# For VMs
echo stats | curl telnet://$ZOOKEEPER_HOST
echo ruok | curl telnet://$ZOOKEEPER_HOST
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo stats | curl telnet://localhost:2181"
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo ruok | curl telnet://localhost:2181"
# For VMs
echo reqs | curl telnet://$ZOOKEEPER_HOST
echo cons | curl telnet://$ZOOKEEPER_HOST
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo reqs | curl telnet://localhost:2181"
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo cons | curl telnet://localhost:2181"
# For VMs
echo srvr | curl telnet://$ZOOKEEPER_HOST
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo srvr | curl telnet://localhost:2181"
# For VMs
echo wchs | curl telnet://$ZOOKEEPER_HOST
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo wchs | curl telnet://localhost:2181"
# For VMs
echo wchc | curl telnet://$ZOOKEEPER_HOST
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo wchs | curl telnet://localhost:2181"
# For VMs
echo dirs | curl telnet://$ZOOKEEPER_HOST
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo dirs | curl telnet://localhost:2181"
# For VMs
echo mntr | curl telnet://$ZOOKEEPER_HOST
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo mntr | curl telnet://localhost:2181"
# For VMs
echo isro | curl telnet://$ZOOKEEPER_HOST
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo isro | curl telnet://localhost:2181"
# For VMs
$KAFKA_BIN/zookeeper-shell.sh $ZOOKEEPER_HOST ls /brokers/ids
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/zookeeper-shell.sh localhost:2181 ls /brokers/ids
kafkacat -b $BROKER_HOST -L
# For VMs
$KAFKA_BIN/zookeeper-shell.sh $ZOOKEEPER_HOST get /controller
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/zookeeper-shell.sh localhost:2181 get /controller
# For VMs
$KAFKA_BIN/zookeeper-shell.sh $ZOOKEEPER_HOST ls /brokers/ids/{id}
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/zookeeper-shell.sh localhost:2181 ls /brokers/ids/{id}
kafkacat -b $BROKER_HOST -L
# For VMs
$KAFKA_BIN/zookeeper-shell.sh $ZOOKEEPER_HOST ls /brokers/topics
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/zookeeper-shell.sh localhost:2181 ls /brokers/topics
kafkacat -b $BROKER_HOST -L -t <my-topic>
Change log cleaner threads.
# For VMs
$KAFKA_BIN/kafka-configs.sh \
--bootstrap-server $BROKER_HOST \
--entity-type brokers \
--entity-name <broker id> \
--alter \
--add-config log.cleaner.threads=2
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
--bootstrap-server $BROKER_HOST \
--entity-type brokers \
--entity-name <broker id> \
--alter \
--add-config log.cleaner.threads=2
# For VMs
$KAFKA_BIN/kafka-configs.sh \
--bootstrap-server $BROKER_HOST \
--entity-type brokers \
--entity-name <broker id> \
--describe
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
--bootstrap-server $BROKER_HOST \
--entity-type brokers \
--entity-name <broker id> \
--describe
# For VMs
$KAFKA_BIN/kafka-configs.sh \
--bootstrap-server $BROKER_HOST \
--entity-type brokers \
--entity-name <broker id> \
--alter \
--delete-config log.cleaner.threads
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
--bootstrap-server $BROKER_HOST \
--entity-type brokers \
--entity-name <broker id> \
--alter \
--delete-config log.cleaner.threads
# For VMs
$KAFKA_BIN/kafka-configs.sh \
--bootstrap-server $BROKER_HOST \
--entity-type brokers \
--entity-default \
--alter \
--add-config log.cleaner.threads=2
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
--bootstrap-server $BROKER_HOST \
--entity-type brokers \
--entity-default \
--alter \
--add-config log.cleaner.threads=2
# For VMs
$KAFKA_BIN/kafka-configs.sh \
--bootstrap-server $BROKER_HOST \
--entity-type brokers \
--entity-default \
--describe
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
--bootstrap-server $BROKER_HOST \
--entity-type brokers \
--entity-default \
--describe
# For VMs
$KAFKA_BIN/kafka-configs.sh \
--bootstrap-server $BROKER_HOST \
--entity-type brokers \
--entity-name <broker-id> \
--alter \
--add-config "listener.name.internal.ssl.endpoint.identification.algorithm="
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
--bootstrap-server $BROKER_HOST \
--entity-type brokers \
--entity-name <broker-id> \
--alter \
--add-config "listener.name.internal.ssl.endpoint.identification.algorithm="
# For VMs
$KAFKA_BIN/kafka-topics.sh \
--list \
--zookeeper $ZOOKEEPER_HOST
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh \
--list \
--zookeeper $ZOOKEEPER_HOST
# For VMs
$KAFKA_BIN/kafka-topics.sh \
--bootstrap-server $BROKER_HOST \
--list
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh \
--list \
--bootstrap-server $BROKER_HOST
# For VMs
$KAFKA_BIN/kafka-topics.sh \
--zookeeper $ZOOKEEPER_HOST \
--topic <topic_name> \
--describe
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh \
--zookeeper $ZOOKEEPER_HOST \
--topic <topic_name> \
--describe
kafkacat -b $BROKER_HOST -L -t <topic_name>
# For VMs
$KAFKA_BIN/kafka-configs.sh \
--zookeeper $ZOOKEEPER_HOST \
--entity-type topics \
--entity-name <topic_name> \
--describe
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
--zookeeper $ZOOKEEPER_HOST \
--entity-type topics \
--entity-name <topic_name> \
--describe
# For VMs
$KAFKA_BIN/kafka-configs.sh \
--zookeeper $ZOOKEEPER_HOST \
--entity-type topics \
--entity-name <topic_name> \
--delete-config <config>
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
--zookeeper $ZOOKEEPER_HOST \
--entity-type topics \
--entity-name <topic_name> \
--delete-config <config>
kafkacat -C -b $BROKER_HOST -t <topic_name> -e | kafkacat -P -b $BROKER_HOST -t <topic-name2>
kafkacat -C -b $BROKER_HOST -t <topic_name> -e | kafkacat -P -b $BROKER_HOST2 -t <topic-name>
topics-to-move.json
{"topics": [{"topic": "topic1"},
{"topic": "topic2"}],
"version":1
}
generate plan to move to broker 5 and 6
# For VMs
$KAFKA_BIN/kafka-reassign-partitions.sh \
--zookeeper $ZOOKEEPER_HOST \
--topics-to-move-json-file topics-to-move.json \
--broker-list "5,6" \
--generate
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
--zookeeper $ZOOKEEPER_HOST \
--entity-type topics \
--entity-name <topic_name> \
--delete-config <config>
Note
|
save the results from the command above to cluster-reassignment.json
|
move to broker 5 and 6
# For VMs
$KAFKA_BIN/kafka-reassign-partitions.sh \
--zookeeper $ZOOKEEPER_HOST \
--reassignment-json-file cluster-reassignment.json \
--execute
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-reassign-partitions.sh \
--zookeeper $ZOOKEEPER_HOST \
--reassignment-json-file cluster-reassignment.json \
--execute
verify status
# For VMs
$KAFKA_BIN/kafka-reassign-partitions.sh \
--zookeeper $ZOOKEEPER_HOST \
--reassignment-json-file cluster-reassignment.json \
--verify
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-reassign-partitions.sh \
--zookeeper $ZOOKEEPER_HOST \
--reassignment-json-file cluster-reassignment.json \
--verify
# For VMs
$KAFKA_BIN/kafka-topics.sh \
--create \
--zookeeper $ZOOKEEPER_HOST \
--replication-factor 1 \
--partitions 1 \
--topic <topic_name>
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh \
--create \
--zookeeper $ZOOKEEPER_HOST \
--replication-factor 1 \
--partitions 1 \
--topic <topic_name>
# For VMs
$KAFKA_BIN/kafka-topics.sh \
--bootstrap-server $BROKER_HOST \
--create \
--topic <topic_name> \
--partitions 1 \
--replication-factor 1 \
--config max.message.bytes=64000 \
--config flush.messages=1
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh \
--bootstrap-server $BROKER_HOST \
--create \
--topic <topic_name> \
--partitions 1 \
--replication-factor 1 \
--config max.message.bytes=64000 \
--config flush.messages=1
reassignment.json
{"version":1,
"partitions":[
{"topic":"__consumer_offsets", "partition":0, "replicas":[106,101,102,105]},
{"topic":"__consumer_offsets", "partition":1, "replicas":[106,101,102,105]},
{"topic":"__consumer_offsets", "partition":2, "replicas":[106,101,102,105]},
{"topic":"__consumer_offsets", "partition":3, "replicas":[106,101,102,105]},
{"topic":"__consumer_offsets", "partition":4, "replicas":[106,101,102,105]},
{"topic":"__consumer_offsets", "partition":5, "replicas":[106,101,102,105]},
{"topic":"__consumer_offsets", "partition":6, "replicas":[106,101,102,105]},
{"topic":"__consumer_offsets", "partition":7, "replicas":[106,101,102,105]},
{"topic":"__consumer_offsets", "partition":8, "replicas":[106,101,102,105]},
{"topic":"__consumer_offsets", "partition":9, "replicas":[106,101,102,105]},
{"topic":"__consumer_offsets", "partition":10, "replicas":[101,102,103,105]},
{"topic":"__consumer_offsets", "partition":11, "replicas":[101,102,103,105]},
{"topic":"__consumer_offsets", "partition":12, "replicas":[101,102,103,105]},
{"topic":"__consumer_offsets", "partition":13, "replicas":[101,102,103,105]},
{"topic":"__consumer_offsets", "partition":14, "replicas":[101,102,103,105]},
{"topic":"__consumer_offsets", "partition":15, "replicas":[101,102,103,105]},
{"topic":"__consumer_offsets", "partition":16, "replicas":[101,102,103,105]},
{"topic":"__consumer_offsets", "partition":17, "replicas":[101,102,103,105]},
{"topic":"__consumer_offsets", "partition":18, "replicas":[101,102,103,105]},
{"topic":"__consumer_offsets", "partition":19, "replicas":[101,102,103,105]},
{"topic":"__consumer_offsets", "partition":20, "replicas":[102,103,104,105]},
{"topic":"__consumer_offsets", "partition":21, "replicas":[102,103,104,105]},
{"topic":"__consumer_offsets", "partition":22, "replicas":[102,103,104,105]},
{"topic":"__consumer_offsets", "partition":23, "replicas":[102,103,104,105]},
{"topic":"__consumer_offsets", "partition":24, "replicas":[102,103,104,105]},
{"topic":"__consumer_offsets", "partition":25, "replicas":[102,103,104,105]},
{"topic":"__consumer_offsets", "partition":26, "replicas":[102,103,104,105]},
{"topic":"__consumer_offsets", "partition":27, "replicas":[102,103,104,105]},
{"topic":"__consumer_offsets", "partition":28, "replicas":[102,103,104,105]},
{"topic":"__consumer_offsets", "partition":29, "replicas":[102,103,104,105]},
{"topic":"__consumer_offsets", "partition":30, "replicas":[103,104,106,105]},
{"topic":"__consumer_offsets", "partition":31, "replicas":[103,104,106,105]},
{"topic":"__consumer_offsets", "partition":32, "replicas":[103,104,106,105]},
{"topic":"__consumer_offsets", "partition":33, "replicas":[103,104,106,105]},
{"topic":"__consumer_offsets", "partition":34, "replicas":[103,104,106,105]},
{"topic":"__consumer_offsets", "partition":35, "replicas":[103,104,106,105]},
{"topic":"__consumer_offsets", "partition":36, "replicas":[103,104,106,105]},
{"topic":"__consumer_offsets", "partition":37, "replicas":[103,104,106,105]},
{"topic":"__consumer_offsets", "partition":38, "replicas":[103,104,106,105]},
{"topic":"__consumer_offsets", "partition":39, "replicas":[103,104,106,105]},
{"topic":"__consumer_offsets", "partition":40, "replicas":[104,106,101,105]},
{"topic":"__consumer_offsets", "partition":41, "replicas":[104,106,101,105]},
{"topic":"__consumer_offsets", "partition":42, "replicas":[104,106,101,105]},
{"topic":"__consumer_offsets", "partition":43, "replicas":[104,106,101,105]},
{"topic":"__consumer_offsets", "partition":44, "replicas":[104,106,101,105]},
{"topic":"__consumer_offsets", "partition":45, "replicas":[104,106,101,105]},
{"topic":"__consumer_offsets", "partition":46, "replicas":[104,106,101,105]},
{"topic":"__consumer_offsets", "partition":47, "replicas":[104,106,101,105]},
{"topic":"__consumer_offsets", "partition":48, "replicas":[104,106,101,105]},
{"topic":"__consumer_offsets", "partition":49, "replicas":[104,106,101,105]}
]
}
# For VMs
$KAFKA_BIN/kafka-reassign-partitions.sh \
--zookeeper $ZOOKEEPER_HOST \
--reassignment-json-file reassignment.json \
--execute
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-reassign-partitions.sh \
--zookeeper $ZOOKEEPER_HOST \
--reassignment-json-file reassignment.json \
--execute
# For VMs
$KAFKA_BIN/kafka-reassign-partitions.sh \
--zookeeper $ZOOKEEPER_HOST \
--reassignment-json-file reassignment.json \
--verify
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-reassign-partitions.sh \
--zookeeper $ZOOKEEPER_HOST \
--reassignment-json-file reassignment.json \
--verify
# For VMs
$KAFKA_BIN/kafka-topics.sh \
--zookeeper $ZOOKEEPER_HOST \
--alter \
--topic <topic_name>\
--config retention.ms=1000
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh \
--zookeeper $ZOOKEEPER_HOST \
--alter \
--topic <topic_name>\
--config retention.ms=1000
# For VMs
$KAFKA_BIN/kafka-topics.sh \
--zookeeper $ZOOKEEPER_HOST \
--alter \
--topic <topic_name> \
--config min.insync.replicas=2
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh \
--zookeeper $ZOOKEEPER_HOST \
--alter \
--topic <topic_name> \
--config min.insync.replicas=2
# For VMs
$KAFKA_BIN/kafka-configs.sh \
--zookeeper $ZOOKEEPER_HOST \
--entity-type topics \
--entity-name <topic_name> \
--alter \
--add-config max.message.bytes=128000
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
--zookeeper $ZOOKEEPER_HOST \
--entity-type topics \
--entity-name <topic_name> \
--alter \
--add-config max.message.bytes=128000
# For VMs
$KAFKA_BIN/kafka-topics.sh \
--zookeeper $ZOOKEEPER_HOST \
--alter \
--topic <topic_name> \
--delete-config retention.ms
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh \
--zookeeper $ZOOKEEPER_HOST \
--alter \
--topic <topic_name> \
--delete-config retention.ms
# For VMs
$KAFKA_BIN/kafka-configs.sh \
--zookeeper $ZOOKEEPER_HOST \
--entity-type topics \
--entity-name <topic_name> \
--alter \
--delete-config retention.ms
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
--zookeeper $ZOOKEEPER_HOST \
--entity-type topics \
--entity-name <topic_name> \
--alter \
--delete-config retention.ms
# For VMs
$KAFKA_BIN/kafka-topics.sh \
--zookeeper $ZOOKEEPER_HOST \
--describe \
--under-replicated-partitions
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh \
--zookeeper $ZOOKEEPER_HOST \
--describe \
--under-replicated-partitions
# For VMs
$KAFKA_BIN/kafka-topics.sh \
--delete \
--zookeeper $ZOOKEEPER_HOST \
--topic <topic_name>
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh \
--delete \
--zookeeper $ZOOKEEPER_HOST \
--topic <topic_name>
# For VMs
$KAFKA_BIN/kafka-topics.sh \
--bootstrap-server $BROKER_HOST \
--delete \
--topic <topic_name>
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh \
--bootstrap-server $BROKER_HOST \
--delete \
--topic <topic_name>
# For VMs
$KAFKA_BIN/kafka-run-class.sh \
kafka.tools.GetOffsetShell \
--broker-list $BROKER_HOST \
--topic <topic_name> \
--time -2
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-run-class.sh \
kafka.tools.GetOffsetShell \
--broker-list $BROKER_HOST \
--topic <topic_name> \
--time -2
# For VMs
$KAFKA_BIN/kafka-run-class.sh \
kafka.tools.GetOffsetShell \
--broker-list $BROKER_HOST \
--topic <topic_name> \
--time -1
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-run-class.sh \
kafka.tools.GetOffsetShell \
--broker-list $BROKER_HOST \
--topic <topic_name> \
--time -1
# For VMs
$KAFKA_BIN/kafka-topics.sh \
--alter \
--topic <topic_name> \
--partitions 8
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh \
--alter \
--topic <topic_name> \
--partitions 8
topics.json
{
"topics": [
{
"topic": "test"
}
],
"version": 1
}
# For VMs
$KAFKA_BIN/kafka-reassign-partitions.sh \
--zookeeper $ZOOKEEPER_HOST \
--generate \
--broker-list "401,402,601" \
--topics-to-move-json-file topics.json
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-reassign-partitions.sh \
--zookeeper $ZOOKEEPER_HOST \
--generate \
--broker-list "401,402,601" \
--topics-to-move-json-file topics.json
new-replication-factor.json
{"version":1,"partitions":[{"topic":"topic1","partition":0,"replicas":[5,6,7]}]}
execute new replication factor
# For VMs
$KAFKA_BIN/kafka-reassign-partitions.sh \
--zookeeper $ZOOKEEPER_HOST \
--reassignment-json-file new-replication-factor.json \
--execute
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-reassign-partitions.sh \
--zookeeper $ZOOKEEPER_HOST \
--reassignment-json-file new-replication-factor.json \
--execute
verify status of partition reassignment
# For VMs
$KAFKA_BIN/kafka-reassign-partitions.sh \
--zookeeper $ZOOKEEPER_HOST \
--reassignment-json-file new-replication-factor.json \
--verify
$KAFKA_BIN/kafka-topics.sh \
--bootstrap-server $ZOOKEEPER_HOST \
--topic <topic_name> \
--describe
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-reassign-partitions.sh \
--zookeeper $ZOOKEEPER_HOST \
--reassignment-json-file new-replication-factor.json \
--verify
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh \
--bootstrap-server $ZOOKEEPER_HOST \
--topic <topic_name> \
--describe
Create plan
topics.json
{
"topics": [
{
"topic": "test"
}
],
"version": 1
}
# For VMs
$KAFKA_BIN/kafka-reassign-partitions.sh \
--zookeeper $ZOOKEEPER_HOST \
--generate \
--broker-list "401,402,601" \
--topics-to-move-json-file topics.json
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-reassign-partitions.sh \
--zookeeper $ZOOKEEPER_HOST \
--generate \
--broker-list "401,402,601" \
--topics-to-move-json-file topics.json
Save the result of the above command to a file named replicas.json
# For VMs
$KAFKA_BIN/kafka-reassign-partitions.sh \
--zookeeper $ZOOKEEPER_HOST \
--reassignment-json-file replicas.json \
--execute
$KAFKA_BIN/kafka-reassign-partitions.sh \
--zookeeper $ZOOKEEPER_HOST \
--reassignment-json-file replicas.json \
--verify
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-reassign-partitions.sh \
--zookeeper $ZOOKEEPER_HOST \
--reassignment-json-file replicas.json \
--execute
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-reassign-partitions.sh \
--zookeeper $ZOOKEEPER_HOST \
--reassignment-json-file replicas.json \
--verify
# For VMs
$KAFKA_BIN/kafka-topics.sh \
--zookeeper $ZOOKEEPER_HOST \
--describe \
--unavailable-partitions
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh \
--zookeeper $ZOOKEEPER_HOST \
--describe \
--unavailable-partitions
# For VMs
$KAFKA_BIN/kafka-leader-election.sh \
--election-type preferred \
--bootstrap-server $BROKER_HOST \
--all-topic-partitions
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-leader-election.sh \
--election-type preferred \
--bootstrap-server $BROKER_HOST \
--all-topic-partitions
# For VMs
$KAFKA_BIN/kafka-leader-election.sh \
--election-type preferred \
--bootstrap-server $BROKER_HOST \
--topic <topic name> \
--partition <partition id>
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-leader-election.sh \
--election-type preferred \
--bootstrap-server $BROKER_HOST \
--topic <topic name> \
--partition <partition id>
# For VMs
$KAFKA_BIN/kafka-consumer-groups.sh \
--list \
--bootstrap-server $BROKER_HOST
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh \
--list \
--bootstrap-server $BROKER_HOST
# For VMs
$KAFKA_BIN/kafka-consumer-groups.sh \
--describe \
--group <group_id> \
--bootstrap-server $BROKER_HOST
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh \
--describe \
--group <group_id> \
--bootstrap-server $BROKER_HOST
# For VMs
$KAFKA_BIN/kafka-consumer-groups.sh \
--describe \
--bootstrap-server $BROKER_HOST \
--all-groups
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh \
--describe \
--bootstrap-server $BROKER_HOST \
--all-groups
# For VMs
$KAFKA_BIN/kafka-consumer-groups.sh \
--bootstrap-server $BROKER_HOST \
--delete \
--group <group-id-1> \
--group <group-id-2>
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh \
--bootstrap-server $BROKER_HOST \
--delete \
--group <group-id-1> \
--group <group-id-2>
# For VMs
$KAFKA_BIN/kafka-consumer-groups.sh \
--bootstrap-server $BROKER_HOST \
--describe \
--group <group-id> \
--members
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh \
--bootstrap-server $BROKER_HOST \
--describe \
--group <group-id> \
--members
# For VMs
$KAFKA_BIN/kafka-consumer-groups.sh \
--bootstrap-server $BROKER_HOST \
--describe \
--group <group_id> \
--members \
--verbose
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh \
--bootstrap-server $BROKER_HOST \
--describe \
--group <group_id> \
--members \
--verbose
# For VMs
$KAFKA_BIN/kafka-consumer-groups.sh \
--bootstrap-server $BROKER_HOST \
--describe \
--group <group-id> \
--state
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh \
--bootstrap-server $BROKER_HOST \
--describe \
--group <group-id> \
--state
# For VMs
$KAFKA_BIN/kafka-console-consumer.sh \
--bootstrap-server $BROKER_HOST \
--topic <topic_name>
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-consumer.sh \
--bootstrap-server $BROKER_HOST \
--topic <topic_name>
kafkacat -C -b $BROKER_HOST -t <topic_name>
kafkacat -C -b $BROKER_HOST -t <topic_name> -q -f 'Topic %t using partition %p at offset %o has key = %k and value = %S'
# For VMs
$KAFKA_BIN/kafka-console-consumer.sh \
--bootstrap-server $BROKER_HOST \
--topic <topic_name> \
--from-beginning
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-consumer.sh \
--bootstrap-server $BROKER_HOST \
--topic <topic_name> \
--from-beginning
# For VMs
$KAFKA_BIN/kafka-console-consumer.sh \
--bootstrap-server $BROKER_HOST \
--topic <topic_name>
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-consumer.sh \
--bootstrap-server $BROKER_HOST \
--topic <topic_name>
# For VMs
$KAFKA_BIN/kafka-console-consumer.sh \
--bootstrap-server $BROKER_HOST \
--topic <topic_name> \
--property print.key=true \
--property key.separator=,
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-consumer.sh \
--bootstrap-server $BROKER_HOST \
--topic <topic_name> \
--property print.key=true \
--property key.separator=,
# For VMs
$KAFKA_BIN/kafka-console-consumer.sh \
--bootstrap-server $BROKER_HOST \
--topic <topic_name> \
--max-messages 1
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-consumer.sh \
--bootstrap-server $BROKER_HOST \
--topic <topic_name> \
--max-messages 1
kafkacat -C -b $BROKER_HOST -t <topic_name> -o -2 -e
# For VMs
$KAFKA_BIN/kafka-console-consumer.sh \
--bootstrap-server $BROKER_HOST \
--topic __consumer_offsets \
--formatter 'kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter' \
--max-messages 1
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-consumer.sh \
--bootstrap-server $BROKER_HOST \
--topic __consumer_offsets \
--formatter 'kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter' \
--max-messages 1
# For VMs
$KAFKA_BIN/kafka-run-class.sh kafka.admin.ConsumerGroupCommand \
--bootstrap-server $BROKER_HOST \
--group <group-id> \
--new-consumer \
--describe
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand \
--bootstrap-server $BROKER_HOST \
--group <group-id> \
--new-consumer \
--describe
# For VMs
$KAFKA_BIN/kafka-console-consumer.sh \
--bootstrap-server $BROKER_HOST \
--formatter "kafka.coordinator.transaction.TransactionLog\$TransactionLogMessageFormatter" \
--topic __transaction_state \
--from-beginning
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-consumer.sh \
--bootstrap-server $BROKER_HOST \
--topic __transaction_state \
--from-beginning \
--formatter "kafka.coordinator.transaction.TransactionLog\$TransactionLogMessageFormatter"
# For VMs
$KAFKA_BIN/kafka-console-consumer.sh \
--topic <topic_name> \
--bootstrap-server $BROKER_HOST \
--group <group-id>
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-consumer.sh \
--topic <topic_name> \
--bootstrap-server $BROKER_HOST \
--group <group-id>
# For VMs
$KAFKA_BIN/kafka-consumer-groups.sh \
--bootstrap-server $BROKER_HOST \
--group <group_id> \
--describe
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh \
--bootstrap-server $BROKER_HOST \
--group <group_id> \
--describe
# For VMs
$KAFKA_BIN/kafka-consumer-groups.sh \
--bootstrap-server $BROKER_HOST \
--reset-offsets \
--group <group-id> \
--topic topic1 \
--to-latest
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh \
--bootstrap-server $BROKER_HOST \
--reset-offsets \
--group <group-id> \
--topic topic1 \
--to-latest
# For VMs
# There are many other resetting options
# --shift-by <positive_or_negative_integer> / --to-current / --to-latest / --to-offset <offset_integer>
# --to-datetime <datetime_string> --by-duration <duration_string>
$KAFKA_BIN/kafka-consumer-groups.sh \
--bootstrap-server $BROKER_HOST \
--group <group_id> \
--topic <topic_name> \
--reset-offsets \
--to-earliest \
--execute
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh \
--bootstrap-server $BROKER_HOST \
--group <group_id> \
--topic <topic_name> \
--reset-offsets \
--to-earliest \
--execute
# For VMs
$KAFKA_BIN/kafka-consumer-groups.sh \
--bootstrap-server $BROKER_HOST \
--all-groups \
--reset-offsets \
--topic <topic_name> \
--to-earliest
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh \
--bootstrap-server $BROKER_HOST \
--all-groups \
--reset-offsets \
--topic <topic_name> \
--to-earliest
# For VMs
$KAFKA_BIN/kafka-consumer-groups.sh \
--bootstrap-server $BROKER_HOST \
--group <groud_id> \
--reset-offsets \
--shift-by 2 \
--execute \
--topic <topic_name>
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh \
--bootstrap-server $BROKER_HOST \
--group <groud_id> \
--reset-offsets \
--shift-by 2 \
--execute \
--topic <topic_name>
# For VMs
$KAFKA_BIN/kafka-consumer-groups.sh \
--bootstrap-server $BROKER_HOST \
--group <groud_id> \
--reset-offsets \
--shift-by -2 \
--execute \
--topic <topic_name>
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh \
--bootstrap-server $BROKER_HOST \
--group <groud_id> \
--reset-offsets \
--shift-by -2 \
--execute \
--topic <topic_name>
# For VMs
$KAFKA_BIN/kafka-consumer-groups.sh \
--bootstrap-server $BROKER_HOST \
--describe \
--group <group_id>
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh \
--bootstrap-server $BROKER_HOST \
--describe \
--group <group_id>
# For VMs
$KAFKA_BIN/kafka-consumer-offset-checker.sh \
--zookeeper $ZOOKEEPER_HOST \
--group <group_id> \
--topic <topic_name>
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-offset-checker.sh \
--zookeeper $ZOOKEEPER_HOST \
--group <group_id> \
--topic <topic_name>
# For VMs
$KAFKA_BIN/kafka-console-producer.sh \
--broker-list $BROKER_HOST \
--topic <topic_name> < messages.txt
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-producer.sh \
--broker-list $BROKER_HOST \
--topic <topic_name> < messages.txt
kafkacat -P -l -b $BROKER_HOST -t <topic_name> messages.txt
# For VMs
$KAFKA_BIN/kafka-console-producer.sh \
--broker-list $BROKER_HOST \
--topic <topic_name>
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-producer.sh \
--broker-list $BROKER_HOST \
--topic <topic_name>
kafkacat -P -b $BROKER_HOST -t <topic_name>
# For VMs
echo "My Message" | $KAFKA_BIN/kafka-console-producer.sh \
--broker-list $BROKER_HOST \
--topic <topic_name>
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-producer.sh \
--broker-list $BROKER_HOST \
--topic <topic_name>
echo "My Message" | kafkacat -b $BROKER_HOST -t <topic_name>
echo "My Message" | kafkacat -b $BROKER_HOST -t <topic_name>
echo "My Message" | kafkacat -b $BROKER_HOST -H "header1=value1" -H "header2=value2"
# For VMs
$KAFKA_BIN/kafka-console-producer.sh \
--broker-list $BROKER_HOST \
--topic <topic_name> \
--producer-property acks=all
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-producer.sh \
--broker-list $BROKER_HOST \
--topic <topic_name> \
--producer-property acks=all
# For VMs
$KAFKA_BIN/kafka-console-producer.sh \
--broker-list $BROKER_HOST \
--topic <topic_name> \
--property parse.key=true \
--property key.separator=,
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-producer.sh \
--broker-list $BROKER_HOST \
--topic <topic_name> \
--property parse.key=true \
--property key.separator=,
Note
|
Your message should be: <mykey>,<message>. For example: Gus,1000. |
# For VMs
$KAFKA_BIN/kafka-configs.sh \
--zookeeper $ZOOKEEPER_HOST \
--alter \
--add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' \
--entity-type users \
--entity-name <user> \
--entity-type clients \
--entity-name <client-id>
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
--zookeeper $ZOOKEEPER_HOST \
--alter \
--add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' \
--entity-type users \
--entity-name <user> \
--entity-type clients \
--entity-name <client-id>
# For VMs
$KAFKA_BIN/kafka-configs.sh \
--zookeeper $ZOOKEEPER_HOST \
--alter \
--add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' \
--entity-type users \
--entity-name <user>
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
--zookeeper $ZOOKEEPER_HOST \
--alter \
--add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' \
--entity-type users \
--entity-name <user>
# For VMs
$KAFKA_BIN/kafka-configs.sh \
--zookeeper $ZOOKEEPER_HOST \
--alter \
--add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' \
--entity-type clients \
--entity-name <client-id>
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
--zookeeper $ZOOKEEPER_HOST \
--alter \
--add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' \
--entity-type clients \
--entity-name <client-id>
# For VMs
$KAFKA_BIN/kafka-configs.sh \
--zookeeper $ZOOKEEPER_HOST \
--alter \
--add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' \
--entity-type users \
--entity-name <user> \
--entity-type clients \
--entity-default
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
--zookeeper $ZOOKEEPER_HOST \
--alter \
--add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' \
--entity-type users \
--entity-name <user> \
--entity-type clients \
--entity-default
# For VMs
$KAFKA_BIN/kafka-configs.sh \
--zookeeper $ZOOKEEPER_HOST \
--alter \
--add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' \
--entity-type users \
--entity-default
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
--zookeeper $ZOOKEEPER_HOST \
--alter \
--add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' \
--entity-type users \
--entity-default
# For VMs
$KAFKA_BIN/kafka-configs.sh \
--zookeeper $ZOOKEEPER_HOST \
--alter \
--add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' \
--entity-type clients \
--entity-default
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
--zookeeper $ZOOKEEPER_HOST \
--alter \
--add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' \
--entity-type clients \
--entity-default
# For VMs
$KAFKA_BIN/kafka-configs.sh \
--zookeeper $ZOOKEEPER_HOST \
--describe \
--entity-type users \
--entity-name <user> \
--entity-type clients \
--entity-name <cliente-id>
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
--zookeeper $ZOOKEEPER_HOST \
--describe \
--entity-type users \
--entity-name <user> \
--entity-type clients \
--entity-name <cliente-id>
# For VMs
$KAFKA_BIN/kafka-configs.sh \
--zookeeper $ZOOKEEPER_HOST \
--describe \
--entity-type users \
--entity-name <user>
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
--zookeeper $ZOOKEEPER_HOST \
--describe \
--entity-type users \
--entity-name <user>
# For VMs
$KAFKA_BIN/kafka-configs.sh \
--zookeeper $ZOOKEEPER_HOST \
--describe \
--entity-type clients \
--entity-name <client-id>
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
--zookeeper $ZOOKEEPER_HOST \
--describe \
--entity-type clients \
--entity-name <client-id>
# For VMs
$KAFKA_BIN/kafka-acls.sh \
--authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST \
--add \
--allow-principal User:<user1> \
--allow-principal User:<user2> \
--allow-host <ip-address1> \
--allow-host <ip-address2> \
--operation Read \
--operation Write \
--topic <topic_name>
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-acls.sh \
--authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST \
--add \
--allow-principal User:<user1> \
--allow-principal User:<user2> \
--allow-host <ip-address1> \
--allow-host <ip-address2> \
--operation Read \
--operation Write \
--topic <topic_name>
# For VMs
$KAFKA_BIN/kafka-acls.sh \
--authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST \
--add \
--allow-principal User:* \
--allow-host * \
--deny-principal User:<user1> \
--deny-host <ip-address> \
--operation Read \
--topic <topic_name>
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-acls.sh \
--authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST \
--add \
--allow-principal User:* \
--allow-host * \
--deny-principal User:<user1> \
--deny-host <ip-address> \
--operation Read \
--topic <topic_name>
# For VMs
$KAFKA_BIN/kafka-acls.sh \
--authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST \
--add \
--allow-principal User:<user1> \
--allow-host <ip-address> \
--producer --topic *
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-acls.sh \
--authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST \
--add \
--allow-principal User:<user1> \
--allow-host <ip-address> \
--producer --topic *
# For VMs
$KAFKA_BIN/kafka-acls.sh \
--authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST \
--add \
--allow-principal User:<user1> \
--allow-host <ip-address> \
--consume --topic *
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-acls.sh \
--authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST \
--add \
--allow-principal User:<user1> \
--allow-host <ip-address> \
--consume --topic *
# For VMs
$KAFKA_BIN/kafka-acls.sh \
--authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST \
--remove \
--allow-principal User:<user1> \
--allow-principal User:<user2> \
--allow-host <ip-address1> \
--allow-host <ip-address2> \
--operation Read \
--operation Write \
--topic <topic_name>
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-acls.sh \
--authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST \
--remove \
--allow-principal User:<user1> \
--allow-principal User:<user2> \
--allow-host <ip-address1> \
--allow-host <ip-address2> \
--operation Read \
--operation Write \
--topic <topic_name>
# For VMs
$KAFKA_BIN/kafka-acls.sh \
--authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST \
--list \
--topic <topic_name>
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-acls.sh \
--authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST \
--list \
--topic <topic_name>
# For VMs
$KAFKA_BIN/kafka-acls.sh \
--authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST \
--list \
--topic *
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-acls.sh \
--authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST \
--list \
--topic *
# For VMs
$KAFKA_BIN/kafka-mirror-maker.sh \
--consumer.config consumer.properties \
--producer.config producer.properties \
--whitelist <topic_name>
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-mirror-maker.sh \
--consumer.config consumer.properties \
--producer.config producer.properties \
--whitelist <topic_name>
# For VMs
$KAFKA_BIN/kafka-delegation-tokens.sh \
--bootstrap-server $BROKER_HOST \
--create \
--max-life-time-period -1 \
--command-config client.properties \
--renewer-principal User:<user>
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-delegation-tokens.sh \
--bootstrap-server $BROKER_HOST \
--create \
--max-life-time-period -1 \
--command-config client.properties \
--renewer-principal User:<user>
# For VMs
$KAFKA_BIN/kafka-delegation-tokens.sh \
--bootstrap-server $BROKER_HOST \
--renew \
--renew-time-period -1 \
--command-config client.properties \
--hmac ABCDEFGHIJK
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-delegation-tokens.sh \
--bootstrap-server $BROKER_HOST \
--renew \
--renew-time-period -1 \
--command-config client.properties \
--hmac ABCDEFGHIJK
# For VMs
$KAFKA_BIN/kafka-delegation-tokens.sh \
--bootstrap-server $BROKER_HOST \
--expire \
--expiry-time-period -1 \
--command-config client.properties \
--hmac ABCDEFGHIJK
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-delegation-tokens.sh \
--bootstrap-server $BROKER_HOST \
--expire \
--expiry-time-period -1 \
--command-config client.properties \
--hmac ABCDEFGHIJK
# For VMs
$KAFKA_BIN/kafka-delegation-tokens.sh \
--bootstrap-server $BROKER_HOST \
--describe \
--command-config client.properties \
--owner-principal User:<user1>
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-delegation-tokens.sh \
--bootstrap-server $BROKER_HOST \
--describe \
--command-config client.properties \
--owner-principal User:<user1>
# For VMs
$KAFKA_BIN/kafka-producer-perf-test.sh \
--topic teste \
--num-records 50000000 \
--record-size 100 \
--throughput -1 \
--producer-props acks=all bootstrap.servers=$BROKER_HOST buffer.memory=67108864 batch.size=8196
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-producer-perf-test.sh \
--topic teste \
--num-records 50000000 \
--record-size 100 \
--throughput -1 \
--producer-props acks=all bootstrap.servers=$BROKER_HOST buffer.memory=67108864 batch.size=8196
# For VMs
$KAFKA_BIN/kafka-consumer-perf-test.sh \
--group grupo \
--print-metrics \
--show-detailed-stats \
--topic teste \
--messages 600000 \
--broker-list $BROKER_HOST \
--timeout 1000000
# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-perf-test.sh \
--group grupo \
--print-metrics \
--show-detailed-stats \
--topic teste \
--messages 600000 \
--broker-list $BROKER_HOST \
--timeout 1000000