Skip to content

Commit

Permalink
backport of commit 746a0a1
Browse files Browse the repository at this point in the history
  • Loading branch information
erichaberkorn committed Mar 21, 2023
1 parent d49d068 commit 25539fa
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 11 deletions.
3 changes: 3 additions & 0 deletions .changelog/16729.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
peering: Fix issue resulting in prepared query failover to cluster peers never un-failing over.
```
6 changes: 3 additions & 3 deletions agent/consul/prepared_query_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ func (p *PreparedQuery) Execute(args *structs.PreparedQueryExecuteRequest,
// by the query setup.
if len(reply.Nodes) == 0 {
wrapper := &queryServerWrapper{srv: p.srv, executeRemote: p.ExecuteRemote}
if err := queryFailover(wrapper, query, args, reply); err != nil {
if err := queryFailover(wrapper, *query, args, reply); err != nil {
return err
}
}
Expand Down Expand Up @@ -707,7 +707,7 @@ func (q *queryServerWrapper) GetOtherDatacentersByDistance() ([]string, error) {

// queryFailover runs an algorithm to determine which DCs to try and then calls
// them to try to locate alternative services.
func queryFailover(q queryServer, query *structs.PreparedQuery,
func queryFailover(q queryServer, query structs.PreparedQuery,
args *structs.PreparedQueryExecuteRequest,
reply *structs.PreparedQueryExecuteResponse) error {

Expand Down Expand Up @@ -789,7 +789,7 @@ func queryFailover(q queryServer, query *structs.PreparedQuery,
// the remote query as well.
remote := &structs.PreparedQueryExecuteRemoteRequest{
Datacenter: dc,
Query: *query,
Query: query,
Limit: args.Limit,
QueryOptions: args.QueryOptions,
Connect: args.Connect,
Expand Down
41 changes: 33 additions & 8 deletions agent/consul/prepared_query_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2092,16 +2092,16 @@ func TestPreparedQuery_Execute(t *testing.T) {
require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID))

// Update the health of a node to mark it critical.
setHealth := func(t *testing.T, codec rpc.ClientCodec, dc string, node string, health string) {
setHealth := func(t *testing.T, codec rpc.ClientCodec, dc string, i int, health string) {
t.Helper()
req := structs.RegisterRequest{
Datacenter: dc,
Node: node,
Node: fmt.Sprintf("node%d", i),
Address: "127.0.0.1",
Service: &structs.NodeService{
Service: "foo",
Port: 8000,
Tags: []string{"dc1", "tag1"},
Tags: []string{dc, fmt.Sprintf("tag%d", i)},
},
Check: &structs.HealthCheck{
Name: "failing",
Expand All @@ -2113,7 +2113,7 @@ func TestPreparedQuery_Execute(t *testing.T) {
var reply struct{}
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &reply))
}
setHealth(t, codec1, "dc1", "node1", api.HealthCritical)
setHealth(t, codec1, "dc1", 1, api.HealthCritical)

// The failing node should be filtered.
t.Run("failing node filtered", func(t *testing.T) {
Expand All @@ -2133,7 +2133,7 @@ func TestPreparedQuery_Execute(t *testing.T) {
})

// Upgrade it to a warning and re-query, should be 10 nodes again.
setHealth(t, codec1, "dc1", "node1", api.HealthWarning)
setHealth(t, codec1, "dc1", 1, api.HealthWarning)
t.Run("warning nodes are included", func(t *testing.T) {
req := structs.PreparedQueryExecuteRequest{
Datacenter: "dc1",
Expand Down Expand Up @@ -2303,7 +2303,7 @@ func TestPreparedQuery_Execute(t *testing.T) {

// Now fail everything in dc1 and we should get an empty list back.
for i := 0; i < 10; i++ {
setHealth(t, codec1, "dc1", fmt.Sprintf("node%d", i+1), api.HealthCritical)
setHealth(t, codec1, "dc1", i+1, api.HealthCritical)
}
t.Run("everything is failing so should get empty list", func(t *testing.T) {
req := structs.PreparedQueryExecuteRequest{
Expand Down Expand Up @@ -2474,7 +2474,7 @@ func TestPreparedQuery_Execute(t *testing.T) {

// Set all checks in dc2 as critical
for i := 0; i < 10; i++ {
setHealth(t, codec2, "dc2", fmt.Sprintf("node%d", i+1), api.HealthCritical)
setHealth(t, codec2, "dc2", i+1, api.HealthCritical)
}

// Now we should see 9 nodes from dc3 (we have the tag filter still)
Expand All @@ -2493,6 +2493,31 @@ func TestPreparedQuery_Execute(t *testing.T) {
}
expectFailoverPeerNodes(t, &query, &reply, 9)
})

// Set all checks in dc1 as passing
for i := 0; i < 10; i++ {
setHealth(t, codec1, "dc1", i+1, api.HealthPassing)
}

// Nothing is healthy so nothing is returned
t.Run("un-failing over", func(t *testing.T) {
retry.Run(t, func(r *retry.R) {
req := structs.PreparedQueryExecuteRequest{
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
QueryOptions: structs.QueryOptions{Token: execToken},
}

var reply structs.PreparedQueryExecuteResponse
require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply))

for _, node := range reply.Nodes {
assert.NotEqual(t, "node3", node.Node.Node)
}

expectNodes(t, &query, &reply, 9)
})
})
}

func TestPreparedQuery_Execute_ForwardLeader(t *testing.T) {
Expand Down Expand Up @@ -2982,7 +3007,7 @@ func (m *mockQueryServer) ExecuteRemote(args *structs.PreparedQueryExecuteRemote

func TestPreparedQuery_queryFailover(t *testing.T) {
t.Parallel()
query := &structs.PreparedQuery{
query := structs.PreparedQuery{
Name: "test",
Service: structs.ServiceQuery{
Failover: structs.QueryFailoverOptions{
Expand Down

0 comments on commit 25539fa

Please sign in to comment.