Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Knative eventing sources more serverless and scalable #2153

Closed
aslom opened this issue Nov 6, 2019 · 27 comments
Closed

Make Knative eventing sources more serverless and scalable #2153

aslom opened this issue Nov 6, 2019 · 27 comments
Labels
kind/feature-request lifecycle/stale Denotes an issue or PR has remained open with no activity and has become stale.
Milestone

Comments

@aslom
Copy link
Member

aslom commented Nov 6, 2019

Problem
A short explanation of the problem, including relevant restrictions.

Sources are running and consuming resources even when there is no events. And when large number of events is received source do not automatically scale up

Persona:
Which persona is this feature for?

System Integrator, Event consumer (developer)

Exit Criteria
A measurable (binary) test that would indicate that the problem has been resolved.

When event sources are not receiving events they scale down to zero (or almost zero) and scale up when umber of events to send increases

Time Estimate (optional):
How many developer-days do you think this may take to resolve?

weeks

Additional context (optional)
Add any other context about the feature request here.

Ref #2152

@slinkydeveloper
Copy link
Contributor

slinkydeveloper commented Nov 6, 2019

Same as #2152 (comment) + this requires non-HTTP autoscaling (eg autoscaling based on kafka messages like https://github.com/kedacore/keda)

@aslom
Copy link
Member Author

aslom commented Nov 6, 2019

@slinkydeveloper Yes. Something like "activator" component that waits for activity related to registered sources and starts source? It could also work as auto-scaling to provide metrics to scale up source service/deployment (and down when no activity)?

@zroubalik
Copy link
Contributor

Keda (is pull based autoscaler) so it supports mulitple event sources scaling (0-1) OOTB today, just as @slinkydeveloper mentioned. In case you are interested I could provide help with Keda on this.

@aslom
Copy link
Member Author

aslom commented Nov 7, 2019

@zroubalik @slinkydeveloper that would add KEDA as dependency for those knative sources that need pull based scaling?

If I understand correctly KEDA design we could use ScaledObject (example) to scale knative source deployment?

@zroubalik
Copy link
Contributor

zroubalik commented Nov 7, 2019

@aslom Exactly, in ScaledObject you can specify the specific settings for each Knative Source (ie. how do you want to scale it).

I can imagine a very simple controller that creates ScaledObject with preferred settings when Event Source is created.

@aslom
Copy link
Member Author

aslom commented Nov 8, 2019

I can see that two ways to implement

  • modify event source controller to add pulling event statistic logic and scaling (no dependency on KEDA)
  • modify event source controller to create ScaledObject that is then used by KEDA to pull event statics and do scaling (dependency on KEDA )

Or do both which I think may the best - if users have KEDA they can configure Knative to use KEDA. Otherwise can use built-in pulling and scaling in controller to scale sources

@zroubalik if the picture below is correct for KEDA and what do you think about the options?

image

image

Feel free to edit slides: https://docs.google.com/presentation/d/1JNV25z0ZkjA_vhZy0y3dag_xoku3dtunMYtoggyFu6Y/edit?usp=sharing

@zroubalik
Copy link
Contributor

@aslom Hi, I think the picture is correct and described correctly both options.

As for the options, Keda provides all necessary scaling capabilities for multiple sources OOTB today. So it is up for a decision, where do we want to put the effort. Whether we want to reimplement the same functionality inside Knative, reuse Keda or implement both options.

I am willing to help with the Keda and controller part.

I'd like to note that Keda is going to release v1.0 soon and is planned to move Keda under CNCF. So there shouldn't any concerns on this, wrt adding it as an dependency for Knative Eventing.

@aslom
Copy link
Member Author

aslom commented Nov 11, 2019

@zroubalik that sounds good. As the very first step we should be able to use Keda by creating Scaler yaml?

That means that controller when it sees KEDA in source configuration then it creates Scaler yaml and assumes KEDA will do scaling.

That should allow us to do quick testing - I will start with Kafka source that is already supported in KEDA and see how scaling works. (I already have experience with Kafka source)

Does that sound reasonable?

