Skip to content
This repository has been archived by the owner on Jun 4, 2021. It is now read-only.

[WIP] Experimental KEDA support for Kafka Event Source #886

Closed
wants to merge 12 commits into from
102 changes: 102 additions & 0 deletions kafka/source/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,105 @@ event sink.

A more detailed example of the `KafkaSource` can be found in the
[Knative documentation](https://knative.dev/docs/eventing/samples/).

## Experimental KEDA support in Kafka Event Source

Warning: this is _experimental_ and may be changed in future. Should not be used
in production. This is mainly for discussion and evolving scaling in Knative
eventing.

The code is using Unstructured and also imported KEDA API - this is for
discussion which version should be used (right now only Unstructured is fully
implemented). KEDA to provide client-go support discussion #494
<https://github.com/kedacore/keda/issues/494>

### Install KEDA

To install the version I used for the experiment (the latest version from master
should work too):

```bash
git clone https://github.com/kedacore/keda.git
cd keda
git checkout v1.3.0
```

Then follow [KEDA setup instructions](https://keda.sh/deploy/) to deploy using
YAML:

```bash
kubectl apply -f deploy/crds/keda.k8s.io_scaledobjects_crd.yaml
kubectl apply -f deploy/crds/keda.k8s.io_triggerauthentications_crd.yaml
kubectl apply -f deploy/
```

If worked expected to have in keda namespace:

```bash
kubectl get pods -n keda
NAME READY STATUS RESTARTS AGE
keda-metrics-apiserver-5bb8b6664c-vrhrj 1/1 Running 0 38s
keda-operator-865ccfb9d9-xd85w 1/1 Running 0 39s
```

### Run Kafka Source Controller

Install expermiental Kafka source controller with KEDA support from source code:

```bash
export KO_DOCKER_REPO=...
ko apply -f kafka/source/config/
```

#### Local testing

```bash
go run kafka/source/cmd/controller/main.go -kubeconfig $KUBECONFIG
```

### Create Kafka Source that uses KEDA

To use KEDA the YAML file must have one of autoscaling annotations (such as
minScale).

Warning: temporary limitation - do not give KafkaSource name longer than 4
characters as KEDA will add prefix to deployment name that already has UUID
appended HAP can not be created if its name exceeds 64 character name limit
(keda-hpa-kafkasource-SO_NAME-999036a8-af6e-4431-b671-d052842dddf1). For more
details see https://github.com/kedacore/keda/issues/704

Example:

```yaml
aslom marked this conversation as resolved.
Show resolved Hide resolved
apiVersion: sources.knative.dev/v1alpha1
kind: KafkaSource
metadata:
name: kn1
annotations:
autoscaling.knative.dev/minScale: "0"
autoscaling.knative.dev/maxScale: "10"
autoscaling.knative.dev/class: keda.autoscaling.knative.dev
keda.autoscaling.knative.dev/pollingInterval: "2"
keda.autoscaling.knative.dev/cooldownPeriod: "15"
spec:
bootstrapServers: my-cluster-kafka-bootstrap.kafka:9092 #note the .kafka in URL for namespace
consumerGroup: knative-demo-kafka-keda-src1
topics: knative-demo-topic
sink:
ref:
apiVersion: serving.knative.dev/v1alpha1
kind: Service
name: event-display
```

To verify that Kafka source is using KEDA retrieve the scaled object created by
Kafka source:

```bash
⇒ kubectl get scaledobjects.keda.k8s.io
NAME DEPLOYMENT TRIGGERS AGE
kn1 kafkasource-kn1-0e12266a-93c2-48ee-8d8d-3b6ffbe9d18f kafka 26m
⇒ kubectl get horizontalpodautoscalers.autoscaling
NAME REFERENCE TARGETS MINPODS MAXPODS REPLICAS AGE
keda-hpa-kafkasource-kn1-0e12266a-93c2-48ee-8d8d-3b6ffbe9d18f Deployment/kafkasource-kn1-0e12266a-93c2-48ee-8d8d-3b6ffbe9d18f <unknown>/10 (avg) 1 10 0 26m
```
1 change: 1 addition & 0 deletions kafka/source/cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package main

import (
_ "k8s.io/client-go/plugin/pkg/client/auth/oidc"
kafka "knative.dev/eventing-contrib/kafka/source/pkg/reconciler"
"knative.dev/pkg/injection/sharedmain"
)
Expand Down
5 changes: 5 additions & 0 deletions kafka/source/config/201-clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ rules:
- leases
verbs: *everything

- apiGroups:
- keda.k8s.io
resources:
- scaledobjects
verbs: *everything
---
# The role is needed for the aggregated role source-observer in knative-eventing to provide readonly access to "Sources".
# See https://github.com/knative/eventing/blob/master/config/200-source-observer-clusterrole.yaml.
Expand Down
2 changes: 2 additions & 0 deletions kafka/source/pkg/reconciler/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"knative.dev/eventing/pkg/apis/sources/v1alpha1"
kubeclient "knative.dev/pkg/client/injection/kube/client"
deploymentinformer "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment"
"knative.dev/pkg/injection/clients/dynamicclient"

"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
Expand All @@ -53,6 +54,7 @@ func NewController(

c := &Reconciler{
KubeClientSet: kubeclient.Get(ctx),
DynamicClientSet: dynamicclient.Get(ctx),
kafkaClientSet: kafkaclient.Get(ctx),
kafkaLister: kafkaInformer.Lister(),
deploymentLister: deploymentInformer.Lister(),
Expand Down
6 changes: 5 additions & 1 deletion kafka/source/pkg/reconciler/kafkasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"knative.dev/eventing-contrib/kafka/source/pkg/apis/sources/v1alpha1"
"knative.dev/eventing-contrib/kafka/source/pkg/reconciler/resources"
"knative.dev/eventing-contrib/kafka/source/pkg/reconciler/scaling"

// NewController stuff
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
appsv1listers "k8s.io/client-go/listers/apps/v1"
"knative.dev/eventing-contrib/kafka/source/pkg/client/clientset/versioned"
Expand Down Expand Up @@ -84,7 +86,8 @@ func newDeploymentFailed(namespace, name string, err error) pkgreconciler.Event

type Reconciler struct {
// KubeClientSet allows us to talk to the k8s for core APIs
KubeClientSet kubernetes.Interface
KubeClientSet kubernetes.Interface
DynamicClientSet dynamic.Interface

receiveAdapterImage string

Expand Down Expand Up @@ -145,6 +148,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, src *v1alpha1.KafkaSourc
logging.FromContext(ctx).Error("Unable to create the receive adapter", zap.Error(err))
return err
}
scaling.ScaleKafkaSourceWithKeda(ctx, ra, src, r.DynamicClientSet)
src.Status.MarkDeployed(ra)
src.Status.CloudEventAttributes = r.createCloudEventAttributes(src)

Expand Down
Loading