diff --git a/CHANGELOG.md b/CHANGELOG.md index 39bf55aaae02..abfa8b8258ec 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -57,6 +57,8 @@ [#10896](https://github.com/Kong/kong/pull/10896) - Fix a bug when worker consuming dynamic log level setting event and using a wrong reference for notice logging [#10897](https://github.com/Kong/kong/pull/10897) +- **worker-events**: Fix the bug that event payload which are over 65K can't be delivered. + [#11117](https://github.com/Kong/kong/pull/11117) #### Admin API diff --git a/kong/global.lua b/kong/global.lua index 90fa35e711ec..bc0ea2ee3fe6 100644 --- a/kong/global.lua +++ b/kong/global.lua @@ -6,6 +6,10 @@ local kong_cache = require "kong.cache" local kong_cluster_events = require "kong.cluster_events" local private_node = require "kong.pdk.private.node" +local cjson = require "cjson" +local string_buffer = require "string.buffer" +local uuid = require("kong.tools.utils").uuid + local ngx = ngx local type = type local error = error @@ -201,6 +205,127 @@ function _GLOBAL.init_worker_events() return nil, err end + local PAYLOAD_MAX_LEN, PAYLOAD_TOO_BIG_ERR = 65000, "failed to publish event: payload too big" + + -- There is a limit for the payload size that events lib allows to send, + -- we overwrite `post` and `register` method to support sending large payloads + local native_post = worker_events.post + local function large_payload_post(source, event, data, unique) + local serialized = 0 + if type(data) == "table" then + data = cjson.encode(data) + serialized = 1 + end + + local len_data = #data + local len_sent = len_data + local pt = 1 + local eid = uuid() + while len_sent > 0 do + local partial + local packet = {} + if len_sent <= PAYLOAD_MAX_LEN then + partial = data:sub(pt, pt + len_sent - 1) + packet["uuid"] = eid + packet["length"] = len_data + packet["serialized"] = serialized + else + partial = data:sub(pt, pt + PAYLOAD_MAX_LEN - 1) + end + + packet["uuid"] = eid + packet["data"] = partial + packet["partial"] = 1 + ngx.log(ngx.ERR, "gonna send partial") + local ok, err = native_post(source, event, packet, unique) + if not ok then + ngx.log(ngx.ERR, "error to send via native_post: ", err) + -- explicitly reset the event buffer + native_post(source, event, { uuid = eid, reset = 1 }, unique) + return ok, err + end + + pt = pt + PAYLOAD_MAX_LEN + len_sent = len_sent - PAYLOAD_MAX_LEN + end + + return true + end + + worker_events.post = function(source, event, data, unique) + local ok, err = native_post(source, event, data, unique) + if err == PAYLOAD_TOO_BIG_ERR then + return large_payload_post(source, event, data, unique) + end + + return ok, err + end + + local TTL = 10 * 60 -- 10 minutes + local native_register = worker_events.register + worker_events.register = function(callback, source, event, ...) + local buffer_dict = {} + + local function recycle_buffer() + ngx.log(ngx.DEBUG, "recycle buffer") + if ngx.worker.exiting() then + return + end + + for eid, buffer in pairs(buffer_dict) do + if ngx.now() - buffer.ts > TTL then + buffer.buffer:reset() + buffer_dict[eid] = nil + ngx.log(ngx.DEBUG, eid, " in event buffer is recycled") + end + end + + ngx.timer.at(TTL, recycle_buffer) + end + + ngx.timer.at(TTL, recycle_buffer) + + local function cb(data, ...) + if data.partial == nil then + return callback(data, ...) + end + + local buffer = buffer_dict[data.uuid] + local buf = buffer ~= nil and buffer.buffer + if data.reset then + if buffer ~= nil then + buffer.buffer:reset() + buffer_dict[data.uuid] = nil + end + + ngx.log(ngx.DEBUG, "buffer reset") + return + end + + if buffer == nil then + buf = string_buffer.new(PAYLOAD_MAX_LEN * 2) + buffer_dict[data.uuid] = { buffer = buf, ts = nil } + end + + buf:put(data.data) + buffer.ts = ngx.now() + if data.length ~= nil then + assert(#buf == data.length, "failed to decode event payload: length mismatch") + local d = buf:get(data.length) + if data.serialized == 1 then + d = cjson.decode(d) + end + + buf:reset() + buffer_dict[data.uuid] = nil + assert(d, "failed to decode event payload") + return callback(d, ...) + end + end + + return native_register(cb, source, event, ...) + end + return worker_events end diff --git a/spec/02-integration/07-sdk/06-worker_events_spec.lua b/spec/02-integration/07-sdk/06-worker_events_spec.lua new file mode 100644 index 000000000000..e9e0df7a2473 --- /dev/null +++ b/spec/02-integration/07-sdk/06-worker_events_spec.lua @@ -0,0 +1,141 @@ +local helpers = require "spec.helpers" + +describe("worker_events", function() + local strategy = "off" + local test_cases = {"string", "table", } + local business_port + + lazy_setup(function() + business_port = helpers.get_available_port() + local fixtures = { + http_mock = { + worker_events = [[ + server { + server_name example.com; + listen %s; + + location = /test_too_big_string { + content_by_lua_block { + local PAYLOAD_TOO_BIG_ERR = "failed to publish event: payload too big" + local SOURCE, EVENT = "foo", "string" + local worker_events = kong.worker_events + local cjson = require "cjson.safe" + local payload_received + + local function generate_data() + return string.rep("X", 70000) + end + + local function wait_until(validator, timeout) + local deadline = ngx.now() + (timeout or 5) + local res + repeat + worker_events.poll() + res = validator() + until res or ngx.now() >= deadline + return res + end + + -- subscribe + local ok, err = worker_events.register(function(data) + payload_received = data + end, SOURCE, EVENT) + + -- when payload is a string + local PAYLOAD = generate_data() + local ok, err = worker_events.post(SOURCE, EVENT, PAYLOAD) + if not ok then + ngx.status = ngx.HTTP_INTERNAL_SERVER_ERROR + ngx.say("post string failed, err: " .. cjson.encode(err)) + ngx.exit(ngx.OK) + end + + assert(wait_until(function() + return PAYLOAD == payload_received + end, 1)) + + ngx.status = ngx.HTTP_OK + ngx.say("ok") + ngx.exit(200) + } + } + + location = /test_too_big_table { + content_by_lua_block { + local PAYLOAD_TOO_BIG_ERR = "failed to publish event: payload too big" + local DEFAULT_TRUNCATED_PAYLOAD = ", truncated payload: not a serialized object" + local SOURCE, EVENT = "foo", "table" + local worker_events = kong.worker_events + local cjson = require "cjson.safe" + local deepcompare = require("pl.tablex").deepcompare + local payload_received + + local function generate_data() + return string.rep("X", 70000) + end + + local function wait_until(validator, timeout) + local deadline = ngx.now() + (timeout or 5) + local res + repeat + worker_events.poll() + res = validator() + until res or ngx.now() >= deadline + return res + end + + -- subscribe + local ok, err = worker_events.register(function(data) + payload_received = data + end, SOURCE, EVENT) + + -- when payload is a table + PAYLOAD = { + foo = 'bar', + data = generate_data() + } + local ok, err = worker_events.post(SOURCE, EVENT, PAYLOAD) + if not ok then + ngx.status = ngx.HTTP_INTERNAL_SERVER_ERROR + ngx.say("post table failed, err: " .. cjson.encode(err)) + ngx.exit(ngx.OK) + end + + assert(wait_until(function() + return deepcompare(PAYLOAD, payload_received) + end, 1)) + + ngx.status = ngx.HTTP_OK + ngx.say("ok") + ngx.exit(200) + } + } + } + ]], + } + } + + fixtures.http_mock.worker_events = string.format(fixtures.http_mock.worker_events, business_port) + + assert(helpers.start_kong({ + database = strategy, + nginx_conf = "spec/fixtures/custom_nginx.template", + }, nil, nil, fixtures)) + end) + + lazy_teardown(function () + assert(helpers.stop_kong()) + end) + + for _, payload_type in ipairs(test_cases) do + it("too big `" .. payload_type .. "` payload", function() + local res = helpers.proxy_client(nil, business_port):get("/test_too_big_" .. payload_type, { + headers = { + host = "example.com", + } + }) + local body = assert.res_status(200, res) + assert.equal(body, "ok") + end) + end +end)