@zroubalik
Copy link
Contributor

Yeah, that scenario sounds good. I have an experience with Keda, so I'll be happy to write the controller for managing it. @aslom or do you plan to write that?

@aslom
Copy link
Member Author

aslom commented Nov 13, 2019

@zroubalik yes going ot write first version for quick review

@aslom
Copy link
Member Author

aslom commented Nov 26, 2019

@zroubalik do you know what is current state of scaling to zero in KEDA and HPA? It seems that is not possible as defaultHPAMinReplicas is used if MinReplicaCount is set to zero?
https://github.com/kedacore/keda/search?q=defaultHPAMinReplicas&unscoped_q=defaultHPAMinReplicas

It seems to be related to HPA not supporting it in some versions of K8S? minReplicas is set in HPA created by KEDA to 1 (as expected on code above) and if I manually edit it to

  minReplicas: 0

then I am getting:

$ kubectl get hpa keda-hpa-kafkasource-kafka-source-2a42349b-0fdd-11ea-9cac-264e53a4499c -o yaml > tmp-hpa.yaml
# edited
$ kubectl apply -f tmp-hpa.yaml
Warning: kubectl apply should be used on resource created by either kubectl create --save-config or kubectl apply
The HorizontalPodAutoscaler "keda-hpa-kafkasource-kafka-source-2a42349b-0fdd-11ea-9cac-264e53a4499c" is invalid: spec.minReplicas: Invalid value: 0: must be greater than 0

Seems to be fixed in k8s 1.16+
kubernetes/kubernetes#69687 (comment)
kubernetes/kubernetes#74526
https://github.com/kubernetes/kubernetes/pull/74526/files#diff-4364e3f754b12e39f60eea33105f6c7eL48

Going to test it with 1.16 but I wondered if it is documented somewhere in KEDA? I could not find an issue about it.

BTW: the confusing part was that even I manually scaled:

$ kubectl scale deployment kafkasource-kafka-source-2a42349b-0fdd-11ea-9cac-264e53a4499c --replicas=0

kafkasource deployment would not disappear - took me some time to realize that HPA was scaling it back to 1 ...

@zroubalik
Copy link
Contributor

@aslom scaling to 0, is done by KEDA controller (it sets deployment to 0 replicas). It is not managed by HPA at the moment, as you correctly mentioned that this feature is fixed in k8s 1.16+, so it might get into the KEDA later, as we are supporting older k8s versions as well.

Scaling to 0 should work correctly, I wonder what caused the problem you are facing. You might hit the cooldown period?

@aslom
Copy link
Member Author

aslom commented Nov 26, 2019

@zroubalik are you sure that HAP is not used by KEDA? I am using the lates version I deployed from github 2 days ago. Where I can find about how ScaledObject and HPA works in KEDA? When I create ScaledObject I can see corresponding HPA is created. When I delete ScaledObject then HPA is deleted. The created HPA has minScale 1 and that prevents kafka source from going wiht replicas to 0 when I am running kubectl scale deployment

@zroubalik
Copy link
Contributor

zroubalik commented Nov 26, 2019

@aslom HPAs are used by KEDA. But for the scaling itself: 0 <-> 1 is handled by KEDA, 1 <-> N is handled by HPA which si being created for each ScaledObject.

@aslom
Copy link
Member Author

aslom commented Nov 27, 2019

@zroubalik That makes sense considering HPA will only allow scaling to zero in 1.16+. When scaling to zero what mechanism is used - does it disable HPA so it does not scale replica back to one from zero?

@zroubalik
Copy link
Contributor

@aslom it scales Deployment to 0 replicas

@aslom
Copy link
Member Author

aslom commented Dec 3, 2019

@zroubalik thank you for your help - I have got it now working. TLDR: I did not realize that knative kafka source controller was scaling back to one from zero (and it was doing it very fast ...) and ran into problem with KEDA 1.0 and had to use latest install form git master (details below)

Short screen recording: https://vimeo.com/376958832

Now that it is working with manual setup next step I am doing PR to add KEDA support as option in Kafka source yaml.

