Skip to content

uReplicator Design

icefury71 edited this page Oct 3, 2016 · 1 revision

Background

The current Kafka MirrorMaker design uses the Kafka high-level consumer to consume data from multiple regional Kafka clusters and produce to an aggregation Kafka cluster. This means that whenever a regional Kafka broker has an issue, it causes rebalancing across all the currently serving topics. Some undesired consequences come along with that, affecting the entire cluster and existing topics. To be specific, the current problems are below:

  • Problems:
  • Rebalancing (High-level consumer):
    • Happens sporadically for no reason
    • Affects the entire cluster even when just one topic/partition undergoes an issue (e.g., leader change, gc issue/zookeeper connection died, cannot catch up)
    • Takes time to recover, and whole cluster will die after 32 retry temps
    • Bugs - it could happen that one consumer falls off the grid (not owning any partition, but no one else takes care)
    • Metadata problem: might brings down entire cluster (e.g. one topic not exists)
  • Restart required when new topic gets whitelists

Kafka MirrorMaker's Design

The original MirrorMaker tool shipped with Kafka (as of 0.8.2) uses the high-level consumer to handle message reading. It relies on the ZookeeperConsumerConnector to listen to the partition for issues like leader change, etc., and signal consumer group to rebalance accordingly.

The gray part is what we want to replace with uReplicator.

ZookeeperConsumerConnector handles consumers' interaction with Zookeeper, including rebalancing, leader change, etc.

Goals of uReplicator

  1. Stable mirroring: Rebalancing only during startup (when a node is added/ deleted)
  2. Simple operations: Easy to scale up cluster, no server restart for whitelisting topics
  3. High throughput: Max offset lag is consistently 0
  4. Time SLA: ~5min

*Good to have: Auditing capability (might be Chaperone integration)

Proposed uReplicator Design

uReplicator Controller

The uReplicator controller internally wraps itself up with a Helix controller and handles all the cluster management workload.

Helix Controller

Helix is a generic cluster management framework used for the automatic management of partitioned, replicated, and distributed resources hosted on a cluster of nodes. Helix automates reassignment of resources in the face of node failure and recovery, cluster expansion, and reconfiguration.

In this design, we use Helix to manage all the nodes in the cluster and assign topic partitions. We define each Kafka topic as one Helix resource and each Kafka topic partition as a Helix partition. Helix controller registers OnlineOffline StateModel and customized balance logic to manage worker nodes and topic partitions assignment. The IdealStates defined in Helix is the real topic partition mapping for uReplicator workers, which consumes from the source Kafka stream and produces to the destination Kafka cluster.

As a cluster manager, Helix controller also exposes REST endpoints to accept requests to add/update/delete a topic.

Helix Terminology Map

Kafka World Helix World
Kafka Topic Helix Resource
Kafka Topic Partition Helix Partition
MirrorMaker Controller Helix Controller
MirrorMaker Worker Helix Participant

uReplicator Worker

A uReplicator worker registers itself as a Helix participant. The worker accepts requests from the controller to start/stop consuming for a particular topic partition.

SimpleConsumer in uReplicator Worker

To fully address the rebalancing caused by the high-level consumer, we use SimpleConsumer. It supports the automatic new topic adding and won’t affect the existing topics when partition leaders change or a new partition gets added.

Topic Partition Assignment Management

The controller has the global view of number of uReplicator workers and the information about topics to be copied. Controller assigns topic partitions equally across all the available uReplicator workers. Internally, it uses Helix to store the topic partitions assignment in Zookeeper. The assignment of topic partitions is distributed to each uReplicator worker as state transitions through Helix. For each partition to worker assignment, the worker gets an offline-to-online state transition notification and adds the corresponding topic partition to its own simple consumer. For each partition deleted from the assignment, the worker gets an online-to-offline state transition notification and removes this partition from its consumer. Then the uReplicator worker gets an offline-to-online notification and goes online to consume data and produce to aggregation cluster.

Common Cluster Behavior

Add new topic

When you add a new topic to the controller, it assigns each partition to the worker currently serving the minimum number of partitions. Note: If auto topic whitelisting is enabled, the controller auto-whitelists topics in both source and destination Kafka clusters.

Delete existing topic

When you delete an existing topic from the controller, it removes this topic from idealStates. Then, the corresponding uReplicator workers get online-to-offline transition messages and drop those topic partitions.

Expand partitions for existing topic

When you expand a topic in the controller, it assigns expanded partitions, using the same mechanism as adding new topic. Note: If auto topic whitelisting is enabled, topic expansion in uReplicator happens automatically.

Add/remove new uReplicator worker

uReplicator controller registers a listener on live uReplicator instance changes. Whenever uReplicator workers are added/removed, this listener is notified and schedules a rebalance job with a configurable delay (default is 2 minutes). After 2 minutes, the rebalance job starts and checks the current cluster status to determine whether to rebalance. By adding fixed delay here, the controller won't do a rebalance in the presence of Zookeeper disconnection/reconnection issues or rolling restarting uReplicator workers. Rebalancing only happens when a machine is added or during a failure scenario.

Cluster Expansion/Shrinking

Usually in production, we deploy three controllers, one of which is the leader. All uReplicator workers join the cluster as Helix participants and wait for the topic partitions assignment. The controller detects any new uReplicator workers added or existing workers removed and rebalances the cluster accordingly.

Cluster status monitoring/validation

The controller periodically checks current idealStates and externalView to make sure they are aligned and reports mismatching topics and partitions to metrics. The controller has the option to create a Kafka observer, which periodically fetches topic information from Kafka. In this case, the controller also compares topic info from Kafka brokers with the controller assignment. If the number of partitions assigned differs from what a broker has, the controller reports mismatch topics and partitions metrics.