diff --git a/scaler/config.go b/scaler/config.go index 8a50aace8..5859a097b 100644 --- a/scaler/config.go +++ b/scaler/config.go @@ -29,6 +29,8 @@ type config struct { // UpdateRoutingTableDur is the duration between manual // updates to the routing table. UpdateRoutingTableDur time.Duration `envconfig:"KEDA_HTTP_SCALER_ROUTING_TABLE_UPDATE_DUR" default:"100ms"` + // This will be the 'Target Pending Requests' for the interceptor + TargetPendingRequestsInterceptor int `envconfig:"KEDA_HTTP_SCALER_TARGET_PENDING_REQUESTS_INTERCEPTOR" default:"100"` } func mustParseConfig() *config { diff --git a/scaler/handlers.go b/scaler/handlers.go index 2211265a0..393626bbb 100644 --- a/scaler/handlers.go +++ b/scaler/handlers.go @@ -21,10 +21,11 @@ func init() { } type impl struct { - lggr logr.Logger - pinger *queuePinger - routingTable routing.TableReader - targetMetric int64 + lggr logr.Logger + pinger *queuePinger + routingTable routing.TableReader + targetMetric int64 + targetMetricInterceptor int64 externalscaler.UnimplementedExternalScalerServer } @@ -33,12 +34,14 @@ func newImpl( pinger *queuePinger, routingTable routing.TableReader, defaultTargetMetric int64, + defaultTargetMetricInterceptor int64, ) *impl { return &impl{ - lggr: lggr, - pinger: pinger, - routingTable: routingTable, - targetMetric: defaultTargetMetric, + lggr: lggr, + pinger: pinger, + routingTable: routingTable, + targetMetric: defaultTargetMetric, + targetMetricInterceptor: defaultTargetMetricInterceptor, } } @@ -115,20 +118,26 @@ func (e *impl) GetMetricSpec( lggr.Error(err, "no 'host' found in ScaledObject metadata") return nil, err } - target, err := e.routingTable.Lookup(host) - if err != nil { - lggr.Error( - err, - "error getting target for host", - "host", - host, - ) - return nil, err + var targetPendingRequests int64 + if host == "interceptor" { + targetPendingRequests = e.targetMetricInterceptor + } else { + target, err := e.routingTable.Lookup(host) + if err != nil { + lggr.Error( + err, + "error getting target for host", + "host", + host, + ) + return nil, err + } + targetPendingRequests = int64(target.TargetPendingRequests) } metricSpecs := []*externalscaler.MetricSpec{ { MetricName: host, - TargetSize: int64(target.TargetPendingRequests), + TargetSize: targetPendingRequests, }, } diff --git a/scaler/handlers_test.go b/scaler/handlers_test.go index 8d824ad65..4fcd8a180 100644 --- a/scaler/handlers_test.go +++ b/scaler/handlers_test.go @@ -31,6 +31,7 @@ func TestIsActive(t *testing.T) { pinger, table, 123, + 200, ) res, err := hdl.IsActive( ctx, @@ -81,7 +82,7 @@ func TestGetMetricSpec(t *testing.T) { )) ticker, pinger := newFakeQueuePinger(ctx, lggr) defer ticker.Stop() - hdl := newImpl(lggr, pinger, table, 123) + hdl := newImpl(lggr, pinger, table, 123, 200) meta := map[string]string{ "host": host, "targetPendingRequests": strconv.Itoa(int(target)), @@ -110,7 +111,7 @@ func TestGetMetricsMissingHostInMetadata(t *testing.T) { table := routing.NewTable() ticker, pinger := newFakeQueuePinger(ctx, lggr) defer ticker.Stop() - hdl := newImpl(lggr, pinger, table, 123) + hdl := newImpl(lggr, pinger, table, 123, 200) // no 'host' in the ScalerObjectRef's metadata field res, err := hdl.GetMetrics(ctx, req) @@ -137,7 +138,7 @@ func TestGetMetricsMissingHostInQueue(t *testing.T) { table := routing.NewTable() ticker, pinger := newFakeQueuePinger(ctx, lggr) defer ticker.Stop() - hdl := newImpl(lggr, pinger, table, 123) + hdl := newImpl(lggr, pinger, table, 123, 200) req := &externalscaler.GetMetricsRequest{ ScaledObjectRef: &externalscaler.ScaledObjectRef{}, @@ -212,7 +213,7 @@ func TestGetMetricsHostFoundInQueueCounts(t *testing.T) { // first tick time.Sleep(50 * time.Millisecond) - hdl := newImpl(lggr, pinger, table, 123) + hdl := newImpl(lggr, pinger, table, 123, 200) res, err := hdl.GetMetrics(ctx, req) r.NoError(err) r.NotNil(res) @@ -284,7 +285,7 @@ func TestGetMetricsInterceptorReturnsAggregate(t *testing.T) { // first tick time.Sleep(tickDur * 5) - hdl := newImpl(lggr, pinger, table, 123) + hdl := newImpl(lggr, pinger, table, 123, 200) res, err := hdl.GetMetrics(ctx, req) r.NoError(err) r.NotNil(res) diff --git a/scaler/main.go b/scaler/main.go index 3e3cf8c01..84b10f707 100644 --- a/scaler/main.go +++ b/scaler/main.go @@ -41,6 +41,7 @@ func main() { svcName := cfg.TargetService targetPortStr := fmt.Sprintf("%d", cfg.TargetPort) targetPendingRequests := cfg.TargetPendingRequests + targetPendingRequestsInterceptor := cfg.TargetPendingRequestsInterceptor k8sCl, _, err := k8s.NewClientset() if err != nil { @@ -69,6 +70,7 @@ func main() { pinger, table, int64(targetPendingRequests), + int64(targetPendingRequestsInterceptor), ) }) @@ -106,6 +108,7 @@ func startGrpcServer( pinger *queuePinger, routingTable *routing.Table, targetPendingRequests int64, + targetPendingRequestsInterceptor int64, ) error { addr := fmt.Sprintf("0.0.0.0:%d", port) @@ -123,6 +126,7 @@ func startGrpcServer( pinger, routingTable, targetPendingRequests, + targetPendingRequestsInterceptor, ), ) reflection.Register(grpcServer)