Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(worker_event): send large payload #11117

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
[#10896](https://github.com/Kong/kong/pull/10896)
- Fix a bug when worker consuming dynamic log level setting event and using a wrong reference for notice logging
[#10897](https://github.com/Kong/kong/pull/10897)
- **worker-events**: Fix the bug that event payload which are over 65K can't be delivered.
[#11117](https://github.com/Kong/kong/pull/11117)

#### Admin API

Expand Down
125 changes: 125 additions & 0 deletions kong/global.lua
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ local kong_cache = require "kong.cache"
local kong_cluster_events = require "kong.cluster_events"
local private_node = require "kong.pdk.private.node"

local cjson = require "cjson"
local string_buffer = require "string.buffer"
local uuid = require("kong.tools.utils").uuid

local ngx = ngx
local type = type
local error = error
Expand Down Expand Up @@ -201,6 +205,127 @@ function _GLOBAL.init_worker_events()
return nil, err
end

local PAYLOAD_MAX_LEN, PAYLOAD_TOO_BIG_ERR = 65000, "failed to publish event: payload too big"

-- There is a limit for the payload size that events lib allows to send,
-- we overwrite `post` and `register` method to support sending large payloads
local native_post = worker_events.post
local function large_payload_post(source, event, data, unique)
local serialized = 0
if type(data) == "table" then
data = cjson.encode(data)
serialized = 1
end

local len_data = #data
local len_sent = len_data
local pt = 1
local eid = uuid()
while len_sent > 0 do
local partial
local packet = {}
if len_sent <= PAYLOAD_MAX_LEN then
partial = data:sub(pt, pt + len_sent - 1)
packet["uuid"] = eid
packet["length"] = len_data
packet["serialized"] = serialized
else
partial = data:sub(pt, pt + PAYLOAD_MAX_LEN - 1)
end

packet["uuid"] = eid
packet["data"] = partial
packet["partial"] = 1
ngx.log(ngx.ERR, "gonna send partial")
local ok, err = native_post(source, event, packet, unique)
if not ok then
ngx.log(ngx.ERR, "error to send via native_post: ", err)
-- explicitly reset the event buffer
native_post(source, event, { uuid = eid, reset = 1 }, unique)
return ok, err
end

pt = pt + PAYLOAD_MAX_LEN
len_sent = len_sent - PAYLOAD_MAX_LEN
end

return true
end

worker_events.post = function(source, event, data, unique)
local ok, err = native_post(source, event, data, unique)
if err == PAYLOAD_TOO_BIG_ERR then
return large_payload_post(source, event, data, unique)
end

return ok, err
end

local TTL = 10 * 60 -- 10 minutes
local native_register = worker_events.register
worker_events.register = function(callback, source, event, ...)
local buffer_dict = {}

local function recycle_buffer()
ngx.log(ngx.DEBUG, "recycle buffer")
if ngx.worker.exiting() then
return
end

for eid, buffer in pairs(buffer_dict) do
if ngx.now() - buffer.ts > TTL then
buffer.buffer:reset()
buffer_dict[eid] = nil
ngx.log(ngx.DEBUG, eid, " in event buffer is recycled")
end
end

ngx.timer.at(TTL, recycle_buffer)
end

ngx.timer.at(TTL, recycle_buffer)

local function cb(data, ...)
if data.partial == nil then
return callback(data, ...)
end

local buffer = buffer_dict[data.uuid]
local buf = buffer ~= nil and buffer.buffer
if data.reset then
if buffer ~= nil then
buffer.buffer:reset()
buffer_dict[data.uuid] = nil
end

ngx.log(ngx.DEBUG, "buffer reset")
return
end

if buffer == nil then
buf = string_buffer.new(PAYLOAD_MAX_LEN * 2)
buffer_dict[data.uuid] = { buffer = buf, ts = nil }
end

buf:put(data.data)
buffer.ts = ngx.now()
if data.length ~= nil then
assert(#buf == data.length, "failed to decode event payload: length mismatch")
local d = buf:get(data.length)
if data.serialized == 1 then
d = cjson.decode(d)
end

buf:reset()
buffer_dict[data.uuid] = nil
assert(d, "failed to decode event payload")
return callback(d, ...)
end
end

return native_register(cb, source, event, ...)
end

return worker_events
end

Expand Down
141 changes: 141 additions & 0 deletions spec/02-integration/07-sdk/06-worker_events_spec.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
local helpers = require "spec.helpers"

describe("worker_events", function()
local strategy = "off"
local test_cases = {"string", "table", }
local business_port

lazy_setup(function()
business_port = helpers.get_available_port()
local fixtures = {
http_mock = {
worker_events = [[
server {
server_name example.com;
listen %s;

location = /test_too_big_string {
content_by_lua_block {
local PAYLOAD_TOO_BIG_ERR = "failed to publish event: payload too big"
local SOURCE, EVENT = "foo", "string"
local worker_events = kong.worker_events
local cjson = require "cjson.safe"
local payload_received

local function generate_data()
return string.rep("X", 70000)
end

local function wait_until(validator, timeout)
local deadline = ngx.now() + (timeout or 5)
local res
repeat
worker_events.poll()
res = validator()
until res or ngx.now() >= deadline
return res
end

-- subscribe
local ok, err = worker_events.register(function(data)
payload_received = data
end, SOURCE, EVENT)

-- when payload is a string
local PAYLOAD = generate_data()
local ok, err = worker_events.post(SOURCE, EVENT, PAYLOAD)
if not ok then
ngx.status = ngx.HTTP_INTERNAL_SERVER_ERROR
ngx.say("post string failed, err: " .. cjson.encode(err))
ngx.exit(ngx.OK)
end

assert(wait_until(function()
return PAYLOAD == payload_received
end, 1))

ngx.status = ngx.HTTP_OK
ngx.say("ok")
ngx.exit(200)
}
}

location = /test_too_big_table {
content_by_lua_block {
local PAYLOAD_TOO_BIG_ERR = "failed to publish event: payload too big"
local DEFAULT_TRUNCATED_PAYLOAD = ", truncated payload: not a serialized object"
local SOURCE, EVENT = "foo", "table"
local worker_events = kong.worker_events
local cjson = require "cjson.safe"
local deepcompare = require("pl.tablex").deepcompare
local payload_received

local function generate_data()
return string.rep("X", 70000)
end

local function wait_until(validator, timeout)
local deadline = ngx.now() + (timeout or 5)
local res
repeat
worker_events.poll()
res = validator()
until res or ngx.now() >= deadline
return res
end

-- subscribe
local ok, err = worker_events.register(function(data)
payload_received = data
end, SOURCE, EVENT)

-- when payload is a table
PAYLOAD = {
foo = 'bar',
data = generate_data()
}
local ok, err = worker_events.post(SOURCE, EVENT, PAYLOAD)
if not ok then
ngx.status = ngx.HTTP_INTERNAL_SERVER_ERROR
ngx.say("post table failed, err: " .. cjson.encode(err))
ngx.exit(ngx.OK)
end

assert(wait_until(function()
return deepcompare(PAYLOAD, payload_received)
end, 1))

ngx.status = ngx.HTTP_OK
ngx.say("ok")
ngx.exit(200)
}
}
}
]],
}
}

fixtures.http_mock.worker_events = string.format(fixtures.http_mock.worker_events, business_port)

assert(helpers.start_kong({
database = strategy,
nginx_conf = "spec/fixtures/custom_nginx.template",
}, nil, nil, fixtures))
end)

lazy_teardown(function ()
assert(helpers.stop_kong())
end)

for _, payload_type in ipairs(test_cases) do
it("too big `" .. payload_type .. "` payload", function()
local res = helpers.proxy_client(nil, business_port):get("/test_too_big_" .. payload_type, {
headers = {
host = "example.com",
}
})
local body = assert.res_status(200, res)
assert.equal(body, "ok")
end)
end
end)
Loading