Tested with K8s v1.16.3

$ k version
Client Version: version.Info{Major:"1", Minor:"16", GitVersion:"v1.16.3", GitCommit:"b3cbbae08ec52a7fc73d334838e18d17e8512749", GitTreeState:"clean", BuildDate:"2019-11-14T04:24:34Z", GoVersion:"go1.12.13", Compiler:"gc", Platform:"darwin/amd64"}
Server Version: version.Info{Major:"1", Minor:"16", GitVersion:"v1.16.3+IKS", GitCommit:"6511e42c1f1568ac32d99ae33316985c43dbcca6", GitTreeState:"clean", BuildDate:"2019-11-13T22:36:46Z", GoVersion:"go1.12.12", Compiler:"gc", Platform:"linux/amd64"}

Installed KEDA 1.0 using YAML files

git clone https://github.com/kedacore/keda.git
cd keda
git checkout v1.0.0
kubectl create namespace keda
kubectl apply -f deploy/crds/keda.k8s.io_scaledobjects_crd.yaml
kubectl apply -f deploy/crds/keda.k8s.io_triggerauthentications_crd.yaml
kubectl apply -f deploy/

Deployed Kafka Source using Strimzi:
https://knative.dev/docs/eventing/samples/kafka/source/
and tested with hello-display service in default namespace (so I can have logs not disappearing if using ksvc version)
https://knative.dev/docs/eventing/getting-started/

And created test topic with ten partitions (so KEDA scaler for Kafka can scale up to 10 as it scales to number of partitions):

kubectl apply --filename - << END
  apiVersion: kafka.strimzi.io/v1beta1
  kind: KafkaTopic
  metadata:
    name: my-topic-10
    namespace: kafka
    labels:
      strimzi.io/cluster: my-cluster
  spec:
    partitions: 10
    replicas: 1
    config:
      retention.ms: 7200000
      segment.bytes: 1073741824
END

And created Knative Kafka source that is using my-topic-10

kubectl apply --filename - << END
apiVersion: sources.eventing.knative.dev/v1alpha1
kind: KafkaSource
metadata:
  name: kafka-src10
spec:
  binding: 
    name: kafka-binding
    namespace: default
  consumerGroup: kafka-source10-1
  bootstrapServers: my-cluster-kafka-bootstrap.kafka:9092 #note the .kafka in URL for namespace
  topics: my-topic-10
  sink:
    apiVersion: v1
    kind: Service
    name: hello-display
END

Then getting deployment for Kafka source:

$ kubectl get deployments
NAME                READY   UP-TO-DATE   AVAILABLE   AGE
hello-display       1/1     1            1           4h49m
kafka-src10-f6kr5   1/1     1            1           18s

Create KEDA scaler that is scaling Kafka source deployment:

$ cat keda-kafka10.yaml
apiVersion: keda.k8s.io/v1alpha1
kind: ScaledObject
metadata:
  name: kafka-scaledobject-a10a
  namespace: default
  labels:
    deploymentName: kafka-src10-f6kr5
spec:
  scaleTargetRef:
    deploymentName: kafka-src10-f6kr5
  pollingInterval: 1
  cooldownPeriod:  15
  minReplicaCount: 0
  maxReplicaCount: 10
  triggers:
  - type: kafka
    metadata:
      # Required
      brokerList: my-cluster-kafka-brokers.kafka:9092
      consumerGroup: kafka-source10-1       # Make sure that this consumer group name is the same one as the one that is consuming topics
      topic: my-topic-10
      lagThreshold: "50"
kubectl apply -f keda-kafka10.yaml

But that fails
"error":"ScaledObject.keda.k8s.io \"kafka-scaledobject-a10a\" is invalid: [status.currentReplicas: Required value, status.desiredReplicas: Required value]"

