Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add X-RateLimit-* response headers as an opt-in feature #77

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

145 changes: 76 additions & 69 deletions proto/ratelimit/ratelimit.pb.go

Large diffs are not rendered by default.

51 changes: 51 additions & 0 deletions src/service/header_util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package ratelimit

import (
"strconv"

"github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
pb "github.com/envoyproxy/go-control-plane/envoy/service/ratelimit/v2"
)

func limitHeader(descriptor *pb.RateLimitResponse_DescriptorStatus) *core.HeaderValue {
return &core.HeaderValue{
Key: "X-RateLimit-Limit",
Value: strconv.FormatUint(uint64(descriptor.CurrentLimit.RequestsPerUnit), 10),
}
}

func remainingHeader(descriptor *pb.RateLimitResponse_DescriptorStatus) *core.HeaderValue {
return &core.HeaderValue{
Key: "X-RateLimit-Remaining",
Value: strconv.FormatUint(uint64(descriptor.LimitRemaining), 10),
}
}

func resetHeader(
descriptor *pb.RateLimitResponse_DescriptorStatus, now int64) *core.HeaderValue {

return &core.HeaderValue{
Key: "X-RateLimit-Reset",
Value: strconv.FormatInt(calculateReset(descriptor, now), 10),
}
}

func calculateReset(descriptor *pb.RateLimitResponse_DescriptorStatus, now int64) int64 {
sec := unitInSeconds(descriptor.CurrentLimit.Unit)
return sec - now%sec
}

func unitInSeconds(unit pb.RateLimitResponse_RateLimit_Unit) int64 {
switch unit {
case pb.RateLimitResponse_RateLimit_SECOND:
return 1
case pb.RateLimitResponse_RateLimit_MINUTE:
return 60
case pb.RateLimitResponse_RateLimit_HOUR:
return 60 * 60
case pb.RateLimitResponse_RateLimit_DAY:
return 60 * 60 * 24
default:
panic("unknown rate limit unit")
}
}
85 changes: 67 additions & 18 deletions src/service/ratelimit.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ package ratelimit
import (
"strings"
"sync"
"time"

"github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
pb "github.com/envoyproxy/go-control-plane/envoy/service/ratelimit/v2"
"github.com/lyft/goruntime/loader"
"github.com/lyft/gostats"
"github.com/lyft/ratelimit/src/assert"
"github.com/lyft/ratelimit/src/config"
"github.com/lyft/ratelimit/src/redis"
"github.com/lyft/ratelimit/src/settings"
logger "github.com/sirupsen/logrus"
"golang.org/x/net/context"
)
Expand All @@ -19,6 +22,16 @@ type shouldRateLimitStats struct {
serviceError stats.Counter
}

// Clock represents a wall clock for time based operations.
type Clock interface {
Now() time.Time
}

// StdClock returns system time.
type StdClock struct{}

func (c StdClock) Now() time.Time { return time.Now() }

