From c3afa56ffac3f76840de362df0eefff12ecadd5f Mon Sep 17 00:00:00 2001 From: Thijs Schreijer Date: Wed, 14 Jun 2017 15:48:57 +0200 Subject: [PATCH] Async lookups (#15) * wip(cache) implemented LRU cache a number of tests marked as pending waiting for updating to the new async/stale query method * wip(client) implements async dns resolution. rewrites the core resolution code --- README.md | 7 +- spec/balancer_spec.lua | 29 +- spec/client_cache_spec.lua | 227 +++++++++++ spec/client_spec.lua | 502 ++++++++++++----------- src/resty/dns/balancer.lua | 2 +- src/resty/dns/client.lua | 801 +++++++++++++++++++++---------------- 6 files changed, 968 insertions(+), 600 deletions(-) create mode 100644 spec/client_cache_spec.lua diff --git a/README.md b/README.md index 5ff1182079e..5a0953780b3 100644 --- a/README.md +++ b/README.md @@ -35,8 +35,13 @@ use the `rbusted` script. History ======= -### 0.5.x (xx-xxx-2017) Bugfixes +### 0.6.x (xx-xxx-2017) Rewritten resolver core to resolve async +- Added: resolution will be done async whenever possible. For this to work a new + setting has been introduced `staleTtl` which determines for how long stale + records will returned while a query is in progress in the background. +- Change: BREAKING! several functions that previously returned and took a + resolver object no longer do so. - Fix: no longer lookup ip adresses as names if the query type is not A or AAAA - Fix: normalize names to lowercase after query - Fix: set last-success types for hosts-file entries and ip-addresses diff --git a/spec/balancer_spec.lua b/spec/balancer_spec.lua index be25a2f7866..e59b84eadc6 100644 --- a/spec/balancer_spec.lua +++ b/spec/balancer_spec.lua @@ -19,7 +19,7 @@ else end -- creates an SRV record in the cache -local dnsSRV = function(records) +local dnsSRV = function(records, staleTtl) -- if single table, then insert into a new list if not records[1] then records = { records } end @@ -43,14 +43,15 @@ local dnsSRV = function(records) records.expire = gettime() + records[1].ttl -- create key, and insert it - dnscache[records[1].type..":"..records[1].name] = records + local key = records[1].type..":"..records[1].name + dnscache:set(key, records, records[1].ttl + (staleTtl or 4)) -- insert last-succesful lookup type - dnscache[records[1].name] = records[1].type + dnscache:set(records[1].name, records[1].type) return records end -- creates an A record in the cache -local dnsA = function(records) +local dnsA = function(records, staleTtl) -- if single table, then insert into a new list if not records[1] then records = { records } end @@ -71,14 +72,15 @@ local dnsA = function(records) records.expire = gettime() + records[1].ttl -- create key, and insert it - dnscache[records[1].type..":"..records[1].name] = records + local key = records[1].type..":"..records[1].name + dnscache:set(key, records, records[1].ttl + (staleTtl or 4)) -- insert last-succesful lookup type - dnscache[records[1].name] = records[1].type + dnscache:set(records[1].name, records[1].type) return records end -- creates an AAAA record in the cache -local dnsAAAA = function(records) +local dnsAAAA = function(records, staleTtl) -- if single table, then insert into a new list if not records[1] then records = { records } end @@ -99,9 +101,10 @@ local dnsAAAA = function(records) records.expire = gettime() + records[1].ttl -- create key, and insert it - dnscache[records[1].type..":"..records[1].name] = records + local key = records[1].type..":"..records[1].name + dnscache:set(key, records, records[1].ttl + (staleTtl or 4)) -- insert last-succesful lookup type - dnscache[records[1].name] = records[1].type + dnscache:set(records[1].name, records[1].type) return records end @@ -1023,7 +1026,13 @@ describe("Loadbalancer", function() ["[::1]:80"] = 30, }, count) - record.expire = gettime() - 1 -- expire record now + -- expire the existing record + record.expire = gettime() - 1 + record.expired = true + -- do a lookup to trigger the async lookup + client.resolve("does.not.exist.mashape.com", {qtype = client.TYPE_A}) + sleep(0.5) -- provide time for async lookup to complete + for _ = 1, b.wheelSize do b:getPeer() end -- hit them all to force renewal count = count_slots(b) diff --git a/spec/client_cache_spec.lua b/spec/client_cache_spec.lua new file mode 100644 index 00000000000..1d3c589cfa4 --- /dev/null +++ b/spec/client_cache_spec.lua @@ -0,0 +1,227 @@ +local pretty = require("pl.pretty").write +local _ + +-- empty records and not found errors should be identical, hence we +-- define a constant for that error message +local NOT_FOUND_ERROR = "dns server error: 3 name error" + +local gettime, sleep +if ngx then + gettime = ngx.now + sleep = ngx.sleep +else + local socket = require("socket") + gettime = socket.gettime + sleep = socket.sleep +end + +-- simple debug function +local dump = function(...) + print(require("pl.pretty").write({...})) +end + +describe("DNS client cache", function() + + local client, resolver, query_func + + before_each(function() + _G._TEST = true + client = require("resty.dns.client") + resolver = require("resty.dns.resolver") + + -- you can replace this `query_func` upvalue to spy on resolver query calls. + -- This default will just call the original resolver (hence is transparent) + query_func = function(self, original_query_func, name, options) + return original_query_func(self, name, options) + end + + -- patch the resolver lib, such that any new resolver created will query + -- using the `query_func` upvalue defined above + local old_new = resolver.new + resolver.new = function(...) + local r = old_new(...) + local original_query_func = r.query + r.query = function(self, ...) + if not query_func then + print(debug.traceback("WARNING: query_func is not set")) + dump(self, ...) + return + end + return query_func(self, original_query_func, ...) + end + return r + end + end) + + after_each(function() + package.loaded["resty.dns.client"] = nil + package.loaded["resty.dns.resolver"] = nil + client = nil + resolver = nil + query_func = nil + _G._TEST = nil + end) + + describe("shortnames", function() + + local lrucache, mock_records, config + before_each(function() + config = { + nameservers = { "8.8.8.8" }, + ndots = 1, + search = { "domain.com" }, + hosts = {}, + resolvConf = {}, + order = { "LAST", "SRV", "A", "AAAA", "CNAME" }, + badTtl = 0.5, + staleTtl = 0.5, + enable_ipv6 = false, + } + assert(client.init(config)) + lrucache = client.getcache() + + query_func = function(self, original_query_func, qname, opts) + return mock_records[qname..":"..opts.qtype] or { errcode = 3, errstr = "name error" } + end + end) + + it("are stored in cache without type", function() + mock_records = { + ["myhost1.domain.com:"..client.TYPE_A] = {{ + type = client.TYPE_A, + address = "1.2.3.4", + class = 1, + name = "myhost1.domain.com", + ttl = 30, + }} + } + + local result = client.resolve("myhost1") + assert.equal(result, lrucache:get("none:short:myhost1")) + end) + + it("are stored in cache with type", function() + mock_records = { + ["myhost2.domain.com:"..client.TYPE_A] = {{ + type = client.TYPE_A, + address = "1.2.3.4", + class = 1, + name = "myhost2.domain.com", + ttl = 30, + }} + } + + local result = client.resolve("myhost2", { qtype = client.TYPE_A }) + assert.equal(result, lrucache:get(client.TYPE_A..":short:myhost2")) + end) + + it("are resolved from cache without type", function() + mock_records = {} + lrucache:set("none:short:myhost3", {{ + type = client.TYPE_A, + address = "1.2.3.4", + class = 1, + name = "myhost3.domain.com", + ttl = 30, + }, + ttl = 30, + expire = gettime() + 30, + }, 30+4) + + local result = client.resolve("myhost3") + assert.equal(result, lrucache:get("none:short:myhost3")) + end) + + it("are resolved from cache with type", function() + mock_records = {} + lrucache:set(client.TYPE_A..":short:myhost4", {{ + type = client.TYPE_A, + address = "1.2.3.4", + class = 1, + name = "myhost4.domain.com", + ttl = 30, + }, + ttl = 30, + expire = gettime() + 30, + }, 30+4) + + local result = client.resolve("myhost4", { qtype = client.TYPE_A }) + assert.equal(result, lrucache:get(client.TYPE_A..":short:myhost4")) + end) + + it("of dereferenced CNAME are stored in cache", function() + mock_records = { + ["myhost5.domain.com:"..client.TYPE_CNAME] = {{ + type = client.TYPE_CNAME, + class = 1, + name = "myhost5.domain.com", + cname = "mytarget.domain.com", + ttl = 30, + }}, + ["mytarget.domain.com:"..client.TYPE_A] = {{ + type = client.TYPE_A, + address = "1.2.3.4", + class = 1, + name = "mytarget.domain.com", + ttl = 30, + }} + } + local result = client.resolve("myhost5") + + assert.same(mock_records["mytarget.domain.com:"..client.TYPE_A], result) -- not the test, intermediate validation + + -- the type un-specificc query was the CNAME, so that should be in the + -- shorname cache + assert.same(mock_records["myhost5.domain.com:"..client.TYPE_CNAME], + lrucache:get("none:short:myhost5")) + end) + + it("ttl in cache is honored for short name entries", function() + -- in the short name case the same record is inserted again in the cache + -- and the lru-ttl has to be calculated, make sure it is correct + mock_records = { + ["myhost6.domain.com:"..client.TYPE_A] = {{ + type = client.TYPE_A, + address = "1.2.3.4", + class = 1, + name = "myhost6.domain.com", + ttl = 0.1, + }} + } + local mock_copy = require("pl.tablex").deepcopy(mock_records) + + -- resolve and check whether we got the mocked record + local result = client.resolve("myhost6") + assert.equal(result, mock_records["myhost6.domain.com:"..client.TYPE_A]) + + -- replace our mocked list with the copy made (new table, so no equality) + mock_records = mock_copy + + -- wait for expiring + sleep(0.1 + config.staleTtl / 2) + + -- resolve again, now getting same record, but stale, this will trigger + -- background refresh query + local result2 = client.resolve("myhost6") + assert.equal(result2, result) + assert.is_true(result2.expired) -- stale; marked as expired + + -- wait for refresh to complete + sleep(0.1) + + -- resolve and check whether we got the new record from the mock copy + local result3 = client.resolve("myhost6") + assert.not_equal(result, result3) -- must be a different record now + assert.equal(result3, mock_records["myhost6.domain.com:"..client.TYPE_A]) + + -- the 'result3' resolve call above will also trigger a new background query + -- (because the sleep of 0.1 equals the records ttl of 0.1) + -- so let's yield to activate that background thread now. If not done so, + -- the `after_each` will clear `query_func` and an error will appear on the + -- next test after this one that will yield. + sleep(0.1) + end) + + end) + +end) diff --git a/spec/client_spec.lua b/spec/client_spec.lua index 9006e62481b..d19398cea78 100644 --- a/spec/client_spec.lua +++ b/spec/client_spec.lua @@ -24,17 +24,39 @@ end describe("DNS client", function() - local client + local client, resolver, query_func before_each(function() _G._TEST = true client = require("resty.dns.client") + resolver = require("resty.dns.resolver") + + -- you can replace this `query_func` upvalue to spy on resolver query calls. + -- This default will just call the original resolver (hence is transparent) + query_func = function(self, original_query_func, name, options) + return original_query_func(self, name, options) + end + + -- patch the resolver lib, such that any new resolver created will query + -- using the `query_func` upvalue defined above + local old_new = resolver.new + resolver.new = function(...) + local r = old_new(...) + local original_query_func = r.query + r.query = function(self, ...) + return query_func(self, original_query_func, ...) + end + return r + end + end) after_each(function() package.loaded["resty.dns.client"] = nil package.loaded["resty.dns.resolver"] = nil client = nil + resolver = nil + query_func = nil _G._TEST = nil end) @@ -145,10 +167,12 @@ describe("DNS client", function() "options ndots:1", } })) + local lrucache = client.getcache() -- insert a last successful type - client.getcache()["host"] = client.TYPE_CNAME + local hostname = "host" + lrucache:set(hostname, client.TYPE_CNAME) local list = {} - for qname, qtype in client._search_iter("host", nil) do + for qname, qtype in client._search_iter(hostname, nil) do table.insert(list, tostring(qname)..":"..tostring(qtype)) end assert.same({ @@ -304,7 +328,8 @@ describe("DNS client", function() local host = "txttest.thijsschreijer.nl" local typ = client.TYPE_TXT - local answers = assert(client.resolve(host, { qtype = typ })) + local answers, err, try_list = client.resolve(host, { qtype = typ }) + assert(answers, (err or "") .. tostring(try_list)) assert.are.equal(host, answers[1].name) assert.are.equal(typ, answers[1].type) assert.are.equal(#answers, 1) @@ -328,7 +353,7 @@ describe("DNS client", function() local host = "txttest.thijsschreijer.nl" local typ = client.TYPE_TXT - local answers = assert(client.resolve(host, { qtype = typ })) + local answers, err, history = assert(client.resolve(host, { qtype = typ })) local now = gettime() local touch_diff = math.abs(now - answers.touch) @@ -360,9 +385,7 @@ describe("DNS client", function() it("fetching names case insensitive", function() assert(client.init()) - -- do a query so we get a resolver object to spy on - local _, _, r, history = client.toip("google.com", 123, false) - r.query = function(self, ...) + query_func = function(self, original_query_func, name, options) return { { name = "some.UPPER.case", @@ -372,10 +395,10 @@ describe("DNS client", function() } end - local res, err, r, history = client.resolve( + local res, err, history = client.resolve( "some.upper.CASE", { qtype = client.TYPE_A }, - false, r) + false) assert.equal(1, #res) assert.equal("some.upper.case", res[1].name) end) @@ -396,6 +419,7 @@ describe("DNS client", function() it("fetching A record redirected through 2 CNAME records (un-typed)", function() assert(client.init()) + local lrucache = client.getcache() --[[ This test might fail. Recurse flag is on by default. This means that the first return @@ -413,17 +437,17 @@ describe("DNS client", function() local host = "smtp.thijsschreijer.nl" local typ = client.TYPE_A - local answers = assert(client.resolve(host)) + local answers, err, history = assert(client.resolve(host)) -- check first CNAME local key1 = client.TYPE_CNAME..":"..host - local entry1 = client.getcache()[key1] + local entry1 = lrucache:get(key1) assert.are.equal(host, entry1[1].name) -- the 1st record is the original 'smtp.thijsschreijer.nl' assert.are.equal(client.TYPE_CNAME, entry1[1].type) -- and that is a CNAME -- check second CNAME local key2 = client.TYPE_CNAME..":"..entry1[1].cname - local entry2 = client.getcache()[key2] + local entry2 = lrucache:get(key2) assert.are.equal(entry1[1].cname, entry2[1].name) -- the 2nd is the middle 'thuis.thijsschreijer.nl' assert.are.equal(client.TYPE_CNAME, entry2[1].type) -- and that is also a CNAME @@ -434,9 +458,9 @@ describe("DNS client", function() assert.are.equal(#answers, 1) -- check last successful lookup references - local lastsuccess3 = client.getcache()[answers[1].name] - local lastsuccess2 = client.getcache()[entry2[1].name] - local lastsuccess1 = client.getcache()[entry1[1].name] + local lastsuccess3 = lrucache:get(answers[1].name) + local lastsuccess2 = lrucache:get(entry2[1].name) + local lastsuccess1 = lrucache:get(entry1[1].name) assert.are.equal(client.TYPE_A, lastsuccess3) assert.are.equal(client.TYPE_CNAME, lastsuccess2) assert.are.equal(client.TYPE_CNAME, lastsuccess1) @@ -462,7 +486,8 @@ describe("DNS client", function() it("fetching multiple SRV records through CNAME (un-typed)", function() assert(client.init()) - + local lrucache = client.getcache() + local host = "cname2srv.thijsschreijer.nl" local typ = client.TYPE_SRV @@ -471,7 +496,7 @@ describe("DNS client", function() -- first check CNAME local key = client.TYPE_CNAME..":"..host - local entry = client.getcache()[key] + local entry = lrucache:get(key) assert.are.equal(host, entry[1].name) assert.are.equal(client.TYPE_CNAME, entry[1].type) @@ -518,6 +543,7 @@ describe("DNS client", function() it("fetching IPv4 address as A type", function() assert(client.init()) + local lrucache = client.getcache() local host = "1.2.3.4" @@ -526,8 +552,7 @@ describe("DNS client", function() assert.are.equal(client.TYPE_A, answers[1].type) assert.are.equal(10*365*24*60*60, answers[1].ttl) -- 10 year ttl - local cache = client.getcache() - assert.equal(client.TYPE_A, cache[host]) + assert.equal(client.TYPE_A, lrucache:get(host)) end) it("fetching IPv4 address as SRV type", function() @@ -561,8 +586,8 @@ describe("DNS client", function() assert.are.equal(client.TYPE_AAAA, answers[1].type) assert.are.equal(10*365*24*60*60, answers[1].ttl) -- 10 year ttl - local cache = client.getcache() - assert.equal(client.TYPE_AAAA, cache[host]) + local lrucache = client.getcache() + assert.equal(client.TYPE_AAAA, lrucache:get(host)) end) it("fetching IPv6 address as SRV type", function() @@ -596,44 +621,75 @@ describe("DNS client", function() local host = "1::2:3::4" -- 2x double colons - local answers, err, r, history = client.resolve(host) + local answers, err, history = client.resolve(host) assert.is_nil(answers) assert.equal(NOT_FOUND_ERROR, err) assert(tostring(history):find("bad IPv6", nil, true)) end) - it("fetching records from cache only, expired and ttl = 0",function() - assert(client.init()) - local expired_entry = { + it("recursive lookups failure - single resolve", function() + assert(client.init({ + resolvConf = { + -- resolv.conf without `search` and `domain` options + "nameserver 8.8.8.8", + }, + })) + query_func = function(self, original_query_func, name, opts) + if name ~= "hello.world" and (opts or {}).qtype ~= client.TYPE_CNAME then + return original_query_func(self, name, opts) + end + return { + { + type = client.TYPE_CNAME, + cname = "hello.world", + class = 1, + name = "hello.world", + ttl = 30, + }, + } + end + + local result, err, history = client.resolve("hello.world") + assert.is_nil(result) + assert.are.equal("recursion detected", err) + end) + + it("recursive lookups failure - single", function() + assert(client.init({ + resolvConf = { + -- resolv.conf without `search` and `domain` options + "nameserver 8.8.8.8", + }, + })) + local lrucache = client.getcache() + local entry1 = { { - type = client.TYPE_A, - address = "1.2.3.4", + type = client.TYPE_CNAME, + cname = "hello.world", class = 1, - name = "1.2.3.4", + name = "hello.world", ttl = 0, }, touch = 0, - expire = 0, -- definitely expired + expire = 0, } -- insert in the cache - client.getcache()[expired_entry[1].type..":"..expired_entry[1].name] = expired_entry - local cache_count = #client.getcache() + lrucache:set(entry1[1].type..":"..entry1[1].name, entry1) - -- resolve this, cache only - local result = client.resolve("1.2.3.4", {qtype = expired_entry[1].type}, true) - - assert.are.equal(expired_entry, result) - assert.are.equal(cache_count, #client.getcache()) -- should not be deleted - assert.are.equal(expired_entry, client.getcache()[expired_entry[1].type..":"..expired_entry[1].name]) + -- Note: the bad case would be that the below lookup would hang due to round-robin on an empty table + local result, err, history = client.resolve("hello.world", nil, true) + assert.is_nil(result) + assert.are.equal("recursion detected", err) end) - it("recursive lookups failure", function() + it("recursive lookups failure - multi", function() assert(client.init({ resolvConf = { -- resolv.conf without `search` and `domain` options "nameserver 8.8.8.8", }, })) + local lrucache = client.getcache() local entry1 = { { type = client.TYPE_CNAME, @@ -657,11 +713,11 @@ describe("DNS client", function() expire = 0, } -- insert in the cache - client.getcache()[entry1[1].type..":"..entry1[1].name] = entry1 - client.getcache()[entry2[1].type..":"..entry2[1].name] = entry2 + lrucache:set(entry1[1].type..":"..entry1[1].name, entry1) + lrucache:set(entry2[1].type..":"..entry2[1].name, entry2) -- Note: the bad case would be that the below lookup would hang due to round-robin on an empty table - local result, err, r, history = client.resolve("hello.world", nil, true) + local result, err, history = client.resolve("hello.world", nil, true) assert.is_nil(result) assert.are.equal("recursion detected", err) end) @@ -678,8 +734,8 @@ describe("DNS client", function() order = {"SRV", "CNAME", "A", "AAAA"}, })) - local cache = client.getcache() - assert.equal(client.TYPE_A, cache.localhost) -- success set to A as it is the preferred option + local lrucache = client.getcache() + assert.equal(client.TYPE_A, lrucache:get("localhost")) -- success set to A as it is the preferred option assert(client.init( { @@ -687,8 +743,8 @@ describe("DNS client", function() order = {"SRV", "CNAME", "AAAA", "A"}, })) - local cache = client.getcache() - assert.equal(client.TYPE_AAAA, cache.localhost) -- success set to AAAA as it is the preferred option + local lrucache = client.getcache() + assert.equal(client.TYPE_AAAA, lrucache:get("localhost")) -- success set to AAAA as it is the preferred option end) @@ -780,6 +836,7 @@ describe("DNS client", function() end) it("SRV-record with 1 entry, round-robin",function() assert(client.init()) + local lrucache = client.getcache() local host = "hello.world" local entry = { { @@ -796,7 +853,7 @@ describe("DNS client", function() expire = gettime()+10, } -- insert in the cache - client.getcache()[entry[1].type..":"..entry[1].name] = entry + lrucache:set(entry[1].type..":"..entry[1].name, entry) -- repeated lookups, as the first will simply serve the first entry -- and the only second will setup the round-robin scheme, this is @@ -843,19 +900,19 @@ describe("DNS client", function() assert.is_string(ip) assert.is_nil(port) end) - it("#only recursive SRV pointing to itself",function() + it("recursive SRV pointing to itself",function() assert(client.init({ resolvConf = { -- resolv.conf without `search` and `domain` options "nameserver 8.8.8.8", }, })) - local ip, record, port, host, r, history + local ip, record, port, host, history host = "srvrecurse.thijsschreijer.nl" -- resolve SRV specific should return the record including its -- recursive entry - record, err, r, history = client.resolve(host, { qtype = client.TYPE_SRV }) + record, err, history = client.resolve(host, { qtype = client.TYPE_SRV }) assert.is_table(record) assert.equal(1, #record) assert.equal(host, record[1].target) @@ -864,7 +921,7 @@ describe("DNS client", function() -- default order, SRV, A; the recursive SRV record fails, and it falls -- back to the IP4 address - ip, port, r, history = client.toip(host) + ip, port, history = client.toip(host) assert.is_string(ip) assert.is_equal("10.0.0.44", ip) assert.is_nil(port) @@ -895,9 +952,9 @@ describe("DNS client", function() expire = gettime()+10, -- active } -- insert in the cache - local cache = client.getcache() - cache[A_entry[1].type..":"..A_entry[1].name] = A_entry - cache[AAAA_entry[1].type..":"..AAAA_entry[1].name] = AAAA_entry + local lrucache = client.getcache() + lrucache:set(A_entry[1].type..":"..A_entry[1].name, A_entry) + lrucache:set(AAAA_entry[1].type..":"..AAAA_entry[1].name, AAAA_entry) end assert(client.init({order = {"AAAA", "A"}})) config() @@ -908,31 +965,6 @@ describe("DNS client", function() ip = client.toip("hello.world") assert.equals(ip, "5.6.7.8") end) - it("resolving from cache only, expired and ttl = 0",function() - assert(client.init()) - local expired_entry = { - { - type = client.TYPE_A, - address = "5.6.7.8", - class = 1, - name = "hello.world", - ttl = 0, - }, - touch = 0, - expire = 0, -- definitely expired - } - -- insert in the cache - client.getcache()[expired_entry[1].type..":"..expired_entry[1].name] = expired_entry - local cache_count = #client.getcache() - - -- resolve this, cache only - local result, port = assert(client.toip("hello.world", 9876, true)) - - assert.are.equal(expired_entry[1].address, result) - assert.are.equal(9876, port) - assert.are.equal(cache_count, #client.getcache()) -- should not be deleted - assert.are.equal(expired_entry, client.getcache()[expired_entry[1].type..":"..expired_entry[1].name]) - end) it("handling of empty responses", function() assert(client.init()) local empty_entry = { @@ -954,6 +986,7 @@ describe("DNS client", function() "nameserver 8.8.8.8", }, })) + local lrucache = client.getcache() local entry1 = { { type = client.TYPE_CNAME, @@ -977,8 +1010,8 @@ describe("DNS client", function() expire = 0, } -- insert in the cache - client.getcache()[entry1[1].type..":"..entry1[1].name] = entry1 - client.getcache()[entry2[1].type..":"..entry2[1].name] = entry2 + lrucache:set(entry1[1].type..":"..entry1[1].name, entry1) + lrucache:set(entry2[1].type..":"..entry2[1].name, entry2) -- Note: the bad case would be that the below lookup would hang due to round-robin on an empty table local ip, port, r, history = client.toip("hello.world", 123, true) @@ -1014,7 +1047,8 @@ describe("DNS client", function() end) end) - describe("matrix;", function() + pending("matrix;", function() +--revisit this: ttl=0 is no longer special... local ip = "1.4.2.3" local name = "thijsschreijer.nl" local prep = function(ttl, expired) @@ -1041,7 +1075,8 @@ describe("DNS client", function() expire = gettime() + ttl + expired, } -- insert in the cache - client.getcache()[entry[1].type..":"..entry[1].name] = entry + local lrucache = client.getcache() + lrucache:set(entry[1].type..":"..entry[1].name, entry) return entry end @@ -1126,85 +1161,150 @@ describe("DNS client", function() it("verifies ttl and caching of empty responses and name errors", function() --empty/error responses should be cached for a configurable time local emptyTtl = 0.1 + local staleTtl = 0.1 + local qname = "really.really.really.does.not.exist.mashape.com" assert(client.init({ emptyTtl = emptyTtl, + staleTtl = staleTtl, resolvConf = { -- resolv.conf without `search` and `domain` options "nameserver 8.8.8.8", }, })) - - -- do a query so we get a resolver object to spy on - local _, _, r, history = client.toip("google.com", 123, false) - spy.on(r, "query") + -- mock query function to count calls + local call_count = 0 + query_func = function(self, original_query_func, name, options) + call_count = call_count + 1 + return original_query_func(self, name, options) + end + + -- make a first request, populating the cache local res1, res2, err1, err2 - res1, err1, r, history = client.resolve( - "really.reall.really.does.not.exist.mashape.com", - { qtype = client.TYPE_A }, - false, r) - assert.spy(r.query).was.called(1) - assert.equal(NOT_FOUND_ERROR, err1) - - -- make a second request, result from cache, spy still called only once - res2, err2, r, history = client.resolve( - "really.reall.really.does.not.exist.mashape.com", - { qtype = client.TYPE_A }, - false, r) - assert.are.equal(res1, res2) - assert.spy(r.query).was.called(1) - - -- wait for expiry of ttl and retry, spy should be called twice now - sleep(emptyTtl+0.1) - res2, err2, r = client.resolve( - "really.reall.really.does.not.exist.mashape.com", - { qtype = client.TYPE_A }, - false, r) - assert.spy(r.query).was.called(2) + res1, err1, history = client.resolve( + qname, + { qtype = client.TYPE_A } + ) + assert.is_nil(res1) + assert.are.equal(1, call_count) + assert.are.equal(NOT_FOUND_ERROR, err1) + res1 = assert(client.getcache():get(client.TYPE_A..":"..qname)) + + + -- make a second request, result from cache, still called only once + res2, err2, history = client.resolve( + qname, + { qtype = client.TYPE_A } + ) + assert.is_nil(res2) + assert.are.equal(1, call_count) + assert.are.equal(NOT_FOUND_ERROR, err2) + res2 = assert(client.getcache():get(client.TYPE_A..":"..qname)) + assert.equal(res1, res2) + assert.falsy(res2.expired) + + + -- wait for expiry of Ttl and retry, still called only once + sleep(emptyTtl+0.5 * staleTtl) + res2, err2 = client.resolve( + qname, + { qtype = client.TYPE_A } + ) + assert.is_nil(res2) + assert.are.equal(1, call_count) + assert.are.equal(NOT_FOUND_ERROR, err2) + res2 = assert(client.getcache():get(client.TYPE_A..":"..qname)) + assert.equal(res1, res2) + assert.is_true(res2.expired) -- by now, record is marked as expired + + + -- wait for expiry of staleTtl and retry, should be called twice now + sleep(0.75 * staleTtl) + res2, err2 = client.resolve( + qname, + { qtype = client.TYPE_A } + ) + assert.is_nil(res2) + assert.are.equal(2, call_count) + assert.are.equal(NOT_FOUND_ERROR, err2) + res2 = assert(client.getcache():get(client.TYPE_A..":"..qname)) + assert.not_equal(res1, res2) + assert.falsy(res2.expired) -- new record, not expired end) it("verifies ttl and caching of (other) dns errors", function() --empty responses should be cached for a configurable time local badTtl = 0.1 + local staleTtl = 0.1 + local qname = "realname.com" assert(client.init({ badTtl = badTtl, + staleTtl = staleTtl, resolvConf = { -- resolv.conf without `search` and `domain` options "nameserver 8.8.8.8", }, })) - -- do a query so we get a resolver object to spy on - local _, _, r, history = client.toip("google.com", 123, false) - r.query = function() return { errcode = 5, errstr = "refused" } end - spy.on(r, "query") - + -- mock query function to count calls, and return errors + local call_count = 0 + query_func = function(self, original_query_func, name, options) + call_count = call_count + 1 + return { errcode = 5, errstr = "refused" } + end + + -- initial request to populate the cache - local res1, res2, err1, err2 - res1, err1, r = client.resolve( - "realname.com", - { qtype = client.TYPE_A }, - false, r) - assert.spy(r.query).was.called(1) - assert.equal("dns server error: 5 refused", err1) - - -- try again, from cache, spu should still be called only once - res2, err2, r = client.resolve( - "realname.com", - { qtype = client.TYPE_A }, - false, r) + local res1, res2, err1, err2, history + res1, err1, history = client.resolve( + qname, + { qtype = client.TYPE_A } + ) + assert.is_nil(res1) + assert.are.equal(1, call_count) + assert.are.equal("dns server error: 5 refused", err1) + res1 = assert(client.getcache():get(client.TYPE_A..":"..qname)) + + + -- try again, from cache, should still be called only once + res2, err2, history = client.resolve( + qname, + { qtype = client.TYPE_A } + ) + assert.is_nil(res2) + assert.are.equal(call_count, 1) assert.are.equal(err1, err2) - assert.spy(r.query).was.called(1) - - -- wait for expiry of ttl and retry, spy should be called twice now - sleep(badTtl+0.1) - res2, err2, r = client.resolve( - "realname.com", - { qtype = client.TYPE_A }, - false, r) + res2 = assert(client.getcache():get(client.TYPE_A..":"..qname)) + assert.are.equal(res1, res2) + assert.falsy(res1.expired) + + + -- wait for expiry of ttl and retry, still 1 call, but now stale result + sleep(badTtl + 0.5 * staleTtl) + res2, err2, history = client.resolve( + qname, + { qtype = client.TYPE_A } + ) + assert.is_nil(res2) + assert.are.equal(call_count, 1) + assert.are.equal(err1, err2) + res2 = assert(client.getcache():get(client.TYPE_A..":"..qname)) + assert.are.equal(res1, res2) + assert.is_true(res2.expired) + + -- wait for expiry of staleTtl and retry, 2 calls, new result + sleep(0.75 * staleTtl) + res2, err2, history = client.resolve( + qname, + { qtype = client.TYPE_A } + ) + assert.is_nil(res2) + assert.are.equal(call_count, 2) -- 2 calls now assert.are.equal(err1, err2) - assert.spy(r.query).was.called(2) + res2 = assert(client.getcache():get(client.TYPE_A..":"..qname)) + assert.are_not.equal(res1, res2) -- a new record + assert.falsy(res2.expired) end) describe("verifies the polling of dns queries, retries, and wait times", function() @@ -1213,7 +1313,15 @@ describe("DNS client", function() assert(client.init()) local coros = {} local results = {} - + + local call_count = 0 + query_func = function(self, original_query_func, name, options) + call_count = call_count + 1 + sleep(0.5) -- make sure we take enough time so the other threads + -- will be waiting behind this one + return original_query_func(self, name, options) + end + -- we're going to schedule a whole bunch of queries, all of this -- function, which does the same lookup and stores the result local x = function() @@ -1221,7 +1329,10 @@ describe("DNS client", function() -- so the scheduler loop can first schedule them all before actually -- starting resolving coroutine.yield(coroutine.running()) - local result = client.resolve("thijsschreijer.nl") + local result, err, history = client.resolve( + "thijsschreijer.nl", + { qtype = client.TYPE_A } + ) table.insert(results, result) end @@ -1251,67 +1362,6 @@ describe("DNS client", function() assert.equal(1,count) end) - it("simultaneous lookups with ttl=0 are not synchronized to 1 lookup", function() - assert(client.init()) - - -- insert a ttl=0 record, so the resolver expects 0 and does not - -- synchronize the lookups - local ip = "1.4.2.3" - local name = "thijsschreijer.nl" - local entry = { - { - type = client.TYPE_A, - address = ip, - class = 1, - name = name, - ttl = 0, - }, - touch = 0, - expire = gettime() - 1, - } - -- insert in the cache - client.getcache()[entry[1].type..":"..entry[1].name] = entry - - local coros = {} - local results = {} - - -- we're going to schedule a whole bunch of queries, all of this - -- function, which does the same lookup and stores the result - local x = function() - -- the function is ran when started. So we must immediately yield - -- so the scheduler loop can first schedule them all before actually - -- starting resolving - coroutine.yield(coroutine.running()) - local result = client.resolve("thijsschreijer.nl", {qtype = client.TYPE_A}) - table.insert(results, result) - end - - -- schedule a bunch of the same lookups - for _ = 1, 10 do - local co = ngx.thread.spawn(x) - table.insert(coros, co) - end - - -- all scheduled and waiting to start due to the yielding done. - -- now start them all - for i = 1, #coros do - ngx.thread.wait(coros[i]) -- this wait will resume the scheduled ones - end - - -- now count the unique responses we got - local counters = {} - for _, r in ipairs(results) do - r = tostring(r) - counters[r] = (counters[r] or 0) + 1 - end - local count = 0 - for _ in pairs(counters) do count = count + 1 end - - -- we should have a 10 individual result tables, as all threads are - -- supposed to do their own lookup. - assert.equal(10,count) - end) - it("timeout while waiting", function() -- basically the local function _synchronized_query assert(client.init({ @@ -1325,29 +1375,23 @@ describe("DNS client", function() -- insert a stub thats waits and returns a fixed record local name = "thijsschreijer.nl" - local resty = require("resty.dns.resolver") - resty.new = function(...) - return { - query = function() - local ip = "1.4.2.3" - local entry = { - { - type = client.TYPE_A, - address = ip, - class = 1, - name = name, - ttl = 10, - }, - touch = 0, - expire = gettime() + 10, - } - sleep(2) -- wait before we return the results - return entry - end + query_func = function() + local ip = "1.4.2.3" + local entry = { + { + type = client.TYPE_A, + address = ip, + class = 1, + name = name, + ttl = 10, + }, + touch = 0, + expire = gettime() + 10, } + sleep(2) -- wait before we return the results + return entry end - local coros = {} local results = {} @@ -1358,8 +1402,8 @@ describe("DNS client", function() -- so the scheduler loop can first schedule them all before actually -- starting resolving coroutine.yield(coroutine.running()) - local result, err = client.resolve("thijsschreijer.nl", {qtype = client.TYPE_A}) - table.insert(results, result or err) + local result, err, history = client.resolve("thijsschreijer.nl", {qtype = client.TYPE_A}) + table.insert(results, (result or err)) end -- schedule a bunch of the same lookups @@ -1374,20 +1418,8 @@ describe("DNS client", function() ngx.thread.wait(coros[i]) -- this wait will resume the scheduled ones end - -- the result should be 3 entries - -- 1: a table (first attempt) - -- 2: a second table (the 1 retry, as hardcoded in `pool_max_retry` variable) - -- 3-10: error message (returned by thread 3 to 10) - assert.is.table(results[1]) - assert.is.table(results[1][1]) - assert.is.equal(results[1][1].name, name) - results[1].touch = nil - results[2].touch = nil - results[1].expire = nil - results[2].expire = nil - assert.Not.equal(results[1], results[2]) - assert.same(results[1], results[2]) - for i = 3, 10 do + -- all results are equal, as they all will wait for the first response + for i = 1, 10 do assert.equal("dns lookup pool exceeded retries (1): timeout", results[i]) end end) diff --git a/src/resty/dns/balancer.lua b/src/resty/dns/balancer.lua index a04be921475..b6f3c95d4ee 100644 --- a/src/resty/dns/balancer.lua +++ b/src/resty/dns/balancer.lua @@ -412,7 +412,7 @@ function objHost:queryDns(cacheOnly) dirty = true end end - + self.lastQuery = newQuery self.lastSorted = newSorted diff --git a/src/resty/dns/client.lua b/src/resty/dns/client.lua index ad272feb068..a12b4e7ac59 100644 --- a/src/resty/dns/client.lua +++ b/src/resty/dns/client.lua @@ -22,12 +22,14 @@ local utils = require("resty.dns.utils") local fileexists = require("pl.path").exists local semaphore = require("ngx.semaphore").new - +local lrucache = require("resty.lrucache") local resolver = require("resty.dns.resolver") +local deepcopy = require("pl.tablex").deepcopy local time = ngx.now local ngx_log = ngx.log local log_WARN = ngx.WARN local log_DEBUG = ngx.DEBUG +local timer_at = ngx.timer.at local math_min = math.min local math_max = math.max @@ -48,6 +50,8 @@ local config local defined_hosts -- hash table to lookup names originating from the hosts file local emptyTtl -- ttl (in seconds) for empty and 'name error' (3) errors local badTtl -- ttl (in seconds) for a other dns error results +local staleTtl -- ttl (in seconds) to serve stale data (while new lookup is in progress) +local cacheSize -- size of the lru cache local orderValids = {"LAST", "SRV", "A", "AAAA", "CNAME"} -- default order to query for _,v in ipairs(orderValids) do orderValids[v:upper()] = v end @@ -73,90 +77,113 @@ end --- Caching. -- The cache will not update the `ttl` field. So every time the same record -- is served, the ttl will be the same. But the cache will insert extra fields --- on the top-level; `touch` (timestamp of last access) and `expire` (expiry time --- based on `ttl`) +-- on the top-level; `touch` (timestamp of last access), `expire` (expiry time +-- based on `ttl`), and `expired` (boolean indicating it expired/is stale) -- @section caching --- hostname cache indexed by "recordtype:hostname" returning address list. +-- hostname lru-cache indexed by "recordtype:hostname" returning address list. +-- short names are indexed by "recordtype:short:hostname" -- Result is a list with entries. -- Keys only by "hostname" only contain the last succesfull lookup type -- for this name, see `resolve` function. -local cache = {} +local dnscache --- lookup a single entry in the cache. Invalidates the entry if its beyond its ttl. --- Even if the record is expired and `nil` is returned, the second return value --- can be `true`. +-- lookup a single entry in the cache. -- @param qname name to lookup -- @param qtype type number, any of the TYPE_xxx constants --- @param peek just consult the cache, do not check ttl nor expire, just touch it --- @return 1st; cached record or nil, 2nd; expect_ttl_0, true if the last one was ttl 0 -local cachelookup = function(qname, qtype, peek) +-- @return cached record or nil +local cachelookup = function(qname, qtype) local now = time() local key = qtype..":"..qname - local cached = cache[key] - local expect_ttl_0 + local cached = dnscache:get(key) if cached then - expect_ttl_0 = ((cached[1] or empty).ttl == 0) - if peek then - -- cannot update, just update touch time - cached.touch = now - elseif expect_ttl_0 then - -- ttl = 0 so we should not remove the cache entry, but we should also - -- not return it - cached.touch = now - cached = nil - elseif (cached.expire < now) then - -- the cached entry expired, and we're allowed to mark it as such - cache[key] = nil - cached = nil - else - -- still valid, so nothing to do - cached.touch = now + cached.touch = now + if (cached.expire < now) then + cached.expired = true end end - - return cached, expect_ttl_0 + + return cached end --- inserts an entry in the cache --- Note: if the ttl=0, then it is also stored to enable 'cache-only' lookups --- params qname, qtype are IGNORED unless `entry` has an empty list part. +-- inserts an entry in the cache. +-- @param entry the dns record list to store (may also be an error entry) +-- @param qname the name under which to store the record (optional for records, not for errors) +-- @param qtype the query type for which to store the record (optional for records, not for errors) +-- @return nothing local cacheinsert = function(entry, qname, qtype) - - local ttl, key + local ttl, key, lru_ttl + local now = time() local e1 = entry[1] - if e1 then - key = e1.type..":"..e1.name - - -- determine minimum ttl of all answer records - ttl = e1.ttl - for i = 2, #entry do - ttl = math_min(ttl, entry[i].ttl) + + if not entry.expire then + -- new record not seen before + if e1 then + -- an actual record + key = (qtype or e1.type) .. ":" .. (qname or e1.name) + + -- determine minimum ttl of all answer records + ttl = e1.ttl + for i = 2, #entry do + ttl = math_min(ttl, entry[i].ttl) + end + + elseif entry.errcode and entry.errcode ~= 3 then + -- an error, but no 'name error' (3) + ttl = badTtl + key = qtype..":"..qname + + else + -- empty or a 'name error' (3) + ttl = emptyTtl + key = qtype..":"..qname end - elseif entry.errcode and entry.errcode ~= 3 then - -- an error, but no 'name error' (3) - ttl = badTtl - key = qtype..":"..qname + + -- set expire time + entry.touch = now + entry.ttl = ttl + entry.expire = now + ttl + entry.expired = false + lru_ttl = ttl + staleTtl + else - -- empty or a 'name error' (3) - ttl = emptyTtl - key = qtype..":"..qname + -- an existing record reinserted (under a shortname for example) + -- must calculate remaining ttl, cannot get it from lrucache + ttl = entry.ttl + key = (qtype or e1.type) .. ":" .. (qname or e1.name) + lru_ttl = entry.expire - now + staleTtl + + if lru_ttl < 0 then + return -- item is already expired, so we do not add it + end end - - -- set expire time - local now = time() - entry.touch = now - entry.expire = now + ttl - cache[key] = entry + + dnscache:set(key, entry, lru_ttl) +end + +-- Lookup a shortname in the cache. +-- @param qname the name to lookup +-- @param qtype (optional) if not given a non-type specific query is done +-- @return same as cachelookup +local function cacheShortLookup(qname, qtype) + return cachelookup("short:" .. qname, qtype or "none") +end + +-- Inserts a shortname in the cache. +-- @param qname the name to lookup +-- @param qtype (optional) if not given a non-type specific insertion is done +-- @return nothing +local function cacheShortInsert(entry, qname, qtype) + return cacheinsert(entry, "short:" .. qname, qtype or "none") end -- Lookup the last succesful query type. -- @param qname name to resolve -- @return query/record type constant, or ˋnilˋ if not found local function cachegetsuccess(qname) - return cache[qname] + return dnscache:get(qname) end -- Sets the last succesful query type. @@ -164,76 +191,87 @@ end -- @qtype query/record type to set, or ˋnilˋ to clear -- @return ˋtrueˋ local function cachesetsuccess(qname, qtype) - cache[qname] = qtype + dnscache:set(qname, qtype) return true end ---- Cleanup the DNS client cache. Items will be checked on TTL only upon --- retrieval from the cache. So items inserted, but never used again will --- never be removed from the cache automatically. So unless you have a very --- restricted fixed set of hostnames you're resolving, you should occasionally --- purge the cache. --- @param touched in seconds. Cleanup everything (also non-expired items) not --- touched in `touched` seconds. If omitted, only expired items (based on ttl) --- will be removed. --- @return number of entries deleted -_M.purgeCache = function(touched) - local f - if type(touched == nil) then - f = function(entry, now, count) -- check ttl only - if entry.expire < now then - return nil, count + 1, true - else - return entry, count, false - end - end - elseif type(touched) == "number" then - f = function(entry, now, count) -- check ttl and touch - if (entry.expire < now) or (entry.touch + touched <= now) then - return nil, count + 1, true + +-- ===================================================== +-- Try/status list for recursion checks and logging +-- ===================================================== + +local msg_mt = { + __tostring = function(self) + return table_concat(self, "/") + end +} + +local try_list_mt = { + __tostring = function(self) + local l, i = {}, 0 + for _, entry in ipairs(self) do + l[i+1] = entry.qname + l[i+2] = ":" + l[i+3] = entry.qtype or "(na)" + local m = tostring(entry.msg) + if m == "" then + i = i + 4 else - return entry, count, false + l[i+4] = " - " + l[i+5] = m + i = i + 6 end + l[i]="\n" end - else - error("expected nil or number, got " ..type(touched), 2) - end - local now = time() - local count = 0 - local deleted - for key, entry in pairs(cache) do - if type(entry) == "table" then - cache[key], count, deleted = f(entry, now, count) - else - -- TODO: entry for record type, how to purge this??? - end + return table_concat(l) end - return count +} + +-- adds a try to a list of tries. +-- The list keeps track of all queries tried so far. The array part lists the +-- order of attempts, whilst the `:` key contains the index of that try. +-- @param self (optional) the list to add to, if omitted a new one will be created and returned +-- @param qname name being looked up +-- @param qtype query type being done +-- @param status (optional) message to be recorded +-- @return the list +local function try_add(self, qname, qtype, status) + self = self or setmetatable({}, try_list_mt) + local key = tostring(qname) .. ":" .. tostring(qtype) + local idx = #self + 1 + self[idx] = { + qname = qname, + qtype = qtype, + msg = setmetatable({ status }, msg_mt), + } + self[key] = idx + return self +end + +-- adds a status to the last try. +-- @param self the try_list to add to +-- @param status string with current status, added to the list for the current try +-- @return the try_list +local function try_status(self, status) + local status_list = self[#self].msg + status_list[#status_list + 1] = status + return self end + -- ============================================== -- Main DNS functions for lookup -- ============================================== --- Resolving. -- When resolving names, queries will be synchronized, such that only a single --- query will be sent. Any other requests coming in while waiting for a --- response from the name server will be queued, and receive the same result --- as the first request, once that returns. --- The exception is when a `ttl=0` is expected (expectation --- is based on a previous query returning `ttl=0`), in that case every request --- will get its own name server query. --- --- Because OpenResty will close sockets on boundaries of contexts, the --- resolver objects can only be reused in limited situations. To reuse --- them see the `r` parameters of the `resolve` and `toip` functions. Applicable --- for multiple consecutive calls in the same context. +-- query will be sent. If stale data is available, the request will return +-- stale data immediately, whilst continuing to resolve the name in the +-- background. -- -- The `dnsCacheOnly` parameter found with `resolve` and `toip` can be used in -- contexts where the co-socket api is unavailable. When the flag is set --- only cached data is returned, which is possibly stale, but it will never --- use blocking io. Also; the stale data will not --- be invalidated from the cache when `dnsCacheOnly` is set. +-- only cached data is returned, but it will never use blocking io. -- -- __Housekeeping__; when using `toip` it has to do some housekeeping to apply -- the (weighted) round-robin scheme. Those values will be stored in the @@ -266,11 +304,15 @@ local poolMaxRetry -- -- 'last'; will try the last previously successful type for a hostname. -- local order = { "last", "SRV", "A", "AAAA", "CNAME" } -- +-- -- Stale ttl for how long a stale record will be served from the cache +-- -- while a background lookup is in progress. +-- local staleTtl = 4.0 -- in seconds (can have fractions) +-- -- -- Cache ttl for empty and 'name error' (3) responses -- local emptyTtl = 30.0 -- in seconds (can have fractions) -- -- -- Cache ttl for other error responses --- local badTtl = 1.0 -- in seconds (can have fractions) +-- local badTtl = 1.0 -- in seconds (can have fractions) -- -- -- `ndots`, same as the `resolv.conf` option, if not given it is taken from -- -- `resolv.conf` or otherwise set to 1 @@ -290,6 +332,8 @@ local poolMaxRetry -- search = search, -- order = order, -- badTtl = badTtl, +-- emptyTtl = emptTtl, +-- staleTtl = staleTtl, -- enable_ipv6 = enable_ipv6, -- }) -- ) @@ -298,7 +342,10 @@ _M.init = function(options) log(log_DEBUG, "(re)configuring dns client") local resolv, hosts, err options = options or {} - cache = {} -- clear cache on re-initialization + staleTtl = options.staleTtl or 4 + cacheSize = options.cacheSize or 10000 -- default set here to be able to reset the cache + + dnscache = lrucache.new(cacheSize) -- clear cache on (re)initialization defined_hosts = {} -- reset hosts hash table local order = options.order or orderValids @@ -436,136 +483,209 @@ _M.init = function(options) return true end -local queue = setmetatable({}, {__mode = "v"}) --- Performs a query, but only one at a time. While the query is waiting for a response, all --- other queries for the same name+type combo will be yielded until the first one --- returns. All calls will then return the same response. --- Reason; prevent a dog-pile effect when a dns record expires. Especially under load many dns --- queries would be fired at the dns server if we wouldn't do this. --- The `poolMaxWait` is how long a thread waits for another to complete the query, after the timeout it will --- clear the token in the cache and retry (all others will become in line after this new one) --- The `poolMaxRetry` is how often we wait for another query to complete, after this number it will return --- an error. A retry will be performed when a) waiting for the other thread times out, or b) when the --- query by the other thread returns an error. --- The maximum delay would be `poolMaxWait * poolMaxRetry`. --- @return query result + nil + r, or nil + error + r -local function synchronizedQuery(qname, r_opts, r, expect_ttl_0, count) - local key = qname..":"..r_opts.qtype - local item = queue[key] - if not item then - -- no lookup being done so far - if not r then - local err - r, err = resolver:new(config) - if not r then - return r, err, nil - end - end - if expect_ttl_0 then - -- we're not limiting the dns queries, but query on EVERY request - local result, err = r:query(qname, r_opts) - return result, err, r - else - -- we're limiting to one request at a time - item = { - semaphore = semaphore(), - } - queue[key] = item -- insertion in queue; this is where the synchronization starts - item.result, item.err = r:query(qname, r_opts) - -- query done, but by now many others might be waiting for our result. - -- 1) stop new ones from adding to our lock/semaphore - queue[key] = nil - -- 2) release all waiting threads - item.semaphore:post(math_max(item.semaphore:count() * -1, 1)) - return item.result, item.err, r +-- removes non-requested results, updates the cache +-- @return `true` +local function parseQuery(qname, qtype, answers, try_list) + + -- check the answers and store them in the cache + -- eg. A, AAAA, SRV records may be accompanied by CNAME records + -- store them all, leaving only the requested type in so we can return that set + local others = {} + for i = #answers, 1, -1 do -- we're deleting entries, so reverse the traversal + local answer = answers[i] + + -- normalize casing + answer.name = string_lower(answer.name) + + if (answer.type ~= qtype) or (answer.name ~= qname) then + local key = answer.type..":"..answer.name + try_status(try_list, key .. " removed") + local lst = others[key] + if not lst then + lst = {} + others[key] = lst + end + table_insert(lst, 1, answer) -- pos 1: preserve order + table_remove(answers, i) end - else - -- lookup is on its way, wait for it - local ok, err = item.semaphore:wait(poolMaxWait) - if ok and item.result then - -- we were released, and have a query result from the - -- other thread, so all is well, return it - return item.result, item.err, r - else - -- there was an error, either a semaphore timeout, or - -- a lookup error, so retry (retry actually means; do - -- our own lookup instead of waiting for another lookup). - count = count or 1 - if count > poolMaxRetry then - return nil, "dns lookup pool exceeded retries ("..tostring(poolMaxRetry).."): "..(item.error or err or "unknown"), r + end + if next(others) then + for _, lst in pairs(others) do + -- only store if not already cached (this is only a 'by-product') + -- or replace it if the cache contains an error + if #(cachelookup(lst[1].name, lst[1].type) or empty) == 0 then + cacheinsert(lst) + end + -- set success-type, only if not set (this is only a 'by-product') + if not cachegetsuccess(lst[1].name) then + cachesetsuccess(lst[1].name, lst[1].type) end - if queue[key] == item then queue[key] = nil end -- don't block on the same thread again - return synchronizedQuery(qname, r_opts, r, expect_ttl_0, count + 1) end end + + -- now insert actual target record in cache + cacheinsert(answers, qname, qtype) + return true end -local msg_mt = { - __tostring = function(self) - return table_concat(self, "/") - end -} -local try_list_mt = { - __tostring = function(self) - local l, i = {}, 0 - for _, entry in ipairs(self) do - l[i+1] = entry.qname - l[i+2] = ":" - l[i+3] = entry.qtype - local m = tostring(entry.msg) - if m == "" then - i = i + 4 - else - l[i+4] = " - " - l[i+5] = m - i = i + 6 - end - l[i]="\n" +local queue = setmetatable({}, {__mode = "v"}) +-- to be called as a timer-callback, performs a query and returns the results +-- in the `item` table. +local function executeQuery(premature, item) + if premature then return end + + local r, err = resolver:new(config) + if not r then + item.result, item.err = r, "failed to create a resolver: " .. err + else + try_status(item.try_list, "querying") + item.result, item.err = r:query(item.qname, item.r_opts) + if item.result then + parseQuery(item.qname, item.r_opts.qtype, item.result, item.try_list) end - return table_concat(l) end -} --- adds a try to a list of tries. --- The list keeps track of all queries tried so far. The array part lists the --- order of attempts, whilst the `:` key contains the index of that try. --- @param self (optional) the list to add to, if omitted a new one will be created and returned --- @param qname name being looked up --- @param qtype query type being done --- @param status (optional) message to be recorded --- @return the list -local function try_add(self, qname, qtype, status) - self = self or setmetatable({}, try_list_mt) - local k = tostring(qname) .. ":" .. tostring(qtype) - local i = #self + 1 - self[i] = { + -- query done, but by now many others might be waiting for our result. + -- 1) stop new ones from adding to our lock/semaphore + queue[item.key] = nil + -- 2) release all waiting threads + item.semaphore:post(math_max(item.semaphore:count() * -1, 1)) + item.semaphore = nil +end + +-- schedules an async query. +-- @param qname the name to query for +-- @param r_opts a table with the query options +-- @param try_list the try_list object to add to +-- @return `item` table which will receive the `result` and/or `err` fields, and a +-- `semaphore` field that can be used to wait for completion (once complete +-- the `semaphore` field will be removed). Upon error it returns `nil+error`. +local function asyncQuery(qname, r_opts, try_list) + local key = qname..":"..r_opts.qtype + local item = queue[key] + if item then + try_status(try_list, "in progress (async)") + return item -- already in progress, return existing query + end + + item = { + key = key, + semaphore = semaphore(), qname = qname, - qtype = qtype, - msg = setmetatable({ status }, msg_mt), + r_opts = deepcopy(r_opts), + try_list = try_list, } - self[k] = i - return self + queue[key] = item + + local ok, err = timer_at(0, executeQuery, item) + if not ok then + queue[key] = nil + return nil, "asyncQuery failed to create timer: "..err + end + try_status(try_list, "scheduled") + + return item end --- adds a status to the last entry in the `msg` table. -local function try_status(self, status) - local entry = self[#self] - local msg = entry.msg - msg[#msg + 1] = status - return self +-- schedules a sync query. +-- The `poolMaxWait` is how long a thread waits for another to complete the query. +-- The `poolMaxRetry` is how often we wait for another query to complete. +-- The maximum delay would be `poolMaxWait * poolMaxRetry`. +-- @param qname the name to query for +-- @param r_opts a table with the query options +-- @param try_list the try_list object to add to +-- @return `result + nil + try_list`, or `nil + err + try_list` in case of errors +local function syncQuery(qname, r_opts, try_list, count) + local key = qname..":"..r_opts.qtype + local item = queue[key] + + -- if nothing is in progress, we start a new async query + if not item then + local err + item, err = asyncQuery(qname, r_opts, try_list) + if not item then + return item, err, try_list + end + else + try_status(try_list, "in progress (sync)") + end + + -- block and wait for the async query to complete + local ok, err = item.semaphore:wait(poolMaxWait) + if ok and item.result then + -- we were released, and have a query result from the + -- other thread, so all is well, return it + return item.result, item.err, try_list + end + + -- there was an error, either a semaphore timeout, or a lookup error + -- go retry + count = count or 1 + try_status(try_list, "try "..count.." error: "..(item.err or err or "unknown")) + if count > poolMaxRetry then + return nil, "dns lookup pool exceeded retries (" .. + tostring(poolMaxRetry) .. "): "..tostring(item.err or err), + try_list + end + + -- don't block on the same thread again, so remove it from the queue + if queue[key] == item then queue[key] = nil end + + return syncQuery(qname, r_opts, try_list, count + 1) +end + +-- will lookup a name in the cache, or alternatively query the nameservers. +-- If nothing is in the cache, a synchronous query is performewd. If the cache +-- contains stale data, that stale data is returned while an asynchronous +-- lookup is started in the background. +-- @param qname the name to look for +-- @param r_opts a table with the query options +-- @param dnsCacheOnly if true, no active lookup is done when there is no (stale) +-- data. In that case an error is returned (as a dns server failure table). +-- @param try_list the try_list object to add to +-- @return `entry + nil + try_list`, or `nil + err + try_list` +local function lookup(qname, r_opts, dnsCacheOnly, try_list) + local entry = cachelookup(qname, r_opts.qtype) + if not entry then + --not found in cache + if dnsCacheOnly then + -- we can't do a lookup, so return an error + try_list = try_add(try_list, qname, r_opts.qtype, "cache only lookup failed") + return { + errcode = 4, -- standard is "server failure" + errstr = "server failure, cache only lookup failed", -- extended description + }, nil, try_list + end + -- perform a sync lookup, as we have no stale data to fall back to + try_list = try_add(try_list, qname, r_opts.qtype, "cache-miss") + return syncQuery(qname, r_opts, try_list) + end + + try_list = try_add(try_list, qname, r_opts.qtype, "cache-hit") + if entry.expired then + -- the cached record is stale but usable, so we do a refresh query in the background + try_status(try_list, "stale") + asyncQuery(qname, r_opts, try_list) + end + + return entry, nil, try_list end -local function check_ipv6(qname, qtype, r, try_list) +-- checks the query to be a valid IPv6. Inserts it in the cache or inserts +-- an error if it is invalid +-- @param qname the IPv6 address to check +-- @param qtype query type performed, any of the `TYPE_xx` constants +-- @param try_list the try_list object to add to +-- @return record as cached, nil, try_list +local function check_ipv6(qname, qtype, try_list) try_list = try_add(try_list, qname, qtype, "IPv6") - -- check cache and always use "cacheonly" to not alter it as IP addresses are - -- long lived in the cache anyway - local record = cachelookup(qname, qtype, true) + local record = cachelookup(qname, qtype) if record then try_status(try_list, "cached") - return record, nil, r, try_list + return record, nil, try_list end local check = qname @@ -598,18 +718,22 @@ local function check_ipv6(qname, qtype, r, try_list) } end cacheinsert(record, qname, qtype) - return record, nil, r, try_list + return record, nil, try_list end -local function check_ipv4(qname, qtype, r, try_list) +-- checks the query to be a valid IPv4. Inserts it in the cache or inserts +-- an error if it is invalid +-- @param qname the IPv4 address to check +-- @param qtype query type performed, any of the `TYPE_xx` constants +-- @param try_list the try_list object to add to +-- @return record as cached, nil, try_list +local function check_ipv4(qname, qtype, try_list) try_list = try_add(try_list, qname, qtype, "IPv4") - -- check cache and always use "cacheonly" to not alter it as IP addresses are - -- long lived in the cache anyway - local record = cachelookup(qname, qtype, true) + local record = cachelookup(qname, qtype) if record then try_status(try_list, "cached") - return record, nil, r, try_list + return record, nil, try_list end if qtype == _M.TYPE_A then @@ -632,90 +756,9 @@ local function check_ipv4(qname, qtype, r, try_list) } end cacheinsert(record, qname, qtype) - return record, nil, r, try_list + return record, nil, try_list end --- will lookup in the cache, or alternatively query dns servers and populate the cache. --- only looks up the requested type. --- It will always add an entry for the requested name in the `try_list`. --- @return query result + nil + r + try_list, or nil + error + r + try_list -local function lookup(qname, r_opts, dnsCacheOnly, r, try_list) - local qtype = r_opts.qtype - - local record, expect_ttl_0 = cachelookup(qname, qtype, dnsCacheOnly) - if record then -- cache hit - try_list = try_add(try_list, qname, qtype, "cache hit") - return record, nil, r, try_list - end - try_list = try_add(try_list, qname, qtype) - if dnsCacheOnly then - -- no active lookups allowed, so return error - -- NOTE: this error response should never be cached, because it is caused - -- by the limited nginx context where we can't use sockets to do the lookup - try_status(try_list, "cache only lookup failed") - return { - errcode = 4, -- standard is "server failure" - errstr = "server failure, cache only lookup failed", -- extended description - }, nil, r, try_list - end - - -- not found in our cache, so perform query on dns servers - local answers, err - answers, err, r = synchronizedQuery(qname, r_opts, r, expect_ttl_0) ---print("============================================================") ---print("Lookup: ",qname,":",r_opts.qtype) ---print("Error : ", tostring(err)) ---print(require("pl.pretty").write(answers or {})) - if not answers then - try_status(try_list, tostring(err)) - return answers, err, r, try_list - end - - -- check our answers and store them in the cache - -- eg. A, AAAA, SRV records may be accompanied by CNAME records - -- store them all, leaving only the requested type in so we can return that set - local others = {} - for i = #answers, 1, -1 do -- we're deleting entries, so reverse the traversal - local answer = answers[i] - - -- normalize casing - answer.name = string_lower(answer.name) - - if (answer.type ~= qtype) or (answer.name ~= qname) then - local key = answer.type..":"..answer.name - try_status(try_list, key .. " removed") ---print("removing: ", key, " (name or type mismatch)") - local lst = others[key] - if not lst then - lst = {} - others[key] = lst - end - table_insert(lst, 1, answer) -- pos 1: preserve order - table_remove(answers, i) - end - end - if next(others) then - for _, lst in pairs(others) do - -- only store if not already cached (this is only a 'by-product') - -- or replace it if the cache contains an error - if #(cachelookup(lst[1].name, lst[1].type) or empty) == 0 then - cacheinsert(lst) - end - -- set success-type, only if not set (this is only a 'by-product') - if not cachegetsuccess(lst[1].name) then - cachesetsuccess(lst[1].name, lst[1].type) - end - end - end - - -- now insert actual target record in cache - try_status(try_list, "queried, result "..#answers) - cacheinsert(answers, qname, qtype) ---print("------------------------------------------------------------") ---print(require("pl.pretty").write(answers or {})) ---print("============================================================") - return answers, nil, r, try_list -end -- iterator that iterates over all names and types to look up based on the -- provided name, the `typeOrder`, `hosts`, `ndots` and `search` settings @@ -807,96 +850,128 @@ end -- - then A, -- - then CNAME. -- +-- The outer loop will be based on the `search` and `ndots` options. Within each of +-- those, the inner loop will be the query/record type. -- @function resolve -- @param qname Name to resolve -- @param r_opts Options table, see remark about the `qtype` field above and -- [OpenResty docs](https://github.com/openresty/lua-resty-dns) for more options. -- @param dnsCacheOnly Only check the cache, won't do server lookups --- (will not invalidate any ttl expired data and will hence possibly return --- expired data) --- @param r (optional) dns resolver object to use, it will also be returned. --- In case of multiple calls, this allows to reuse the resolver object --- instead of recreating a new one on each call. -- @param try_list (optional) list of tries to add to --- @return `list of records + nil + r + try_list`, or `nil + err + r + try_list`. -local function resolve(qname, r_opts, dnsCacheOnly, r, try_list) - +-- @return `list of records + nil + try_list`, or `nil + err + try_list`. +local function resolve(qname, r_opts, dnsCacheOnly, try_list) qname = string_lower(qname) local qtype = (r_opts or empty).qtype local err, records + local opts = {} if r_opts then for k,v in pairs(r_opts) do opts[k] = v end -- copy the options table else + -- if no options table provided, set the ADDITIONAL SECTION to TRUE opts.additional_section = true end - + + -- first check for shortname in the cache + -- we do this only to prevent iterating over the SEARCH directive and + -- potentially requerying failed lookups in that process as the ttl for + -- errors is relatively short (1 second default) + records = cacheShortLookup(qname, qtype) + if records then + if try_list then + -- check for recursion + if try_list["(short)"..qname..":"..tostring(qtype)] then + records = nil + err = "recursion detected" + try_status(try_list, "recursion detected") + return nil, err, try_list + end + end + + try_list = try_add(try_list, "(short)"..qname, qtype, "cache-hit") + if records.expired then + -- if the record is already stale/expired we have to traverse the + -- iterator as that is required to start the async refresh queries + records = nil + try_list = try_status(try_list, "stale") + + else + -- a valid non-stale record + -- check for CNAME records, and dereferencing the CNAME + if (records[1] or empty).type == _M.TYPE_CNAME and qtype ~= _M.TYPE_CNAME then + opts.qtype = nil + try_status(try_list, "dereferencing") + return resolve(records[1].cname, opts, dnsCacheOnly, try_list) + end + + -- return the shortname cache hit + return records, nil, try_list + end + else + try_list = try_add(try_list, "(short)"..qname, qtype, "cache-miss") + end + -- check for qname being an ip address local name_type = utils.hostnameType(qname) if name_type ~= "name" then if name_type == "ipv4" then - -- if no qtype is given, we're supposed to search, and hence we add TYPE_A as type - records, err, r, try_list = check_ipv4(qname, qtype or _M.TYPE_A, r, try_list) + -- if no qtype is given, we're supposed to search, so forcing TYPE_A is safe + records, err, try_list = check_ipv4(qname, qtype or _M.TYPE_A, try_list) else - -- must be 'ipv6' - -- if no qtype is given, we're supposed to search, and hence we add TYPE_AAAA as type - records, err, r, try_list = check_ipv6(qname, qtype or _M.TYPE_AAAA, r, try_list) + + -- it is 'ipv6' + -- if no qtype is given, we're supposed to search, so forcing TYPE_AAAA is safe + records, err, try_list = check_ipv6(qname, qtype or _M.TYPE_AAAA, try_list) end + if records.errcode then -- the query type didn't match the ip address, or a bad ip address return nil, ("dns server error: %s %s"):format(records.errcode, records.errstr), - r, try_list + try_list end - -- valid ip - return records, nil, r, try_list + -- valid ipv4 or ipv6 + return records, nil, try_list end -- go try a sequence of record types for try_name, try_type in search_iter(qname, qtype) do - if try_list and try_list[try_name..":"..try_type] then -- recursion, been here before records = nil err = "recursion detected" - -- insert an entry, error will be appended at the end of the search loop - try_add(try_list, try_name, try_type) + else -- go look it up opts.qtype = try_type - records, err, r, try_list = lookup(try_name, opts, dnsCacheOnly, r, try_list) + records, err, try_list = lookup(try_name, opts, dnsCacheOnly, try_list) end - + if not records then -- nothing to do, an error -- fall through to the next entry in our search sequence + elseif records.errcode then -- dns error: fall through to the next entry in our search sequence err = ("dns server error: %s %s"):format(records.errcode, records.errstr) records = nil + elseif #records == 0 then -- empty: fall through to the next entry in our search sequence err = "dns server error: 3 name error" -- ensure same error message as "not found" records = nil + else - -- we got some records - if not dnsCacheOnly and not qtype then - -- only set the last succes, if we're not searching for a specific type - -- and we're not limited by a cache-only request - cachesetsuccess(qname, try_type) -- set last succesful type resolved - cachesetsuccess(try_name, try_type) -- set last succesful type resolved - end - if try_type == _M.TYPE_CNAME then - if try_type == qtype then - -- a CNAME was explicitly requested, so no dereferencing - return records, nil, r, try_list + -- we got some records, update the cache + if not dnsCacheOnly then + if not qtype then + -- only set the last succes, if we're not searching for a specific type + -- and we're not limited by a cache-only request + cachesetsuccess(try_name, try_type) -- set last succesful type resolved end - -- dereference CNAME - opts.qtype = nil - try_status(try_list, "dereferencing") - return resolve(records[1].cname, opts, dnsCacheOnly, r, try_list) end + if qtype ~= _M.TYPE_SRV and try_type == _M.TYPE_SRV then -- check for recursive records, but NOT when requesting SRV explicitly local cnt = 0 @@ -906,6 +981,7 @@ local function resolve(qname, r_opts, dnsCacheOnly, r, try_list) cnt = cnt + 1 end end + if cnt == #records then -- fully recursive SRV record, specific Kubernetes problem -- which generates a SRV record for each host, pointing to @@ -916,18 +992,36 @@ local function resolve(qname, r_opts, dnsCacheOnly, r, try_list) err = "recursion detected" end end + if records then - return records, nil, r, try_list + -- we have a result + + -- cache it under its shortname + if not dnsCacheOnly then + cacheShortInsert(records, qname, qtype) + end + + -- check if we need to dereference a CNAME + if records[1].type == _M.TYPE_CNAME and qtype ~= _M.TYPE_CNAME then + -- dereference CNAME + opts.qtype = nil + try_status(try_list, "dereferencing") + return resolve(records[1].cname, opts, dnsCacheOnly, try_list) + end + + return records, nil, try_list end end + -- we had some error, record it in the status list try_status(try_list, err) end + -- we failed, clear cache and return last error if not dnsCacheOnly then cachesetsuccess(qname, nil) end - return nil, err, r, try_list + return nil, err, try_list end -- returns the index of the record next up in the round-robin scheme. @@ -1103,22 +1197,23 @@ end -- of recreating a new one on each call. -- @param try_list (optional) list of tries to add to -- @return `ip address + port + r + try_list`, or in case of an error `nil + error + r + try_list` -local function toip(qname, port, dnsCacheOnly, r, try_list) +local function toip(qname, port, dnsCacheOnly, try_list) local rec, err - rec, err, r, try_list = resolve(qname, nil, dnsCacheOnly, r, try_list) + rec, err, try_list = resolve(qname, nil, dnsCacheOnly, try_list) if err then - return nil, err, r, try_list + return nil, err, try_list end +--print(tostring(try_list)) if rec[1].type == _M.TYPE_SRV then local entry = rec[roundRobinW(rec)] -- our SRV entry might still contain a hostname, so recurse, with found port number local srvport = (entry.port ~= 0 and entry.port) or port -- discard port if it is 0 try_status(try_list, "dereferencing SRV") - return toip(entry.target, srvport, dnsCacheOnly, r, try_list) + return toip(entry.target, srvport, dnsCacheOnly, try_list) else -- must be A or AAAA - return rec[roundRobin(rec)].address, port, r, try_list + return rec[roundRobin(rec)].address, port, try_list end end @@ -1182,7 +1277,7 @@ _M.setpeername = setpeername -- export the locals in case we're testing if _TEST then - _M.getcache = function() return cache end + _M.getcache = function() return dnscache end _M._search_iter = search_iter -- export as different name! end