"level":"info","ts":1575331878.5229948,"logger":"controller_scaledobject","msg":"Reconciling ScaledObject","Request.Namespace":"default","Request.Name":"kafka-scaledobject-a10a"}
{"level":"info","ts":1575331878.5230665,"logger":"controller_scaledobject","msg":"Detecting ScaleType from ScaledObject","Request.Namespace":"default","Request.Name":"kafka-scaledobject-a10a"}
{"level":"info","ts":1575331878.5230744,"logger":"controller_scaledobject","msg":"Detected ScaleType = Deployment","Request.Namespace":"default","Request.Name":"kafka-scaledobject-a10a"}
{"level":"info","ts":1575331878.5230997,"logger":"controller_scaledobject","msg":"Creating a new HPA","Request.Namespace":"default","Request.Name":"kafka-scaledobject-a10a","HPA.Namespace":"default","HPA.Name":"keda-hpa-kafka-src10-f6kr5"}
{"level":"error","ts":1575331878.5708196,"logger":"controller_scaledobject","msg":"Error updating scaledObject status with used externalMetricNames","Request.Namespace":"default","Request.Name":"kafka-scaledobject-a10a","error":"ScaledObject.keda.k8s.io \"kafka-scaledobject-a10a\" is invalid: [status.currentReplicas: Required value, status.desiredReplicas: Required value]","stacktrace":"github.com/go-logr/zapr.(*zapLogger).Error\n\t/go/pkg/mod/github.com/go-logr/zapr@v0.1.1/zapr.go:128\ngithub.com/kedacore/keda/pkg/controller/scaledobject.(*ReconcileScaledObject).getScaledObjectMetricSpecs\n\tkeda/pkg/controller/scaledobject/scaledobject_controller.go:350\ngithub.com/kedacore/keda/pkg/controller/scaledobject.(*ReconcileScaledObject).newHPAForScaledObject\n\tkeda/pkg/controller/scaledobject/scaledobject_controller.go:296\ngithub.com/kedacore/keda/pkg/controller/scaledobject.(*ReconcileScaledObject).reconcileDeploymentType\n\tkeda/pkg/controller/scaledobject/scaledobject_controller.go:202\ngithub.com/kedacore/keda/pkg/controller/scaledobject.(*ReconcileScaledObject).Reconcile\n\tkeda/pkg/controller/scaledobject/scaledobject_controller.go:146\nsigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).reconcileHandler\n\t/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.2.2/pkg/internal/controller/controller.go:216\nsigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).processNextWorkItem\n\t/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.2.2/pkg/internal/controller/controller.go:192\nsigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).worker\n\t/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.2.2/pkg/internal/controller/controller.go:171\nk8s.io/apimachinery/pkg/util/wait.JitterUntil.func1\n\t/go/pkg/mod/k8s.io/apimachinery@v0.0.0-20190404173353-6a84e37a896d/pkg/util/wait/wait.go:152\nk8s.io/apimachinery/pkg/util/wait.JitterUntil\n\t/go/pkg/mod/k8s.io/apimachinery@v0.0.0-20190404173353-6a84e37a896d/pkg/util/wait/wait.go:153\nk8s.io/apimachinery/pkg/util/wait.Until\n\t/go/pkg/mod/k8s.io/apimachinery@v0.0.0-20190404173353-6a84e37a896d/pkg/util/wait/wait.go:88"}
{"level":"error","ts":1575331878.5709054,"logger":"controller_scaledobject","msg":"Failed to create new HPA resource","Request.Namespace":"default","Request.Name":"kafka-scaledobject-a10a","HPA.Namespace":"default","HPA.Name":"keda-hpa-kafka-src10-f6kr5","error":"ScaledObject.keda.k8s.io \"kafka-scaledobject-a10a\" is invalid: [status.currentReplicas: Required value, status.desiredReplicas: Required value]","stacktrace":"github.com/go-logr/zapr.(*zapLogger).Error\n\t/go/pkg/mod/github.com/go-logr/zapr@v0.1.1/zapr.go:128\ngithub.com/kedacore/keda/pkg/controller/scaledobject.(*ReconcileScaledObject).reconcileDeploymentType\n\tkeda/pkg/controller/scaledobject/scaledobject_controller.go:204\ngithub.com/kedacore/keda/pkg/controller/scaledobject.(*ReconcileScaledObject).Reconcile\n\tkeda/pkg/controller/scaledobject/scaledobject_controller.go:146\nsigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).reconcileHandler\n\t/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.2.2/pkg/internal/controller/controller.go:216\nsigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).processNextWorkItem\n\t/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.2.2/pkg/internal/controller/controller.go:192\nsigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).worker\n\t/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.2.2/pkg/internal/controller/controller.go:171\nk8s.io/apimachinery/pkg/util/wait.JitterUntil.func1\n\t/go/pkg/mod/k8s.io/apimachinery@v0.0.0-20190404173353-6a84e37a896d/pkg/util/wait/wait.go:152\nk8s.io/apimachinery/pkg/util/wait.JitterUntil\n\t/go/pkg/mod/k8s.io/apimachinery@v0.0.0-20190404173353-6a84e37a896d/pkg/util/wait/wait.go:153\nk8s.io/apimachinery/pkg/util/wait.Until\n\t/go/pkg/mod/k8s.io/apimachinery@v0.0.0-20190404173353-6a84e37a896d/pkg/util/wait/wait.go:88"}
{"level":"error","ts":1575331878.571173,"logger":"controller-runtime.controller","msg":"Reconciler error","controller":"scaledobject-controller","request":"default/kafka-scaledobject-a10a","error":"ScaledObject.keda.k8s.io \"kafka-scaledobject-a10a\" is invalid: [status.currentReplicas: Required value, status.desiredReplicas: Required value]","stacktrace":"github.com/go-logr/zapr.(*zapLogger).Error\n\t/go/pkg/mod/github.com/go-logr/zapr@v0.1.1/zapr.go:128\nsigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).reconcileHandler\n\t/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.2.2/pkg/internal/controller/controller.go:218\nsigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).processNextWorkItem\n\t/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.2.2/pkg/internal/controller/controller.go:192\nsigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).worker\n\t/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.2.2/pkg/internal/controller/controller.go:171\nk8s.io/apimachinery/pkg/util/wait.JitterUntil.func1\n\t/go/pkg/mod/k8s.io/apimachinery@v0.0.0-20190404173353-6a84e37a896d/pkg/util/wait/wait.go:152\nk8s.io/apimachinery/pkg/util/wait.JitterUntil\n\t/go/pkg/mod/k8s.io/apimachinery@v0.0.0-20190404173353-6a84e37a896d/pkg/util/wait/wait.go:153\nk8s.io/apimachinery/pkg/util/wait.Until\n\t/go/pkg/mod/k8s.io/apimachinery@v0.0.0-20190404173353-6a84e37a896d/pkg/util/wait/wait.go:88"}

