diff --git a/CHANGELOG.md b/CHANGELOG.md index ea46e279c..c0d2f33fb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). - OpenTracing support [PR #669](https://github.com/3scale/apicast/pull/669) - Generate new policy scaffold from the CLI [PR #682](https://github.com/3scale/apicast/pull/682) -- 3scale batcher policy [PR #685](https://github.com/3scale/apicast/pull/685), [PR #710](https://github.com/3scale/apicast/pull/710), [PR #757](https://github.com/3scale/apicast/pull/757) +- 3scale batcher policy [PR #685](https://github.com/3scale/apicast/pull/685), [PR #710](https://github.com/3scale/apicast/pull/710), [PR #757](https://github.com/3scale/apicast/pull/757), [PR #786](https://github.com/3scale/apicast/pull/786) - Liquid templating support in the headers policy configuration [PR #716](https://github.com/3scale/apicast/pull/716) - Ability to modify query parameters in the URL rewriting policy [PR #724](https://github.com/3scale/apicast/pull/724) - 3scale referrer policy [PR #728](https://github.com/3scale/apicast/pull/728), [PR #777](https://github.com/3scale/apicast/pull/777) diff --git a/gateway/src/apicast/policy/3scale_batcher/3scale_batcher.lua b/gateway/src/apicast/policy/3scale_batcher/3scale_batcher.lua index 348d47915..35ceeccbd 100644 --- a/gateway/src/apicast/policy/3scale_batcher/3scale_batcher.lua +++ b/gateway/src/apicast/policy/3scale_batcher/3scale_batcher.lua @@ -8,16 +8,27 @@ local reporter = require('reporter') local Transaction = require('transaction') local http_ng_resty = require('resty.http_ng.backend.resty') local semaphore = require('ngx.semaphore') +local TimerTask = require('resty.concurrent.timer_task') local ipairs = ipairs local default_auths_ttl = 10 local default_batch_reports_seconds = 10 -local _M = policy.new('Caching policy') +local _M, mt = policy.new('Caching policy') local new = _M.new +mt.__gc = function(self) + -- Instances of this policy are garbage-collected when the config is + -- reloaded. We need to ensure that the TimerTask instance schedules another + -- run before that so we do not leave any pending reports. + + if self.timer_task then + self.timer_task:cancel(true) + end +end + function _M.new(config) local self = new(config) @@ -30,9 +41,7 @@ function _M.new(config) self.batch_reports_seconds = config.batch_report_seconds or default_batch_reports_seconds - self.report_timer_on = false - - -- Semaphore used to ensure that only one timer is started per worker. + -- Semaphore used to ensure that only one TimerTask is started per worker. local semaphore_report_timer, err = semaphore.new(1) if not semaphore_report_timer then ngx.log(ngx.ERR, "Create semaphore failed: ", err) @@ -70,7 +79,7 @@ local function set_flags_to_avoid_auths_in_apicast(context) context.skip_apicast_post_action = true end -local function report(_, service_id, backend, reports_batcher) +local function report(service_id, backend, reports_batcher) local reports = reports_batcher:get_all(service_id) if reports then @@ -81,22 +90,33 @@ local function report(_, service_id, backend, reports_batcher) reporter.report(reports, service_id, backend, reports_batcher) end --- This starts a timer on each worker. --- Starting a timer on each worker means that there will be more calls to +local function timer_task(self, service_id, backend) + local task = report + + local task_options = { + args = { service_id, backend, self.reports_batcher }, + interval = self.batch_reports_seconds + } + + return TimerTask.new(task, task_options) +end + +-- This starts a TimerTask on each worker. +-- Starting a TimerTask on each worker means that there will be more calls to -- 3scale backend, and the config param 'batch_report_seconds' becomes -- more confusing because the reporting frequency will be affected by the -- number of APIcast workers. --- If we started a timer just on one of the workers, it could die, and then, +-- If we started a TimerTask just on one of the workers, it could die, and then, -- there would not be any reporting. -local function ensure_report_timer_on(self, service_id, backend) - local check_timer = self.semaphore_report_timer:wait(0) +local function ensure_timer_task_created(self, service_id, backend) + local check_timer_task = self.semaphore_report_timer:wait(0) + + if check_timer_task then + if not self.timer_task then + self.timer_task = timer_task(self, service_id, backend) - if check_timer then - if not self.report_timer_on then - ngx.timer.every(self.batch_reports_seconds, report, - service_id, backend, self.reports_batcher) + self.timer_task:execute() - self.report_timer_on = true ngx.log(ngx.DEBUG, 'scheduled 3scale batcher report timer every ', self.batch_reports_seconds, ' seconds') end @@ -166,7 +186,7 @@ function _M:access(context) local credentials = context.credentials local transaction = Transaction.new(service_id, credentials, usage) - ensure_report_timer_on(self, service_id, backend) + ensure_timer_task_created(self, service_id, backend) local cached_auth = self.auths_cache:get(transaction) diff --git a/gateway/src/apicast/policy/3scale_batcher/reporter.lua b/gateway/src/apicast/policy/3scale_batcher/reporter.lua index 272c24d37..6a5d0d416 100644 --- a/gateway/src/apicast/policy/3scale_batcher/reporter.lua +++ b/gateway/src/apicast/policy/3scale_batcher/reporter.lua @@ -1,5 +1,6 @@ local ReportsBatch = require('apicast.policy.3scale_batcher.reports_batch') local Usage = require('apicast.usage') +local Transaction = require('apicast.policy.3scale_batcher.transaction') local pairs = pairs @@ -14,11 +15,13 @@ local function return_reports(service_id, batch, reports_batcher) usage:add(metric, value) end - reports_batcher:add( + local transaction = Transaction.new( service_id, { [credentials_type] = credential }, usage ) + + reports_batcher:add(transaction) end end diff --git a/gateway/src/resty/concurrent/timer_task.lua b/gateway/src/resty/concurrent/timer_task.lua index f1c126e7f..fd91a943b 100644 --- a/gateway/src/resty/concurrent/timer_task.lua +++ b/gateway/src/resty/concurrent/timer_task.lua @@ -9,6 +9,11 @@ local default_interval_seconds = 60 _M.active_tasks = {} +-- Whether a run should be the last one for a given ID +-- When a task is marked to run for a last time, it will do so even if it is +-- cancelled. +_M.last_one = {} + function _M.register_task(id) _M.active_tasks[id] = true end @@ -58,13 +63,19 @@ end local run_periodic, schedule_next, timer_execute run_periodic = function(run_now, id, func, args, interval) - if not _M.task_is_active(id) then return end + if not _M.task_is_active(id) and not _M.last_one[id] then + return + end if run_now then func(unpack(args)) end - schedule_next(interval, id, func, args, interval) + if not _M.last_one[id] then + schedule_next(id, func, args, interval) + else + _M.last_one[id] = nil + end end -- Note: ngx.timer.at always sends "premature" as the first param. @@ -89,7 +100,17 @@ function _M:execute(run_now) run_periodic(run_now or false, self.id, self.task, self.args, self.interval) end -function _M:cancel() +--- Cancel a task +-- @tparam[opt] run_one_more boolean True to ensure that the task will run one +-- more time before it is cancelled. False to just cancel the task. (Defaults +-- to false) +function _M:cancel(run_one_more) + if run_one_more then + _M.last_one[self.id] = true + end + + -- We can cancel the task in all cases because the flag to run for the last + -- time has precedence. _M.unregister_task(self.id) end diff --git a/spec/policy/3scale_batcher/3scale_batcher_spec.lua b/spec/policy/3scale_batcher/3scale_batcher_spec.lua index 31024cc74..95e258466 100644 --- a/spec/policy/3scale_batcher/3scale_batcher_spec.lua +++ b/spec/policy/3scale_batcher/3scale_batcher_spec.lua @@ -4,8 +4,13 @@ local Transaction = require('apicast.policy.3scale_batcher.transaction') local Usage = require('apicast.usage') local configuration = require('apicast.configuration') local lrucache = require('resty.lrucache') +local TimerTask = require('resty.concurrent.timer_task') describe('3scale batcher policy', function() + before_each(function() + TimerTask.active_tasks = {} + end) + describe('.new', function() it('allows to configure the batching period', function() local test_batching_period = 3 @@ -42,6 +47,10 @@ describe('3scale batcher policy', function() batcher_policy.auths_cache = AuthsCache.new(lrucache.new(10), 10) stub(batcher_policy.reports_batcher, 'add') + -- if a report job executes, by default, stub the batcher so it returns + -- no pending reports. + stub(batcher_policy.reports_batcher, 'get_all').returns({}) + stub(batcher_policy, 'backend_downtime_cache') context = { diff --git a/spec/policy/3scale_batcher/reporter_spec.lua b/spec/policy/3scale_batcher/reporter_spec.lua index 85e512a24..153c956d8 100644 --- a/spec/policy/3scale_batcher/reporter_spec.lua +++ b/spec/policy/3scale_batcher/reporter_spec.lua @@ -1,9 +1,33 @@ local reporter = require('apicast.policy.3scale_batcher.reporter') -local keys_helper = require('apicast.policy.3scale_batcher.keys_helper') -local ipairs = ipairs +local ReportsBatcher = require('apicast.policy.3scale_batcher.reports_batcher') +local lrucache = require('resty.lrucache') +local resty_lock = require 'resty.lock' local pairs = pairs local insert = table.insert +-- ReportsBatcher uses a shdict. For the test we can use a lrucache instead +-- but we need to define 2 missing methods (safe_add and get_keys) +local function build_fake_shdict() + local fake_shdict = lrucache.new(100) + + fake_shdict.safe_add = function(self, k, v) + local current = self:get(k) or 0 + self:set(k, current + v) + end + + fake_shdict.get_keys = function(self) + local res = {} + + for k, _ in pairs(self.hasht) do + insert(res, k) + end + + return res + end + + return fake_shdict +end + describe('reporter', function() local test_service_id = 's1' @@ -13,37 +37,14 @@ describe('reporter', function() before_each(function() test_backend_client = { report = function() return { ok = false } end } spy_report_backend_client = spy.on(test_backend_client, 'report') + + -- Mock the lock so it can always be acquired and returned without waiting. + stub(resty_lock, 'new').returns( + { lock = function() return 0 end, unlock = function() return 1 end } + ) end) - -- Testing using the real ReportsBatcher is a bit difficult because it uses - -- shared dicts and locks. To simplify we define this table with the same - -- interface. - local reports_batcher = { - reports = {}, - - add = function(self, service_id, credentials, usage) - local deltas = usage.deltas - for _, metric in ipairs(usage.metrics) do - local key = keys_helper.key_for_batched_report(service_id, credentials, metric) - self.reports[key] = (self.reports[key] or 0) + deltas[metric] - end - end, - - get_all = function(self, service_id) - local cached_reports = {} - - for key, value in pairs(self.reports) do - local report = keys_helper.report_from_key_batched_report(key, value) - - if value and value > 0 and report.service_id == service_id then - insert(cached_reports, report) - self.reports[key] = nil - end - end - - return cached_reports - end - } + local reports_batcher = ReportsBatcher.new(build_fake_shdict()) it('returns reports to the batcher when sending reports to backend fails', function() local test_reports = { diff --git a/spec/resty/concurrent/timer_task_spec.lua b/spec/resty/concurrent/timer_task_spec.lua index defdb750c..65f140ac3 100644 --- a/spec/resty/concurrent/timer_task_spec.lua +++ b/spec/resty/concurrent/timer_task_spec.lua @@ -6,6 +6,7 @@ describe('TimerTask', function() before_each(function() TimerTask.active_tasks = {} + TimerTask.last_one = {} end) after_each(function() @@ -85,6 +86,14 @@ describe('TimerTask', function() assert.is_false(TimerTask.task_is_active(task.id)) end) + + it('marks the task to run for the last time when specified in the params', function() + local task = TimerTask.new(test_task) + + task:cancel(true) + + assert.is_true(TimerTask.last_one[task.id]) + end) end) describe(':execute', function() @@ -114,6 +123,12 @@ describe('TimerTask', function() timer_task:execute(true) assert.stub(ngx_timer_stub).was_called() + + -- Can't check all the arguments of ngx.timer.at because it calls an + -- private function but at least we can check the interval (first arg), + -- and that the second argument is a function. + assert.equals(interval, ngx_timer_stub.calls[1].vals[1]) + assert.is_function(ngx_timer_stub.calls[1].vals[2]) end) end) @@ -157,6 +172,27 @@ describe('TimerTask', function() assert.stub(ngx_timer_stub).was_called() end) end) + + describe('when the task should run for the last time', function() + it('runs the task', function() + local timer_task = TimerTask.new(func, { args = args, interval = interval }) + local func_spy = spy.on(timer_task, 'task') + timer_task:cancel(true) + + timer_task:execute(true) + + assert.spy(func_spy).was_called_with(unpack(args)) + end) + + it('does not schedule another task', function() + local timer_task = TimerTask.new(func, { args = args, interval = interval }) + timer_task:cancel(true) + + timer_task:execute(true) + + assert.stub(ngx_timer_stub).was_not_called() + end) + end) end) it('cancels itself when it is garbage collected', function() @@ -168,4 +204,14 @@ describe('TimerTask', function() assert.is_false(TimerTask.task_is_active(id)) end) + + it('does not ensure a last run when garbage collected', function() + local timer_task = TimerTask.new(test_task) + local id = timer_task.id + + timer_task = nil + collectgarbage() + + assert.is_falsy(TimerTask.last_one[id]) + end) end) diff --git a/t/apicast-policy-3scale-batcher.t b/t/apicast-policy-3scale-batcher.t index db8b62234..32eb37708 100644 --- a/t/apicast-policy-3scale-batcher.t +++ b/t/apicast-policy-3scale-batcher.t @@ -132,15 +132,15 @@ init_by_lua_block { --- no_error_log [error] -=== TEST 3: batched reports -This test checks that reports are batched correctly. In order to do that, we -make 100 requests using 5 different user keys (20 requests for each of them). -We define 2 services and make each of them receive the same number of calls. -This is to test that reports are correctly classified by service. -At the end, we make another request that is redirected to a location we defined -in this test that checks the counters of the batched reports. -To make sure that the policy does not report the batched reports during the -test, we define a high 'batch_report_seconds' in the policy config. +=== TEST 3: reports hits correctly +This test is a bit complex. We want to check that reports are sent correctly to +backend. Reports are sent periodically and also when instances of the policy +are garbage collected. In order to capture those reports, we parse them in +the backend endpoint that receives them (/transactions.xml) and aggregate them +in a shared dictionary that we'll check later. At the end of the test, we force +a report to ensure that there are no pending reports, and then, we call an +endpoint defined specifically for this test (/check_reports) that checks +that the values accumulated in that shared dictionary are correct. --- http_config include $TEST_NGINX_UPSTREAM_CONFIG; lua_shared_dict cached_auths 1m; @@ -165,7 +165,7 @@ init_by_lua_block { policy_chain = { { name = 'apicast.policy.3scale_batcher', - configuration = { batch_report_seconds = 60 } + configuration = { batch_report_seconds = 1 } }, { name = 'apicast.policy.apicast' } } @@ -186,7 +186,7 @@ init_by_lua_block { policy_chain = { { name = 'apicast.policy.3scale_batcher', - configuration = { batch_report_seconds = 60 } + configuration = { batch_report_seconds = 1 } }, { name = 'apicast.policy.apicast' } } @@ -198,23 +198,6 @@ init_by_lua_block { --- config include $TEST_NGINX_APICAST_CONFIG; - location /check_batched_reports { - content_by_lua_block { - local keys_helper = require('apicast.policy.3scale_batcher.keys_helper') - local luassert = require('luassert') - - for service = 1,2 do - for user_key = 1,5 do - local key = keys_helper.key_for_batched_report(service, {user_key = user_key }, 'hits') - -- The mapping rule defines a delta of 2 for hits, and we made 10 - -- requests for each {service, user_key}, so all the counters should - -- be 20. - luassert.equals(20, ngx.shared.batched_reports:get(key)) - end - end - } - } - location /transactions/authorize.xml { content_by_lua_block { ngx.exit(200) @@ -225,76 +208,44 @@ init_by_lua_block { echo 'yay, api backend'; } ---- request eval -my $res = []; - -for(my $i = 0; $i < 20; $i = $i + 1 ) { - push $res, "GET /test?user_key=1"; - push $res, "GET /test?user_key=2"; - push $res, "GET /test?user_key=3"; - push $res, "GET /test?user_key=4"; - push $res, "GET /test?user_key=5"; -} - -push $res, "GET /check_batched_reports"; - -$res ---- more_headers eval -my $res = []; + location /transactions.xml { + content_by_lua_block { + ngx.req.read_body() + local post_args = ngx.req.get_post_args() -for(my $i = 0; $i < 50; $i = $i + 1 ) { - push $res, "Host: one"; -} + local post_transactions = {} + + -- Parse the reports. + -- The keys of the post arguments have this format: + -- 1) "transactions[0][user_key]" + -- 2) "transactions[0][usage][hits]" + + for k, v in pairs(post_args) do + local index = string.match(k, "transactions%[(%d+)%]%[user_key%]") + if index then + post_transactions[index] = post_transactions[index] or {} + post_transactions[index].user_key = v + else + local index, metric = string.match(k, "transactions%[(%d+)%]%[usage%]%[(%w+)%]") + post_transactions[index] = post_transactions[index] or {} + post_transactions[index].metric = metric + post_transactions[index].value = v + end + end -for(my $i = 0; $i < 50; $i = $i + 1 ) { - push $res, "Host:two"; -} + local service_id = ngx.req.get_uri_args()['service_id'] -push $res, "Host: one"; + -- Accumulate the reports in a the shared dict ngx.shared.result -$res ---- no_error_log -[error] -3scale batcher report timer got + ngx.shared.result = ngx.shared.result or {} + ngx.shared.result[service_id] = ngx.shared.result[service_id] or {} -=== TEST 4: report batched reports to backend -This test checks that reports are sent correctly to backend. To do that, it performs -some requests, then it forces a report request to backend, and finally, checks that -the POST body that backend receives is correct. ---- http_config -include $TEST_NGINX_UPSTREAM_CONFIG; -lua_shared_dict cached_auths 1m; -lua_shared_dict batched_reports 1m; -lua_shared_dict batched_reports_locks 1m; -lua_package_path "$TEST_NGINX_LUA_PATH"; -init_by_lua_block { - require('apicast.configuration_loader').mock({ - services = { - { - id = 1, - backend_version = 1, - backend_authentication_type = 'service_token', - backend_authentication_value = 'token-value', - proxy = { - backend = { endpoint = "http://127.0.0.1:$TEST_NGINX_SERVER_PORT" }, - api_backend = "http://127.0.0.1:$TEST_NGINX_SERVER_PORT/api-backend/", - proxy_rules = { - { pattern = '/', http_method = 'GET', metric_system_name = 'hits', delta = 2 } - }, - policy_chain = { - { - name = 'apicast.policy.3scale_batcher', - configuration = { batch_report_seconds = 60 } - }, - { name = 'apicast.policy.apicast' } - } - } - } + for _, t in pairs(post_transactions) do + ngx.shared.result[service_id][t.user_key] = ngx.shared.result[service_id][t.user_key] or {} + ngx.shared.result[service_id][t.user_key][t.metric] = (ngx.shared.result[service_id][t.user_key][t.metric] or 0) + t.value + end } - }) -} ---- config - include $TEST_NGINX_APICAST_CONFIG; + } location /force_report_to_backend { content_by_lua_block { @@ -303,60 +254,41 @@ init_by_lua_block { local http_ng_resty = require('resty.http_ng.backend.resty') local backend_client = require('apicast.backend_client') - local service_id = '1' - - local reports_batcher = ReportsBatcher.new( - ngx.shared.batched_reports, 'batched_reports_locks') + for service = 1,2 do + local service_id = tostring(service) - local reports = reports_batcher:get_all(service_id) + local reports_batcher = ReportsBatcher.new( + ngx.shared.batched_reports, 'batched_reports_locks') - local backend = backend_client:new( - { - id = '1', - backend_authentication_type = 'service_token', - backend_authentication_value = 'token-value', - backend = { endpoint = "http://127.0.0.1:$TEST_NGINX_SERVER_PORT" } - }, http_ng_resty) + local reports = reports_batcher:get_all(service_id) - reporter.report(reports, service_id, backend, reports_batcher) - } - } + local backend = backend_client:new( + { + id = service_id, + backend_authentication_type = 'service_token', + backend_authentication_value = 'token-value', + backend = { endpoint = "http://127.0.0.1:$TEST_NGINX_SERVER_PORT" } + }, http_ng_resty) - location /transactions/authorize.xml { - content_by_lua_block { - ngx.exit(200) + reporter.report(reports, service_id, backend, reports_batcher) + end } } - location /transactions.xml { + location /check_reports { content_by_lua_block { - ngx.req.read_body() - local post_args = ngx.req.get_post_args() - - -- Transactions can be received in any order, so we need to check both - -- possibilities. - -- We did 20 requests for each user key, and each request increases - -- hits by 2 according to the mapping rules defined. - local order1 = - (post_args["transactions[0][user_key]"] == '1' and - post_args["transactions[0][usage][hits]"] == "40") and - (post_args["transactions[1][user_key]"] == '2' and - post_args["transactions[1][usage][hits]"] == "40") - - local order2 = - (post_args["transactions[1][user_key]"] == '1' and - post_args["transactions[1][usage][hits]"] == "40") and - (post_args["transactions[0][user_key]"] == '2' and - post_args["transactions[0][usage][hits]"] == "40") - local luassert = require('luassert') - luassert.equals('1', ngx.req.get_uri_args()['service_id']) - luassert.is_true(order1 or order2) - } - } - location /api-backend { - echo 'yay, api backend'; + for service = 1,2 do + for user_key = 1,5 do + -- The mapping rule defines a delta of 2 for hits, and we made 10 + -- requests for each {service, user_key}, so all the counters should + -- be 20. + local hits = ngx.shared.result[tostring(service)][tostring(user_key)].hits + luassert.equals(20, hits) + end + end + } } --- request eval @@ -365,15 +297,34 @@ my $res = []; for(my $i = 0; $i < 20; $i = $i + 1 ) { push $res, "GET /test?user_key=1"; push $res, "GET /test?user_key=2"; + push $res, "GET /test?user_key=3"; + push $res, "GET /test?user_key=4"; + push $res, "GET /test?user_key=5"; } push $res, "GET /force_report_to_backend"; +push $res, "GET /check_reports"; + +$res +--- more_headers eval +my $res = []; + +for(my $i = 0; $i < 50; $i = $i + 1 ) { + push $res, "Host: one"; +} + +for(my $i = 0; $i < 50; $i = $i + 1 ) { + push $res, "Host:two"; +} + +push $res, "Host: one"; +push $res, "Host: one"; $res --- no_error_log [error] -=== TEST 5: with caching policy (resilient mode) +=== TEST 4: with caching policy (resilient mode) The purpose of this test is to test that the 3scale batcher policy works correctly when combined with the caching one. In this case, the caching policy is configured as "resilient". We define a