-
Notifications
You must be signed in to change notification settings - Fork 198
uReplicator User Guide
As introduced in the original uReplicator design, uReplicator aimed to provide a Kafka replication solution with better scalability and stability compared to existing solutions. The first generation of uReplicator was open sourced in 2016 and used to replicate Uber’s Kafka traffic in production. As the scale of Kafka infrastructure kept growing, we developed the second generation of uReplicator which provided better scalability and simplified operation complexity. The second generation uReplicator (Federated uReplicator) was released in 2018 and replicated Trillions of messages (~Petabytes) per day across hundreds of replication paths for Uber.
uReplicator is good at:
- High throughput
- uReplicator has a controller to assign partitions to workers based on throughput in source cluster so each worker can achieve max throughput. (Currently it depends on Chaperone; We will make it get workload from JMX in the future)
- uReplicator checks lags on each worker and removes heavy traffic from lagging workers.
- High availability
- uReplicator uses smart rebalance instead of high level consumer rebalance which can guarantee smooth replication.
- High scalability
- When the scale of Kafka infrastructure increases, simply add more hosts to uReplicator and it will scale up automatically
- Smart operation (Federated uReplicator)
- Federated uReplicator can set up replication route automatically.
- When a route has higher traffic or lag, Federated uReplicator can add workers automatically and release afterwards.
- uReplicator can detect new partitions in source cluster and start replication automatically
uReplicator controller runs the customized rebalance algorithm to assign the partitions to each worker through Apache Helix. It distributes and assigns topic partitions to each worker process based on number of partitions and traffic/lag information. If a worker process is down, the controller can automatically redistribute the partitions to other workers.
uReplicator worker contains multiple Kafka consumer and producer to replicate the data from source to destination. It provides features like partition mapping to reduce contention in destination cluster.
A uReplicator route is a virtual group of controllers and workers. Each route normally contains 3 controller and N workers (N depends on your traffic). Each route is responsible for replication from one source to one destination cluster. If you want to replicate between multiple clusters, you need to either set up multiple routes or use Federated uReplicator.
uReplicator encounters scalability and operation limits as the numbers of partitions and deployments keep increasing. The new Federated uReplicator aims to address these issues and provide an extensible architecture for further scaling.
In order to limit the number of deployments, uReplicator Deployment groups uReplicator Routes with same use cases together, e.g., all aggregation routes are under aggregation deployments. Each deployment will have one manager (or more) to set up routes automatically.
The uReplicator Manager within each deployment handles the setup of uReplicator Routes and add topics to correct Routes. the mirroring of topics that are assigned from uReplicator Front. It might contain multiple source and destination Kafka clusters. The manager selects one controller from controller pool and multiple workers from worker pool to form one uReplicator Route (see below). The manager adds the topic to an existing route or starts a new route based on predefined rules, e.g. number of partitions and total traffic. The manager only handles topic level assignment (to uReplicator Route) and worker allocation. The partition level assignment (to uReplicator Worker) will be handled by uReplicator Controller. Thus, uReplicator Manager is the federation level of uReplicator Controller.
Federated uReplicator is backward compatible with original uReplicator. If you want to use federation mode, just put a flag in the controller and worker configurations; Otherwise, you just need to deploy controller and worker without manager.
Federated uReplicator is good when you have many replication paths, i.e., cluster1->cluster4, cluster1->cluster5, cluster2->cluster4, ... , cluster3->cluster5, you need to set up six deployments for each of them in non-federated mode; In federated mode, you just need one deployments: (cluster1|2|3->cluster4|5). If you expect you to have more clusters in the future, we recommend to use federated mode to reduce operation cost in the future.
Note: The quick start is for non-federated mode.
Check out the uReplicator project:
git clone git@github.com:uber/uReplicator.git
cd uReplicator
This project contains everything (controller/worker) you’ll need to run uReplicator.
Before you can run uReplicator, you need to build a package for it. This package is what your deployment tool uses to deploy uReplicator.
mvn clean package
Or command below (the previous one will take a long time to run):
mvn clean package -DskipTests
To test uReplicator locally, you need two systems: Kafka, and ZooKeeper. The script “grid” is to help you set up these systems.
- Modify permission for the scripts generated by Maven:
chmod u+x bin/pkg/*.sh
- The command below will download, install, and start ZooKeeper and Kafka (will start two Kafka systems: kafka1, which we use as source Kafka cluster, and kafka2, which we use as destination Kafka cluster):
bin/grid bootstrap
- Create a dummyTopic in kafka1 and produce some dummy data:
./bin/produce-data-to-kafka-topic-dummyTopic.sh
- Check if the data is successfully produced to kafka1 by opening another console tab and executing the command below:
./deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181/cluster1 --topic dummyTopic
- You should get this data:
Kafka topic dummy topic data 1
Kafka topic dummy topic data 2
Kafka topic dummy topic data 3
Kafka topic dummy topic data 4
…
Example 1: Copy data from source cluster to destination cluster
- Start uReplicator Controller (you should keep it running):
java -Dlog4j.configuration=file:config/tools-log4j.properties -Xms3g -Xmx3g -Xmn512m -XX:NewSize=512m -XX:MaxNewSize=512m -XX:+AlwaysPreTouch -XX:+UseCompressedOops -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+DisableExplicitGC -XX:+PrintCommandLineFlags -XX:CMSInitiatingOccupancyFraction=80 -XX:SurvivorRatio=2 -XX:+PrintTenuringDistribution -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -XX:+PrintGCTimeStamps -Xloggc:/tmp/ureplicator-controller/gc-ureplicator-controller.log -server -cp uReplicator-Controller/target/uReplicator-Controller-2.0.2-SNAPSHOT-jar-with-dependencies.jar com.uber.stream.kafka.mirrormaker.controller.ControllerStarter -enableFederated false -mode auto ackUpToGit false -enableAutoTopicExpansion true -port 9000 -refreshTimeInSeconds 10 -srcKafkaZkPath localhost:2181/cluster1 -zookeeper localhost:2181 -destKafkaZkPath localhost:2181/cluster2 -helixClusterName testMirrorMaker
- Start uReplicator Worker (you should keep it running, and it’s normal if you see kafka.consumer.ConsumerTimeoutException at this moment, since no topic has been added for copying):
java -Dlog4j.configuration=file:config/test-log4j.properties -XX:MaxGCPauseMillis=100 -XX:InitiatingHeapOccupancyPercent=45 -verbose:gc -Xmx1g -Xms1g -XX:+UseG1GC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution -Xloggc:gc-ureplicator-worker.log -server -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=9089 -Dcom.sun.management.jmxremote.rmi.port=9089 -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -cp uReplicator-Worker/target/uReplicator-Worker-2.0.2-SNAPSHOT-jar-with-dependencies.jar com.uber.stream.ureplicator.worker.WorkerStarter -federated_enabled false -consumer_config config/consumer.properties -producer_config config/producer.properties -helix_config config/helix.properties -topic_mappings config/topicmapping.properties
- Add topic to uReplicator Controller to start copying from kafka1 to kafka2:
curl -X POST -d '{"topic":"dummyTopic", "numPartitions":"1"}' http://localhost:9000/topics
- To check if the data is successfully copied to kafka2, you should open another console tab and execute the command below:
./deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181/cluster2 --topic dummyTopic1
- And you will see the same messages produced in kafka1:
Kafka topic dummy topic data 1
Kafka topic dummy topic data 2
Kafka topic dummy topic data 3
Kafka topic dummy topic data 4
…
Example 2: Copy data from source Kafka to destination Kafka cluster without explicitly whitelisting topics
- Start uReplicator Controller (you should keep it running):
java -Dlog4j.configuration=file:config/tools-log4j.properties -Xms3g -Xmx3g -Xmn512m -XX:NewSize=512m -XX:MaxNewSize=512m -XX:+AlwaysPreTouch -XX:+UseCompressedOops -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+DisableExplicitGC -XX:+PrintCommandLineFlags -XX:CMSInitiatingOccupancyFraction=80 -XX:SurvivorRatio=2 -XX:+PrintTenuringDistribution -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -XX:+PrintGCTimeStamps -Xloggc:/tmp/ureplicator-controller/gc-ureplicator-controller.log -server -cp uReplicator-Controller/target/uReplicator-Controller-2.0.2-SNAPSHOT-jar-with-dependencies.jar com.uber.stream.kafka.mirrormaker.controller.ControllerStarter -enableFederated false -mode auto ackUpToGit false -enableAutoTopicExpansion true -port 9000 -refreshTimeInSeconds 10 -srcKafkaZkPath localhost:2181/cluster1 -zookeeper localhost:2181 -destKafkaZkPath localhost:2181/cluster2 -helixClusterName testMirrorMaker -enableAutoWhitelist true
- Start uReplicator Worker (you should keep it running, and it’s normal if you see kafka.consumer.ConsumerTimeoutException at this moment since no topic has been added for copying):
java -Dlog4j.configuration=file:config/test-log4j.properties -XX:MaxGCPauseMillis=100 -XX:InitiatingHeapOccupancyPercent=45 -verbose:gc -Xmx1g -Xms1g -XX:+UseG1GC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution -Xloggc:gc-ureplicator-worker.log -server -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=9089 -Dcom.sun.management.jmxremote.rmi.port=9089 -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -cp uReplicator-Worker/target/uReplicator-Worker-2.0.2-SNAPSHOT-jar-with-dependencies.jar com.uber.stream.ureplicator.worker.WorkerStarter -federated_enabled false -consumer_config config/consumer.properties -producer_config config/producer.properties -helix_config config/helix.properties
- Create topic in kafka2. Example 2 enables topic auto-whitelisting, so you don't need to whitelist topics manually. If a topic is in both source and destination Kafka clusters, the controller auto-whitelists the topic and starts copying data.
./deploy/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181/cluster2 --topic dummyTopic --partition 1 --replication-factor 1
- To check if the data is successfully copied to kafka2, open another console tab and execute this command (you might need to wait about 20 seconds for controller to refresh):
./deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181/cluster2 --topic dummyTopic
- And you should see the same messages produced in kafka1:
Kafka topic dummy topic data 1
Kafka topic dummy topic data 2
Kafka topic dummy topic data 3
Kafka topic dummy topic data 4
…
When you’re done, you can clean everything up using the same grid script:
./bin/pkg/stop-all.sh
Congratulations! You’ve now set up a local grid that includes Kafka and ZooKeeper, and you've run a uReplicator worker on it.
Example 3: Run ureplicator on Docker
- Build uReplicator devenv
docker build -t devenv devenv/.
- Start uReplicator DockerImages
docker run -d -p 2181:2181 -p 9093:9093 -p 9094:9094 --add-host devenv:127.0.0.1 --name devenv devenv
- Build uReplicator Images
docker build -t ureplicator .
- Start uReplicator Controller
docker run -d --link devenv:devenv -p 9000:9000 --name controller --expose=9000 ureplicator "controller" -mode customized \
-enableAutoWhitelist true \
-port 9000 \
-refreshTimeInSeconds 10 \
-srcKafkaZkPath devenv:2181/cluster1 \
-zookeeper devenv:2181 \
-destKafkaZkPath devenv:2181/cluster2 \
-helixClusterName testMirrorMaker
- Start uReplicator Worker
docker run -d --link devenv:devenv --name worker ureplicator "worker" \
--consumer_config example/example-consumer.properties \
--producer_config example/example-producer.properties \
--helix_config example/example-helix.properties
- Create topic in kafka2. Example 3 enables topic auto-whitelisting, so you don't need to whitelist topics manually. If a topic is in both source and destination Kafka clusters, the controller auto-whitelists the topic and starts copying data.
docker exec -it devenv bash
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181/cluster2 --topic dummyTopic --partition 4 --replication-factor 1
- Produce test data
docker exec -it devenv bash
./produce-data-to-kafka-topic-dummyTopic.sh
- To check if the data is successfully copied to kafka2, open another console tab and execute this command
docker exec -it devenv bash
$KAFKA_HOME/bin/kafka-console-consumer.sh --zookeeper localhost:2181/cluster2 --topic dummyTopic
Example 4 Run urepliator with 2 workers
- Build Docker Compose
docker-compose -f docker-compose-example4.yml build
- Start Docker Compose
docker-compose -f docker-compose-example4.yml up
- Produce test data
docker exec -it ureplicator_devenv_1 bash;
./produce-data-to-kafka-topic-dummyTopic.sh
- Create topic in kafka2 with 4 partition
docker exec -it ureplicator_devenv_1 bash;
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181/cluster2 --topic dummyTopic1 --partition 4 --replication-factor 1
- Add topic to uReplicator Controller to start copying from kafka1 to kafka2:
curl -X POST -d '{"topic":"dummyTopic", "numPartitions":"4"}' http://localhost:9000/topics
- To check if the data is successfully copied to kafka2, open another console tab and execute this command
docker exec -it ureplicator_devenv_1 bash;
$KAFKA_HOME/bin/kafka-console-consumer.sh --zookeeper localhost:2181/cluster2 --topic dummyTopic1
- Check topic partition allocation
curl localhost:9000/topics/dummyTopic | jq .
- Check online instances
curl localhost:9000/instances | jq .
Name | Default | Example | Description |
helixClusterName | uReplicator | Helix cluster name, Helix will create a node under the path provided in '-zookeeper' | |
zookeeper | zk1, zk2/cluster1-cluster2 | Zookeeper path for each deployment, we recommend to put src-dst in the zookeeper path to isolate each deployment. | |
port | 8080 | Port for controller rest endpoint | |
mode | customized | Rebalance mode, always costomized | |
env | dc1.cluster1-cluster2 | env has two parts seperated by '.': first part is the data center and second par is deployment name (i.e., cluster1-cluster2), it will be used in metrics. | |
instanceId | Controller instance name or ID | ||
graphiteHost | localhost | Graphite host | |
graphitePort | 4756 | Graphite port | |
srcKafkaZkPath | Zookeeper path for source Kafka cluster | ||
consumerCommitZkPath | Where you want to commit the consumed offset. If blank, it will use srcKafkaZkPath by default | ||
destKafkaZkPath | Zookeeper path for destination Kafka cluster | ||
numOffsetThread | 10 | Number of thread to check committed offset and high watermark | |
blockingQueueSize | 30000 | Max pending tasks to check topic offset information | |
refreshIntervalInSec | 300 | Interval to run offset checking | |
groupId | ureplicator-cluster1-cluster2 | Consumer group name for uReplicagtor. Recommend to put deployment name in groupId to avoid conflict. | |
enableAutoWhitelist | FALSE | If auto whitelist a topic when it exits in both source and destination clusters | |
enableAutoTopicExpansion | FALSE | If auto whitelist extra partitions when topic is expanded in source clusters. (Note, it will expand partitions in destination clusters) | |
maxWorkingInstances | 0 | Max number of workers to use in the deployment. If 0, then all of the workers can be used. Recommend to just use 0. | |
autoRebalanceDelayInSeconds | 120 | After detecting liveinstance change, how long to wait before rebalance. If you have a host restarted in this period of time, then restart won't trigger rebalance. | |
autoRebalancePeriodInSeconds | 0 | How often to run a rebalance check | |
autoRebalanceMinIntervalInSeconds | 600 | Min interval between each rebalance | |
autoRebalanceMinLagTimeInSeconds | 900 | Min time to consider a partition is lagging | |
autoRebalanceMinLagOffset | 100000 | Min offset to consider a partition is lagging | |
autoRebalanceMaxOffsetInfoValidInSeconds | 1800 | Wait time before calculating lag after restart | |
autoRebalanceWorkloadRatioThreshold | 1.2 | Workload ratio to consider a worker is overloaded | |
workloadRefreshPeriodInSeconds | 600 | Interval to refresh workload info | |
maxDedicatedLaggingInstancesRatio | 0.5 | Max number of workers for lagging partitions | |
maxStuckPartitionMovements | 3 | Max number of moves for stuck partitions before stop moving | |
moveStuckPartitionAfterMinutes | 20 | Min wait time before moving a stuck partition | |
patternToExcludeTopics | __consumer_offsets | Pattern of topic to exclude from whitelisting | |
backUpToGit | FALSE | Backup controller metadata to git (true) or local file (false) | |
localBackupFilePath | /var/log/kafka-mirror-maker-controller | Local backup file location | |
remoteBackupRepo | Remote Backup Repo to store cluster state | ||
localGitRepoClonePath | Clone location of the remote git backup repo | ||
enableSrcKafkaValidation | FALSE | Enable Source Kafka Validation, e.g. if topic exists in source cluster | |
maxWorkloadPerWorkerByteWithinRegion | 8388608 | The max workload per worker within region | |
maxWorkloadPerWorkerByteCrossRegion | 8388608 | The max workload per worker cross region |
Name | Default | Example | Description |
zkServer | uReplicator | Helix cluster name which should be the same as helixClusterName in controller | |
instanceId | Worker's hostname or ID |
Name | Default | Example | Description |
commit.zookeeper.connect | Same as consumerCommitZkPath in controller | ||
zookeeper.connection.timeout.ms | 30000 | Zookeeper connection timeout | |
zookeeper.session.timeout.ms | 30000 | Zookeeper session timeout | |
group.id | Consumer group id, same as groupId in controller | ||
client.id | Consumer client id | ||
consumer.id | Consumer id | ||
num.consumer.fetchers | 3 | Number of fetcher threads per source broker | |
socket.receive.buffer.bytes | 1966080 | The SO_RCVBUF buffer of the socket sever sockets. If the value is -1, the OS default will be used. | |
fetch.message.max.bytes | 31457280 | The maximum amount of data per-partition the server will return. | |
fetch.min.bytes | 131072 | The minimum amount of data the server should return for a fetch request. | |
queued.max.message.chunks | 5 | Maxn message in memory when producer is busy | |
auto.offset.reset | largest | Where to start consuming when last commited offset is out of range | |
fetch.wait.max.ms | 500 | Max wait time for each fetcher request issued |
Name | Default | Example | Description |
client.id | Producer client id | ||
producer.type | async | Async or sync produce (i.e., wait ack before sending next message) | |
acks | The number of acknowledgments the producer requires the leader to have received before considering a request complete. | ||
compression.type | Producer compression type | ||
key.serializer | org.apache.kafka.common.serialization.ByteArraySerializer | Serializer class for key | |
value.serializer | org.apache.kafka.common.serialization.ByteArraySerializer | Serializer class for value | |
batch.size | 131072 | Producer batch size | |
linger.ms | 1000 | Producer waiting time to accumulate a batch | |
buffer.memory | 167772160 | The total bytes of memory the producer can use to buffer records waiting to be sent to the server. | |
max.request.size | 31457280 | The maximum size of a request in bytes. | |
send.buffer.bytes | 1310720 | The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default will be used. | |
max.in.flight.requests.per.connection | 5 | The maximum number of unacknowledged requests the client will send on a single connection before blocking. | |
request.timeout.ms | 30000 | Producer request timeout |
Name | Default | Example | Description |
enable | FALSE | If enable 1:1 partition mapping |
Name | Default | Example | Description |
config | Clusters info file. Should contain all source/destination cluster's zk/broker info. Format is: kafka.cluster.zkStr.cluster1=zk1,zk2/cluster1 kafka.cluster.servers.cluster1=broker1:9092,broker2:9092 | ||
srcClusters | List of clusters that can be source | ||
destClusters | List of clusters that can be destination | ||
enableRebalance | If manager finds a new worker in a route when a worker in the route is down for some time | ||
zookeeper | Helix zookeeper path. Should be same as zookeeper in controller | ||
managerPort | Port for manager rest endpoint | ||
deployment | Deployment name, will be used in metrics | ||
env | env has two parts seperated by '.': first part is the data center and second par is deployment name (i.e., aggregation-dc1), it will be used in metrics. | ||
instanceId | Manager instance name or ID | ||
controllerPort | Port for controller rest endpoint | ||
graphiteHost | Graphite host | ||
graphitePort | Graphite port | ||
metricsPrefix | Graphite metrics prefix | ||
workloadRefreshPeriodInSeconds | 300 | Interval to refresh workload | |
initMaxNumPartitionsPerRoute | 1500 | Max number of partition in a route when adding new topics | |
maxNumPartitionsPerRoute | 2000 | If topics in a route is expanded and total number of partitions exceeds this number, manager will move the topics with largest partition number to a smaller route | |
initMaxNumWorkersPerRoute | 10 | Number of workers when creating a route | |
maxNumWorkersPerRoute | 50 | Max number of workers in a route |
Name | Default | Example | Description |
config | Clusters info file. Should contain all source/destination cluster's zk/broker info. Format is: kafka.cluster.zkStr.cluster1=zk1,zk2/cluster1 kafka.cluster.servers.cluster1=broker1:9092,broker2:9092, should be the same as config in manager config | ||
enableFederated | TRUE | If enable federation mode | |
srcClusters | List of clusters that can be source, should be the same as in manager | ||
destClusters | List of clusters that can be destination, should be the same as in manager | ||
deploymentName | Deployment name, should be the same as deployment in manager | ||
zookeeper | zk1, zk2/cluster1-cluster2 | Zookeeper path for each deployment, we recommend to put src-dst in the zookeeper path to isolate each deployment. | |
port | 8080 | Port for controller rest endpoint | |
mode | customized | Rebalance mode, always costomized | |
env | dc1.cluster1-cluster2 | env has two parts seperated by '.': first part is the data center and second par is deployment name (i.e., cluster1-cluster2), it will be used in metrics. | |
instanceId | Controller instance name or ID | ||
graphiteHost | localhost | Graphite host | |
graphitePort | 4756 | Graphite port | |
numOffsetThread | 10 | Number of thread to check committed offset and high watermark | |
blockingQueueSize | 30000 | Max pending tasks to check topic offset information | |
refreshIntervalInSec | 300 | Interval to run offset checking | |
groupId | ureplicator-cluster1-cluster2 | Consumer group name for uReplicagtor. Recommend to put deployment name in groupId to avoid conflict. | |
enableAutoWhitelist | FALSE | If auto whitelist a topic when it exits in both source and destination clusters | |
enableAutoTopicExpansion | FALSE | If auto whitelist extra partitions when topic is expanded in source clusters. (Note, it will expand partitions in destination clusters) | |
maxWorkingInstances | 0 | Max number of workers to use in the deployment. If 0, then all of the workers can be used. Recommend to just use 0. | |
autoRebalanceDelayInSeconds | 120 | After detecting liveinstance change, how long to wait before rebalance. If you have a host restarted in this period of time, then restart won't trigger rebalance. | |
autoRebalancePeriodInSeconds | 0 | How often to run a rebalance check | |
autoRebalanceMinIntervalInSeconds | 600 | Min interval between each rebalance | |
autoRebalanceMinLagTimeInSeconds | 900 | Min time to consider a partition is lagging | |
autoRebalanceMinLagOffset | 100000 | Min offset to consider a partition is lagging | |
autoRebalanceMaxOffsetInfoValidInSeconds | 1800 | Wait time before calculating lag after restart | |
autoRebalanceWorkloadRatioThreshold | 1.2 | Workload ratio to consider a worker is overloaded | |
workloadRefreshPeriodInSeconds | 600 | Interval to refresh workload info | |
maxDedicatedLaggingInstancesRatio | 0.5 | Max number of workers for lagging partitions | |
maxStuckPartitionMovements | 3 | Max number of moves for stuck partitions before stop moving | |
moveStuckPartitionAfterMinutes | 20 | Min wait time before moving a stuck partition | |
patternToExcludeTopics | __consumer_offsets | Pattern of topic to exclude from whitelisting | |
backUpToGit | FALSE | Backup controller metadata to git (true) or local file (false) | |
localBackupFilePath | /var/log/kafka-mirror-maker-controller | Local backup file location | |
remoteBackupRepo | Remote Backup Repo to store cluster state | ||
localGitRepoClonePath | Clone location of the remote git backup repo | ||
enableSrcKafkaValidation | FALSE | Enable Source Kafka Validation, e.g. if topic exists in source cluster | |
maxWorkloadPerWorkerByteWithinRegion | 8388608 | The max workload per worker within region | |
maxWorkloadPerWorkerByteCrossRegion | 8388608 | The max workload per worker cross region |
Name | Default | Example | Description |
federated.enabled | If enable federation mode | ||
federated.deployment.name | Deployment name | ||
zkServer | uReplicator | Helix cluster name which should be the same as helixClusterName in controller | |
instanceId | Worker's hostname or ID |
Name | Default | Example | Description |
zookeeper.connection.timeout.ms | 30000 | Zookeeper connection timeout | |
zookeeper.session.timeout.ms | 30000 | Zookeeper session timeout | |
num.consumer.fetchers | 3 | Number of fetcher threads per source broker | |
socket.receive.buffer.bytes | 1966080 | The SO_RCVBUF buffer of the socket sever sockets. If the value is -1, the OS default will be used. | |
fetch.message.max.bytes | 31457280 | The maximum amount of data per-partition the server will return. | |
fetch.min.bytes | 131072 | The minimum amount of data the server should return for a fetch request. | |
queued.max.message.chunks | 5 | Maxn message in memory when producer is busy | |
auto.offset.reset | largest | Where to start consuming when last commited offset is out of range | |
fetch.wait.max.ms | 500 | Max wait time for each fetcher request issued |
Name | Default | Example | Description |
producer.type | async | Async or sync produce (i.e., wait ack before sending next message) | |
acks | The number of acknowledgments the producer requires the leader to have received before considering a request complete. | ||
compression.type | Producer compression type | ||
key.serializer | org.apache.kafka.common.serialization.ByteArraySerializer | Serializer class for key | |
value.serializer | org.apache.kafka.common.serialization.ByteArraySerializer | Serializer class for value | |
batch.size | 131072 | Producer batch size | |
linger.ms | 1000 | Producer waiting time to accumulate a batch | |
buffer.memory | 167772160 | The total bytes of memory the producer can use to buffer records waiting to be sent to the server. | |
max.request.size | 31457280 | The maximum size of a request in bytes. | |
send.buffer.bytes | 1310720 | The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default will be used. | |
max.in.flight.requests.per.connection | 5 | The maximum number of unacknowledged requests the client will send on a single connection before blocking. | |
request.timeout.ms | 30000 | Producer request timeout |
Name | Default | Example | Description |
enable | FALSE | If enable 1:1 partition mapping |
Whitelist a topic
curl -X POST "<controller_host>:<ontroller_port>/topics/<topic>"
Blacklist a topic
curl -X DELETE "<controller_host>:<ontroller_port>/topics/<topic>"
Whitelist a topic
curl -X POST "<manager_host>:<manager_port>/topics/<topic>?src=<src>&dst=<dst>"
Blacklist a topic
curl -X DELETE "<manager_host>:<manager_port>/topics/<topic>?src=<src>&dst=<dst>"
This is useful when a partition in destination is unavailable, i.e. offline partition and you still want rest of the partition to be replicated. This operation is the same for both modes.
Get blacklisted topic partitions
curl -X GET "<controller_host>:<ontroller_port>/blacklist?topic=<topic>"
Blacklist topic partition
curl -X POST "<controller_host>:<ontroller_port>/blacklist?topic=<topic>&partition=<partition>&opt=blacklist"
Remove topic partitions
curl -X POST "<controller_host>:<ontroller_port>/blacklist?topic=<topic>&partition=<partitio >&opt=whitelist"
This is useful when you need extra worker in one route temporarily for spiking traffic
Get current worker number override
curl -X GET "<manager_host>:<manager_port>/admin/worker_number_override"
Override number of workers
curl -X POST "<manager_host>:<manager_port>/admin/worker_number_override?route=@<src_cluster>@<dst_cluster>@<route_id>&workerNumber=<new_number>"
Delete number of worker override
curl -X DELETE "<manager_host>:<manager_port>/admin/worker_number_override?route=@<src_cluster>@<dst_cluster>@<route_id>"
If you are using static controller/worker id, i.e., host name/id doesn’t change after restart/deploy, we recommend to turn rebalance off as default so manager will not rebalance route during restart. Otherwise, you need to turn on rebalance.
Enable route rebalance
curl -X POST "<manager_host>:<manager_port>/admin/enable_autobalancing
Disable route rebalance
curl -X POST "<manager_host>:<manager_port>/admin/disable_autobalancing
Check route rebalance status
curl -X GET "<manager_host>:<manager_port>/admin/autobalancing_status
We recommend to turn off auto-scaling during restart/deployment.
Enable route auto-scaling
curl -X POST "<manager_host>:<manager_port>/admin/enable_autoscaling
Disable route auto-scaling
curl -X POST "<manager_host>:<manager_port>/admin/disable_autoscaling
Check route auto-scaling status
curl -X GET "<manager_host>:<manager_port>/admin/autoscaling_status
Controller:
java -Dlog4j.configuration=file:config/tools-log4j.properties -Xms3g -Xmx3g -Xmn512m -XX:NewSize=512m -XX:MaxNewSize=512m -XX:+AlwaysPreTouch -XX:+UseCompressedOops -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+DisableExplicitGC -XX:+PrintCommandLineFlags -XX:CMSInitiatingOccupancyFraction=80 -XX:SurvivorRatio=2 -XX:+PrintTenuringDistribution -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -XX:+PrintGCTimeStamps -Xloggc:/tmp/ureplicator-controller/gc-ureplicator-controller.log -server -cp uReplicator-Controller/target/uReplicator-Controller-1.0.0-SNAPSHOT-jar-with-dependencies.jar com.uber.stream.kafka.mirrormaker.controller.ControllerStarter -enableFederated false -deploymentName <cluster1-cluster2> -helixClusterName uReplicator -mode customized -zookeeper zk1,zk2,zk3/ureplicator/<cluster1-cluster2> -port <port> -env <dc1>.<cluster1-cluster2> -instanceId 0 -hostname <hostname> -graphiteHost 127.0.0.1 -graphitePort 4756 -metricsPrefix ureplicator-controller -enableAutoWhitelist false -enableAutoTopicExpansion true -autoRebalanceDelayInSeconds 120 -autoRebalancePeriodInSeconds 120 -autoRebalanceMinIntervalInSeconds 600 -autoRebalanceMinLagTimeInSeconds 900 -autoRebalanceMinLagOffset 100000 -autoRebalanceMaxOffsetInfoValidInSeconds 1800 -autoRebalanceWorkloadRatioThreshold 1.5 -maxDedicatedLaggingInstancesRatio 0.2 -maxStuckPartitionMovements 3 -moveStuckPartitionAfterMinutes 20 -workloadRefreshPeriodInSeconds 300 -patternToExcludeTopics ^__.* -enableSrcKafkaValidation true -srcKafkaZkPath "src_zk/cluster1" -destKafkaZkPath "dst_zk/cluster2" -consumerCommitZkPath "" -maxWorkingInstances 0 -autoRebalanceDelayInSeconds 120 -refreshTimeInSeconds 600 -initWaitTimeInSeconds 120 -numOffsetThread 10 -blockingQueueSize 30000 -offsetRefreshIntervalInSec 300 -groupId <ureplicator-cluster1-cluster2> -backUpToGit false -localBackupFilePath /tmp/ureplicator-controller -localGitRepoClonePath /ureplicator-controller-bkp
Worker:
java -Dlog4j.configuration=file:config/test-log4j.properties -XX:MaxGCPauseMillis=100 -XX:InitiatingHeapOccupancyPercent=45 -verbose:gc -Xmx1g -Xms1g -XX:+UseG1GC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution -Xloggc:gc-ureplicator-worker.log -server -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=9089 -Dcom.sun.management.jmxremote.rmi.port=9089 -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -cp uReplicator-Worker/target/uReplicator-Worker-2.0.2-SNAPSHOT-jar-with-dependencies.jar com.uber.stream.ureplicator.worker.WorkerStarter -federated_enabled true -consumer_config config/consumer.properties -producer_config config/producer.properties -helix_config config/helix.properties -topic_partition_count_observer true -cluster_config config/clusters.properties
Manager:
java -Dlog4j.configuration=file:config/tools-log4j.properties -Xms3g -Xmx3g -Xmn512m -XX:NewSize=512m -XX:MaxNewSize=512m -XX:+AlwaysPreTouch -XX:+UseCompressedOops -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+DisableExplicitGC -XX:+PrintCommandLineFlags -XX:CMSInitiatingOccupancyFraction=80 -XX:SurvivorRatio=2 -XX:+PrintTenuringDistribution -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -XX:+PrintGCTimeStamps -Xloggc:/tmp/ureplicator-manager/gc-ureplicator-manager.log -server -cp uReplicator-Manager/target/uReplicator-Manager-1.0.0-SNAPSHOT-jar-with-dependencies.jar com.uber.stream.kafka.mirrormaker.manager.ManagerStarter -config config/clusters.properties -srcClusters cluster1,cluster2 -destClusters cluster3 -enableRebalance false -zookeeper zk1,zk2,zk3/ureplicator/testing-dc1 -managerPort <port> -deployment testing-dc1 -env dc1.testing-dc1 -instanceId <id> -controllerPort <port> -graphiteHost 127.0.0.1 -graphitePort 4756 -metricsPrefix ureplicator-manager -workloadRefreshPeriodInSeconds 300 -initMaxNumPartitionsPerRoute 1500 -maxNumPartitionsPerRoute 2000 -initMaxNumWorkersPerRoute 10 -maxNumWorkersPerRoute 80
Controller:
java -Dlog4j.configuration=file:config/tools-log4j.properties -Xms3g -Xmx3g -Xmn512m -XX:NewSize=512m -XX:MaxNewSize=512m -XX:+AlwaysPreTouch -XX:+UseCompressedOops -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+DisableExplicitGC -XX:+PrintCommandLineFlags -XX:CMSInitiatingOccupancyFraction=80 -XX:SurvivorRatio=2 -XX:+PrintTenuringDistribution -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -XX:+PrintGCTimeStamps -Xloggc:/tmp/ureplicator-controller/gc-ureplicator-controller.log -server -cp uReplicator-Controller/target/uReplicator-Controller-1.0.0-SNAPSHOT-jar-with-dependencies.jar com.uber.stream.kafka.mirrormaker.controller.ControllerStarter -config config/clusters.properties -srcClusters cluster1,cluster2 -destClusters cluster3 -enableFederated true -deploymentName testing-dc1 -mode customized -zookeeper zk1,zk2,zk3/ureplicator/testing-dc1 -port <port> -env dc1.testing-dc1 -instanceId <id> -hostname <hostname> -graphiteHost 127.0.0.1 -graphitePort 4756 -metricsPrefix ureplicator-controller -enableAutoWhitelist false -enableAutoTopicExpansion true -autoRebalanceDelayInSeconds 120 -autoRebalancePeriodInSeconds 120 -autoRebalanceMinIntervalInSeconds 600 -autoRebalanceMinLagTimeInSeconds 900 -autoRebalanceMinLagOffset 100000 -autoRebalanceMaxOffsetInfoValidInSeconds 1800 -autoRebalanceWorkloadRatioThreshold 1.5 -maxDedicatedLaggingInstancesRatio 0.2 -maxStuckPartitionMovements 3 -moveStuckPartitionAfterMinutes 20 -workloadRefreshPeriodInSeconds 300 -patternToExcludeTopics ^__.* -enableSrcKafkaValidation true -consumerCommitZkPath "" -maxWorkingInstances 0 -autoRebalanceDelayInSeconds 120 -refreshTimeInSeconds 600 -initWaitTimeInSeconds 120 -numOffsetThread 10 -blockingQueueSize 30000 -offsetRefreshIntervalInSec 300 -backUpToGit false -localBackupFilePath /tmp/ureplicator-controller -localGitRepoClonePath /ureplicator-controller-bkp
Worker:
java -Dlog4j.configuration=file:config/tools-log4j.properties -XX:MaxGCPauseMillis=100 -XX:InitiatingHeapOccupancyPercent=45 -verbose:gc -Xmx5g -Xms5g -XX:+UseG1GC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution -server -javaagent:./bin/libs/jmxtrans-agent-1.2.4.jar=config/jmxtrans.xml -cp uReplicator-Worker/target/uReplicator-Worker-1.0.0-SNAPSHOT-jar-with-dependencies.jar kafka.mirrormaker.MirrorMakerWorker --cluster.config config/clusters.properties --consumer_config config/consumer.properties --producer_config config/producer.properties --helix_config config/helix.properties --dstzk_config config/dstzk.propertiess
Manager leader count (It can be used to find leader of managers)
stats.$dc.counter.ureplicator-manager.$deployment.$manager_host.leader.counter.count
Available controllers
stats.$dc.counter.ureplicator-manager.$deployment.$manager_host.controller.available.counter.count
Assigned controllers
stats.$dc.counter.ureplicator-manager.$deployment.$manager_host.controller.assigned.counter.count
Available workers
stats.$dc.counter.ureplicator-manager.$deployment.$manager_host.worker.available.counter.count
Total topic numbers per route
stats.$dc.counter.ureplicator-manager.$deployment.$manager_host.$route.topic.totalNumber.count
Number of controllers per route
stats.$dc.counter.ureplicator-manager.$deployment.$manager_host.$route.controller.totalNumber.count
Number of workers per route
stats.$dc.counter.ureplicator-manager.$deployment.$manager_host.$route.worker.totalNumber.count
Mismatch topic/worker per route
stats.$dc.counter.ureplicator-manager.$deployment.$manager_host.validate.wrong.counter.count
More to add...
Note: remove $route for non-federation mode
Live worker in the route/cluster
stats.$dc.counter.ureplicator-controller.$deployment.$route.$controller_host.worker.liveInstances.count
Stuck partition in the route/cluster (no message is replicated, this is the most important metrics to indicate if the cluster is healthy)
stats.$dc.counter.ureplicator-controller.$deployment.$route.$controller_host.NumNoProgressPartitions
Heartbeat of offset monitor thread
stats.$dc.counter.ureplicator-controller.$deployment.$route.$controller_host.offsetMonitor.executed.count
Offset monitor thread failure count
stats.$dc.counter.ureplicator-controller.$deployment.$route.$controller_host.OffsetMonitorFailureCount
Number of topics
stats.$dc.counter.ureplicator-controller.$deployment.$route.$controller_host.topic.totalNumber.count
Number of partitions
stats.$dc.counter.ureplicator-controller.$deployment.$route.$controller_host.topic.partitions.totalNumber.count
Leader count
stats.$dc.counter.ureplicator-controller.$deployment.$route.$controller_host.leader.counter.count
Topic parititon lag from controller’s view
stats.$dc.counter.ureplicator-controller.$deployment.$route.$controller_host.OffsetMonitorLag.$topic.$partition
More to add...
You can find all worker metrics in https://github.com/uber/uReplicator/blob/master/config/jmxtrans.xml