From eb217668fe8ddba0132b7c16de33c6c33724957f Mon Sep 17 00:00:00 2001 From: qianyong Date: Fri, 18 Feb 2022 16:50:22 +0800 Subject: [PATCH 01/13] fix: let etcd v3 `read_watch` handle the case where a chunk contains partial event or multiple events --- lib/resty/etcd/v3.lua | 64 +++++++++++++++++++++++++++++-------------- 1 file changed, 43 insertions(+), 21 deletions(-) diff --git a/lib/resty/etcd/v3.lua b/lib/resty/etcd/v3.lua index d9fa91b4..617323ac 100644 --- a/lib/resty/etcd/v3.lua +++ b/lib/resty/etcd/v3.lua @@ -1,4 +1,5 @@ -- https://github.com/ledgetech/lua-resty-http +local split = require("ngx.re").split local typeof = require("typeof") local cjson = require("cjson.safe") local setmetatable = setmetatable @@ -688,43 +689,64 @@ local function request_chunk(self, method, path, opts, timeout) local function read_watch() + body = "" while(1) do - body, err = res.body_reader() - if not body then + local chunk, err = res.body_reader() + if not chunk then return nil, err end - if not utils.is_empty_str(body) then + + body = body .. chunk + + if not utils.is_empty_str(chunk) and sub_str(chunk, -1) == "\n" then break end end - body, err = decode_json(body) - if not body then - return nil, "failed to decode json body: " .. (err or " unkwon") - elseif body.error and body.error.http_code >= 500 then - -- health_check retry should do nothing here - -- and let connection closed to create a new one - health_check.report_failure(endpoint.http_host) - return nil, endpoint.http_host .. ": " .. body.error.http_status + + local chunks, err = split(body, [[\n]], "jo") + if err then + return nil, "failed to split chunks: " .. err end - if body.result and body.result.events then - for _, event in ipairs(body.result.events) do - if event.kv.value then -- DELETE not have value - event.kv.value = decode_base64(event.kv.value or "") - event.kv.value = self.serializer.deserialize(event.kv.value) + local all_events = {} + for _, chunk in ipairs(chunks) do + body, err = decode_json(chunk) + if not body then + return nil, "failed to decode json body: " .. (err or " unkwon") + elseif body.error and body.error.http_code >= 500 then + -- health_check retry should do nothing here + -- and let connection closed to create a new one + health_check.report_failure(endpoint.http_host) + return nil, endpoint.http_host .. ": " .. body.error.http_status + end + + if body.result and body.result.events then + for _, event in ipairs(body.result.events) do + if event.kv.value then -- DELETE not have value + event.kv.value = decode_base64(event.kv.value or "") + event.kv.value = self.serializer.deserialize(event.kv.value) + end + event.kv.key = decode_base64(event.kv.key) + if event.prev_kv then + event.prev_kv.value = decode_base64(event.prev_kv.value or "") + event.prev_kv.value = self.serializer.deserialize(event.prev_kv.value) + event.prev_kv.key = decode_base64(event.prev_kv.key) + end + tab_insert(all_events, event) end - event.kv.key = decode_base64(event.kv.key) - if event.prev_kv then - event.prev_kv.value = decode_base64(event.prev_kv.value or "") - event.prev_kv.value = self.serializer.deserialize(event.prev_kv.value) - event.prev_kv.key = decode_base64(event.prev_kv.key) + else + if #chunks == 1 then + return body end end end + if #all_events > 1 then + body.result.events = all_events + end return body end From de344ced2cfb69a130beaf3162556a64fd7ed0c7 Mon Sep 17 00:00:00 2001 From: Yong Qian Date: Sat, 26 Feb 2022 23:56:33 +0800 Subject: [PATCH 02/13] add test case --- lib/resty/etcd/v3.lua | 25 +++++-- t/v3/key.t | 149 +++++++++++++++++++++++++++++++++++++++++- 2 files changed, 166 insertions(+), 8 deletions(-) diff --git a/lib/resty/etcd/v3.lua b/lib/resty/etcd/v3.lua index 617323ac..24a6e1a1 100644 --- a/lib/resty/etcd/v3.lua +++ b/lib/resty/etcd/v3.lua @@ -689,26 +689,37 @@ local function request_chunk(self, method, path, opts, timeout) local function read_watch() - body = "" + body = nil while(1) do - local chunk, err = res.body_reader() + local chunk, error = res.body_reader() + if error then + return nil, error + end if not chunk then - return nil, err + break end - body = body .. chunk + if not body then + body = chunk + else + body = body .. chunk + end if not utils.is_empty_str(chunk) and sub_str(chunk, -1) == "\n" then break end end +--- ONLY + if not body then + return nil, nil + end - local chunks, err = split(body, [[\n]], "jo") - if err then - return nil, "failed to split chunks: " .. err + local chunks, error = split(body, [[\n]], "jo") + if error then + return nil, "failed to split chunks: " .. error end local all_events = {} diff --git a/t/v3/key.t b/t/v3/key.t index 733d1a74..ec0267f0 100644 --- a/t/v3/key.t +++ b/t/v3/key.t @@ -284,7 +284,6 @@ timeout/ --- timeout: 5 - === TEST 6: watchdir(key=="") --- http_config eval: $::HttpConfig --- config @@ -512,3 +511,151 @@ passed request chunk headers: {"foo":"bar"} --- no_error_log [error] + + + +=== TEST 13: watch response which http chunk contains partial etcd event response +--- http_config eval: $::HttpConfig +--- config + location /version { + content_by_lua_block { + ngx.say('{"etcdserver":"3.4.0","etcdcluster":"3.4.0"}') + } + } + + location /v3/watch { + content_by_lua_block { + -- payload get from tcpdump while running TEST 3 and split the event response into two chunks + + ngx.say('{"result":{"header":{"cluster_id":"14841639068965178418","member_id":"10276657743932975437","revision":"271","raft_term":"7"},"created":true}}') + ngx.flush() + ngx.sleep(0.1) + + -- partial event without trailing new line + ngx.print('{"result":{"header":{"cluster_id":"14841639068965178418","member_id":"10276657743932975437",') + ngx.flush() + ngx.print('"revision":"272","raft_term":"7"},"events"') + ngx.flush() + + -- key = /test, value = bcd3 + ngx.say(':[{"kv":{"key":"L3Rlc3Q=","create_revision":"156","mod_revision":"272","version":"44","value":"ImJjZDMi"}}]}}') + ngx.flush() + + -- ensure client timeout + ngx.sleep(1) + } + } + + location /t { + content_by_lua_block { + local etcd, err = require("resty.etcd").new({ + protocol = "v3", + http_host = { + "http://127.0.0.1:" .. ngx.var.server_port, + }, + }) + check_res(etcd, err) + + local cur_time = ngx.now() + local body_chunk_fun, err = etcd:watch("/test", {timeout = 0.5}) + if not body_chunk_fun then + ngx.say("failed to watch: ", err) + end + + local idx = 0 + while true do + local chunk, err = body_chunk_fun() + + if not chunk then + if err then + ngx.say(err) + end + break + end + + idx = idx + 1 + ngx.say(idx, ": ", require("cjson").encode(chunk.result)) + end + } + } +--- request +GET /t +--- no_error_log +[error] +--- response_body_like eval +qr/1:.*"created":true.* +2:.*"value":"bcd3".* +timeout/ +--- timeout: 5 + + + +=== TEST 14: watch response which one http chunk contains multiple events chunk +--- http_config eval: $::HttpConfig +--- config + location /version { + content_by_lua_block { + ngx.say('{"etcdserver":"3.4.0","etcdcluster":"3.4.0"}') + } + } + + location /v3/watch { + content_by_lua_block { + -- payload get from tcpdump while running TEST 5 and merge two event response into one http chunk + + ngx.say('{"result":{"header":{"cluster_id":"14841639068965178418","member_id":"10276657743932975437","revision":"290","raft_term":"8"},"created":true}}') + ngx.flush() + ngx.sleep(0.1) + + -- one http chunk contains multiple event response, note the new line at the end of first event response + -- key1 = /wdir/, value1 = bcd4 + -- key2 = /wdir/a, value2 = bcd4a + ngx.say('{"result":{"header":{"cluster_id":"14841639068965178418","member_id":"10276657743932975437","revision":"292","raft_term":"8"},"events":[{"kv":{"key":"L3dkaXIv","create_revision":"31","mod_revision":"292","version":"22","value":"ImJjZDQi"}}]}}\n{"result":{"header":{"cluster_id":"14841639068965178418","member_id":"10276657743932975437","revision":"293","raft_term":"8"},"events":[{"kv":{"key":"L3dkaXIvYQ==","create_revision":"293","mod_revision":"293","version":"1","value":"ImJjZDRhIg=="}}]}}') + ngx.flush() + + -- ensure client timeout + ngx.sleep(1) + } + } + + location /t { + content_by_lua_block { + local etcd, err = require("resty.etcd").new({ + protocol = "v3", + http_host = { + "http://127.0.0.1:" .. ngx.var.server_port, + }, + }) + check_res(etcd, err) + + local cur_time = ngx.now() + local body_chunk_fun, err = etcd:watch("/", {timeout = 0.5}) + if not body_chunk_fun then + ngx.say("failed to watch: ", err) + end + + local idx = 0 + while true do + local chunk, err = body_chunk_fun() + + if not chunk then + if err then + ngx.say(err) + end + break + end + + idx = idx + 1 + ngx.say(idx, ": ", require("cjson").encode(chunk.result)) + end + } + } +--- request +GET /t +--- no_error_log +[error] +--- response_body_like eval +qr/1:.*"created":true.* +2:.*"value":"bcd4".*"value":"bcd4a".* +timeout/ +--- timeout: 5 From 64ca04bbc66b7e28072f211302bc878a58f08146 Mon Sep 17 00:00:00 2001 From: Yong Qian Date: Sun, 27 Feb 2022 00:10:07 +0800 Subject: [PATCH 03/13] fix code format --- lib/resty/etcd/v3.lua | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/lib/resty/etcd/v3.lua b/lib/resty/etcd/v3.lua index 24a6e1a1..5fc0c790 100644 --- a/lib/resty/etcd/v3.lua +++ b/lib/resty/etcd/v3.lua @@ -711,11 +711,10 @@ local function request_chunk(self, method, path, opts, timeout) end end ---- ONLY - if not body then - return nil, nil - end + if not body then + return nil, nil + end local chunks, error = split(body, [[\n]], "jo") if error then From 77e8ef77cf8763a7a5ced8b7c4c2785b7a206b2f Mon Sep 17 00:00:00 2001 From: Yong Qian Date: Sun, 27 Feb 2022 17:51:35 +0800 Subject: [PATCH 04/13] undo useless change --- t/v3/key.t | 1 + 1 file changed, 1 insertion(+) diff --git a/t/v3/key.t b/t/v3/key.t index ec0267f0..ec829311 100644 --- a/t/v3/key.t +++ b/t/v3/key.t @@ -284,6 +284,7 @@ timeout/ --- timeout: 5 + === TEST 6: watchdir(key=="") --- http_config eval: $::HttpConfig --- config From 7b456db2adf988f5eeed92fde26db09b6357510b Mon Sep 17 00:00:00 2001 From: Yong Qian Date: Sun, 27 Feb 2022 19:44:56 +0800 Subject: [PATCH 05/13] drop support for etcd vesion < v3.4.0 --- .github/workflows/ci.yml | 6 ------ README.md | 1 + 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index be3e91d8..7b5c4c84 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -15,12 +15,6 @@ jobs: include: - version: 2.2.5 conf: Procfile-single - - version: 3.1.0 - conf: Procfile-single - - version: 3.2.0 - conf: Procfile-single - - version: 3.3.0 - conf: Procfile-single-enable-v2 - version: 3.4.0 conf: Procfile-single-enable-v2 - version: 3.5.0 diff --git a/README.md b/README.md index 1943fabd..24614f42 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,7 @@ Table of Contents * [Install](#install) * [API v2](api_v2.md) * [API v3](api_v3.md) + * **NOTE**: Requires ETCD version >= v3.4.0 ## Install From 0faf78ce2199f055b8c25089e79f1f165214b1e1 Mon Sep 17 00:00:00 2001 From: Yong Qian Date: Sun, 27 Feb 2022 21:48:21 +0800 Subject: [PATCH 06/13] resolve comments --- lib/resty/etcd/v3.lua | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/resty/etcd/v3.lua b/lib/resty/etcd/v3.lua index 5fc0c790..479aad56 100644 --- a/lib/resty/etcd/v3.lua +++ b/lib/resty/etcd/v3.lua @@ -17,6 +17,7 @@ local pairs = pairs local re_match = ngx.re.match local type = type local tab_insert = table.insert +local tab_concat = table.concat local str_lower = string.lower local tab_clone = require("table.clone") local decode_json = cjson.decode @@ -703,10 +704,10 @@ local function request_chunk(self, method, path, opts, timeout) if not body then body = chunk else - body = body .. chunk + body = tab_concat({body, chunk}) end - if not utils.is_empty_str(chunk) and sub_str(chunk, -1) == "\n" then + if not utils.is_empty_str(chunk) and str_byte(chunk, -1) == str_byte("\n") then break end From a85dab809565d6ae0de0ae3d7a0695d8ed2734c6 Mon Sep 17 00:00:00 2001 From: Nic Date: Mon, 28 Feb 2022 12:19:09 +0800 Subject: [PATCH 07/13] Update lib/resty/etcd/v3.lua Co-authored-by: Alex Zhang --- lib/resty/etcd/v3.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/resty/etcd/v3.lua b/lib/resty/etcd/v3.lua index 479aad56..f26fcdf9 100644 --- a/lib/resty/etcd/v3.lua +++ b/lib/resty/etcd/v3.lua @@ -726,7 +726,7 @@ local function request_chunk(self, method, path, opts, timeout) for _, chunk in ipairs(chunks) do body, err = decode_json(chunk) if not body then - return nil, "failed to decode json body: " .. (err or " unkwon") + return nil, "failed to decode json body: " .. (err or " unknown") elseif body.error and body.error.http_code >= 500 then -- health_check retry should do nothing here -- and let connection closed to create a new one From d60dea168a3ea72d4d2e1af860048b7b34289b00 Mon Sep 17 00:00:00 2001 From: Yong Qian Date: Tue, 1 Mar 2022 20:41:18 +0800 Subject: [PATCH 08/13] revert useless changes --- lib/resty/etcd/v3.lua | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/resty/etcd/v3.lua b/lib/resty/etcd/v3.lua index 479aad56..3a3ff8c6 100644 --- a/lib/resty/etcd/v3.lua +++ b/lib/resty/etcd/v3.lua @@ -690,7 +690,7 @@ local function request_chunk(self, method, path, opts, timeout) local function read_watch() - body = nil + body = "" while(1) do local chunk, error = res.body_reader() @@ -704,7 +704,7 @@ local function request_chunk(self, method, path, opts, timeout) if not body then body = chunk else - body = tab_concat({body, chunk}) + body = body .. chunk end if not utils.is_empty_str(chunk) and str_byte(chunk, -1) == str_byte("\n") then From 3a58c085a6fdeae03d148a148b2a1ad99eac3342 Mon Sep 17 00:00:00 2001 From: Yong Qian Date: Wed, 2 Mar 2022 08:21:09 +0800 Subject: [PATCH 09/13] fix --- lib/resty/etcd/v3.lua | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/resty/etcd/v3.lua b/lib/resty/etcd/v3.lua index 3827ba39..2e3e3ff4 100644 --- a/lib/resty/etcd/v3.lua +++ b/lib/resty/etcd/v3.lua @@ -17,7 +17,6 @@ local pairs = pairs local re_match = ngx.re.match local type = type local tab_insert = table.insert -local tab_concat = table.concat local str_lower = string.lower local tab_clone = require("table.clone") local decode_json = cjson.decode From 46b56ddc110a1266f827c99cbe0b1e8d6370852f Mon Sep 17 00:00:00 2001 From: Yong Qian Date: Wed, 2 Mar 2022 09:31:43 +0800 Subject: [PATCH 10/13] add more comments --- lib/resty/etcd/v3.lua | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lib/resty/etcd/v3.lua b/lib/resty/etcd/v3.lua index 2e3e3ff4..64867583 100644 --- a/lib/resty/etcd/v3.lua +++ b/lib/resty/etcd/v3.lua @@ -703,6 +703,9 @@ local function request_chunk(self, method, path, opts, timeout) if not body then body = chunk else + -- this branch will only be executed in rare cases, + -- for example, a single event json is larger than the proxy_buffer_size of nginx which proxies etcd, + -- so It would be ok to use a string concat directly without worry about the performance. body = body .. chunk end From f428a265cb9c199377d77585e0c686a85b79df3f Mon Sep 17 00:00:00 2001 From: Yong Qian Date: Wed, 2 Mar 2022 09:40:24 +0800 Subject: [PATCH 11/13] reformat --- lib/resty/etcd/v3.lua | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/resty/etcd/v3.lua b/lib/resty/etcd/v3.lua index 64867583..afbe7d3c 100644 --- a/lib/resty/etcd/v3.lua +++ b/lib/resty/etcd/v3.lua @@ -703,9 +703,9 @@ local function request_chunk(self, method, path, opts, timeout) if not body then body = chunk else - -- this branch will only be executed in rare cases, - -- for example, a single event json is larger than the proxy_buffer_size of nginx which proxies etcd, - -- so It would be ok to use a string concat directly without worry about the performance. + -- this branch will only be executed in rare cases, for example, a single event json + -- is larger than the proxy_buffer_size of nginx which proxies etcd, so it would be + -- ok to use a string concat directly without worry about the performance. body = body .. chunk end From aca8a02ef670443dee5568bbdbcf6100003e0dcb Mon Sep 17 00:00:00 2001 From: Yong Qian Date: Wed, 2 Mar 2022 11:15:10 +0800 Subject: [PATCH 12/13] fix body initial value --- lib/resty/etcd/v3.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/resty/etcd/v3.lua b/lib/resty/etcd/v3.lua index afbe7d3c..f8e4510e 100644 --- a/lib/resty/etcd/v3.lua +++ b/lib/resty/etcd/v3.lua @@ -689,7 +689,7 @@ local function request_chunk(self, method, path, opts, timeout) local function read_watch() - body = "" + body = nil while(1) do local chunk, error = res.body_reader() From bb205950578cc3e2e1f4edfa1701020aa71de298 Mon Sep 17 00:00:00 2001 From: Yong Qian Date: Wed, 2 Mar 2022 20:05:37 +0800 Subject: [PATCH 13/13] rename variable --- lib/resty/etcd/v3.lua | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/lib/resty/etcd/v3.lua b/lib/resty/etcd/v3.lua index f8e4510e..5a555e32 100644 --- a/lib/resty/etcd/v3.lua +++ b/lib/resty/etcd/v3.lua @@ -692,9 +692,9 @@ local function request_chunk(self, method, path, opts, timeout) body = nil while(1) do - local chunk, error = res.body_reader() - if error then - return nil, error + local chunk, read_err = res.body_reader() + if read_err then + return nil, read_err end if not chunk then break @@ -719,9 +719,9 @@ local function request_chunk(self, method, path, opts, timeout) return nil, nil end - local chunks, error = split(body, [[\n]], "jo") - if error then - return nil, "failed to split chunks: " .. error + local chunks, split_err = split(body, [[\n]], "jo") + if split_err then + return nil, "failed to split chunks: " .. split_err end local all_events = {}