Skip to content
This repository has been archived by the owner on Jun 4, 2021. It is now read-only.

Commit

Permalink
cleanup and fixing trigger lagThreshold to be string in ScaledObject
Browse files Browse the repository at this point in the history
  • Loading branch information
aslom committed Feb 19, 2020
1 parent 5721b44 commit 080d6b9
Showing 1 changed file with 48 additions and 92 deletions.
140 changes: 48 additions & 92 deletions kafka/source/pkg/reconciler/kafkasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,11 @@ type Reconciler struct {
loggingConfig *pkgLogging.Config
metricsConfig *metrics.ExporterOptions

minReplicaCount *int32
maxReplicaCount *int32
cooldownPeriod *int32
pollingInterval *int32
minReplicaCount *int32
maxReplicaCount *int32
cooldownPeriod *int32
pollingInterval *int32
triggerLagThreshold *int32
}

// Check that our Reconciler implements controller.Reconciler
Expand Down Expand Up @@ -241,35 +242,10 @@ func checkResourcesStatus(src *v1alpha1.KafkaSource) error {
return nil
}

// //Uses KEDA API
// func (r *Reconciler) generateKedaScaledObjectWithKedaAPI(ctx context.Context, ra *v1.Deployment, src *v1alpha1.KafkaSource) {
// println("Got ra", ra.GetName())
// println("Got minReplicaCount", src.Spec.MinReplicaCount)
// if src.Spec.MinReplicaCount == nil && *src.Spec.MinReplicaCount == 1 { // default - TODO check if not set?
// return
// }
// scaledObject := &keda_v1alpha1.ScaledObject{
// Spec: keda_v1alpha1.ScaledObjectSpec{
// MinReplicaCount: src.Spec.MinReplicaCount,
// MaxReplicaCount: src.Spec.MaxReplicaCount,
// },
// }
// r.deployScaledObjectWithKedaJSON(scaledObject)
// }

// //Uses KEDA API
// func (r *Reconciler) deployScaledObjectWithKedaJSON(so *keda_v1alpha1.ScaledObject) {
// json, err := json.Marshal(so)
// if err != nil {
// log.Fatal(err)
// }
// println("TODO KEDA deploy", string(json))
// r.KubeClientSet.AppsV1()
// }

