-
Notifications
You must be signed in to change notification settings - Fork 170
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add rate limiting to the entire service policy. #648
Changes from 14 commits
9985f90
9c102ed
0554bd5
cb4a273
66d8086
e522691
10c163d
1645bf3
609d54a
773df6a
0306fe2
683ecc6
397df3a
ee851a9
5548d0f
1e8b5c3
04fcd16
947ee62
b709391
10f79fc
1474f3c
6d76fe0
48c64c8
ef334ac
aa8e0fd
d7e2f73
ba7dc9e
ae7e658
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
local policy_chain = require('apicast.policy_chain').default() | ||
|
||
local rate_limit_policy = require('apicast.policy.rate_limit').new({ | ||
limiters = { | ||
{ | ||
name = "connections", | ||
key = "limit1", | ||
conn = 20, | ||
burst = 10, | ||
delay = 0.5 | ||
}, | ||
{ | ||
name = "leaky_bucket", | ||
key = "limit2", | ||
rate = 18, | ||
burst = 9 | ||
}, | ||
{ | ||
name = "fixed_window", | ||
key = "limit3", | ||
count = 10, | ||
window = 10 | ||
}}, | ||
redis_url = "redis://localhost:6379/1" | ||
}) | ||
|
||
policy_chain:insert(rate_limit_policy, 1) | ||
|
||
return { | ||
policy_chain = policy_chain | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
{ | ||
"$schema": "http://apicast.io/policy-v1/schema#manifest#", | ||
"name": "rate limit policy", | ||
"description": ["This policy adds rate limit.", | ||
"" | ||
], | ||
"version": "builtin", | ||
"configuration": { | ||
"type": "object", | ||
"properties": { | ||
"limiters": { | ||
"description": "List of limiters to be applied", | ||
"type": "array", | ||
"items": { | ||
"type": "object", | ||
"properties": { | ||
"oneOf": [{ | ||
"name": { | ||
"type": "string", | ||
"enum": ["connections"], | ||
"description": "limiting request concurrency (or concurrent connections)" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would be good to start with capital letter. This applies to other comments too. |
||
}, | ||
"key": { | ||
"description": "Key of limiter", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is not very useful description. |
||
"type": "string" | ||
}, | ||
"conn": { | ||
"type": "integer", | ||
"description": "the maximum number of concurrent requests allowed" | ||
}, | ||
"burst": { | ||
"type": "integer", | ||
"description": "the number of excessive concurrent requests (or connections) allowed to be delayed" | ||
}, | ||
"delay": { | ||
"type": "integer", | ||
"description": "the default processing latency of a typical connection (or request)" | ||
} | ||
}, { | ||
"name": { | ||
"type": "string", | ||
"enum": ["leaky_bucket"], | ||
"description": "limiting request rate" | ||
}, | ||
"key": { | ||
"description": "Key of limiter", | ||
"type": "string" | ||
}, | ||
"rate": { | ||
"type": "integer", | ||
"description": "the specified request rate (number per second) threshold" | ||
}, | ||
"burst": { | ||
"type": "integer", | ||
"description": "the number of excessive requests per second allowed to be delayed" | ||
} | ||
}, { | ||
"name": { | ||
"type": "string", | ||
"enum": ["fixed_window"], | ||
"description": "limiting request counts" | ||
}, | ||
"key": { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When we allow this to be a "template" in the future we will somehow have to distinguish between plain strings and templated strings. We probably should change this to be more extensible in the future before the final release. Also right now those keys have global scopes. That does not really work in our Cloud Hosted environment. We probably should offer a way to have "scope" of the key that could be either "global" or "service" and the key would be automatically namespaced. |
||
"description": "Key of limiter", | ||
"type": "string" | ||
}, | ||
"count": { | ||
"type": "integer", | ||
"description": "the specified number of requests threshold" | ||
}, | ||
"window": { | ||
"type": "integer", | ||
"description": "the time window in seconds before the request count is reset" | ||
} | ||
}] | ||
} | ||
} | ||
}, | ||
"redis_url": { | ||
"description": "URL of Redis", | ||
"type": "string" | ||
}, | ||
"status_code_rejected": { | ||
"type": "integer", | ||
"description": "the status code when requests over the limit, default 429" | ||
}, | ||
"logging_only": { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this should not be boolean but rather enum like |
||
"type": "boolean", | ||
"description": "if true, the request goes through when there is some issue with rate limiting, default false" | ||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
return require('rate_limit') |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,222 @@ | ||
local policy = require('apicast.policy') | ||
local _M = policy.new('Rate Limit Policy') | ||
|
||
local resty_limit_conn = require('resty.limit.conn') | ||
local resty_limit_req = require('resty.limit.req') | ||
local resty_limit_count = require('resty.limit.count') | ||
|
||
local ngx_semaphore = require "ngx.semaphore" | ||
local limit_traffic = require "resty.limit.traffic" | ||
local ts = require ('apicast.threescale_utils') | ||
local tonumber = tonumber | ||
local next = next | ||
local shdict_key = 'limiter' | ||
|
||
local new = _M.new | ||
|
||
local traffic_limiters = { | ||
connections = function(config) | ||
return resty_limit_conn.new(shdict_key, config.conn, config.burst, config.delay) | ||
end, | ||
leaky_bucket = function(config) | ||
return resty_limit_req.new(shdict_key, config.rate, config.burst) | ||
end, | ||
fixed_window = function(config) | ||
return resty_limit_count.new(shdict_key, config.count, config.window) | ||
end | ||
} | ||
|
||
local function try(f, catch_f) | ||
local status, exception = pcall(f) | ||
if not status then | ||
catch_f(exception) | ||
end | ||
end | ||
|
||
local function init_limiter(config) | ||
local lim, limerr | ||
try( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is not really idiomatic Lua and we really should not be expecting exceptions. All resty code will return an error as a second return value, so we can just use that. |
||
function() | ||
lim, limerr = traffic_limiters[config.name](config) | ||
end, | ||
function(e) | ||
return nil, e | ||
end | ||
) | ||
if not lim then | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can't we return |
||
return nil, limerr | ||
end | ||
|
||
return lim, nil | ||
end | ||
|
||
local function redis_shdict(url) | ||
local options = { url = url } | ||
local redis, err = ts.connect_redis(options) | ||
if not redis then | ||
return nil, err | ||
end | ||
|
||
return { | ||
incr = function(_, key, value, init) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These functions can be defined just once in the top closure and then get the |
||
if not init then | ||
return redis:incrby(key, value), nil | ||
end | ||
redis:setnx(key, init) | ||
return redis:incrby(key, value), nil | ||
end, | ||
set = function(_, key, value) | ||
return redis:set(key, value) | ||
end, | ||
expire = function(_, key, exptime) | ||
local ret = redis:expire(key, exptime) | ||
if ret == 0 then | ||
return nil, "not found" | ||
end | ||
return true, nil | ||
end, | ||
get = function(_, key) | ||
local val = redis:get(key) | ||
if type(val) == "userdata" then | ||
return nil | ||
end | ||
return val | ||
end | ||
} | ||
end | ||
|
||
local function error(logging_only, status_code) | ||
if not logging_only then | ||
return ngx.exit(status_code) | ||
end | ||
end | ||
|
||
function _M.new(config) | ||
local self = new() | ||
self.config = config or {} | ||
self.limiters = config.limiters | ||
self.redis_url = config.redis_url | ||
self.status_code_rejected = config.status_code_rejected or 429 | ||
self.logging_only = config.logging_only or false | ||
|
||
return self | ||
end | ||
|
||
function _M:access() | ||
local limiters = {} | ||
local keys = {} | ||
|
||
if not self.redis_url then | ||
-- Only one (the first) limiter is enable. | ||
-- Key will be shdict_key ('limiter'). | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
@mikz No. Although it is a tentative implementation, we can use only one limiter with one shared dictionary. When user defines 3 limiters ("limiter1","limiter2","limiter3"), only "limiter1" is enabled. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, I don't understand this either. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @davidor If we allow to use several limiters with shdict, we have to define the size of the shdict by using lua_shared_dict. If a user use 3 limiters, we have to define 3 lua_shared_dict, and if a user use 10 limiters, we have to define 10 lua_shared_dict. I don't want to limit the number of limiters, but there is no good way to define lua_shared_dict automatically as many as a user defines. So I defined only one lua_shared_dict tentatively. |
||
local lim, initerr = init_limiter(self.limiters[1]) | ||
if not lim then | ||
ngx.log(ngx.ERR, "unknown limiter: ", initerr) | ||
error(self.logging_only, 500) | ||
goto done | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You could call |
||
end | ||
|
||
limiters[1] = lim | ||
keys[1] = shdict_key | ||
|
||
else | ||
for _, limiter in ipairs(self.limiters) do | ||
local lim, initerr = init_limiter(limiter) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
if not lim then | ||
ngx.log(ngx.ERR, "unknown limiter: ", limiter.name, ", err: ", initerr) | ||
error(self.logging_only, 500) | ||
goto done | ||
end | ||
|
||
local rediserr | ||
lim.dict, rediserr = redis_shdict(self.redis_url) | ||
if not lim.dict then | ||
ngx.log(ngx.ERR, "failed to connect Redis: ", rediserr) | ||
error(self.logging_only, 500) | ||
goto done | ||
end | ||
|
||
limiters[#limiters + 1] = lim | ||
keys[#keys + 1] = limiter.key | ||
|
||
end | ||
end | ||
|
||
local states = {} | ||
local connections_committed = {} | ||
local keys_committed = {} | ||
|
||
local delay, comerr = limit_traffic.combine(limiters, keys, states) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When using Redis and several limiters, the call to For efficiency, we might want to find a way to send all those commands in a single network trip using pipelines: https://github.com/openresty/lua-resty-redis#init_pipeline This is an optimization we can leave for a future PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @davidor I'd like to leave this for a future PR because it's a little bit difficult. |
||
if not delay then | ||
if comerr == "rejected" then | ||
ngx.log(ngx.WARN, "Requests over the limit.") | ||
error(self.logging_only, self.status_code_rejected) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure if we want to let the requests through when "logging only". That would effectively disable the policy. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mikz Do you mean There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not really. I mean the "logging only" does not terminate the request. So it does not limit requests over limit. This policy does not rate limit requests when set to "logging only". I think there is a difference between throwing errors when initializing misconfigured policy or redis is inavailable - but when the call is actually rejected - then the policy should reject the request - or have different config option for it. |
||
goto done | ||
end | ||
ngx.log(ngx.ERR, "failed to limit traffic: ", comerr) | ||
error(self.logging_only, 500) | ||
goto done | ||
end | ||
|
||
for i, lim in ipairs(limiters) do | ||
if lim.is_committed and lim:is_committed() then | ||
connections_committed[#connections_committed + 1] = lim | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is getting number of elements in the table in each iteration, better to use |
||
keys_committed[#keys_committed + 1] = keys[i] | ||
end | ||
end | ||
|
||
if next(connections_committed) ~= nil then | ||
local ctx = ngx.ctx | ||
ctx.limiters = connections_committed | ||
ctx.keys = keys_committed | ||
end | ||
|
||
if delay >= 0.001 then | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if |
||
ngx.log(ngx.WARN, 'need to delay by: ', delay, 's, states: ', table.concat(states, ", ")) | ||
ngx.sleep(delay) | ||
end | ||
|
||
::done:: | ||
end | ||
|
||
local function checkin(_, ctx, time, semaphore, redis_url, logging_only) | ||
local limiters = ctx.limiters | ||
local keys = ctx.keys | ||
local latency = tonumber(time) | ||
|
||
for i, lim in ipairs(limiters) do | ||
if redis_url then | ||
local rediserr | ||
lim.dict, rediserr = redis_shdict(redis_url) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is needed because the connection has been already closed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This initializes a Redis client instance on every request. It might be better to have a connection pool and fetch and return connections as needed, but maybe we can leave this for a future PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @davidor I'd like to leave this for a future PR because it's a little bit difficult. |
||
if not lim.dict then | ||
ngx.log(ngx.ERR, "failed to connect Redis: ", rediserr) | ||
error(logging_only, 500) | ||
goto done | ||
end | ||
end | ||
local conn, err = lim:leaving(keys[i], latency) | ||
if not conn then | ||
ngx.log(ngx.ERR, "failed to record the connection leaving request: ", err) | ||
error(logging_only, 500) | ||
goto done | ||
end | ||
end | ||
|
||
if semaphore then | ||
semaphore:post(1) | ||
end | ||
|
||
::done:: | ||
end | ||
|
||
function _M:log() | ||
local ctx = ngx.ctx | ||
local limiters = ctx.limiters | ||
if limiters and next(limiters) ~= nil then | ||
local semaphore = ngx_semaphore.new() | ||
ngx.timer.at(0, checkin, ngx.ctx, ngx.var.request_time, semaphore, self.redis_url, self.logging_only) | ||
semaphore:wait(10) | ||
end | ||
end | ||
|
||
return _M |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the
""
needed ?