Skip to content

Latest commit

 

History

History
285 lines (224 loc) · 13.8 KB

notes.adoc

File metadata and controls

285 lines (224 loc) · 13.8 KB

Cruise Control

To start with it is worth familiarising yourself with cruise-control (CC) - I recommend this video as I found it the most useful. I have also left example YAML/Properties files in this repository.

Running Cruise Control

First things first, it seems as though the CC releases are not very stable. In the time I have been looking at it there has been an issue with (almost) every release. I have applied some workarounds to get a version running for Kafka 2.0.1, and built these into images.

Important
It should be noted that the workaround prevents the default sample stores (__KafkaCruiseControlPartitionMetricSamples, __KafkaCruiseControlModelTrainingSamples) from having their replication factor updated, no matter the value of sample.store.topic.replication.factor in the configuration file

Kafka Image

We need to rebuild the Strimzi Kafka image with the CC metric sampler so that we can retrieve data from the brokers. This is as simple as adding the metrics reporter JAR (that has been built with a version of Java compatible with the Strimzi Kafka Image), to the /opt/kafka/libs/ folder.

Build jar
./gradlew jar
Build the image
FROM strimzi/kafka:0.11.4-kafka-2.0.1

COPY cruise-control-metrics-reporter-2.0.46.jar /opt/kafka/libs/
docker build -t <my-user>/kakfa:latest .
docker push <my-user>/kafka:latest

Build Cruise Control Image

Next we need to build an image to deploy CC as an application in our k8s cluster. I have created a hacky workaround for the bug in v2.0.46 (with the pitfall outlined above) in my own branch adamcattermole/cruise-control:fix/2.0.46-strimzi, although it might be worth testing with the latest release. It may be that this is fixed at some point soon, but for the purposes of my experimentation I stopped attempting to fix it myself.

Note
At the time of writing, reconnect.backoff.ms must be defined in the config file, otherwise this results in a NPE due to changes from PR#707. This should be fixed in (the currently unreleased) v2.0.49 from PR#732 and PR#735.

We also deploy the latest version (v0.1.0 at the time of writing) of linkedin/cruise-control-ui, which provides a frontend and visualisations for the RESTful operations available for CC. The deployment is as simple as extracting the files into the root CC directory.

Note
cruise-control-ui does not currently support all of the possible REST operations/parameters so it may be necessary to perform some requests manually
Example Dockerfile used to build the image. For testing I swap the ENTRYPOINT to run CC manually
FROM centos:7

RUN yum -y install git java-1.8.0-openjdk-devel && \
    yum clean all -y

ENV JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk

RUN git clone --branch fix/2.0.46-strimzi https://github.com/adam-cattermole/cruise-control.git

WORKDIR cruise-control

RUN ./gradlew jar copyDependantLibs

COPY cruisecontrol-2.0.46.properties config/cruisecontrol.properties

RUN curl -L https://github.com/linkedin/cruise-control-ui/releases/download/v0.1.0/cruise-control-ui.tar.gz \
    -o /tmp/cruise-control-ui.tar.gz \
    && tar zxvf /tmp/cruise-control-ui.tar.gz

ENTRYPOINT ["/bin/bash", "-c", "./kafka-cruise-control-start.sh config/cruisecontrol.properties"]
# ENTRYPOINT ["/bin/bash", "-c", "sleep 30000000"]

Create Deployment

Spin up a new cluster operator and Kafka cluster (making sure to specify the Kafka image built using the metrics lib), or use mine adamcattermole/kafka:2.0.46. metric.reporters should also be defined.

Resource defintion for Kafka cluster
apiVersion: kafka.strimzi.io/v1alpha1
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    image: adamcattermole/kafka:2.0.46
    version: 2.0.1
    replicas: 2
    listeners:
      plain: {}
      tls: {}
    config:
      metric.reporters: "com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter"
      offsets.topic.replication.factor: 1
      transaction.state.log.replication.factor: 1
      transaction.state.log.min.isr: 2
      log.message.format.version: "2.0.1"
    storage:
      type: ephemeral
  zookeeper:
    replicas: 2
    storage:
      type: ephemeral
  entityOperator:
    userOperator: {}
    topicOperator: {}

CC needs direct access to Zookeeper, which is not exposed due to the security model. You can use Jakub Scholz' gist to expose it (insecurely).

Important
If running on OpenShift, we must provide the CC container with root permissions and so this should be provided in the configuration. On native Kubernetes it already has these permissions and so no change is required. This will enable writing to log directories, running the REST endpoint, and other functionality that may have been disabled.

Finally run a deployment of the CC container you created before, or use my image adamcattermole/cruise-control:2.0.46-strimzi.

Example YAML deployment for native k8s
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: my-cruise-control
  labels:
    app: cruise-control
spec:
  replicas: 1
  template:
    metadata:
      labels:
        app: cruise-control
        name: my-cruise-control
    spec:
      containers:
      - name: my-cruise-control
        image: adamcattermole/cruise-control:2.0.46-strimzi
        imagePullPolicy: 'Always'
---
apiVersion: v1
kind: Service
metadata:
  labels:
    app: cruise-control
    name: my-cruise-control
  name: my-cruise-control
spec:
  ports:
    - name: http-9090
      port: 9090
      protocol: TCP
      targetPort: 9090
  type: NodePort
  selector:
    name: my-cruise-control