Then I found related issue
kedacore/keda#487

So I checked out master with commit ID below (to help with reproducting)

cd keda
git pull
git checkout master
#$ git rev-parse HEAD
#0099c102d538995c81610fb48d12bde1259678a6
git checkout 0099c102d538995c81610fb48d12bde1259678a6

and then redployed KEDA CRDs and applied config

Then re-created scaled object

kubectl apply -f keda-kafka10.yaml

Verify in KEDA controller logs all is good

$ kubectl -n keda logs -f keda-operator-856d7cc78b-kvnl9  keda-operator
{"level":"info","ts":1575332466.617871,"logger":"cmd","msg":"Go Version: go1.13.3"}
{"level":"info","ts":1575332466.6179307,"logger":"cmd","msg":"Go OS/Arch: linux/amd64"}
{"level":"info","ts":1575332466.6179435,"logger":"cmd","msg":"Version of operator-sdk: v0.11.0"}
{"level":"info","ts":1575332466.6180365,"logger":"cmd","msg":"Keda Commit: 0099c102d538995c81610fb48d12bde1259678a6"}
{"level":"info","ts":1575332466.6184804,"logger":"leader","msg":"Trying to become the leader."}
{"level":"info","ts":1575332469.655066,"logger":"leader","msg":"No pre-existing lock was found."}
{"level":"info","ts":1575332469.681411,"logger":"leader","msg":"Became the leader."}
{"level":"info","ts":1575332472.6898823,"logger":"controller-runtime.metrics","msg":"metrics server is starting to listen","addr":"0.0.0.0:8383"}
{"level":"info","ts":1575332472.690308,"logger":"cmd","msg":"Registering Components."}
{"level":"info","ts":1575332472.6907687,"logger":"controller-runtime.controller","msg":"Starting EventSource","controller":"scaledobject-controller","source":"kind source: /, Kind="}
{"level":"info","ts":1575332477.5715046,"logger":"metrics","msg":"Metrics Service object created","Service.Name":"keda-operator-metrics","Service.Namespace":"keda"}
{"level":"info","ts":1575332478.342888,"logger":"cmd","msg":"Could not create ServiceMonitor object","error":"no ServiceMonitor registered with the API"}
{"level":"info","ts":1575332478.342933,"logger":"cmd","msg":"Install prometheus-operator in your cluster to create ServiceMonitor objects","error":"no ServiceMonitor registered with the API"}
{"level":"info","ts":1575332478.342941,"logger":"cmd","msg":"Starting the Cmd."}
{"level":"info","ts":1575332478.343351,"logger":"controller-runtime.manager","msg":"starting metrics server","path":"/metrics"}
{"level":"info","ts":1575332478.4436288,"logger":"controller-runtime.controller","msg":"Starting Controller","controller":"scaledobject-controller"}
{"level":"info","ts":1575332478.5438852,"logger":"controller-runtime.controller","msg":"Starting workers","controller":"scaledobject-controller","worker count":1}
{"level":"info","ts":1575332483.901544,"logger":"controller_scaledobject","msg":"Reconciling ScaledObject","Request.Namespace":"default","Request.Name":"kafka-scaledobject-a10a"}
{"level":"info","ts":1575332483.9015884,"logger":"controller_scaledobject","msg":"Adding Finalizer for the ScaledObject","Request.Namespace":"default","Request.Name":"kafka-scaledobject-a10a"}
{"level":"info","ts":1575332483.9227238,"logger":"controller_scaledobject","msg":"Detecting ScaleType from ScaledObject","Request.Namespace":"default","Request.Name":"kafka-scaledobject-a10a"}
{"level":"info","ts":1575332483.9227679,"logger":"controller_scaledobject","msg":"Detected ScaleType = Deployment","Request.Namespace":"default","Request.Name":"kafka-scaledobject-a10a"}
{"level":"info","ts":1575332484.0232098,"logger":"controller_scaledobject","msg":"Creating a new HPA","Request.Namespace":"default","Request.Name":"kafka-scaledobject-a10a","HPA.Namespace":"default","HPA.Name":"keda-hpa-kafka-src10-f6kr5"}

