Skip to content

uReplicator User Guide

yang edited this page Sep 16, 2021 · 10 revisions

1. GETTING STARTED

1.1 Introduction

As introduced in the original uReplicator design, uReplicator aimed to provide a Kafka replication solution with better scalability and stability compared to existing solutions. The first generation of uReplicator was open sourced in 2016 and used to replicate Uber’s Kafka traffic in production. As the scale of Kafka infrastructure kept growing, we developed the second generation of uReplicator which provided better scalability and simplified operation complexity. The second generation uReplicator (Federated uReplicator) was released in 2018 and replicated Trillions of messages (~Petabytes) per day across hundreds of replication paths for Uber.

1.2 Highlight

uReplicator is good at:

  • High throughput
    • uReplicator has a controller to assign partitions to workers based on throughput in source cluster so each worker can achieve max throughput. (Currently it depends on Chaperone; We will make it get workload from JMX in the future)
    • uReplicator checks lags on each worker and removes heavy traffic from lagging workers.
  • High availability
    • uReplicator uses smart rebalance instead of high level consumer rebalance which can guarantee smooth replication.
  • High scalability
    • When the scale of Kafka infrastructure increases, simply add more hosts to uReplicator and it will scale up automatically
  • Smart operation (Federated uReplicator)
    • Federated uReplicator can set up replication route automatically.
    • When a route has higher traffic or lag, Federated uReplicator can add workers automatically and release afterwards.
    • uReplicator can detect new partitions in source cluster and start replication automatically

1.3 uReplicator Design

1.3.1 uReplicator (First generation)

1.3.1.1 uReplicator Controller

uReplicator controller runs the customized rebalance algorithm to assign the partitions to each worker through Apache Helix. It distributes and assigns topic partitions to each worker process based on number of partitions and traffic/lag information. If a worker process is down, the controller can automatically redistribute the partitions to other workers.

1.3.1.2 uReplicator Worker

uReplicator worker contains multiple Kafka consumer and producer to replicate the data from source to destination. It provides features like partition mapping to reduce contention in destination cluster.

1.3.1.3 uReplicator Route

A uReplicator route is a virtual group of controllers and workers. Each route normally contains 3 controller and N workers (N depends on your traffic). Each route is responsible for replication from one source to one destination cluster. If you want to replicate between multiple clusters, you need to either set up multiple routes or use Federated uReplicator.

1.3.2 uReplicator (Second generation, a.k.a Federated uReplicator)

uReplicator encounters scalability and operation limits as the numbers of partitions and deployments keep increasing. The new Federated uReplicator aims to address these issues and provide an extensible architecture for further scaling.

1.3.2.2 uReplicator Deployment

In order to limit the number of deployments, uReplicator Deployment groups uReplicator Routes with same use cases together, e.g., all aggregation routes are under aggregation deployments. Each deployment will have one manager (or more) to set up routes automatically.

1.3.2.3 uReplicator Manager

The uReplicator Manager within each deployment handles the setup of uReplicator Routes and add topics to correct Routes. the mirroring of topics that are assigned from uReplicator Front. It might contain multiple source and destination Kafka clusters. The manager selects one controller from controller pool and multiple workers from worker pool to form one uReplicator Route (see below). The manager adds the topic to an existing route or starts a new route based on predefined rules, e.g. number of partitions and total traffic. The manager only handles topic level assignment (to uReplicator Route) and worker allocation. The partition level assignment (to uReplicator Worker) will be handled by uReplicator Controller. Thus, uReplicator Manager is the federation level of uReplicator Controller.

1.3.3 Which uReplicator to use?

Federated uReplicator is backward compatible with original uReplicator. If you want to use federation mode, just put a flag in the controller and worker configurations; Otherwise, you just need to deploy controller and worker without manager.

1.3.3.1 When to use Federated uReplicator?

Federated uReplicator is good when you have many replication paths, i.e., cluster1->cluster4, cluster1->cluster5, cluster2->cluster4, ... , cluster3->cluster5, you need to set up six deployments for each of them in non-federated mode; In federated mode, you just need one deployments: (cluster1|2|3->cluster4|5). If you expect you to have more clusters in the future, we recommend to use federated mode to reduce operation cost in the future.

2. QUICK START

Note: The quick start is for non-federated mode.

2.1 Get the Code

Check out the uReplicator project:

git clone git@github.com:uber/uReplicator.git
cd uReplicator

This project contains everything (controller/worker) you’ll need to run uReplicator.

2.2 Build uReplicator

Before you can run uReplicator, you need to build a package for it. This package is what your deployment tool uses to deploy uReplicator.

mvn clean package

Or command below (the previous one will take a long time to run):

mvn clean package -DskipTests

2.3 Set Up Local Test Environment

To test uReplicator locally, you need two systems: Kafka, and ZooKeeper. The script “grid” is to help you set up these systems.

  • Modify permission for the scripts generated by Maven:
