Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: let etcd v3 read_watch handle the case where a chunk contains partial event or multiple events #154

Merged
merged 14 commits into from
Mar 3, 2022
64 changes: 43 additions & 21 deletions lib/resty/etcd/v3.lua
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
-- https://github.com/ledgetech/lua-resty-http
local split = require("ngx.re").split
local typeof = require("typeof")
local cjson = require("cjson.safe")
local setmetatable = setmetatable
Expand Down Expand Up @@ -688,43 +689,64 @@ local function request_chunk(self, method, path, opts, timeout)


local function read_watch()
body = ""

while(1) do
body, err = res.body_reader()
if not body then
local chunk, err = res.body_reader()
if not chunk then
return nil, err
end
if not utils.is_empty_str(body) then

body = body .. chunk

if not utils.is_empty_str(chunk) and sub_str(chunk, -1) == "\n" then
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use string.byte would be better

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

break
end

end

body, err = decode_json(body)
if not body then
return nil, "failed to decode json body: " .. (err or " unkwon")
elseif body.error and body.error.http_code >= 500 then
-- health_check retry should do nothing here
-- and let connection closed to create a new one
health_check.report_failure(endpoint.http_host)
return nil, endpoint.http_host .. ": " .. body.error.http_status

local chunks, err = split(body, [[\n]], "jo")
if err then
return nil, "failed to split chunks: " .. err
end

if body.result and body.result.events then
for _, event in ipairs(body.result.events) do
if event.kv.value then -- DELETE not have value
event.kv.value = decode_base64(event.kv.value or "")
event.kv.value = self.serializer.deserialize(event.kv.value)
local all_events = {}
for _, chunk in ipairs(chunks) do
body, err = decode_json(chunk)
if not body then
return nil, "failed to decode json body: " .. (err or " unkwon")
nic-6443 marked this conversation as resolved.
Show resolved Hide resolved
elseif body.error and body.error.http_code >= 500 then
-- health_check retry should do nothing here
-- and let connection closed to create a new one
health_check.report_failure(endpoint.http_host)
return nil, endpoint.http_host .. ": " .. body.error.http_status
end

if body.result and body.result.events then
for _, event in ipairs(body.result.events) do
if event.kv.value then -- DELETE not have value
event.kv.value = decode_base64(event.kv.value or "")
event.kv.value = self.serializer.deserialize(event.kv.value)
end
event.kv.key = decode_base64(event.kv.key)
if event.prev_kv then
event.prev_kv.value = decode_base64(event.prev_kv.value or "")
event.prev_kv.value = self.serializer.deserialize(event.prev_kv.value)
event.prev_kv.key = decode_base64(event.prev_kv.key)
end
tab_insert(all_events, event)
end
event.kv.key = decode_base64(event.kv.key)
if event.prev_kv then
event.prev_kv.value = decode_base64(event.prev_kv.value or "")
event.prev_kv.value = self.serializer.deserialize(event.prev_kv.value)
event.prev_kv.key = decode_base64(event.prev_kv.key)
else
if #chunks == 1 then
return body
end
end
end

if #all_events > 1 then
body.result.events = all_events
end
return body
end

Expand Down