And I was able to see scaling in response to load generator.

To generate messages I am using simple script that sends message every 10ms (sleep 0.01)

$ cat generate-kafka-cloud-events.sh
DOCKER_NAME=${1:-kafka-producer}
TOPIC=${2:-my-topic}
COUNT=${3:-1}
CMD="d=\`date\`; for x in {1..${COUNT}}; do echo \"{\\\"msg\\\": \\\"hello \$x date \$d\\\"}\"; sleep 0.01; done  | bin/kafka-console-producer.sh --broker-list my-cluster-kafka-brokers.kafka:9092 --topic $TOPIC"
echo DOCKER_NAME=$DOCKER_NAME COUNT=$COUNT TOPIC=$TOPIC CMD="echo $CMD"
kubectl run $DOCKER_NAME -ti --image=strimzi/kafka:latest-kafka-2.3.0 --rm=true --restart=Never -- /bin/bash -c "$CMD"

and used with

./generate-kafka-cloud-events.sh kafka-producer1 my-topic-10 10000

To verify Kafka messages are sent and received

kubectl run kafka-consumer -ti --image=strimzi/kafka:0.13.0-kafka-2.3.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-brokers.kafka:9092  --topic my-topic-10

Then watch pods to see scaling

watch -n 0.1 kubectl get pods