chmod u+x bin/pkg/*.sh
  • The command below will download, install, and start ZooKeeper and Kafka (will start two Kafka systems: kafka1, which we use as source Kafka cluster, and kafka2, which we use as destination Kafka cluster):
bin/grid bootstrap
  • Create a dummyTopic in kafka1 and produce some dummy data:
./bin/produce-data-to-kafka-topic-dummyTopic.sh
  • Check if the data is successfully produced to kafka1 by opening another console tab and executing the command below:
./deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181/cluster1 --topic dummyTopic
  • You should get this data:
Kafka topic dummy topic data 1
Kafka topic dummy topic data 2
Kafka topic dummy topic data 3
Kafka topic dummy topic data 4
…

2.4 Start uReplicator

Example 1: Copy data from source cluster to destination cluster

  • Start uReplicator Controller (you should keep it running):
java -Dlog4j.configuration=file:config/tools-log4j.properties -Xms3g -Xmx3g -Xmn512m -XX:NewSize=512m -XX:MaxNewSize=512m -XX:+AlwaysPreTouch -XX:+UseCompressedOops -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+DisableExplicitGC -XX:+PrintCommandLineFlags -XX:CMSInitiatingOccupancyFraction=80 -XX:SurvivorRatio=2 -XX:+PrintTenuringDistribution -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -XX:+PrintGCTimeStamps -Xloggc:/tmp/ureplicator-controller/gc-ureplicator-controller.log -server -cp uReplicator-Controller/target/uReplicator-Controller-2.0.2-SNAPSHOT-jar-with-dependencies.jar com.uber.stream.kafka.mirrormaker.controller.ControllerStarter -enableFederated false  -mode auto ackUpToGit false -enableAutoTopicExpansion true -port 9000 -refreshTimeInSeconds 10 -srcKafkaZkPath localhost:2181/cluster1 -zookeeper localhost:2181 -destKafkaZkPath localhost:2181/cluster2 -helixClusterName testMirrorMaker
  • Start uReplicator Worker (you should keep it running, and it’s normal if you see kafka.consumer.ConsumerTimeoutException at this moment, since no topic has been added for copying):
java -Dlog4j.configuration=file:config/test-log4j.properties -XX:MaxGCPauseMillis=100 -XX:InitiatingHeapOccupancyPercent=45 -verbose:gc -Xmx1g -Xms1g -XX:+UseG1GC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution -Xloggc:gc-ureplicator-worker.log -server -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=9089 -Dcom.sun.management.jmxremote.rmi.port=9089 -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -cp uReplicator-Worker/target/uReplicator-Worker-2.0.2-SNAPSHOT-jar-with-dependencies.jar com.uber.stream.ureplicator.worker.WorkerStarter -federated_enabled false -consumer_config config/consumer.properties -producer_config config/producer.properties -helix_config config/helix.properties -topic_mappings config/topicmapping.properties
  • Add topic to uReplicator Controller to start copying from kafka1 to kafka2:
curl -X POST -d '{"topic":"dummyTopic", "numPartitions":"1"}' http://localhost:9000/topics
  • To check if the data is successfully copied to kafka2, you should open another console tab and execute the command below:
./deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181/cluster2 --topic dummyTopic1
  • And you will see the same messages produced in kafka1:
Kafka topic dummy topic data 1
Kafka topic dummy topic data 2
Kafka topic dummy topic data 3
Kafka topic dummy topic data 4
…

Example 2: Copy data from source Kafka to destination Kafka cluster without explicitly whitelisting topics

  • Start uReplicator Controller (you should keep it running):
java -Dlog4j.configuration=file:config/tools-log4j.properties -Xms3g -Xmx3g -Xmn512m -XX:NewSize=512m -XX:MaxNewSize=512m -XX:+AlwaysPreTouch -XX:+UseCompressedOops -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+DisableExplicitGC -XX:+PrintCommandLineFlags -XX:CMSInitiatingOccupancyFraction=80 -XX:SurvivorRatio=2 -XX:+PrintTenuringDistribution -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -XX:+PrintGCTimeStamps -Xloggc:/tmp/ureplicator-controller/gc-ureplicator-controller.log -server -cp uReplicator-Controller/target/uReplicator-Controller-2.0.2-SNAPSHOT-jar-with-dependencies.jar com.uber.stream.kafka.mirrormaker.controller.ControllerStarter -enableFederated false  -mode auto ackUpToGit false -enableAutoTopicExpansion true -port 9000 -refreshTimeInSeconds 10 -srcKafkaZkPath localhost:2181/cluster1 -zookeeper localhost:2181 -destKafkaZkPath localhost:2181/cluster2 -helixClusterName testMirrorMaker -enableAutoWhitelist true
  • Start uReplicator Worker (you should keep it running, and it’s normal if you see kafka.consumer.ConsumerTimeoutException at this moment since no topic has been added for copying):
java -Dlog4j.configuration=file:config/test-log4j.properties -XX:MaxGCPauseMillis=100 -XX:InitiatingHeapOccupancyPercent=45 -verbose:gc -Xmx1g -Xms1g -XX:+UseG1GC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution -Xloggc:gc-ureplicator-worker.log -server -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=9089 -Dcom.sun.management.jmxremote.rmi.port=9089 -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -cp uReplicator-Worker/target/uReplicator-Worker-2.0.2-SNAPSHOT-jar-with-dependencies.jar com.uber.stream.ureplicator.worker.WorkerStarter -federated_enabled false -consumer_config config/consumer.properties -producer_config config/producer.properties -helix_config config/helix.properties
  • Create topic in kafka2. Example 2 enables topic auto-whitelisting, so you don't need to whitelist topics manually. If a topic is in both source and destination Kafka clusters, the controller auto-whitelists the topic and starts copying data.
./deploy/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181/cluster2 --topic dummyTopic --partition 1 --replication-factor 1
  • To check if the data is successfully copied to kafka2, open another console tab and execute this command (you might need to wait about 20 seconds for controller to refresh):
./deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181/cluster2 --topic dummyTopic
  • And you should see the same messages produced in kafka1:
Kafka topic dummy topic data 1
Kafka topic dummy topic data 2
Kafka topic dummy topic data 3
Kafka topic dummy topic data 4
…

Shutdown

When you’re done, you can clean everything up using the same grid script:

./bin/pkg/stop-all.sh

Congratulations! You’ve now set up a local grid that includes Kafka and ZooKeeper, and you've run a uReplicator worker on it.

Start uReplicator on docker

Example 3: Run ureplicator on Docker

  • Build uReplicator devenv
docker build -t devenv devenv/.
  • Start uReplicator DockerImages
docker run -d -p 2181:2181 -p 9093:9093 -p 9094:9094  --add-host devenv:127.0.0.1 --name devenv devenv
  • Build uReplicator Images
docker build -t ureplicator .
  • Start uReplicator Controller
docker run -d --link devenv:devenv -p 9000:9000 --name controller --expose=9000 ureplicator "controller" -mode customized \
-enableAutoWhitelist true \
-port 9000 \
-refreshTimeInSeconds 10 \
-srcKafkaZkPath devenv:2181/cluster1 \
-zookeeper devenv:2181 \
-destKafkaZkPath devenv:2181/cluster2 \
-helixClusterName testMirrorMaker
  • Start uReplicator Worker
docker run -d --link devenv:devenv --name worker ureplicator "worker" \
--consumer_config example/example-consumer.properties \
--producer_config example/example-producer.properties \
--helix_config example/example-helix.properties
  • Create topic in kafka2. Example 3 enables topic auto-whitelisting, so you don't need to whitelist topics manually. If a topic is in both source and destination Kafka clusters, the controller auto-whitelists the topic and starts copying data.
docker exec -it devenv bash
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181/cluster2 --topic dummyTopic --partition 4 --replication-factor 1
  • Produce test data
docker exec -it devenv bash
./produce-data-to-kafka-topic-dummyTopic.sh 

  • To check if the data is successfully copied to kafka2, open another console tab and execute this command
docker exec -it devenv bash 
$KAFKA_HOME/bin/kafka-console-consumer.sh --zookeeper localhost:2181/cluster2 --topic dummyTopic

Example 4 Run urepliator with 2 workers

  • Build Docker Compose
docker-compose -f docker-compose-example4.yml build
  • Start Docker Compose
docker-compose -f docker-compose-example4.yml up
  • Produce test data
docker exec -it ureplicator_devenv_1 bash;
./produce-data-to-kafka-topic-dummyTopic.sh 

  • Create topic in kafka2 with 4 partition
docker exec -it ureplicator_devenv_1 bash;
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181/cluster2 --topic dummyTopic1 --partition 4 --replication-factor 1
  • Add topic to uReplicator Controller to start copying from kafka1 to kafka2:
curl -X POST -d '{"topic":"dummyTopic", "numPartitions":"4"}' http://localhost:9000/topics
  • To check if the data is successfully copied to kafka2, open another console tab and execute this command
docker exec -it ureplicator_devenv_1 bash;
$KAFKA_HOME/bin/kafka-console-consumer.sh --zookeeper localhost:2181/cluster2 --topic dummyTopic1
  • Check topic partition allocation
curl localhost:9000/topics/dummyTopic | jq .

  • Check online instances
curl localhost:9000/instances | jq .

3. CONFIGURATIONS

3.1 Non-federation Mode

3.1.1 Controller Configurations

Name Default Example Description
helixClusterName uReplicator Helix cluster name, Helix will create a node under the path provided in '-zookeeper'
zookeeper zk1, zk2/cluster1-cluster2 Zookeeper path for each deployment, we recommend to put src-dst in the zookeeper path to isolate each deployment.
port 8080 Port for controller rest endpoint
mode customized Rebalance mode, always costomized
env dc1.cluster1-cluster2 env has two parts seperated by '.': first part is the data center and second par is deployment name (i.e., cluster1-cluster2), it will be used in metrics.
instanceId Controller instance name or ID
graphiteHost localhost Graphite host
graphitePort 4756 Graphite port
srcKafkaZkPath Zookeeper path for source Kafka cluster
consumerCommitZkPath Where you want to commit the consumed offset. If blank, it will use srcKafkaZkPath by default
destKafkaZkPath Zookeeper path for destination Kafka cluster
numOffsetThread 10 Number of thread to check committed offset and high watermark
blockingQueueSize 30000 Max pending tasks to check topic offset information
refreshIntervalInSec 300 Interval to run offset checking
groupId ureplicator-cluster1-cluster2 Consumer group name for uReplicagtor. Recommend to put deployment name in groupId to avoid conflict.
enableAutoWhitelist FALSE If auto whitelist a topic when it exits in both source and destination clusters
enableAutoTopicExpansion FALSE If auto whitelist extra partitions when topic is expanded in source clusters. (Note, it will expand partitions in destination clusters)
maxWorkingInstances 0 Max number of workers to use in the deployment. If 0, then all of the workers can be used. Recommend to just use 0.
autoRebalanceDelayInSeconds 120 After detecting liveinstance change, how long to wait before rebalance. If you have a host restarted in this period of time, then restart won't trigger rebalance.
autoRebalancePeriodInSeconds 0 How often to run a rebalance check
autoRebalanceMinIntervalInSeconds 600 Min interval between each rebalance
autoRebalanceMinLagTimeInSeconds 900 Min time to consider a partition is lagging
autoRebalanceMinLagOffset 100000 Min offset to consider a partition is lagging
autoRebalanceMaxOffsetInfoValidInSeconds 1800 Wait time before calculating lag after restart
autoRebalanceWorkloadRatioThreshold 1.2 Workload ratio to consider a worker is overloaded
workloadRefreshPeriodInSeconds 600 Interval to refresh workload info
maxDedicatedLaggingInstancesRatio 0.5 Max number of workers for lagging partitions
maxStuckPartitionMovements 3 Max number of moves for stuck partitions before stop moving
moveStuckPartitionAfterMinutes 20 Min wait time before moving a stuck partition
patternToExcludeTopics __consumer_offsets Pattern of topic to exclude from whitelisting
backUpToGit FALSE Backup controller metadata to git (true) or local file (false)
localBackupFilePath /var/log/kafka-mirror-maker-controller Local backup file location
remoteBackupRepo Remote Backup Repo to store cluster state
localGitRepoClonePath Clone location of the remote git backup repo
enableSrcKafkaValidation FALSE Enable Source Kafka Validation, e.g. if topic exists in source cluster
maxWorkloadPerWorkerByteWithinRegion 8388608 The max workload per worker within region
maxWorkloadPerWorkerByteCrossRegion 8388608 The max workload per worker cross region

3.1.2 Worker Configurations

3.1.2.1 Helix Configurations

Name Default Example Description
zkServer uReplicator Helix cluster name which should be the same as helixClusterName in controller
instanceId Worker's hostname or ID

3.1.2.2 Consumer Configurations

Name Default Example Description
commit.zookeeper.connect Same as consumerCommitZkPath in controller
zookeeper.connection.timeout.ms 30000 Zookeeper connection timeout
zookeeper.session.timeout.ms 30000 Zookeeper session timeout
group.id Consumer group id, same as groupId in controller
client.id Consumer client id
consumer.id Consumer id
num.consumer.fetchers 3 Number of fetcher threads per source broker
socket.receive.buffer.bytes 1966080 The SO_RCVBUF buffer of the socket sever sockets. If the value is -1, the OS default will be used.
fetch.message.max.bytes 31457280 The maximum amount of data per-partition the server will return.
fetch.min.bytes 131072 The minimum amount of data the server should return for a fetch request.
queued.max.message.chunks 5 Maxn message in memory when producer is busy
auto.offset.reset largest Where to start consuming when last commited offset is out of range
fetch.wait.max.ms 500 Max wait time for each fetcher request issued

3.1.2.3 Producer Configurations

Name Default Example Description
client.id Producer client id
producer.type async Async or sync produce (i.e., wait ack before sending next message)
acks The number of acknowledgments the producer requires the leader to have received before considering a request complete.
compression.type Producer compression type
key.serializer org.apache.kafka.common.serialization.ByteArraySerializer Serializer class for key
value.serializer org.apache.kafka.common.serialization.ByteArraySerializer Serializer class for value
batch.size 131072 Producer batch size
linger.ms 1000 Producer waiting time to accumulate a batch
buffer.memory 167772160 The total bytes of memory the producer can use to buffer records waiting to be sent to the server.
max.request.size 31457280 The maximum size of a request in bytes.
send.buffer.bytes 1310720 The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default will be used.
max.in.flight.requests.per.connection 5 The maximum number of unacknowledged requests the client will send on a single connection before blocking.
request.timeout.ms 30000 Producer request timeout

3.1.2.4 dstZK Configurations

Name Default Example Description
enable FALSE If enable 1:1 partition mapping

3.2 Federation Mode

3.2.1 Manager Configurations

Name Default Example Description
config Clusters info file. Should contain all source/destination cluster's zk/broker info. Format is: kafka.cluster.zkStr.cluster1=zk1,zk2/cluster1 kafka.cluster.servers.cluster1=broker1:9092,broker2:9092
srcClusters List of clusters that can be source
destClusters List of clusters that can be destination
enableRebalance If manager finds a new worker in a route when a worker in the route is down for some time
zookeeper Helix zookeeper path. Should be same as zookeeper in controller
managerPort Port for manager rest endpoint
deployment Deployment name, will be used in metrics
env env has two parts seperated by '.': first part is the data center and second par is deployment name (i.e., aggregation-dc1), it will be used in metrics.
instanceId Manager instance name or ID
controllerPort Port for controller rest endpoint
graphiteHost Graphite host
graphitePort Graphite port
metricsPrefix Graphite metrics prefix
workloadRefreshPeriodInSeconds 300 Interval to refresh workload
initMaxNumPartitionsPerRoute 1500 Max number of partition in a route when adding new topics
maxNumPartitionsPerRoute 2000 If topics in a route is expanded and total number of partitions exceeds this number, manager will move the topics with largest partition number to a smaller route
initMaxNumWorkersPerRoute 10 Number of workers when creating a route
maxNumWorkersPerRoute 50 Max number of workers in a route

3.2.2 Controller Configurations

Name Default Example Description
config Clusters info file. Should contain all source/destination cluster's zk/broker info. Format is: kafka.cluster.zkStr.cluster1=zk1,zk2/cluster1 kafka.cluster.servers.cluster1=broker1:9092,broker2:9092, should be the same as config in manager config
enableFederated TRUE If enable federation mode
srcClusters List of clusters that can be source, should be the same as in manager
destClusters List of clusters that can be destination, should be the same as in manager
deploymentName Deployment name, should be the same as deployment in manager
zookeeper zk1, zk2/cluster1-cluster2 Zookeeper path for each deployment, we recommend to put src-dst in the zookeeper path to isolate each deployment.
port 8080 Port for controller rest endpoint
mode customized Rebalance mode, always costomized
env dc1.cluster1-cluster2 env has two parts seperated by '.': first part is the data center and second par is deployment name (i.e., cluster1-cluster2), it will be used in metrics.
instanceId Controller instance name or ID
graphiteHost localhost Graphite host
graphitePort 4756 Graphite port
numOffsetThread 10 Number of thread to check committed offset and high watermark
blockingQueueSize 30000 Max pending tasks to check topic offset information
refreshIntervalInSec 300 Interval to run offset checking
groupId ureplicator-cluster1-cluster2 Consumer group name for uReplicagtor. Recommend to put deployment name in groupId to avoid conflict.
enableAutoWhitelist FALSE If auto whitelist a topic when it exits in both source and destination clusters
enableAutoTopicExpansion FALSE If auto whitelist extra partitions when topic is expanded in source clusters. (Note, it will expand partitions in destination clusters)
maxWorkingInstances 0 Max number of workers to use in the deployment. If 0, then all of the workers can be used. Recommend to just use 0.
autoRebalanceDelayInSeconds 120 After detecting liveinstance change, how long to wait before rebalance. If you have a host restarted in this period of time, then restart won't trigger rebalance.
autoRebalancePeriodInSeconds 0 How often to run a rebalance check
autoRebalanceMinIntervalInSeconds 600 Min interval between each rebalance
autoRebalanceMinLagTimeInSeconds 900 Min time to consider a partition is lagging
autoRebalanceMinLagOffset 100000 Min offset to consider a partition is lagging
autoRebalanceMaxOffsetInfoValidInSeconds 1800 Wait time before calculating lag after restart
autoRebalanceWorkloadRatioThreshold 1.2 Workload ratio to consider a worker is overloaded
workloadRefreshPeriodInSeconds 600 Interval to refresh workload info
maxDedicatedLaggingInstancesRatio 0.5 Max number of workers for lagging partitions
maxStuckPartitionMovements 3 Max number of moves for stuck partitions before stop moving
moveStuckPartitionAfterMinutes 20 Min wait time before moving a stuck partition
patternToExcludeTopics __consumer_offsets Pattern of topic to exclude from whitelisting
backUpToGit FALSE Backup controller metadata to git (true) or local file (false)
localBackupFilePath /var/log/kafka-mirror-maker-controller Local backup file location
remoteBackupRepo Remote Backup Repo to store cluster state
localGitRepoClonePath Clone location of the remote git backup repo
enableSrcKafkaValidation FALSE Enable Source Kafka Validation, e.g. if topic exists in source cluster
maxWorkloadPerWorkerByteWithinRegion 8388608 The max workload per worker within region
maxWorkloadPerWorkerByteCrossRegion 8388608 The max workload per worker cross region

3.2.3 Worker Configurations

3.2.3.1 Helix Configurations

Name Default Example Description
federated.enabled If enable federation mode
federated.deployment.name Deployment name
zkServer uReplicator Helix cluster name which should be the same as helixClusterName in controller
instanceId Worker's hostname or ID

3.2.3.2 Consumer Configurations

Name Default Example Description
zookeeper.connection.timeout.ms 30000 Zookeeper connection timeout
zookeeper.session.timeout.ms 30000 Zookeeper session timeout
num.consumer.fetchers 3 Number of fetcher threads per source broker
socket.receive.buffer.bytes 1966080 The SO_RCVBUF buffer of the socket sever sockets. If the value is -1, the OS default will be used.
fetch.message.max.bytes 31457280 The maximum amount of data per-partition the server will return.
fetch.min.bytes 131072 The minimum amount of data the server should return for a fetch request.
queued.max.message.chunks 5 Maxn message in memory when producer is busy
auto.offset.reset largest Where to start consuming when last commited offset is out of range
fetch.wait.max.ms 500 Max wait time for each fetcher request issued

3.2.3.3 Producer Configurations

Name Default Example Description
producer.type async Async or sync produce (i.e., wait ack before sending next message)
acks The number of acknowledgments the producer requires the leader to have received before considering a request complete.
compression.type Producer compression type
key.serializer org.apache.kafka.common.serialization.ByteArraySerializer Serializer class for key
value.serializer org.apache.kafka.common.serialization.ByteArraySerializer Serializer class for value
batch.size 131072 Producer batch size
linger.ms 1000 Producer waiting time to accumulate a batch
buffer.memory 167772160 The total bytes of memory the producer can use to buffer records waiting to be sent to the server.
max.request.size 31457280 The maximum size of a request in bytes.
send.buffer.bytes 1310720 The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default will be used.
max.in.flight.requests.per.connection 5 The maximum number of unacknowledged requests the client will send on a single connection before blocking.
request.timeout.ms 30000 Producer request timeout

3.2.3.4 dstZK Configurations

Name Default Example Description
enable FALSE If enable 1:1 partition mapping

4. OPERATIONS

4.1 Topic Operations

4.1.1 Non-federation Mode

Whitelist a topic

curl -X POST "<controller_host>:<ontroller_port>/topics/<topic>"

Blacklist a topic

curl -X DELETE "<controller_host>:<ontroller_port>/topics/<topic>"

4.1.2 Federation Mode

Whitelist a topic

curl -X POST "<manager_host>:<manager_port>/topics/<topic>?src=<src>&dst=<dst>"

Blacklist a topic

curl -X DELETE "<manager_host>:<manager_port>/topics/<topic>?src=<src>&dst=<dst>"

4.2 Partition Operations

This is useful when a partition in destination is unavailable, i.e. offline partition and you still want rest of the partition to be replicated. This operation is the same for both modes.

Get blacklisted topic partitions

curl -X GET "<controller_host>:<ontroller_port>/blacklist?topic=<topic>"

Blacklist topic partition

curl -X POST "<controller_host>:<ontroller_port>/blacklist?topic=<topic>&partition=<partition>&opt=blacklist"

Remove topic partitions

curl -X POST "<controller_host>:<ontroller_port>/blacklist?topic=<topic>&partition=<partitio >&opt=whitelist"

4.3 Number of Worker Override (Federation mode only)

This is useful when you need extra worker in one route temporarily for spiking traffic

Get current worker number override

curl -X GET "<manager_host>:<manager_port>/admin/worker_number_override"

Override number of workers

curl -X POST "<manager_host>:<manager_port>/admin/worker_number_override?route=@<src_cluster>@<dst_cluster>@<route_id>&workerNumber=<new_number>"

Delete number of worker override

curl -X DELETE "<manager_host>:<manager_port>/admin/worker_number_override?route=@<src_cluster>@<dst_cluster>@<route_id>"

4.4 Rebalance Operations (Federation mode only)

If you are using static controller/worker id, i.e., host name/id doesn’t change after restart/deploy, we recommend to turn rebalance off as default so manager will not rebalance route during restart. Otherwise, you need to turn on rebalance.

Enable route rebalance

curl -X POST "<manager_host>:<manager_port>/admin/enable_autobalancing

Disable route rebalance

curl -X POST "<manager_host>:<manager_port>/admin/disable_autobalancing

Check route rebalance status

curl -X GET "<manager_host>:<manager_port>/admin/autobalancing_status

4.4 Auto-scaling Operations (Federation mode only)

We recommend to turn off auto-scaling during restart/deployment.

Enable route auto-scaling

curl -X POST "<manager_host>:<manager_port>/admin/enable_autoscaling

Disable route auto-scaling

curl -X POST "<manager_host>:<manager_port>/admin/disable_autoscaling

Check route auto-scaling status

curl -X GET "<manager_host>:<manager_port>/admin/autoscaling_status

4.5 Start Command

4.5.1 Non-federation Mode

Controller:

java -Dlog4j.configuration=file:config/tools-log4j.properties -Xms3g -Xmx3g -Xmn512m -XX:NewSize=512m -XX:MaxNewSize=512m -XX:+AlwaysPreTouch -XX:+UseCompressedOops -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+DisableExplicitGC -XX:+PrintCommandLineFlags -XX:CMSInitiatingOccupancyFraction=80 -XX:SurvivorRatio=2 -XX:+PrintTenuringDistribution -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -XX:+PrintGCTimeStamps -Xloggc:/tmp/ureplicator-controller/gc-ureplicator-controller.log -server -cp uReplicator-Controller/target/uReplicator-Controller-1.0.0-SNAPSHOT-jar-with-dependencies.jar com.uber.stream.kafka.mirrormaker.controller.ControllerStarter -enableFederated false -deploymentName <cluster1-cluster2> -helixClusterName uReplicator -mode customized -zookeeper zk1,zk2,zk3/ureplicator/<cluster1-cluster2> -port <port> -env <dc1>.<cluster1-cluster2> -instanceId 0 -hostname <hostname> -graphiteHost 127.0.0.1 -graphitePort 4756 -metricsPrefix ureplicator-controller -enableAutoWhitelist false -enableAutoTopicExpansion true -autoRebalanceDelayInSeconds 120 -autoRebalancePeriodInSeconds 120 -autoRebalanceMinIntervalInSeconds 600 -autoRebalanceMinLagTimeInSeconds 900 -autoRebalanceMinLagOffset 100000 -autoRebalanceMaxOffsetInfoValidInSeconds 1800 -autoRebalanceWorkloadRatioThreshold 1.5 -maxDedicatedLaggingInstancesRatio 0.2 -maxStuckPartitionMovements 3 -moveStuckPartitionAfterMinutes 20 -workloadRefreshPeriodInSeconds 300 -patternToExcludeTopics ^__.* -enableSrcKafkaValidation true -srcKafkaZkPath "src_zk/cluster1" -destKafkaZkPath "dst_zk/cluster2" -consumerCommitZkPath "" -maxWorkingInstances 0 -autoRebalanceDelayInSeconds 120 -refreshTimeInSeconds 600 -initWaitTimeInSeconds 120 -numOffsetThread 10 -blockingQueueSize 30000 -offsetRefreshIntervalInSec 300 -groupId <ureplicator-cluster1-cluster2> -backUpToGit false -localBackupFilePath /tmp/ureplicator-controller -localGitRepoClonePath /ureplicator-controller-bkp

Worker:

java -Dlog4j.configuration=file:config/test-log4j.properties -XX:MaxGCPauseMillis=100 -XX:InitiatingHeapOccupancyPercent=45 -verbose:gc -Xmx1g -Xms1g -XX:+UseG1GC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution -Xloggc:gc-ureplicator-worker.log -server -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=9089 -Dcom.sun.management.jmxremote.rmi.port=9089 -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -cp uReplicator-Worker/target/uReplicator-Worker-2.0.2-SNAPSHOT-jar-with-dependencies.jar com.uber.stream.ureplicator.worker.WorkerStarter -federated_enabled true -consumer_config config/consumer.properties -producer_config config/producer.properties -helix_config config/helix.properties -topic_partition_count_observer true -cluster_config config/clusters.properties

4.5.1 Federation Mode

Manager:

java -Dlog4j.configuration=file:config/tools-log4j.properties -Xms3g -Xmx3g -Xmn512m -XX:NewSize=512m -XX:MaxNewSize=512m -XX:+AlwaysPreTouch -XX:+UseCompressedOops -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+DisableExplicitGC -XX:+PrintCommandLineFlags -XX:CMSInitiatingOccupancyFraction=80 -XX:SurvivorRatio=2 -XX:+PrintTenuringDistribution -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -XX:+PrintGCTimeStamps -Xloggc:/tmp/ureplicator-manager/gc-ureplicator-manager.log -server -cp uReplicator-Manager/target/uReplicator-Manager-1.0.0-SNAPSHOT-jar-with-dependencies.jar com.uber.stream.kafka.mirrormaker.manager.ManagerStarter -config config/clusters.properties -srcClusters cluster1,cluster2 -destClusters cluster3 -enableRebalance false -zookeeper zk1,zk2,zk3/ureplicator/testing-dc1 -managerPort <port> -deployment testing-dc1 -env dc1.testing-dc1 -instanceId <id> -controllerPort <port> -graphiteHost 127.0.0.1 -graphitePort 4756 -metricsPrefix ureplicator-manager -workloadRefreshPeriodInSeconds 300 -initMaxNumPartitionsPerRoute 1500 -maxNumPartitionsPerRoute 2000 -initMaxNumWorkersPerRoute 10 -maxNumWorkersPerRoute 80

Controller:

java -Dlog4j.configuration=file:config/tools-log4j.properties -Xms3g -Xmx3g -Xmn512m -XX:NewSize=512m -XX:MaxNewSize=512m -XX:+AlwaysPreTouch -XX:+UseCompressedOops -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+DisableExplicitGC -XX:+PrintCommandLineFlags -XX:CMSInitiatingOccupancyFraction=80 -XX:SurvivorRatio=2 -XX:+PrintTenuringDistribution -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -XX:+PrintGCTimeStamps -Xloggc:/tmp/ureplicator-controller/gc-ureplicator-controller.log -server -cp uReplicator-Controller/target/uReplicator-Controller-1.0.0-SNAPSHOT-jar-with-dependencies.jar com.uber.stream.kafka.mirrormaker.controller.ControllerStarter -config config/clusters.properties -srcClusters cluster1,cluster2 -destClusters cluster3 -enableFederated true -deploymentName testing-dc1 -mode customized -zookeeper zk1,zk2,zk3/ureplicator/testing-dc1 -port <port> -env dc1.testing-dc1 -instanceId <id> -hostname <hostname> -graphiteHost 127.0.0.1 -graphitePort 4756 -metricsPrefix ureplicator-controller -enableAutoWhitelist false -enableAutoTopicExpansion true -autoRebalanceDelayInSeconds 120 -autoRebalancePeriodInSeconds 120 -autoRebalanceMinIntervalInSeconds 600 -autoRebalanceMinLagTimeInSeconds 900 -autoRebalanceMinLagOffset 100000 -autoRebalanceMaxOffsetInfoValidInSeconds 1800 -autoRebalanceWorkloadRatioThreshold 1.5 -maxDedicatedLaggingInstancesRatio 0.2 -maxStuckPartitionMovements 3 -moveStuckPartitionAfterMinutes 20 -workloadRefreshPeriodInSeconds 300 -patternToExcludeTopics ^__.* -enableSrcKafkaValidation true -consumerCommitZkPath "" -maxWorkingInstances 0 -autoRebalanceDelayInSeconds 120 -refreshTimeInSeconds 600 -initWaitTimeInSeconds 120 -numOffsetThread 10 -blockingQueueSize 30000 -offsetRefreshIntervalInSec 300 -backUpToGit false -localBackupFilePath /tmp/ureplicator-controller -localGitRepoClonePath /ureplicator-controller-bkp 

Worker:

java -Dlog4j.configuration=file:config/tools-log4j.properties -XX:MaxGCPauseMillis=100 -XX:InitiatingHeapOccupancyPercent=45 -verbose:gc -Xmx5g -Xms5g -XX:+UseG1GC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution -server -javaagent:./bin/libs/jmxtrans-agent-1.2.4.jar=config/jmxtrans.xml -cp uReplicator-Worker/target/uReplicator-Worker-1.0.0-SNAPSHOT-jar-with-dependencies.jar kafka.mirrormaker.MirrorMakerWorker --cluster.config config/clusters.properties --consumer_config config/consumer.properties --producer_config config/producer.properties --helix_config config/helix.properties --dstzk_config config/dstzk.propertiess

5. MONITORING

5.1 Manager Metrics (Federation mode only)

Manager leader count (It can be used to find leader of managers)

stats.$dc.counter.ureplicator-manager.$deployment.$manager_host.leader.counter.count

Available controllers

stats.$dc.counter.ureplicator-manager.$deployment.$manager_host.controller.available.counter.count

Assigned controllers

stats.$dc.counter.ureplicator-manager.$deployment.$manager_host.controller.assigned.counter.count

Available workers

stats.$dc.counter.ureplicator-manager.$deployment.$manager_host.worker.available.counter.count

Total topic numbers per route

stats.$dc.counter.ureplicator-manager.$deployment.$manager_host.$route.topic.totalNumber.count

Number of controllers per route

stats.$dc.counter.ureplicator-manager.$deployment.$manager_host.$route.controller.totalNumber.count

Number of workers per route

stats.$dc.counter.ureplicator-manager.$deployment.$manager_host.$route.worker.totalNumber.count

Mismatch topic/worker per route

stats.$dc.counter.ureplicator-manager.$deployment.$manager_host.validate.wrong.counter.count

More to add...

5.2 Controller Metrics

Note: remove $route for non-federation mode

Live worker in the route/cluster

stats.$dc.counter.ureplicator-controller.$deployment.$route.$controller_host.worker.liveInstances.count

Stuck partition in the route/cluster (no message is replicated, this is the most important metrics to indicate if the cluster is healthy)

stats.$dc.counter.ureplicator-controller.$deployment.$route.$controller_host.NumNoProgressPartitions

Heartbeat of offset monitor thread

stats.$dc.counter.ureplicator-controller.$deployment.$route.$controller_host.offsetMonitor.executed.count

Offset monitor thread failure count

stats.$dc.counter.ureplicator-controller.$deployment.$route.$controller_host.OffsetMonitorFailureCount

Number of topics

stats.$dc.counter.ureplicator-controller.$deployment.$route.$controller_host.topic.totalNumber.count

Number of partitions

stats.$dc.counter.ureplicator-controller.$deployment.$route.$controller_host.topic.partitions.totalNumber.count

Leader count

stats.$dc.counter.ureplicator-controller.$deployment.$route.$controller_host.leader.counter.count

Topic parititon lag from controller’s view

stats.$dc.counter.ureplicator-controller.$deployment.$route.$controller_host.OffsetMonitorLag.$topic.$partition

More to add...

5.3 Worker Metrics

You can find all worker metrics in https://github.com/uber/uReplicator/blob/master/config/jmxtrans.xml

Clone this wiki locally