Skip to content

Commit

Permalink
feat(deployment): implement health check for conf_server (apache#7378)
Browse files Browse the repository at this point in the history
Signed-off-by: spacewander <spacewanderlzx@gmail.com>
  • Loading branch information
spacewander authored Jul 6, 2022
1 parent 6157037 commit a8a692d
Show file tree
Hide file tree
Showing 3 changed files with 283 additions and 11 deletions.
1 change: 1 addition & 0 deletions apisix/cli/snippet.lua
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ function _M.generate_conf_server(env, conf)
proxy_http_version 1.1;
proxy_set_header Connection "";
proxy_set_header Host $upstream_host;
proxy_next_upstream error timeout non_idempotent http_500 http_502 http_503 http_504;
}
log_by_lua_block {
Expand Down
112 changes: 101 additions & 11 deletions apisix/conf_server.lua
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ local balancer = require("ngx.balancer")
local error = error
local ipairs = ipairs
local ngx = ngx
local ngx_shared = ngx.shared
local ngx_var = ngx.var
local tonumber = tonumber


local _M = {}
Expand All @@ -30,6 +32,16 @@ local resolved_results = {}
local server_picker
local has_domain = false

local is_http = ngx.config.subsystem == "http"
local health_check_shm_name = "etcd-cluster-health-check"
if not is_http then
health_check_shm_name = health_check_shm_name .. "-stream"
end
-- an endpoint is unhealthy if it is failed for HEALTH_CHECK_MAX_FAILURE times in
-- HEALTH_CHECK_DURATION_SECOND
local HEALTH_CHECK_MAX_FAILURE = 3
local HEALTH_CHECK_DURATION_SECOND = 10


local function create_resolved_result(server)
local host, port = core.utils.parse_addr(server)
Expand All @@ -48,6 +60,10 @@ function _M.init()
end

local etcd = conf.deployment.etcd
if etcd.health_check_timeout then
HEALTH_CHECK_DURATION_SECOND = etcd.health_check_timeout
end

for i, s in ipairs(etcd.host) do
local _, to = core.string.find(s, "://")
if not to then
Expand Down Expand Up @@ -80,7 +96,13 @@ end


local function response_err(err)
ngx.log(ngx.ERR, "failure in conf server: ", err)
core.log.error("failure in conf server: ", err)

if ngx.get_phase() == "balancer" then
return
end

ngx.status = 503
ngx.say(core.json.encode({error = err}))
ngx.exit(0)
end
Expand Down Expand Up @@ -127,25 +149,87 @@ local function resolve_servers(ctx)
end


local function gen_unhealthy_key(addr)
return "conf_server:" .. addr
end


local function is_node_health(addr)
local key = gen_unhealthy_key(addr)
local count, err = ngx_shared[health_check_shm_name]:get(key)
if err then
core.log.warn("failed to get health check count, key: ", key, " err: ", err)
return true
end

if not count then
return true
end

return tonumber(count) < HEALTH_CHECK_MAX_FAILURE
end


local function report_failure(addr)
local key = gen_unhealthy_key(addr)
local count, err =
ngx_shared[health_check_shm_name]:incr(key, 1, 0, HEALTH_CHECK_DURATION_SECOND)
if not count then
core.log.error("failed to report failure, key: ", key, " err: ", err)
else
-- count might be larger than HEALTH_CHECK_MAX_FAILURE
core.log.warn("report failure, endpoint: ", addr, " count: ", count)
end
end


local function pick_node_by_server_picker(ctx)
local server, err = ctx.server_picker.get(ctx)
if not server then
err = err or "no valid upstream node"
return nil, "failed to find valid upstream server: " .. err
end

ctx.balancer_server = server

for _, r in ipairs(resolved_results) do
if r.server == server then
return r
end
end

return nil, "unknown server: " .. server
end


local function pick_node(ctx)
local res
if server_picker then
local server, err = server_picker.get(ctx)
if not server then
err = err or "no valid upstream node"
return nil, "failed to find valid upstream server, " .. err
if not ctx.server_picker then
ctx.server_picker = server_picker
end

ctx.server_picker = server_picker
ctx.balancer_server = server
local err
res, err = pick_node_by_server_picker(ctx)
if not res then
return nil, err
end

while not is_node_health(res.server) do
core.log.warn("endpoint ", res.server, " is unhealthy, skipped")

if server_picker.after_balance then
server_picker.after_balance(ctx, true)
end

for _, r in ipairs(resolved_results) do
if r.server == server then
res = r
break
res, err = pick_node_by_server_picker(ctx)
if not res then
return nil, err
end
end

else
-- we don't do health check if there is only one candidate
res = resolved_results[1]
end

Expand Down Expand Up @@ -185,6 +269,12 @@ function _M.balancer()
core.log.warn("could not set upstream retries: ", err)
end
else
if ctx.server_picker and ctx.server_picker.after_balance then
ctx.server_picker.after_balance(ctx, true)
end

report_failure(ctx.balancer_server)

local ok, err = pick_node(ctx)
if not ok then
return response_err(err)
Expand Down
181 changes: 181 additions & 0 deletions t/deployment/conf_server2.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
use t::APISIX;

my $nginx_binary = $ENV{'TEST_NGINX_BINARY'} || 'nginx';
my $version = eval { `$nginx_binary -V 2>&1` };

if ($version =~ m/\/1.17.8/) {
plan(skip_all => "require OpenResty 1.19+");
} else {
plan('no_plan');
}

add_block_preprocessor(sub {
my ($block) = @_;

if (!$block->request) {
$block->set_value("request", "GET /t");
}

if ((!defined $block->error_log) && (!defined $block->no_error_log)) {
$block->set_value("no_error_log", "[error]");
}

});

Test::Nginx::Socket::set_http_config_filter(sub {
my $config = shift;
my $snippet = `./t/bin/gen_snippet.lua conf_server`;
$config .= $snippet;
return $config;
});

run_tests();

__DATA__
=== TEST 1: health check, ensure unhealthy endpoint is skipped
--- http_config
server {
listen 12345;
location / {
access_by_lua_block {
if package.loaded.start_to_fail then
ngx.exit(502)
end
}
proxy_pass http://127.0.0.1:2379;
}
}
--- extra_yaml_config
deployment:
role: traditional
role_traditional:
config_provider: etcd
etcd:
prefix: "/apisix"
host:
- http://127.0.0.1:2379
- http://localhost:12345
--- config
location /t {
content_by_lua_block {
local etcd = require("apisix.core.etcd")
package.loaded.start_to_fail = true
for i = 1, 7 do
assert(etcd.set("/apisix/test", "foo"))
end
package.loaded.start_to_fail = nil
ngx.say('OK')
}
}
--- response_body
OK
--- error_log
report failure, endpoint: localhost:12345
endpoint localhost:12345 is unhealthy, skipped
=== TEST 2: health check, all endpoints are unhealthy
--- http_config
server {
listen 12345;
location / {
access_by_lua_block {
if package.loaded.start_to_fail then
ngx.exit(502)
end
}
proxy_pass http://127.0.0.1:2379;
}
}
--- extra_yaml_config
deployment:
role: traditional
role_traditional:
config_provider: etcd
etcd:
prefix: "/apisix"
host:
- http://localhost:12345
- http://127.0.0.1:12345
--- config
location /t {
content_by_lua_block {
local etcd = require("apisix.core.etcd")
package.loaded.start_to_fail = true
for i = 1, 6 do
etcd.set("/apisix/test", "foo")
end
package.loaded.start_to_fail = nil
local _, err = etcd.set("/apisix/test", "foo")
ngx.say(err)
}
}
--- response_body
invalid response code: 503
--- error_log
endpoint localhost:12345 is unhealthy, skipped
endpoint 127.0.0.1:12345 is unhealthy, skipped
=== TEST 3: health check, all endpoints recover from unhealthy
--- http_config
server {
listen 12345;
location / {
access_by_lua_block {
if package.loaded.start_to_fail then
ngx.exit(502)
end
}
proxy_pass http://127.0.0.1:2379;
}
}
--- extra_yaml_config
deployment:
role: traditional
role_traditional:
config_provider: etcd
etcd:
health_check_timeout: 1
prefix: "/apisix"
host:
- http://localhost:12345
- http://127.0.0.1:12345
--- config
location /t {
content_by_lua_block {
local etcd = require("apisix.core.etcd")
package.loaded.start_to_fail = true
for i = 1, 6 do
etcd.set("/apisix/test", "foo")
end
package.loaded.start_to_fail = nil
ngx.sleep(1.2)
local res, err = etcd.set("/apisix/test", "foo")
ngx.say(err or res.body.node.value)
}
}
--- response_body
foo
--- error_log
endpoint localhost:12345 is unhealthy, skipped
endpoint 127.0.0.1:12345 is unhealthy, skipped

0 comments on commit a8a692d

Please sign in to comment.