func newShouldRateLimitStats(scope stats.Scope) shouldRateLimitStats {
ret := shouldRateLimitStats{}
ret.redisError = scope.NewCounter("redis_error")
Expand Down Expand Up @@ -47,15 +60,17 @@ type RateLimitServiceServer interface {
}

type service struct {
runtime loader.IFace
configLock sync.RWMutex
configLoader config.RateLimitConfigLoader
config config.RateLimitConfig
runtimeUpdateEvent chan int
cache redis.RateLimitCache
stats serviceStats
rlStatsScope stats.Scope
legacy *legacyService
runtime loader.IFace
configLock sync.RWMutex
configLoader config.RateLimitConfigLoader
config config.RateLimitConfig
runtimeUpdateEvent chan int
cache redis.RateLimitCache
stats serviceStats
rlStatsScope stats.Scope
legacy *legacyService
responseHeadersEnabled bool
clock Clock
}

func (this *service) reloadConfig() {
Expand Down Expand Up @@ -126,6 +141,37 @@ func (this *service) shouldRateLimitWorker(
finalCode = descriptorStatus.Code
}
}
if this.responseHeadersEnabled {
now := this.clock.Now().Unix()
var limitingDescriptor *pb.RateLimitResponse_DescriptorStatus
limitCount := 0
for _, descriptor := range responseDescriptorStatuses {
if descriptor.CurrentLimit == nil {
continue
}
limitCount++
if limitingDescriptor == nil ||
descriptor.LimitRemaining < limitingDescriptor.LimitRemaining ||
descriptor.LimitRemaining == limitingDescriptor.LimitRemaining &&
calculateReset(descriptor, now) > calculateReset(limitingDescriptor, now) {
limitingDescriptor = descriptor
}
}
if limitCount == 1 {
response.Headers = []*core.HeaderValue{
limitHeader(limitingDescriptor),
remainingHeader(limitingDescriptor),
resetHeader(limitingDescriptor, now),
}
} else if limitCount > 1 {
// If there is more than one limit, then picking one of them for the "X-RateLimit-Limit"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we pick min for remaining, shouldn't we pick the accompanying Limit to fill the limitHeader?

// header value would be arbitrary, so we omit it completely.
response.Headers = []*core.HeaderValue{
remainingHeader(limitingDescriptor),
resetHeader(limitingDescriptor, now),
}
}
}

response.OverallCode = finalCode
return response
Expand Down Expand Up @@ -175,17 +221,20 @@ func (this *service) GetCurrentConfig() config.RateLimitConfig {
}

func NewService(runtime loader.IFace, cache redis.RateLimitCache,
configLoader config.RateLimitConfigLoader, stats stats.Scope) RateLimitServiceServer {
configLoader config.RateLimitConfigLoader, stats stats.Scope,
clock Clock, settings settings.Settings) RateLimitServiceServer {

newService := &service{
runtime: runtime,
configLock: sync.RWMutex{},
configLoader: configLoader,
config: nil,
runtimeUpdateEvent: make(chan int),
cache: cache,
stats: newServiceStats(stats),
rlStatsScope: stats.Scope("rate_limit"),
runtime: runtime,
configLock: sync.RWMutex{},
configLoader: configLoader,
config: nil,
runtimeUpdateEvent: make(chan int),
cache: cache,
stats: newServiceStats(stats),
rlStatsScope: stats.Scope("rate_limit"),
responseHeadersEnabled: settings.ResponseHeadersEnabled,
clock: clock,
}
newService.legacy = &legacyService{
s: newService,
Expand Down
4 changes: 3 additions & 1 deletion src/service/ratelimit_legacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/lyft/gostats"
pb_legacy "github.com/lyft/ratelimit/proto/ratelimit"
"golang.org/x/net/context"
"strings"
)

type RateLimitLegacyServiceServer interface {
Expand Down Expand Up @@ -90,7 +91,8 @@ func ConvertResponse(response *pb.RateLimitResponse) (*pb_legacy.RateLimitRespon
}

resp := &pb_legacy.RateLimitResponse{}
err = jsonpb.UnmarshalString(s, resp)
u := jsonpb.Unmarshaler{AllowUnknownFields: true}
err = u.Unmarshal(strings.NewReader(s), resp)
if err != nil {
return nil, err
}
Expand Down
6 changes: 4 additions & 2 deletions src/service_cmd/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,10 @@ func Run() {
rand.New(redis.NewLockedSource(time.Now().Unix())),
s.ExpirationJitterMaxSeconds),
config.NewRateLimitConfigLoaderImpl(),
srv.Scope().Scope("service"))

srv.Scope().Scope("service"),
ratelimit.StdClock{},
s,
)
srv.AddDebugHttpEndpoint(
"/rlconfig",
"print out the currently loaded configuration for debugging",
Expand Down
1 change: 1 addition & 0 deletions src/settings/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Settings struct {
RedisPerSecondUrl string `envconfig:"REDIS_PERSECOND_URL" default:"/var/run/nutcracker/ratelimitpersecond.sock"`
RedisPerSecondPoolSize int `envconfig:"REDIS_PERSECOND_POOL_SIZE" default:"10"`
ExpirationJitterMaxSeconds int64 `envconfig:"EXPIRATION_JITTER_MAX_SECONDS" default:"300"`
ResponseHeadersEnabled bool `envconfig:"RESPONSE_HEADERS_ENABLED" default:"false"`
}

type Option func(*Settings)
Expand Down
43 changes: 43 additions & 0 deletions test/integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"testing"
"time"

"github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
pb "github.com/envoyproxy/go-control-plane/envoy/service/ratelimit/v2"
pb_legacy "github.com/lyft/ratelimit/proto/ratelimit"
"github.com/lyft/ratelimit/src/service_cmd/runner"
Expand Down Expand Up @@ -244,3 +245,45 @@ func TestBasicConfigLegacy(t *testing.T) {
assert.NoError(err)
}
}

func TestBasicConfigWithHeaders(t *testing.T) {
os.Setenv("RESPONSE_HEADERS_ENABLED", "true")
os.Setenv("REDIS_PERSECOND", "false")
os.Setenv("PORT", "8082")
os.Setenv("GRPC_PORT", "8086")
os.Setenv("DEBUG_PORT", "8084")
os.Setenv("RUNTIME_ROOT", "runtime/current")
os.Setenv("RUNTIME_SUBDIRECTORY", "ratelimit")
os.Setenv("REDIS_PERSECOND_SOCKET_TYPE", "tcp")
os.Setenv("REDIS_SOCKET_TYPE", "tcp")
os.Setenv("REDIS_URL", "localhost:6379")

go func() {
runner.Run()
}()

// HACK: Wait for the server to come up. Make a hook that we can wait on.
time.Sleep(100 * time.Millisecond)

assert := assert.New(t)
conn, err := grpc.Dial("localhost:8086", grpc.WithInsecure())
assert.NoError(err)
defer conn.Close()
c := pb.NewRateLimitServiceClient(conn)

response, err := c.ShouldRateLimit(
context.Background(),
common.NewRateLimitRequest("basic_headers", [][][2]string{{{"key1", "foo"}}}, 1))
assert.Equal(
&pb.RateLimitResponse{
OverallCode: pb.RateLimitResponse_OK,
Statuses: []*pb.RateLimitResponse_DescriptorStatus{newDescriptorStatus(
pb.RateLimitResponse_OK, 50, pb.RateLimitResponse_RateLimit_SECOND, 49)},
Headers: []*core.HeaderValue{
{Key: "X-RateLimit-Limit", Value: "50"},
{Key: "X-RateLimit-Remaining", Value: "49"},
{Key: "X-RateLimit-Reset", Value: "1"},
}},
response)
assert.NoError(err)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
domain: basic_headers
descriptors:
- key: key1
rate_limit:
unit: second
requests_per_unit: 50
35 changes: 34 additions & 1 deletion test/service/ratelimit_legacy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ratelimit_test
import (
"testing"

"github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
pb_struct "github.com/envoyproxy/go-control-plane/envoy/api/v2/ratelimit"
pb "github.com/envoyproxy/go-control-plane/envoy/service/ratelimit/v2"
"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -214,6 +215,10 @@ func TestInitialLoadErrorLegacy(test *testing.T) {
t := commonSetup(test)
defer t.controller.Finish()

t.settings.ResponseHeadersEnabled = false
var currentTime int64 = 0
var c ratelimit.Clock = stubClock{now: &currentTime}

t.runtime.EXPECT().AddUpdateCallback(gomock.Any()).Do(
func(callback chan<- int) { t.runtimeUpdateCallback = callback })
t.runtime.EXPECT().Snapshot().Return(t.snapshot).MinTimes(1)
Expand All @@ -224,7 +229,7 @@ func TestInitialLoadErrorLegacy(test *testing.T) {
func([]config.RateLimitConfigToLoad, stats.Scope) {
panic(config.RateLimitConfigError("load error"))
})
service := ratelimit.NewService(t.runtime, t.cache, t.configLoader, t.statStore)
service := ratelimit.NewService(t.runtime, t.cache, t.configLoader, t.statStore, c, t.settings)

request := common.NewRateLimitRequestLegacy("test-domain", [][][2]string{{{"hello", "world"}}}, 1)
response, err := service.GetLegacyService().ShouldRateLimit(nil, request)
Expand Down Expand Up @@ -406,3 +411,31 @@ func TestConvertResponse(test *testing.T) {

assert.Equal(test, expectedResponse, resp)
}

func TestConvertResponseWithHeaders(t *testing.T) {
response := &pb.RateLimitResponse{
OverallCode: pb.RateLimitResponse_OVER_LIMIT,
Statuses: []*pb.RateLimitResponse_DescriptorStatus{{
Code: pb.RateLimitResponse_OK,
CurrentLimit: nil,
LimitRemaining: 9,
}},
Headers: []*core.HeaderValue{
{Key: "X-RateLimit-Limit", Value: "5"},
{Key: "X-RateLimit-Remaining", Value: "4"},
{Key: "X-RateLimit-Reset", Value: "38"},
},
}
legacyResponse, err := ratelimit.ConvertResponse(response)
if err != nil {
assert.FailNow(t, err.Error())
}
assert.Equal(t, &pb_legacy.RateLimitResponse{
OverallCode: pb_legacy.RateLimitResponse_OVER_LIMIT,
Statuses: []*pb_legacy.RateLimitResponse_DescriptorStatus{{
Code: pb_legacy.RateLimitResponse_OK,
CurrentLimit: nil,
LimitRemaining: 9,
}},
}, legacyResponse)
}
Loading