-
Notifications
You must be signed in to change notification settings - Fork 170
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #648 from Hitachi/rate_limiting_to_service_policy
Add rate limiting to the entire service policy.
- Loading branch information
Showing
8 changed files
with
1,412 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
local policy_chain = require('apicast.policy_chain').default() | ||
|
||
local rate_limit_policy = require('apicast.policy.rate_limit').new({ | ||
limiters = { | ||
{ | ||
name = "connections", | ||
key = "limit1", | ||
conn = 20, | ||
burst = 10, | ||
delay = 0.5 | ||
}, | ||
{ | ||
name = "leaky_bucket", | ||
key = "limit2", | ||
rate = 18, | ||
burst = 9 | ||
}, | ||
{ | ||
name = "fixed_window", | ||
key = "limit3", | ||
count = 10, | ||
window = 10 | ||
}}, | ||
redis_url = "redis://localhost:6379/1" | ||
}) | ||
|
||
policy_chain:insert(rate_limit_policy, 1) | ||
|
||
return { | ||
policy_chain = policy_chain | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
{ | ||
"$schema": "http://apicast.io/policy-v1/schema#manifest#", | ||
"name": "rate limit policy", | ||
"summary": "Adds rate limit.", | ||
"description": ["This policy adds rate limit."], | ||
"version": "builtin", | ||
"configuration": { | ||
"type": "object", | ||
"properties": { | ||
"limiters": { | ||
"description": "List of limiters to be applied", | ||
"type": "array", | ||
"items": { | ||
"oneOf": [{ | ||
"type": "object", | ||
"properties": { | ||
"name": { | ||
"type": "string", | ||
"enum": ["connections"], | ||
"description": "limiting request concurrency (or concurrent connections)" | ||
}, | ||
"key": { | ||
"description": "Key of limiter", | ||
"type": "string" | ||
}, | ||
"conn": { | ||
"type": "integer", | ||
"description": "the maximum number of concurrent requests allowed" | ||
}, | ||
"burst": { | ||
"type": "integer", | ||
"description": "the number of excessive concurrent requests (or connections) allowed to be delayed" | ||
}, | ||
"delay": { | ||
"type": "number", | ||
"description": "the default processing latency of a typical connection (or request)" | ||
} | ||
} | ||
}, { | ||
"type": "object", | ||
"properties": { | ||
"name": { | ||
"type": "string", | ||
"enum": ["leaky_bucket"], | ||
"description": "limiting request rate" | ||
}, | ||
"key": { | ||
"description": "Key of limiter", | ||
"type": "string" | ||
}, | ||
"rate": { | ||
"type": "integer", | ||
"description": "the specified request rate (number per second) threshold" | ||
}, | ||
"burst": { | ||
"type": "integer", | ||
"description": "the number of excessive requests per second allowed to be delayed" | ||
} | ||
} | ||
}, { | ||
"type": "object", | ||
"properties": { | ||
"name": { | ||
"type": "string", | ||
"enum": ["fixed_window"], | ||
"description": "limiting request counts" | ||
}, | ||
"key": { | ||
"description": "Key of limiter", | ||
"type": "string" | ||
}, | ||
"count": { | ||
"type": "integer", | ||
"description": "the specified number of requests threshold" | ||
}, | ||
"window": { | ||
"type": "integer", | ||
"description": "the time window in seconds before the request count is reset" | ||
} | ||
} | ||
}] | ||
} | ||
}, | ||
"redis_url": { | ||
"description": "URL of Redis", | ||
"type": "string" | ||
}, | ||
"status_code_rejected": { | ||
"type": "integer", | ||
"description": "the status code when requests over the limit, default 429" | ||
}, | ||
"logging_only": { | ||
"type": "boolean", | ||
"description": "if true, the request goes through when there is some issue with rate limiting, default false" | ||
} | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
return require('rate_limit') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,205 @@ | ||
local policy = require('apicast.policy') | ||
local _M = policy.new('Rate Limit Policy') | ||
|
||
local resty_limit_conn = require('resty.limit.conn') | ||
local resty_limit_req = require('resty.limit.req') | ||
local resty_limit_count = require('resty.limit.count') | ||
|
||
local ngx_semaphore = require "ngx.semaphore" | ||
local limit_traffic = require "resty.limit.traffic" | ||
local ts = require ('apicast.threescale_utils') | ||
local tonumber = tonumber | ||
local next = next | ||
local shdict_key = 'limiter' | ||
|
||
local new = _M.new | ||
|
||
local traffic_limiters = { | ||
connections = function(config) | ||
return resty_limit_conn.new(shdict_key, config.conn, config.burst, config.delay) | ||
end, | ||
leaky_bucket = function(config) | ||
return resty_limit_req.new(shdict_key, config.rate, config.burst) | ||
end, | ||
fixed_window = function(config) | ||
return resty_limit_count.new(shdict_key, config.count, config.window) | ||
end | ||
} | ||
|
||
local function try(f, catch_f) | ||
local status, exception = pcall(f) | ||
if not status then | ||
catch_f(exception) | ||
end | ||
end | ||
|
||
local function init_limiter(config) | ||
local lim, limerr | ||
try( | ||
function() | ||
lim, limerr = traffic_limiters[config.name](config) | ||
end, | ||
function(e) | ||
return nil, e | ||
end | ||
) | ||
|
||
return lim, limerr | ||
end | ||
|
||
local function redis_shdict(url) | ||
local options = { url = url } | ||
local redis, err = ts.connect_redis(options) | ||
if not redis then | ||
return nil, err | ||
end | ||
|
||
return { | ||
incr = function(_, key, value, init) | ||
if not init then | ||
return redis:incrby(key, value), nil | ||
end | ||
redis:setnx(key, init) | ||
return redis:incrby(key, value), nil | ||
end, | ||
set = function(_, key, value) | ||
return redis:set(key, value) | ||
end, | ||
expire = function(_, key, exptime) | ||
local ret = redis:expire(key, exptime) | ||
if ret == 0 then | ||
return nil, "not found" | ||
end | ||
return true, nil | ||
end, | ||
get = function(_, key) | ||
local val = redis:get(key) | ||
if type(val) == "userdata" then | ||
return nil | ||
end | ||
return val | ||
end | ||
} | ||
end | ||
|
||
local function error(logging_only, status_code) | ||
if not logging_only then | ||
return ngx.exit(status_code) | ||
end | ||
end | ||
|
||
function _M.new(config) | ||
local self = new() | ||
self.config = config or {} | ||
self.limiters = config.limiters | ||
self.redis_url = config.redis_url | ||
self.status_code_rejected = config.status_code_rejected or 429 | ||
self.logging_only = config.logging_only or false | ||
|
||
return self | ||
end | ||
|
||
function _M:access() | ||
local limiters = {} | ||
local keys = {} | ||
|
||
for _, limiter in ipairs(self.limiters) do | ||
local lim, initerr = init_limiter(limiter) | ||
if not lim then | ||
ngx.log(ngx.ERR, "unknown limiter: ", limiter.name, ", err: ", initerr) | ||
error(self.logging_only, 500) | ||
return | ||
end | ||
|
||
if self.redis_url then | ||
local rediserr | ||
lim.dict, rediserr = redis_shdict(self.redis_url) | ||
if not lim.dict then | ||
ngx.log(ngx.ERR, "failed to connect Redis: ", rediserr) | ||
error(self.logging_only, 500) | ||
return | ||
end | ||
end | ||
|
||
limiters[#limiters + 1] = lim | ||
keys[#keys + 1] = limiter.key | ||
|
||
end | ||
|
||
|
||
local states = {} | ||
local connections_committed = {} | ||
local keys_committed = {} | ||
|
||
local delay, comerr = limit_traffic.combine(limiters, keys, states) | ||
if not delay then | ||
if comerr == "rejected" then | ||
ngx.log(ngx.WARN, "Requests over the limit.") | ||
error(self.logging_only, self.status_code_rejected) | ||
return | ||
end | ||
ngx.log(ngx.ERR, "failed to limit traffic: ", comerr) | ||
error(self.logging_only, 500) | ||
return | ||
end | ||
|
||
for i, lim in ipairs(limiters) do | ||
if lim.is_committed and lim:is_committed() then | ||
connections_committed[#connections_committed + 1] = lim | ||
keys_committed[#keys_committed + 1] = keys[i] | ||
end | ||
end | ||
|
||
if next(connections_committed) ~= nil then | ||
local ctx = ngx.ctx | ||
ctx.limiters = connections_committed | ||
ctx.keys = keys_committed | ||
end | ||
|
||
if delay >= 0.001 then | ||
ngx.log(ngx.WARN, 'need to delay by: ', delay, 's, states: ', table.concat(states, ", ")) | ||
ngx.sleep(delay) | ||
end | ||
|
||
end | ||
|
||
local function checkin(_, ctx, time, semaphore, redis_url, logging_only) | ||
local limiters = ctx.limiters | ||
local keys = ctx.keys | ||
local latency = tonumber(time) | ||
|
||
for i, lim in ipairs(limiters) do | ||
if redis_url then | ||
local rediserr | ||
lim.dict, rediserr = redis_shdict(redis_url) | ||
if not lim.dict then | ||
ngx.log(ngx.ERR, "failed to connect Redis: ", rediserr) | ||
error(logging_only, 500) | ||
return | ||
end | ||
end | ||
local conn, err = lim:leaving(keys[i], latency) | ||
if not conn then | ||
ngx.log(ngx.ERR, "failed to record the connection leaving request: ", err) | ||
error(logging_only, 500) | ||
return | ||
end | ||
end | ||
|
||
if semaphore then | ||
semaphore:post(1) | ||
end | ||
|
||
end | ||
|
||
function _M:log() | ||
local ctx = ngx.ctx | ||
local limiters = ctx.limiters | ||
if limiters and next(limiters) ~= nil then | ||
local semaphore = ngx_semaphore.new() | ||
ngx.timer.at(0, checkin, ngx.ctx, ngx.var.request_time, semaphore, self.redis_url, self.logging_only) | ||
semaphore:wait(10) | ||
end | ||
end | ||
|
||
return _M |
Oops, something went wrong.