or

kubectl get pods --watch

To test scaling to zero deleted kafka source controller

kubectl delete service/kafka-controller

Make sure the controller is not runnign

$ kubectl -n knative-sources get pods
No resources found in knative-sources namespace.

and manually scaled kafka source deployment to zero

kubectl scale deployment kafka-src10-f6kr5 --replicas=0

@aslom
Copy link
Member Author

aslom commented Dec 3, 2019

As next step going to create PR to eventing-contrib Kafka source to modify controller to create ScaledObject if there are scaling parameters in yaml.

@aslom
Copy link
Member Author

aslom commented Feb 4, 2020

@zroubalik @matzew @n3wscott @lionelvillard
I have created an experimental version of scaling Kafka Source by using KEDA. For details how to run see README in PR: knative/eventing-contrib#886

I will show quick demo in knative source WG and also here is video https://youtu.be/-zv-6DcU794 and slides showing current status and future plans:
https://docs.google.com/presentation/d/1c2U_qgwEyar0_OFRmjzuu57FOJZwT453JJInuRd2CEc/edit?usp=sharing

Looking forward to your feedback

@aslom
Copy link
Member Author

aslom commented Feb 4, 2020

I have test failing as I made optional

	// MinReplicaCount is
	// +optional
	MinReplicaCount *int32 `json:"minReplicaCount"`

	// MaxReplicaCount is
	// +optional
	MaxReplicaCount *int32 `json:"maxReplicaCount"`

and they end up serialized as "null" instead of not serialized:

        kafka_source_test.go:55: Creating KafkaSource
        creation.go:34: Failed to create KafkaSource "test-kafka-source": KafkaSource.sources.eventing.knative.dev "test-kafka-source" is invalid: []: Invalid value: map[string]interface {}{"apiVersion":"sources.eventing.knative.dev/v1alpha1", "kind":"KafkaSource", "metadata":map[string]interface {}{"creationTimestamp":"2020-02-04T20:26:36Z", "generation":1, "name":"test-kafka-source", "namespace":"structured-gbvgf", "uid":"0ea7a562-5a97-42b6-80bb-aa0a7f5c54a5"}, "spec":map[string]interface {}{"bootstrapServers":"my-cluster-kafka-bootstrap.kafka.svc:9092", "consumerGroup":"test-consumer-group", "maxReplicaCount":interface {}(nil), "minReplicaCount":interface {}(nil), "net":map[string]interface {}{"sasl":map[string]interface {}{"password":map[string]interface {}{}, "user":map[string]interface {}{}}, "tls":map[string]interface {}{"caCert":map[string]interface {}{}, "cert":map[string]interface {}{}, "key":map[string]interface {}{}}}, "resources":map[string]interface {}{"limits":map[string]interface {}{}, "requests":map[string]interface {}{}}, "sink":map[string]interface {}{"ref":map[string]interface {}{"apiVersion":"v1", "kind":"Service", "name":"e2e-kafka-source-event-logger"}}, "topics":"1ee6f636-95ef-420d-8aae-9145c6e962d8"}}: validation failure list:
            spec.maxReplicaCount in body must be of type integer: "null"

https://prow.knative.dev/view/gcs/knative-prow/pr-logs/pull/knative_eventing-contrib/886/pull-knative-eventing-contrib-integration-tests/1224786419201871873

It seems that the best solution may be to use the smae approach tha serving is using:
https://knative.dev/docs/serving/configuring-autoscaling/

spec:
 template:
  metadata:
   annotations:
    autoscaling.knative.dev/minScale: "2"
    autoscaling.knative.dev/maxScale: "10"

and with custom annotation class:

spec:
 template:
  metadata:
   annotations:
    autoscaling.knative.dev/metric: cpu
    autoscaling.knative.dev/target: 70
    autoscaling.knative.dev/class: hpa.autoscaling.knative.dev

but changing class to keda:

Option 1:

