Skip to content

Commit

Permalink
[BUGFIX] Avoid returning empty data on startup of a non-leader server (
Browse files Browse the repository at this point in the history
…#4554)

Ensure that DB is properly initialized when performing stale queries

Addresses:
- hashicorp/consul-replicate#82
- #3975
- hashicorp/consul-template#1131
  • Loading branch information
pierresouchay authored and freddygv committed Aug 23, 2018
1 parent 4d658f3 commit b898131
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 13 deletions.
64 changes: 53 additions & 11 deletions agent/consul/catalog_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1467,39 +1467,81 @@ func TestCatalog_ListServices_Timeout(t *testing.T) {

func TestCatalog_ListServices_Stale(t *testing.T) {
t.Parallel()
dir1, s1 := testServer(t)
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()

dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1" // Enable ACLs!
c.Bootstrap = false // Disable bootstrap
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()

args := structs.DCSpecificRequest{
Datacenter: "dc1",
}
args.AllowStale = true
var out structs.IndexedServices

// Inject a fake service
if err := s1.fsm.State().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
// Inject a node
if err := s1.fsm.State().EnsureNode(3, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
t.Fatalf("err: %v", err)
}
if err := s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5000}); err != nil {

codec := rpcClient(t, s2)
defer codec.Close()

// Run the query, do not wait for leader, never any contact with leader, should fail
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err == nil || err.Error() != structs.ErrNoLeader.Error() {
t.Fatalf("expected %v but got err: %v and %v", structs.ErrNoLeader, err, out)
}

// Try to join
joinLAN(t, s2, s1)
retry.Run(t, func(r *retry.R) { r.Check(wantRaft([]*Server{s1, s2})) })
waitForLeader(s1, s2)

testrpc.WaitForLeader(t, s2.RPC, "dc1")
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err != nil {
t.Fatalf("err: %v", err)
}

// Run the query, do not wait for leader!
// Should find the services
if len(out.Services) != 1 {
t.Fatalf("bad: %#v", out.Services)
}

if !out.KnownLeader {
t.Fatalf("should have a leader: %v", out)
}

s1.Leave()
s1.Shutdown()

testrpc.WaitUntilNoLeader(t, s2.RPC, "dc1")

args.AllowStale = false
// Since the leader is now down, non-stale query should fail now
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err == nil || err.Error() != structs.ErrNoLeader.Error() {
t.Fatalf("expected %v but got err: %v and %v", structs.ErrNoLeader, err, out)
}

// With stale, request should still work
args.AllowStale = true
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err != nil {
t.Fatalf("err: %v", err)
}

// Should find the service
// Should find old service
if len(out.Services) != 1 {
t.Fatalf("bad: %v", out)
t.Fatalf("bad: %#v", out)
}

// Should not have a leader! Stale read
if out.KnownLeader {
t.Fatalf("bad: %v", out)
t.Fatalf("should not have a leader anymore: %#v", out)
}
}

Expand Down
4 changes: 2 additions & 2 deletions agent/consul/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,8 @@ func (s *Server) forward(method string, info structs.RPCInfo, args interface{},
return true, err
}

// Check if we can allow a stale read
if info.IsRead() && info.AllowStaleRead() {
// Check if we can allow a stale read, ensure our local DB is initialized
if info.IsRead() && info.AllowStaleRead() && !s.raft.LastContact().IsZero() {
return false, nil
}

Expand Down
42 changes: 42 additions & 0 deletions agent/consul/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,48 @@ func TestRPC_NoLeader_Fail(t *testing.T) {
}
}

func TestRPC_NoLeader_Fail_on_stale_read(t *testing.T) {
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.RPCHoldTimeout = 1 * time.Millisecond
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()

arg := structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
}
var out struct{}

// Make sure we eventually fail with a no leader error, which we should
// see given the short timeout.
err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out)
if err == nil || err.Error() != structs.ErrNoLeader.Error() {
t.Fatalf("bad: %v", err)
}

// Until leader has never been known, stale should fail
getKeysReq := structs.KeyListRequest{
Datacenter: "dc1",
Prefix: "",
Seperator: "/",
QueryOptions: structs.QueryOptions{AllowStale: true},
}
var keyList structs.IndexedKeyList
if err := msgpackrpc.CallWithCodec(codec, "KVS.ListKeys", &getKeysReq, &keyList); err.Error() != structs.ErrNoLeader.Error() {
t.Fatalf("expected %v but got err: %v", structs.ErrNoLeader, err)
}

testrpc.WaitForLeader(t, s1.RPC, "dc1")
if err := msgpackrpc.CallWithCodec(codec, "KVS.ListKeys", &getKeysReq, &keyList); err != nil {
t.Fatalf("Did not expect any error but got err: %v", err)
}
}

func TestRPC_NoLeader_Retry(t *testing.T) {
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
Expand Down
14 changes: 14 additions & 0 deletions testrpc/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,20 @@ func WaitForLeader(t *testing.T, rpc rpcFn, dc string) {
})
}

// WaitUntilNoLeader ensures no leader is present, useful for testing lost leadership.
func WaitUntilNoLeader(t *testing.T, rpc rpcFn, dc string) {
var out structs.IndexedNodes
retry.Run(t, func(r *retry.R) {
args := &structs.DCSpecificRequest{Datacenter: dc}
if err := rpc("Catalog.ListNodes", args, &out); err == nil {
r.Fatalf("It still has a leader: %#q", out)
}
if out.QueryMeta.KnownLeader {
r.Fatalf("Has still a leader")
}
})
}

// WaitForTestAgent ensures we have a node with serfHealth check registered
func WaitForTestAgent(t *testing.T, rpc rpcFn, dc string) {
var nodes structs.IndexedNodes
Expand Down

0 comments on commit b898131

Please sign in to comment.