To start with it is worth familiarising yourself with cruise-control (CC) - I recommend this video as I found it the most useful. I have also left example YAML/Properties files in this repository.
First things first, it seems as though the CC releases are not very stable. In the time I have been looking at it there has been an issue with (almost) every release. I have applied some workarounds to get a version running for Kafka 2.0.1, and built these into images.
Important
|
It should be noted that the workaround prevents the default sample stores (__KafkaCruiseControlPartitionMetricSamples, __KafkaCruiseControlModelTrainingSamples) from having their replication factor updated, no matter the value of sample.store.topic.replication.factor in the configuration file
|
We need to rebuild the Strimzi Kafka image with the CC metric sampler so that we can retrieve data from the brokers.
This is as simple as adding the metrics reporter JAR (that has been built with a version of Java compatible with the Strimzi Kafka Image), to the /opt/kafka/libs/
folder.
./gradlew jar
FROM strimzi/kafka:0.11.4-kafka-2.0.1
COPY cruise-control-metrics-reporter-2.0.46.jar /opt/kafka/libs/
docker build -t <my-user>/kakfa:latest .
docker push <my-user>/kafka:latest
Next we need to build an image to deploy CC as an application in our k8s cluster.
I have created a hacky workaround for the bug in v2.0.46
(with the pitfall outlined above) in my own branch adamcattermole/cruise-control:fix/2.0.46-strimzi, although it might be worth testing with the latest release.
It may be that this is fixed at some point soon, but for the purposes of my experimentation I stopped attempting to fix it myself.
Note
|
At the time of writing, reconnect.backoff.ms must be defined in the config file, otherwise this results in a NPE due to changes from PR#707.
This should be fixed in (the currently unreleased) v2.0.49 from PR#732 and PR#735.
|
We also deploy the latest version (v0.1.0
at the time of writing) of linkedin/cruise-control-ui, which provides a frontend and visualisations for the RESTful operations available for CC. The deployment is as simple as extracting the files into the root CC directory.
Note
|
cruise-control-ui does not currently support all of the possible REST operations/parameters so it may be necessary to perform some requests manually |
FROM centos:7
RUN yum -y install git java-1.8.0-openjdk-devel && \
yum clean all -y
ENV JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk
RUN git clone --branch fix/2.0.46-strimzi https://github.com/adam-cattermole/cruise-control.git
WORKDIR cruise-control
RUN ./gradlew jar copyDependantLibs
COPY cruisecontrol-2.0.46.properties config/cruisecontrol.properties
RUN curl -L https://github.com/linkedin/cruise-control-ui/releases/download/v0.1.0/cruise-control-ui.tar.gz \
-o /tmp/cruise-control-ui.tar.gz \
&& tar zxvf /tmp/cruise-control-ui.tar.gz
ENTRYPOINT ["/bin/bash", "-c", "./kafka-cruise-control-start.sh config/cruisecontrol.properties"]
# ENTRYPOINT ["/bin/bash", "-c", "sleep 30000000"]
Spin up a new cluster operator and Kafka cluster (making sure to specify the Kafka image built using the metrics lib), or use mine adamcattermole/kafka:2.0.46.
metric.reporters
should also be defined.
apiVersion: kafka.strimzi.io/v1alpha1
kind: Kafka
metadata:
name: my-cluster
spec:
kafka:
image: adamcattermole/kafka:2.0.46
version: 2.0.1
replicas: 2
listeners:
plain: {}
tls: {}
config:
metric.reporters: "com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter"
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 2
log.message.format.version: "2.0.1"
storage:
type: ephemeral
zookeeper:
replicas: 2
storage:
type: ephemeral
entityOperator:
userOperator: {}
topicOperator: {}
CC needs direct access to Zookeeper, which is not exposed due to the security model. You can use Jakub Scholz' gist to expose it (insecurely).
Important
|
If running on OpenShift, we must provide the CC container with root permissions and so this should be provided in the configuration. On native Kubernetes it already has these permissions and so no change is required. This will enable writing to log directories, running the REST endpoint, and other functionality that may have been disabled. |
Finally run a deployment of the CC container you created before, or use my image adamcattermole/cruise-control:2.0.46-strimzi.
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: my-cruise-control
labels:
app: cruise-control
spec:
replicas: 1
template:
metadata:
labels:
app: cruise-control
name: my-cruise-control
spec:
containers:
- name: my-cruise-control
image: adamcattermole/cruise-control:2.0.46-strimzi
imagePullPolicy: 'Always'
---
apiVersion: v1
kind: Service
metadata:
labels:
app: cruise-control
name: my-cruise-control
name: my-cruise-control
spec:
ports:
- name: http-9090
port: 9090
protocol: TCP
targetPort: 9090
type: NodePort
selector:
name: my-cruise-control
If using Minikube we can list the services using minikube service list
, and interact with CC through the provided IP:port address.
At this point it is possible to use CC to rebalance the cluster.
You can start with a cluster of N
brokers, deploy some topics, scale up, and call the rebalance command to optimize the cluster, distributing the existing topics over the new brokers.
This should be simple to implement into the cluster operator - on reconciliation when the number of brokers changes and it performs the scale, check if CC is running, call the rebalance command once status is Ready (although I realise there is a lot more setup/configuration than that).
I think this is what the recently released banzai operator does, using a simple implementation to interact with CC’s REST API.
There are several benefits to running CC in a Kubernetes environment over bare-metal. One of them which I think is particularly key is the ability to dynamically scale the number of brokers based on the supported goals in real-time.
This does however add an additional concern to the thinking behind CC. An assumption is made that the number of brokers is fixed and does not change. Our optimization problem is already non-trivial, and by adding additional brokers we significantly increase the complexity of finding the optimal solution.
There are also further less obvious issues that arise when using CC on Kubernetes. The default metrics are useful, but require changing and updating for this environment. Here is a table of the goals that are included in CC by default. I have listed some considerations / problems next to those that could have them, as well as thinking about whether scaling the number of brokers impacts the goal:
Goal | Comments | Does scaling #Brokers impact? |
---|---|---|
Rack Awareness |
Can we retrieve rack information in a Kubernetes/OpenShift environment easily? Do we need to worry about this in a cloud environment or leave it to the supplier? On cloud would a Region Awareness ( / Datacentre Awareness) goal be preferable (at the cost of added inter-broker latency)? |
N |
Replica Capacity |
Y |
|
Disk Capacity |
Is the "disk" (or volume) really reaching capacity, or can we just increase the volume claim? Should this be the Disk capacity of the entire Kubernetes cluster instead? |
N? |
Network In/Out Capacity |
Do we know (or can we inspect) this value when running in a cloud environment? |
Y |
CPU Capacity |
How is the utilization measured? Is this the utilization of the container based on quotas / limits? Can we easily increase the pod quotas? Should this be the CPU capacity of the entire cluster instead? |
Y |
Replica Distribution |
N |
|
Potential NW In/Out |
N |
|
Disk Usage Distribution |
N |
|
Network In/Out Distribution |
N |
|
CPU Usage Distribution |
N |
|
Topic Distribution |
N |
|
Leader Replica Distribution |
N |
|
Leader Bytes In Distribution |
N |
There are a good few goals that should theoretically work as intended, but others have some ambiguity.
CC looks to balance these goals, by looping over one-by-one and ensuring that no changes for a new goal negatively impact the previous. If the algorithm can not prevent a hard goal from being violated, the optimization is cancelled with a failure. Once optimized it performs a diff on the new proposal and the previous, to ensure that the new one is actually better than the old.
I decided that the best first step would be to look at a goal that does work, and that will benefit from scaling the brokers, to see if we could improve our rebalancing proposals. The obvious candidate is the hard-goal on Replica Capacity.
The Replica Capacity goal is by default a hard goal, and the configuration is provided through the properties file.
It is as simple as: if max.replicas.per.broker
= m
, and current number of replicas = R
, the number of brokers N
must be >= ceiling(R/m)
for the goal to be satisfied.
At present, if the optimization fails the rebalance is cancelled, even though in our scenario the solution is to scale up the kafka cluster.
I have started to implement some adaptations to the CC program flow to allow us to try rebalance with more brokers.
The code can be found in adamcattermole/cruise-control:2.0.46/strimzi-scale-brokers.
To do this I have added a function to the Goal
interface canAddBrokerHelp()
where AbstractGoal
sets the default to false
, and the ReplicaCapacityGoal
overrides it to true
.
I have also modified the OptimizationFailureException
to store this boolean
value so that we know whether to try again, or throw an exception.
This is quite a hacky solution, and does not follow the existing operation structure in CC.
Scaling to additional brokers is quite an aggressive operation, and so for the time being my decision has been to recursively increase the number of brokers in the cluster model by one at a time, up to a num.brokers.maximum
configuration value.
The configuration is provided through the properties file, and I have set the default to 100.
This produces a proposal for optimizations to the cluster with increased numbers of brokers, assuming that the violated ReplicaCapacityGoal
is able to be satisfied without breaching the num.brokers.maximum
limit.
It will throw an exception if we reached our num.broker.maximum
and the ReplicaCapacityGoal
is still not satisfied.
You should be able to make canAddBrokerHelp()
true
for other goals and ensure that the OptimizationFailureException
is thrown with this value for other goals to be optimized in the same way also.
An obvious problem with the changes so far is that we do not have the ability to scale down.
Ideally we would also have another function canRemoveBrokersHelp()
, and if both canAdd..
and canRemove..
are true
we would need to try diff the proposals to decide if we are converging on a more optimal solution (although I think this is an incredibly hard problem..).
Realistically this does not matter while we are sticking to looking at hard-goal violations only - they all seem to relate to capacity, and so I can not imagine scaling down could fix it.
Currently the proposal cache is not taking into account the changes to scale the brokers.
This is due to some asynchronous operations updating the proposals, so a manual override with &ignore_proposal_cache=true
is required in the rebalance POST
request to generate proposals with the new logic.
It may be worth performing deeper changes by adding additional configuration to the BalancingAction
, BalancingConstraint
as well as the GoalOptimizer
classes, to provide state for scaling the Kafka cluster.
Even though we generate the new proposal that suggests that we scale to N
brokers, we do not actually perform this scaling within the executable - running with &dryrun=true
is required.
To get this part working it will likely require a new ExecutionProposal
to follow the existing design, along with updates to the Executor
class to include _state
for scaling up and down. It may also be worth updating the default addBrokers()
function, which can currently add Kafka brokers that already exist to the cluster, but not add new ones.
There would need to be some code so that CC can update the Kafka cluster CRD replica count, causing the operator to scale, and then waiting for the Ready status to proceed, or rollback on failure.