spec:
 template:
  metadata:
   annotations:
    autoscaling.knative.dev/minScale: "0"
    autoscaling.knative.dev/maxScale: "10"
    autoscaling.knative.dev/pollingInterval: "1"
    autoscaling.knative.dev/cooldownPeriod: "15"
    autoscaling.knative.dev/class: keda.autoscaling.knative.dev

Other option would be to use new labels, for example:

Option 2:

metadata:
  name: kafka-src10
  annotations:
    eventing.knative.dev/autoscaler: keda
    keda.knative.dev/minReplicaCount: "0"
    keda.knative.dev/maxReplicaCount: "2"
    keda.knative.dev/pollingInterval: "1"
    keda.knative.dev/cooldownPeriod: "15"

Using KEDA as prefix may work better for KEDA parameters that are not supported in serving such as pollingInterval or cooldownPeriod - for descriptionhttps://keda.sh/concepts/scaling-deployments/

Both would translate to the same ScaledObject:

apiVersion: keda.k8s.io/v1alpha1
kind: ScaledObject
...
spec:
  pollingInterval: 1
  cooldownPeriod:  15
  minReplicaCount: 0
  maxReplicaCount: 10

@lberk @matzew @lionelvillard @zroubalik @n3wscott what do you think about it? Using shared annotations would make it consistent scaling configuration across serving and eventing.

Here is what I propose as cooldownPeriod and pollingInterval are generic concept not specific to KEDA so I think the first option works the best?

@zroubalik
Copy link
Contributor

zroubalik commented Feb 5, 2020

I don't have a strong opinion on the options, but tend to like 1 more.

IMHO pollingInterval and cooldownPeriod ARE specific to KEDA and aren't generic concepts, because you can eventually replace KEDA with another scaling tool, that is not using this properties/concepts. just my 2 cents

what about:

metadata:
 annotations:
  autoscaling.knative.dev/minScale: "0"
  autoscaling.knative.dev/maxScale: "10"
  autoscaling.knative.dev/class: keda.autoscaling.knative.dev
  keda.knative.dev/pollingInterval: "1"
  keda.knative.dev/cooldownPeriod: "15"
  

@aslom
Copy link
Member Author

aslom commented Feb 14, 2020

@zroubalik I used your suggestion with minor modification for class as prefix in annotation field name:

  annotations:
    autoscaling.knative.dev/minScale: "0"
    autoscaling.knative.dev/maxScale: "10"
    autoscaling.knative.dev/class: keda.autoscaling.knative.dev
    keda.autoscaling.knative.dev/pollingInterval: "2"
    keda.autoscaling.knative.dev/cooldownPeriod: "15"

Details in knative/eventing-contrib@aa0079a

@aslom
Copy link
Member Author

aslom commented Jul 27, 2020

Updated status and options for scaling pull-based event sources in #2901 (comment)

@grantr grantr added this to the Backlog milestone Aug 24, 2020
@github-actions
Copy link

This issue is stale because it has been open for 90 days with no
activity. It will automatically close after 30 more days of
inactivity. Reopen the issue with /reopen. Mark the issue as
fresh by adding the comment /remove-lifecycle stale.

@github-actions github-actions bot added the lifecycle/stale Denotes an issue or PR has remained open with no activity and has become stale. label Nov 25, 2020
@aslom
Copy link
Member Author

aslom commented Dec 6, 2020

/reopen

@github-actions github-actions bot removed the lifecycle/stale Denotes an issue or PR has remained open with no activity and has become stale. label Jan 23, 2021
@slinkydeveloper
Copy link
Contributor

Is this still relevant? Since I see you worked on this in more specific issues, can we close this more general issue?

@github-actions
Copy link

This issue is stale because it has been open for 90 days with no
activity. It will automatically close after 30 more days of
inactivity. Reopen the issue with /reopen. Mark the issue as
fresh by adding the comment /remove-lifecycle stale.

@github-actions github-actions bot added the lifecycle/stale Denotes an issue or PR has remained open with no activity and has become stale. label May 13, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/feature-request lifecycle/stale Denotes an issue or PR has remained open with no activity and has become stale.
Projects
None yet
Development

No branches or pull requests

4 participants