Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: let etcd v3 read_watch handle the case where a chunk contains partial event or multiple events #154

Merged
merged 14 commits into from
Mar 3, 2022
6 changes: 0 additions & 6 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,6 @@ jobs:
include:
- version: 2.2.5
conf: Procfile-single
- version: 3.1.0
conf: Procfile-single
- version: 3.2.0
conf: Procfile-single
- version: 3.3.0
conf: Procfile-single-enable-v2
- version: 3.4.0
conf: Procfile-single-enable-v2
- version: 3.5.0
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Table of Contents
* [Install](#install)
* [API v2](api_v2.md)
* [API v3](api_v3.md)
* **NOTE**: Requires ETCD version >= v3.4.0

## Install

Expand Down
73 changes: 53 additions & 20 deletions lib/resty/etcd/v3.lua
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
-- https://github.com/ledgetech/lua-resty-http
local split = require("ngx.re").split
local typeof = require("typeof")
local cjson = require("cjson.safe")
local setmetatable = setmetatable
Expand All @@ -16,6 +17,7 @@ local pairs = pairs
local re_match = ngx.re.match
local type = type
local tab_insert = table.insert
local tab_concat = table.concat
local str_lower = string.lower
local tab_clone = require("table.clone")
local decode_json = cjson.decode
Expand Down Expand Up @@ -688,43 +690,74 @@ local function request_chunk(self, method, path, opts, timeout)


local function read_watch()
body = nil

while(1) do
body, err = res.body_reader()
local chunk, error = res.body_reader()
if error then
return nil, error
end
if not chunk then
break
end

if not body then
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Look like this branch is not executed, as body has default value ""?

Copy link
Contributor Author

@nic-6443 nic-6443 Mar 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, my fault, body should initial with nil.

return nil, err
body = chunk
else
body = tab_concat({body, chunk})
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can call tab_contat out of the while.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not good, loop may execute more than twice.

Copy link
Contributor

@membphis membphis Mar 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this way is worse than body = body .. chunk

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not good, loop may execute more than twice.

I mean just tab_concat once to concat all chunk

Copy link
Contributor Author

@nic-6443 nic-6443 Mar 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, I misunderstand advice from spacewander.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about it carefully, and since this concat branch will only be executed in rare cases(while one chunk larger than nginx proxy_buffer_size), so I think it would be more appropriate to use a string directly, which avoid creating a table with only one element in most cases.
@membphis @spacewander @starsz Do you agree with this conclusion?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think just use .. is enough.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use .. but add a comment to descript why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

end
if not utils.is_empty_str(body) then

if not utils.is_empty_str(chunk) and str_byte(chunk, -1) == str_byte("\n") then
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if the response chunk doesn't contain the \n and we'll continue reading the next chunk? But if the connection is aborted / timed out this time, so the last chunk was missed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this case, the caller can rewatch based on the last revision it got, so it can start consuming from the correct revision again without miss any event.

break
end

end

body, err = decode_json(body)
if not body then
return nil, "failed to decode json body: " .. (err or " unkwon")
elseif body.error and body.error.http_code >= 500 then
-- health_check retry should do nothing here
-- and let connection closed to create a new one
health_check.report_failure(endpoint.http_host)
return nil, endpoint.http_host .. ": " .. body.error.http_status
return nil, nil
end

if body.result and body.result.events then
for _, event in ipairs(body.result.events) do
if event.kv.value then -- DELETE not have value
event.kv.value = decode_base64(event.kv.value or "")
event.kv.value = self.serializer.deserialize(event.kv.value)
local chunks, error = split(body, [[\n]], "jo")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use err would be better? error is a function in Lua.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, but since err is already defined outside of read_watch, if I use err, our lint will complain about duplicate variable names, so I replace error with split_err.

if error then
return nil, "failed to split chunks: " .. error
end

local all_events = {}
for _, chunk in ipairs(chunks) do
body, err = decode_json(chunk)
if not body then
return nil, "failed to decode json body: " .. (err or " unkwon")
nic-6443 marked this conversation as resolved.
Show resolved Hide resolved
elseif body.error and body.error.http_code >= 500 then
-- health_check retry should do nothing here
-- and let connection closed to create a new one
health_check.report_failure(endpoint.http_host)
return nil, endpoint.http_host .. ": " .. body.error.http_status
end

if body.result and body.result.events then
for _, event in ipairs(body.result.events) do
if event.kv.value then -- DELETE not have value
event.kv.value = decode_base64(event.kv.value or "")
event.kv.value = self.serializer.deserialize(event.kv.value)
end
event.kv.key = decode_base64(event.kv.key)
if event.prev_kv then
event.prev_kv.value = decode_base64(event.prev_kv.value or "")
event.prev_kv.value = self.serializer.deserialize(event.prev_kv.value)
event.prev_kv.key = decode_base64(event.prev_kv.key)
end
tab_insert(all_events, event)
end
event.kv.key = decode_base64(event.kv.key)
if event.prev_kv then
event.prev_kv.value = decode_base64(event.prev_kv.value or "")
event.prev_kv.value = self.serializer.deserialize(event.prev_kv.value)
event.prev_kv.key = decode_base64(event.prev_kv.key)
else
if #chunks == 1 then
return body
end
end
end

if #all_events > 1 then
body.result.events = all_events
end
return body
end

Expand Down
148 changes: 148 additions & 0 deletions t/v3/key.t
Original file line number Diff line number Diff line change
Expand Up @@ -512,3 +512,151 @@ passed
request chunk headers: {"foo":"bar"}
--- no_error_log
[error]



=== TEST 13: watch response which http chunk contains partial etcd event response
--- http_config eval: $::HttpConfig
--- config
location /version {
content_by_lua_block {
ngx.say('{"etcdserver":"3.4.0","etcdcluster":"3.4.0"}')
}
}

location /v3/watch {
content_by_lua_block {
-- payload get from tcpdump while running TEST 3 and split the event response into two chunks

ngx.say('{"result":{"header":{"cluster_id":"14841639068965178418","member_id":"10276657743932975437","revision":"271","raft_term":"7"},"created":true}}')
ngx.flush()
ngx.sleep(0.1)

-- partial event without trailing new line
ngx.print('{"result":{"header":{"cluster_id":"14841639068965178418","member_id":"10276657743932975437",')
ngx.flush()
ngx.print('"revision":"272","raft_term":"7"},"events"')
ngx.flush()

-- key = /test, value = bcd3
ngx.say(':[{"kv":{"key":"L3Rlc3Q=","create_revision":"156","mod_revision":"272","version":"44","value":"ImJjZDMi"}}]}}')
ngx.flush()

-- ensure client timeout
ngx.sleep(1)
}
}

location /t {
content_by_lua_block {
local etcd, err = require("resty.etcd").new({
protocol = "v3",
http_host = {
"http://127.0.0.1:" .. ngx.var.server_port,
},
})
check_res(etcd, err)

local cur_time = ngx.now()
local body_chunk_fun, err = etcd:watch("/test", {timeout = 0.5})
if not body_chunk_fun then
ngx.say("failed to watch: ", err)
end

local idx = 0
while true do
local chunk, err = body_chunk_fun()

if not chunk then
if err then
ngx.say(err)
end
break
end

idx = idx + 1
ngx.say(idx, ": ", require("cjson").encode(chunk.result))
end
}
}
--- request
GET /t
--- no_error_log
[error]
--- response_body_like eval
qr/1:.*"created":true.*
2:.*"value":"bcd3".*
timeout/
--- timeout: 5



=== TEST 14: watch response which one http chunk contains multiple events chunk
--- http_config eval: $::HttpConfig
--- config
location /version {
content_by_lua_block {
ngx.say('{"etcdserver":"3.4.0","etcdcluster":"3.4.0"}')
}
}

location /v3/watch {
content_by_lua_block {
-- payload get from tcpdump while running TEST 5 and merge two event response into one http chunk

ngx.say('{"result":{"header":{"cluster_id":"14841639068965178418","member_id":"10276657743932975437","revision":"290","raft_term":"8"},"created":true}}')
ngx.flush()
ngx.sleep(0.1)

-- one http chunk contains multiple event response, note the new line at the end of first event response
-- key1 = /wdir/, value1 = bcd4
-- key2 = /wdir/a, value2 = bcd4a
ngx.say('{"result":{"header":{"cluster_id":"14841639068965178418","member_id":"10276657743932975437","revision":"292","raft_term":"8"},"events":[{"kv":{"key":"L3dkaXIv","create_revision":"31","mod_revision":"292","version":"22","value":"ImJjZDQi"}}]}}\n{"result":{"header":{"cluster_id":"14841639068965178418","member_id":"10276657743932975437","revision":"293","raft_term":"8"},"events":[{"kv":{"key":"L3dkaXIvYQ==","create_revision":"293","mod_revision":"293","version":"1","value":"ImJjZDRhIg=="}}]}}')
ngx.flush()
tokers marked this conversation as resolved.
Show resolved Hide resolved

-- ensure client timeout
ngx.sleep(1)
}
}

location /t {
content_by_lua_block {
local etcd, err = require("resty.etcd").new({
protocol = "v3",
http_host = {
"http://127.0.0.1:" .. ngx.var.server_port,
},
})
check_res(etcd, err)

local cur_time = ngx.now()
local body_chunk_fun, err = etcd:watch("/", {timeout = 0.5})
if not body_chunk_fun then
ngx.say("failed to watch: ", err)
end

local idx = 0
while true do
local chunk, err = body_chunk_fun()

if not chunk then
if err then
ngx.say(err)
end
break
end

idx = idx + 1
ngx.say(idx, ": ", require("cjson").encode(chunk.result))
end
}
}
--- request
GET /t
--- no_error_log
[error]
--- response_body_like eval
qr/1:.*"created":true.*
2:.*"value":"bcd4".*"value":"bcd4a".*
timeout/
--- timeout: 5