If using Minikube we can list the services using minikube service list, and interact with CC through the provided IP:port address. At this point it is possible to use CC to rebalance the cluster. You can start with a cluster of N brokers, deploy some topics, scale up, and call the rebalance command to optimize the cluster, distributing the existing topics over the new brokers. This should be simple to implement into the cluster operator - on reconciliation when the number of brokers changes and it performs the scale, check if CC is running, call the rebalance command once status is Ready (although I realise there is a lot more setup/configuration than that). I think this is what the recently released banzai operator does, using a simple implementation to interact with CC’s REST API.

Some Additional Thoughts

There are several benefits to running CC in a Kubernetes environment over bare-metal. One of them which I think is particularly key is the ability to dynamically scale the number of brokers based on the supported goals in real-time.

This does however add an additional concern to the thinking behind CC. An assumption is made that the number of brokers is fixed and does not change. Our optimization problem is already non-trivial, and by adding additional brokers we significantly increase the complexity of finding the optimal solution.

There are also further less obvious issues that arise when using CC on Kubernetes. The default metrics are useful, but require changing and updating for this environment. Here is a table of the goals that are included in CC by default. I have listed some considerations / problems next to those that could have them, as well as thinking about whether scaling the number of brokers impacts the goal:

Goal Comments Does scaling #Brokers impact?

Rack Awareness

Can we retrieve rack information in a Kubernetes/OpenShift environment easily? Do we need to worry about this in a cloud environment or leave it to the supplier? On cloud would a Region Awareness ( / Datacentre Awareness) goal be preferable (at the cost of added inter-broker latency)?

N

Replica Capacity

Y

Disk Capacity

Is the "disk" (or volume) really reaching capacity, or can we just increase the volume claim? Should this be the Disk capacity of the entire Kubernetes cluster instead?

N?

Network In/Out Capacity

Do we know (or can we inspect) this value when running in a cloud environment?

Y

CPU Capacity

How is the utilization measured? Is this the utilization of the container based on quotas / limits? Can we easily increase the pod quotas? Should this be the CPU capacity of the entire cluster instead?

Y

Replica Distribution

N

Potential NW In/Out

N

Disk Usage Distribution

N

Network In/Out Distribution

N

CPU Usage Distribution

N

Topic Distribution

N

Leader Replica Distribution

N

Leader Bytes In Distribution

N

There are a good few goals that should theoretically work as intended, but others have some ambiguity.

CC looks to balance these goals, by looping over one-by-one and ensuring that no changes for a new goal negatively impact the previous. If the algorithm can not prevent a hard goal from being violated, the optimization is cancelled with a failure. Once optimized it performs a diff on the new proposal and the previous, to ensure that the new one is actually better than the old.

I decided that the best first step would be to look at a goal that does work, and that will benefit from scaling the brokers, to see if we could improve our rebalancing proposals. The obvious candidate is the hard-goal on Replica Capacity.

Replica Capacity Scaling

The Replica Capacity goal is by default a hard goal, and the configuration is provided through the properties file. It is as simple as: if max.replicas.per.broker = m, and current number of replicas = R, the number of brokers N must be >= ceiling(R/m) for the goal to be satisfied. At present, if the optimization fails the rebalance is cancelled, even though in our scenario the solution is to scale up the kafka cluster.

I have started to implement some adaptations to the CC program flow to allow us to try rebalance with more brokers. The code can be found in adamcattermole/cruise-control:2.0.46/strimzi-scale-brokers. To do this I have added a function to the Goal interface canAddBrokerHelp() where AbstractGoal sets the default to false, and the ReplicaCapacityGoal overrides it to true. I have also modified the OptimizationFailureException to store this boolean value so that we know whether to try again, or throw an exception. This is quite a hacky solution, and does not follow the existing operation structure in CC. Scaling to additional brokers is quite an aggressive operation, and so for the time being my decision has been to recursively increase the number of brokers in the cluster model by one at a time, up to a num.brokers.maximum configuration value. The configuration is provided through the properties file, and I have set the default to 100.

This produces a proposal for optimizations to the cluster with increased numbers of brokers, assuming that the violated ReplicaCapacityGoal is able to be satisfied without breaching the num.brokers.maximum limit. It will throw an exception if we reached our num.broker.maximum and the ReplicaCapacityGoal is still not satisfied. You should be able to make canAddBrokerHelp() true for other goals and ensure that the OptimizationFailureException is thrown with this value for other goals to be optimized in the same way also.

Further work

An obvious problem with the changes so far is that we do not have the ability to scale down. Ideally we would also have another function canRemoveBrokersHelp(), and if both canAdd.. and canRemove.. are true we would need to try diff the proposals to decide if we are converging on a more optimal solution (although I think this is an incredibly hard problem..). Realistically this does not matter while we are sticking to looking at hard-goal violations only - they all seem to relate to capacity, and so I can not imagine scaling down could fix it.

Currently the proposal cache is not taking into account the changes to scale the brokers. This is due to some asynchronous operations updating the proposals, so a manual override with &ignore_proposal_cache=true is required in the rebalance POST request to generate proposals with the new logic. It may be worth performing deeper changes by adding additional configuration to the BalancingAction, BalancingConstraint as well as the GoalOptimizer classes, to provide state for scaling the Kafka cluster.

Even though we generate the new proposal that suggests that we scale to N brokers, we do not actually perform this scaling within the executable - running with &dryrun=true is required. To get this part working it will likely require a new ExecutionProposal to follow the existing design, along with updates to the Executor class to include _state for scaling up and down. It may also be worth updating the default addBrokers() function, which can currently add Kafka brokers that already exist to the cluster, but not add new ones. There would need to be some code so that CC can update the Kafka cluster CRD replica count, causing the operator to scale, and then waiting for the Ready status to proceed, or rollback on failure.