From 01e90b169b98474905fc5ea7eb7b274f49576231 Mon Sep 17 00:00:00 2001 From: Ajanth Date: Wed, 29 Sep 2021 01:11:17 +0530 Subject: [PATCH 1/2] Fixing issue #272 Signed-off-by: Ajanth --- scaler/config.go | 2 ++ scaler/handlers.go | 45 +++++++++++++++++++++++++++------------------ scaler/main.go | 4 ++++ 3 files changed, 33 insertions(+), 18 deletions(-) diff --git a/scaler/config.go b/scaler/config.go index 8a50aace8..5859a097b 100644 --- a/scaler/config.go +++ b/scaler/config.go @@ -29,6 +29,8 @@ type config struct { // UpdateRoutingTableDur is the duration between manual // updates to the routing table. UpdateRoutingTableDur time.Duration `envconfig:"KEDA_HTTP_SCALER_ROUTING_TABLE_UPDATE_DUR" default:"100ms"` + // This will be the 'Target Pending Requests' for the interceptor + TargetPendingRequestsInterceptor int `envconfig:"KEDA_HTTP_SCALER_TARGET_PENDING_REQUESTS_INTERCEPTOR" default:"100"` } func mustParseConfig() *config { diff --git a/scaler/handlers.go b/scaler/handlers.go index 2211265a0..393626bbb 100644 --- a/scaler/handlers.go +++ b/scaler/handlers.go @@ -21,10 +21,11 @@ func init() { } type impl struct { - lggr logr.Logger - pinger *queuePinger - routingTable routing.TableReader - targetMetric int64 + lggr logr.Logger + pinger *queuePinger + routingTable routing.TableReader + targetMetric int64 + targetMetricInterceptor int64 externalscaler.UnimplementedExternalScalerServer } @@ -33,12 +34,14 @@ func newImpl( pinger *queuePinger, routingTable routing.TableReader, defaultTargetMetric int64, + defaultTargetMetricInterceptor int64, ) *impl { return &impl{ - lggr: lggr, - pinger: pinger, - routingTable: routingTable, - targetMetric: defaultTargetMetric, + lggr: lggr, + pinger: pinger, + routingTable: routingTable, + targetMetric: defaultTargetMetric, + targetMetricInterceptor: defaultTargetMetricInterceptor, } } @@ -115,20 +118,26 @@ func (e *impl) GetMetricSpec( lggr.Error(err, "no 'host' found in ScaledObject metadata") return nil, err } - target, err := e.routingTable.Lookup(host) - if err != nil { - lggr.Error( - err, - "error getting target for host", - "host", - host, - ) - return nil, err + var targetPendingRequests int64 + if host == "interceptor" { + targetPendingRequests = e.targetMetricInterceptor + } else { + target, err := e.routingTable.Lookup(host) + if err != nil { + lggr.Error( + err, + "error getting target for host", + "host", + host, + ) + return nil, err + } + targetPendingRequests = int64(target.TargetPendingRequests) } metricSpecs := []*externalscaler.MetricSpec{ { MetricName: host, - TargetSize: int64(target.TargetPendingRequests), + TargetSize: targetPendingRequests, }, } diff --git a/scaler/main.go b/scaler/main.go index 3e3cf8c01..84b10f707 100644 --- a/scaler/main.go +++ b/scaler/main.go @@ -41,6 +41,7 @@ func main() { svcName := cfg.TargetService targetPortStr := fmt.Sprintf("%d", cfg.TargetPort) targetPendingRequests := cfg.TargetPendingRequests + targetPendingRequestsInterceptor := cfg.TargetPendingRequestsInterceptor k8sCl, _, err := k8s.NewClientset() if err != nil { @@ -69,6 +70,7 @@ func main() { pinger, table, int64(targetPendingRequests), + int64(targetPendingRequestsInterceptor), ) }) @@ -106,6 +108,7 @@ func startGrpcServer( pinger *queuePinger, routingTable *routing.Table, targetPendingRequests int64, + targetPendingRequestsInterceptor int64, ) error { addr := fmt.Sprintf("0.0.0.0:%d", port) @@ -123,6 +126,7 @@ func startGrpcServer( pinger, routingTable, targetPendingRequests, + targetPendingRequestsInterceptor, ), ) reflection.Register(grpcServer) From 248897691dd0abcc545bdc1a6564cc8a3eed96ba Mon Sep 17 00:00:00 2001 From: Ajanth Date: Fri, 8 Oct 2021 21:09:43 +0530 Subject: [PATCH 2/2] Fixing scaler test Signed-off-by: Ajanth --- scaler/handlers_test.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/scaler/handlers_test.go b/scaler/handlers_test.go index 8d824ad65..4fcd8a180 100644 --- a/scaler/handlers_test.go +++ b/scaler/handlers_test.go @@ -31,6 +31,7 @@ func TestIsActive(t *testing.T) { pinger, table, 123, + 200, ) res, err := hdl.IsActive( ctx, @@ -81,7 +82,7 @@ func TestGetMetricSpec(t *testing.T) { )) ticker, pinger := newFakeQueuePinger(ctx, lggr) defer ticker.Stop() - hdl := newImpl(lggr, pinger, table, 123) + hdl := newImpl(lggr, pinger, table, 123, 200) meta := map[string]string{ "host": host, "targetPendingRequests": strconv.Itoa(int(target)), @@ -110,7 +111,7 @@ func TestGetMetricsMissingHostInMetadata(t *testing.T) { table := routing.NewTable() ticker, pinger := newFakeQueuePinger(ctx, lggr) defer ticker.Stop() - hdl := newImpl(lggr, pinger, table, 123) + hdl := newImpl(lggr, pinger, table, 123, 200) // no 'host' in the ScalerObjectRef's metadata field res, err := hdl.GetMetrics(ctx, req) @@ -137,7 +138,7 @@ func TestGetMetricsMissingHostInQueue(t *testing.T) { table := routing.NewTable() ticker, pinger := newFakeQueuePinger(ctx, lggr) defer ticker.Stop() - hdl := newImpl(lggr, pinger, table, 123) + hdl := newImpl(lggr, pinger, table, 123, 200) req := &externalscaler.GetMetricsRequest{ ScaledObjectRef: &externalscaler.ScaledObjectRef{}, @@ -212,7 +213,7 @@ func TestGetMetricsHostFoundInQueueCounts(t *testing.T) { // first tick time.Sleep(50 * time.Millisecond) - hdl := newImpl(lggr, pinger, table, 123) + hdl := newImpl(lggr, pinger, table, 123, 200) res, err := hdl.GetMetrics(ctx, req) r.NoError(err) r.NotNil(res) @@ -284,7 +285,7 @@ func TestGetMetricsInterceptorReturnsAggregate(t *testing.T) { // first tick time.Sleep(tickDur * 5) - hdl := newImpl(lggr, pinger, table, 123) + hdl := newImpl(lggr, pinger, table, 123, 200) res, err := hdl.GetMetrics(ctx, req) r.NoError(err) r.NotNil(res)