Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

using dedicated HTTP clients #1251

Merged
merged 11 commits into from
Dec 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

### New

- KEDA uses a dedicated [HTTP client](https://pkg.go.dev/net/http#Client), connection pool, and (optional) TLS certificate for each configured scaler
- KEDA scales any CustomResource that implements Scale subresource ([#703](https://github.com/kedacore/keda/issues/703))
- Provide KEDA go-client ([#494](https://github.com/kedacore/keda/issues/494))
- Define KEDA readiness and liveness probes ([#788](https://github.com/kedacore/keda/issues/788))
Expand Down
36 changes: 27 additions & 9 deletions adapter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"os"
"runtime"
"strconv"
"time"

appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -39,44 +41,44 @@ var (
prometheusMetricsPath string
)

func (a *Adapter) makeProviderOrDie() provider.MetricsProvider {
func (a *Adapter) makeProvider(globalHTTPTimeout time.Duration) (provider.MetricsProvider, error) {
// Get a config to talk to the apiserver
cfg, err := config.GetConfig()
if err != nil {
logger.Error(err, "failed to get the config")
os.Exit(1)
return nil, fmt.Errorf("failed to get the config (%s)", err)
}

scheme := scheme.Scheme
if err := appsv1.SchemeBuilder.AddToScheme(scheme); err != nil {
logger.Error(err, "failed to add apps/v1 scheme to runtime scheme")
os.Exit(1)
return nil, fmt.Errorf("failed to add apps/v1 scheme to runtime scheme (%s)", err)
}
if err := kedav1alpha1.SchemeBuilder.AddToScheme(scheme); err != nil {
logger.Error(err, "failed to add keda scheme to runtime scheme")
os.Exit(1)
return nil, fmt.Errorf("failed to add keda scheme to runtime scheme (%s)", err)
}

kubeclient, err := client.New(cfg, client.Options{
Scheme: scheme,
})
if err != nil {
logger.Error(err, "unable to construct new client")
os.Exit(1)
return nil, fmt.Errorf("unable to construct new client (%s)", err)
}

handler := scaling.NewScaleHandler(kubeclient, nil, scheme)
handler := scaling.NewScaleHandler(kubeclient, nil, scheme, globalHTTPTimeout)

namespace, err := getWatchNamespace()
if err != nil {
logger.Error(err, "failed to get watch namespace")
os.Exit(1)
return nil, fmt.Errorf("failed to get watch namespace (%s)", err)
}

prometheusServer := &prommetrics.PrometheusMetricServer{}
go func() { prometheusServer.NewServer(fmt.Sprintf(":%v", prometheusMetricsPort), prometheusMetricsPath) }()

return kedaprovider.NewProvider(logger, handler, kubeclient, namespace)
return kedaprovider.NewProvider(logger, handler, kubeclient, namespace), nil
}

func printVersion() {
Expand Down Expand Up @@ -117,7 +119,23 @@ func main() {
return
}

kedaProvider := cmd.makeProviderOrDie()
globalHTTPTimeoutStr := os.Getenv("KEDA_HTTP_DEFAULT_TIMEOUT")
if globalHTTPTimeoutStr == "" {
// default to 3 seconds if they don't pass the env var
globalHTTPTimeoutStr = "3000"
}

globalHTTPTimeoutMS, err := strconv.Atoi(globalHTTPTimeoutStr)
if err != nil {
logger.Error(err, "Invalid KEDA_HTTP_DEFAULT_TIMEOUT")
return
}

kedaProvider, err := cmd.makeProvider(time.Duration(globalHTTPTimeoutMS) * time.Millisecond)
if err != nil {
logger.Error(err, "making provider")
return
}
cmd.WithExternalMetrics(kedaProvider)

logger.Info(cmd.Message)
Expand Down
10 changes: 6 additions & 4 deletions controllers/scaledjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package controllers
import (
"context"
"fmt"
"time"

"github.com/go-logr/logr"
batchv1 "k8s.io/api/batch/v1"
Expand All @@ -25,14 +26,15 @@ import (
// ScaledJobReconciler reconciles a ScaledJob object
type ScaledJobReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
scaleHandler scaling.ScaleHandler
Log logr.Logger
Scheme *runtime.Scheme
scaleHandler scaling.ScaleHandler
globalHTTPTimeout time.Duration
}

// SetupWithManager initializes the ScaledJobReconciler instance and starts a new controller managed by the passed Manager instance.
func (r *ScaledJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), nil, mgr.GetScheme())
r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), nil, mgr.GetScheme(), r.globalHTTPTimeout)

return ctrl.NewControllerManagedBy(mgr).
// Ignore updates to ScaledJob Status (in this case metadata.Generation does not change)
Expand Down
5 changes: 4 additions & 1 deletion controllers/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sync"
"time"

"github.com/go-logr/logr"
autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2"
Expand Down Expand Up @@ -48,6 +49,8 @@ type ScaledObjectReconciler struct {
scaledObjectsGenerations *sync.Map
scaleHandler scaling.ScaleHandler
kubeVersion kedautil.K8sVersion

globalHTTPTimeout time.Duration
}

// SetupWithManager initializes the ScaledObjectReconciler instance and starts a new controller managed by the passed Manager instance.
Expand Down Expand Up @@ -75,7 +78,7 @@ func (r *ScaledObjectReconciler) SetupWithManager(mgr ctrl.Manager) error {
// Init the rest of ScaledObjectReconciler
r.restMapper = mgr.GetRESTMapper()
r.scaledObjectsGenerations = &sync.Map{}
r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), r.scaleClient, mgr.GetScheme())
r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), r.scaleClient, mgr.GetScheme(), r.globalHTTPTimeout)

// Start controller
return ctrl.NewControllerManagedBy(mgr).
Expand Down
Loading