func (r *Reconciler) scaleKafkaSource(ctx context.Context, ra *v1.Deployment, src *v1alpha1.KafkaSource) {
r.readScalingAnnotations(ctx, src)
if r.minReplicaCount == nil && r.maxReplicaCount == nil { // no scaling annotatins so no scaling
// no scaling annotatins so no scaling work needed
if r.minReplicaCount == nil && r.maxReplicaCount == nil {
return
}
_, error := r.deployKedaScaledObject(ctx, ra, src)
Expand All @@ -283,12 +259,7 @@ func (r *Reconciler) deployKedaScaledObject(ctx context.Context, ra *v1.Deployme
deploymentName := ra.GetName()
logger.Info("Got ra", zap.Any("receiveAdapter", ra))
logger.Info("Got ra name "+deploymentName, zap.Any("deploymentName", deploymentName))
//logger.Info("Got minReplicaCount "+string(*src.Spec.MinReplicaCount), zap.Any("src.Spec.MinReplicaCount", src.Spec.MinReplicaCount))
//if src.Spec.MinReplicaCount == nil && *src.Spec.MinReplicaCount == 1 { // default - TODO check if not set?
//TODO: delete ScaledObject if exists
// return nil, fmt.Errorf("as minReplicaCount is 1 so skipping creaion of ScaledObject")
//}
namespace := src.Namespace //"default"
namespace := src.Namespace
name := src.Name
gvk := schema.GroupVersionKind{
Group: "keda.k8s.io",
Expand All @@ -300,30 +271,29 @@ func (r *Reconciler) deployKedaScaledObject(ctx context.Context, ra *v1.Deployme
if scaledObjectResourceInterface == nil {
return nil, fmt.Errorf("unable to create dynamic client for ScaledObject")
}
scaledObjectUnstr := r.generateKedaScaledObjectUnstructured(ctx, ra, src)
scaledObjectUnstr, err := r.generateKedaScaledObjectUnstructured(ctx, ra, src)
if err != nil {
return nil, err
}
created, err := scaledObjectResourceInterface.Create(scaledObjectUnstr, metav1.CreateOptions{})
if err != nil {
//println("Failed to create ScaledObject", err)
logger.Error("Failed to create ScaledObject so going to do update", zap.Error(err))
//fmt.Printf("Doing update as failed to create ScaledObject: %s \n", err)
// will replace - https://github.com/kubernetes/client-go/blob/master/examples/dynamic-create-update-delete-deployment/main.go
// doing kubectl "replace" https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#concurrency-control-and-consistency
// first get resourceVersion
existing, err := scaledObjectResourceInterface.Get(name, metav1.GetOptions{})
if err != nil {
//fmt.Printf("Get failed to create ScaledObject: %s \n", err)
logger.Error("Get ScaledObject failed", zap.Error(err))
logger.Error("Failed to create ScaledObject:", zap.Error(err))
return nil, err
}
resourceVersion := existing.GetResourceVersion()
scaledObjectUnstr.SetResourceVersion(resourceVersion)
updated, err := scaledObjectResourceInterface.Update(scaledObjectUnstr, metav1.UpdateOptions{})
if err != nil {
//fmt.Printf("Update failed to create ScaledObject: %s \n", err)
logger.Error("Update failed to create ScaledObject", zap.Error(err))
return nil, err
} else {
//fmt.Printf("Update success %v \n", updated)
logger.Info("Update success", zap.Any("updated", updated))
return updated, nil
}
Expand All @@ -332,12 +302,12 @@ func (r *Reconciler) deployKedaScaledObject(ctx context.Context, ra *v1.Deployme
}

const (
kedaAutoscalingAnnotationClass = "keda.autoscaling.knative.dev"
kedaCooldownPeriodAnnodationKey = kedaAutoscalingAnnotationClass + "/cooldownPeriod"
kedaPollingIntervalAnnodationKey = kedaAutoscalingAnnotationClass + "/pollingInterval"
kedaAutoscalingAnnotationClass = "keda.autoscaling.knative.dev"
kedaCooldownPeriodAnnodationKey = kedaAutoscalingAnnotationClass + "/cooldownPeriod"
kedaPollingIntervalAnnodationKey = kedaAutoscalingAnnotationClass + "/pollingInterval"
kedaTriggerLagThresholdAnnodationKey = kedaAutoscalingAnnotationClass + "/trigger.lagThreshold"
)

//add as map object method?
func convertMapKeyToInt32(dict map[string]string, key string, logger *zap.Logger) *int32 {
val, ok := dict[key]
if !ok {
Expand All @@ -357,73 +327,60 @@ func (r *Reconciler) readScalingAnnotations(ctx context.Context, src *v1alpha1.K
meta := src.GetObjectMeta()
annotations := meta.GetAnnotations()
if annotations != nil {
for k, v := range annotations {
fmt.Printf("key[%s] value[%s]\n", k, v)
}
scalingClass := annotations[autoscaling.ClassAnnotationKey]
r.minReplicaCount = convertMapKeyToInt32(annotations, autoscaling.MinScaleAnnotationKey, logger)
r.maxReplicaCount = convertMapKeyToInt32(annotations, autoscaling.MaxScaleAnnotationKey, logger)
if scalingClass == kedaAutoscalingAnnotationClass {
r.cooldownPeriod = convertMapKeyToInt32(annotations, kedaCooldownPeriodAnnodationKey, logger)
r.pollingInterval = convertMapKeyToInt32(annotations, kedaPollingIntervalAnnodationKey, logger)
r.pollingInterval = convertMapKeyToInt32(annotations, kedaPollingIntervalAnnodationKey, logger)
r.triggerLagThreshold = convertMapKeyToInt32(annotations, kedaTriggerLagThresholdAnnodationKey, logger)

}
}

// println("Got scalingClass", scalingClass)
// println("Got cooldownPeriod", *cooldownPeriod)
// println("Got pollingInterval", *pollingInterval)

// logger.Info("Got minReplicaCount", zap.Any("minReplicaCount", minReplicaCount))
// logger.Info("Got maxReplicaCount", zap.Any("maxReplicaCount", maxReplicaCount))
// var actualMinReplicaCount int32 = 0
// if minReplicaCount != nil {
// actualMinReplicaCount = *minReplicaCount
// }
// var actualMaxReplicaCount int32 = 1
// if maxReplicaCount != nil {
// actualMaxReplicaCount = *maxReplicaCount
// }
// //println("Got actualMinReplicaCount", actualMinReplicaCount)
// //println("Got actualMaxReplicaCount", actualMaxReplicaCount)
// logger.Info("Got actualMinReplicaCount "+string(actualMinReplicaCount), zap.Any("actualMinReplicaCount", actualMinReplicaCount))
// logger.Info("Got actualMaxReplicaCount "+string(actualMaxReplicaCount), zap.Any("actualMaxReplicaCount", actualMaxReplicaCount))
}

func (r *Reconciler) generateKedaScaledObjectUnstructured(ctx context.Context, ra *v1.Deployment, src *v1alpha1.KafkaSource) *unstructured.Unstructured {
func (r *Reconciler) generateKedaScaledObjectUnstructured(ctx context.Context, ra *v1.Deployment, src *v1alpha1.KafkaSource) (*unstructured.Unstructured, error) {
logger := logging.FromContext(ctx).Desugar()
deploymentName := ra.GetName()

namespace := src.Namespace //"default"
namespace := src.Namespace
name := src.Name
//println("Unstructured SO name", name)
logger.Info("Unstructured SO name "+name, zap.Any("name", name))
logger.Info("Unstructured ScaledObject name "+name, zap.Any("name", name))
srcName := src.GetName()
srcUID := src.GetUID()
srcResVersion := src.GetResourceVersion()
logger.Info("Got srcResVersion="+srcResVersion, zap.Any("srcResVersion", srcResVersion))
srcKind := src.GetGroupVersionKind().Kind ///GetObjectKind().GroupVersionKind().Kind
srcKind := src.GetGroupVersionKind().Kind
logger.Info("Got srcKind srcName srcUID", zap.Any("srcKind", srcKind), zap.Any("srcName", srcName), zap.Any("srcUID", srcUID))
srcBrokerList := src.Spec.BootstrapServers
srcConcumerGroup := src.Spec.ConsumerGroup
srcTopic := src.Spec.Topics //TODO multiple topics --> multiple triggers
triggerMetadata := map[string]interface{}{
"brokerList": srcBrokerList,
"consumerGroup": srcConcumerGroup,
"topic": srcTopic,
"lagThreshold": "50",
triggers := make([]map[string]interface{}, 0, 1)
topics := strings.Split(src.Spec.Topics, ",")
if len(topics) == 0 {
return nil, fmt.Errorf("Comma-separated list of topics can not be empty")
}
for _, topic := range topics {
triggerMetadata := map[string]interface{}{
"brokerList": srcBrokerList,
"consumerGroup": srcConcumerGroup,
"topic": topic,
}
if r.triggerLagThreshold != nil {
logger.Info("Got triggerLagThreshold", zap.Any("triggerLagThreshold", r.triggerLagThreshold))
triggerMetadata["lagThreshold"] = strconv.Itoa(int(*r.triggerLagThreshold))
}
trigger := map[string]interface{}{
"type": "kafka",
"metadata": triggerMetadata,
}
triggers = append(triggers, trigger)
}
spec := map[string]interface{}{
"scaleTargetRef": map[string]interface{}{
"deploymentName": deploymentName,
},
"triggers": []map[string]interface{}{{
"type": "kafka",
"metadata": triggerMetadata,
}},
"triggers": triggers,
}

// "minReplicaCount": actualMinReplicaCount,
// "maxReplicaCount": actualMaxReplicaCount,
if r.minReplicaCount != nil {
logger.Info("Got minReplicaCount", zap.Any("minReplicaCount", r.minReplicaCount))
spec["minReplicaCount"] = *r.minReplicaCount
Expand Down Expand Up @@ -460,12 +417,11 @@ func (r *Reconciler) generateKedaScaledObjectUnstructured(ctx context.Context, r
"controller": true,
}},
},
"spec": spec, // *spec?
"spec": spec,
},
}
fmt.Printf("Unstructured SO %s \n", soUnstr)
//logger.Info("Unstructured SO name "+name, zap.Any("name", name))
return soUnstr
logger.Info("Unstructured SO name "+name, zap.Any("name", name), zap.Any("soUnstr", soUnstr))
return soUnstr, nil
}

func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1alpha1.KafkaSource, sinkURI string) (*v1.Deployment, error) {
Expand Down

0 comments on commit 080d6b9

Please sign in to comment.