-
Notifications
You must be signed in to change notification settings - Fork 198
uReplicator Design
The current Kafka MirrorMaker design uses the Kafka high-level consumer to consume data from multiple regional Kafka clusters and produce to an aggregation Kafka cluster. This means that whenever a regional Kafka broker has an issue, it causes rebalancing across all the currently serving topics. Some undesired consequences come along with that, affecting the entire cluster and existing topics. To be specific, the current problems are below:
- Problems:
- Rebalancing (High-level consumer):
- Happens sporadically for no reason
- Affects the entire cluster even when just one topic/partition undergoes an issue (e.g., leader change, gc issue/zookeeper connection died, cannot catch up)
- Takes time to recover, and whole cluster will die after 32 retry temps
- Bugs - it could happen that one consumer falls off the grid (not owning any partition, but no one else takes care)
- Metadata problem: might brings down entire cluster (e.g. one topic not exists)
- Restart required when new topic gets whitelists
The original MirrorMaker tool shipped with Kafka (as of 0.8.2) uses the high-level consumer to handle message reading. It relies on the ZookeeperConsumerConnector to listen to the partition for issues like leader change, etc., and signal consumer group to rebalance accordingly.
The gray part is what we want to replace with uReplicator.
ZookeeperConsumerConnector handles consumers' interaction with Zookeeper, including rebalancing, leader change, etc.
- Stable mirroring: Rebalancing only during startup (when a node is added/ deleted)
- Simple operations: Easy to scale up cluster, no server restart for whitelisting topics
- High throughput: Max offset lag is consistently 0
- Time SLA: ~5min
*Good to have: Auditing capability (might be Chaperone integration)
The uReplicator controller internally wraps itself up with a Helix controller and handles all the cluster management workload.
Helix is a generic cluster management framework used for the automatic management of partitioned, replicated, and distributed resources hosted on a cluster of nodes. Helix automates reassignment of resources in the face of node failure and recovery, cluster expansion, and reconfiguration.
In this design, we use Helix to manage all the nodes in the cluster and assign topic partitions. We define each Kafka topic as one Helix resource and each Kafka topic partition as a Helix partition. Helix controller registers OnlineOffline StateModel and customized balance logic to manage worker nodes and topic partitions assignment. The IdealStates defined in Helix is the real topic partition mapping for uReplicator workers, which consumes from the source Kafka stream and produces to the destination Kafka cluster.
As a cluster manager, Helix controller also exposes REST endpoints to accept requests to add/update/delete a topic.
Kafka World | Helix World |
---|---|
Kafka Topic | Helix Resource |
Kafka Topic Partition | Helix Partition |
MirrorMaker Controller | Helix Controller |
MirrorMaker Worker | Helix Participant |
A uReplicator worker registers itself as a Helix participant. The worker accepts requests from the controller to start/stop consuming for a particular topic partition.
To fully address the rebalancing caused by the high-level consumer, we use SimpleConsumer. It supports the automatic new topic adding and won’t affect the existing topics when partition leaders change or a new partition gets added.
The controller has the global view of number of uReplicator workers and the information about topics to be copied. Controller assigns topic partitions equally across all the available uReplicator workers. Internally, it uses Helix to store the topic partitions assignment in Zookeeper. The assignment of topic partitions is distributed to each uReplicator worker as state transitions through Helix. For each partition to worker assignment, the worker gets an offline-to-online state transition notification and adds the corresponding topic partition to its own simple consumer. For each partition deleted from the assignment, the worker gets an online-to-offline state transition notification and removes this partition from its consumer. Then the uReplicator worker gets an offline-to-online notification and goes online to consume data and produce to aggregation cluster.
Add new topic
When you add a new topic to the controller, it assigns each partition to the worker currently serving the minimum number of partitions. Note: If auto topic whitelisting is enabled, the controller auto-whitelists topics in both source and destination Kafka clusters.
Delete existing topic
When you delete an existing topic from the controller, it removes this topic from idealStates. Then, the corresponding uReplicator workers get online-to-offline transition messages and drop those topic partitions.
Expand partitions for existing topic
When you expand a topic in the controller, it assigns expanded partitions, using the same mechanism as adding new topic. Note: If auto topic whitelisting is enabled, topic expansion in uReplicator happens automatically.
Add/remove new uReplicator worker
uReplicator controller registers a listener on live uReplicator instance changes. Whenever uReplicator workers are added/removed, this listener is notified and schedules a rebalance job with a configurable delay (default is 2 minutes). After 2 minutes, the rebalance job starts and checks the current cluster status to determine whether to rebalance. By adding fixed delay here, the controller won't do a rebalance in the presence of Zookeeper disconnection/reconnection issues or rolling restarting uReplicator workers. Rebalancing only happens when a machine is added or during a failure scenario.
Usually in production, we deploy three controllers, one of which is the leader. All uReplicator workers join the cluster as Helix participants and wait for the topic partitions assignment. The controller detects any new uReplicator workers added or existing workers removed and rebalances the cluster accordingly.
The controller periodically checks current idealStates and externalView to make sure they are aligned and reports mismatching topics and partitions to metrics. The controller has the option to create a Kafka observer, which periodically fetches topic information from Kafka. In this case, the controller also compares topic info from Kafka brokers with the controller assignment. If the number of partitions assigned differs from what a broker has, the controller reports mismatch topics and partitions metrics.