Skip to content

Commit

Permalink
Fix panic in tailer when an ingester is removed from the ring while t…
Browse files Browse the repository at this point in the history
…ailing (#897)
  • Loading branch information
pracucci authored and rfratto committed Aug 14, 2019
1 parent 79333ad commit e5d6f3c
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 12 deletions.
40 changes: 28 additions & 12 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,34 +321,50 @@ func (q *Querier) Tail(ctx context.Context, req *logproto.TailRequest) (*Tailer,

// passed to tailer for (re)connecting to new or disconnected ingesters
func (q *Querier) tailDisconnectedIngesters(ctx context.Context, req *logproto.TailRequest, connectedIngestersAddr []string) (map[string]logproto.Querier_TailClient, error) {
tailClients := make(map[string]logproto.Querier_TailClient)
for i := range connectedIngestersAddr {
tailClients[connectedIngestersAddr[i]] = nil
// Build a map to easily check if an ingester address is already connected
connected := make(map[string]bool)
for _, addr := range connectedIngestersAddr {
connected[addr] = true
}

disconnectedIngesters := []ring.IngesterDesc{}
// Get the current replication set from the ring
replicationSet, err := q.ring.GetAll()
if err != nil {
return nil, err
}

// Look for disconnected ingesters or new one we should (re)connect to
reconnectIngesters := []ring.IngesterDesc{}

for _, ingester := range replicationSet.Ingesters {
if _, isOk := tailClients[ingester.Addr]; isOk {
delete(tailClients, ingester.Addr)
} else {
disconnectedIngesters = append(disconnectedIngesters, ingester)
if _, ok := connected[ingester.Addr]; ok {
continue
}

// Skip ingesters which are leaving or joining the cluster
if ingester.State != ring.ACTIVE {
continue
}

reconnectIngesters = append(reconnectIngesters, ingester)
}

clients, err := q.forGivenIngesters(ring.ReplicationSet{Ingesters: disconnectedIngesters}, func(client logproto.QuerierClient) (interface{}, error) {
if len(reconnectIngesters) == 0 {
return nil, nil
}

// Instance a tail client for each ingester to re(connect)
reconnectClients, err := q.forGivenIngesters(ring.ReplicationSet{Ingesters: reconnectIngesters}, func(client logproto.QuerierClient) (interface{}, error) {
return client.Tail(ctx, req)
})
if err != nil {
return nil, err
}

for i := range clients {
tailClients[clients[i].addr] = clients[i].response.(logproto.Querier_TailClient)
reconnectClientsMap := make(map[string]logproto.Querier_TailClient)
for _, client := range reconnectClients {
reconnectClientsMap[client.addr] = client.response.(logproto.Querier_TailClient)
}
return tailClients, nil

return reconnectClientsMap, nil
}
9 changes: 9 additions & 0 deletions pkg/querier/querier_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,15 @@ func mockReadRingWithOneActiveIngester() *readRingMock {
})
}

func mockIngesterDesc(addr string, state ring.IngesterState) ring.IngesterDesc {
return ring.IngesterDesc{
Addr: addr,
Timestamp: time.Now().UnixNano(),
State: state,
Tokens: []uint32{1, 2, 3},
}
}

// mockStreamIterator returns an iterator with 1 stream and quantity entries,
// where entries timestamp and line string are constructed as sequential numbers
// starting at from
Expand Down
92 changes: 92 additions & 0 deletions pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"
"time"

"github.com/cortexproject/cortex/pkg/ring"
"github.com/grafana/loki/pkg/logproto"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -162,6 +163,97 @@ func TestQuerier_Tail_QueryTimeoutConfigFlag(t *testing.T) {
store.AssertExpectations(t)
}

func TestQuerier_tailDisconnectedIngesters(t *testing.T) {
t.Parallel()

tests := map[string]struct {
connectedIngestersAddr []string
ringIngesters []ring.IngesterDesc
expectedClientsAddr []string
}{
"no connected ingesters and empty ring": {
connectedIngestersAddr: []string{},
ringIngesters: []ring.IngesterDesc{},
expectedClientsAddr: []string{},
},
"no connected ingesters and ring containing new ingesters": {
connectedIngestersAddr: []string{},
ringIngesters: []ring.IngesterDesc{mockIngesterDesc("1.1.1.1", ring.ACTIVE)},
expectedClientsAddr: []string{"1.1.1.1"},
},
"connected ingesters and ring contain the same ingesters": {
connectedIngestersAddr: []string{"1.1.1.1", "2.2.2.2"},
ringIngesters: []ring.IngesterDesc{mockIngesterDesc("2.2.2.2", ring.ACTIVE), mockIngesterDesc("1.1.1.1", ring.ACTIVE)},
expectedClientsAddr: []string{},
},
"ring contains new ingesters compared to the connected one": {
connectedIngestersAddr: []string{"1.1.1.1"},
ringIngesters: []ring.IngesterDesc{mockIngesterDesc("1.1.1.1", ring.ACTIVE), mockIngesterDesc("2.2.2.2", ring.ACTIVE), mockIngesterDesc("3.3.3.3", ring.ACTIVE)},
expectedClientsAddr: []string{"2.2.2.2", "3.3.3.3"},
},
"connected ingesters contain ingesters not in the ring anymore": {
connectedIngestersAddr: []string{"1.1.1.1", "2.2.2.2", "3.3.3.3"},
ringIngesters: []ring.IngesterDesc{mockIngesterDesc("1.1.1.1", ring.ACTIVE), mockIngesterDesc("3.3.3.3", ring.ACTIVE)},
expectedClientsAddr: []string{},
},
"connected ingesters contain ingesters not in the ring anymore and the ring contains new ingesters too": {
connectedIngestersAddr: []string{"1.1.1.1", "2.2.2.2", "3.3.3.3"},
ringIngesters: []ring.IngesterDesc{mockIngesterDesc("1.1.1.1", ring.ACTIVE), mockIngesterDesc("3.3.3.3", ring.ACTIVE), mockIngesterDesc("4.4.4.4", ring.ACTIVE)},
expectedClientsAddr: []string{"4.4.4.4"},
},
"ring contains ingester in LEAVING state not listed in the connected ingesters": {
connectedIngestersAddr: []string{"1.1.1.1"},
ringIngesters: []ring.IngesterDesc{mockIngesterDesc("1.1.1.1", ring.ACTIVE), mockIngesterDesc("2.2.2.2", ring.LEAVING)},
expectedClientsAddr: []string{},
},
"ring contains ingester in PENDING state not listed in the connected ingesters": {
connectedIngestersAddr: []string{"1.1.1.1"},
ringIngesters: []ring.IngesterDesc{mockIngesterDesc("1.1.1.1", ring.ACTIVE), mockIngesterDesc("2.2.2.2", ring.PENDING)},
expectedClientsAddr: []string{},
},
}

for testName, testData := range tests {
testData := testData

t.Run(testName, func(t *testing.T) {
req := logproto.TailRequest{
Query: "{type=\"test\"}",
Regex: "",
DelayFor: 0,
Limit: 10,
Start: time.Now(),
}

// For this test's purpose, whenever a new ingester client needs to
// be created, the factory will always return the same mock instance
ingesterClient := newQuerierClientMock()
ingesterClient.On("Tail", mock.Anything, &req, mock.Anything).Return(newTailClientMock(), nil)

q, err := newQuerier(
mockQuerierConfig(),
mockIngesterClientConfig(),
newIngesterClientMockFactory(ingesterClient),
newReadRingMock(testData.ringIngesters),
newStoreMock())
require.NoError(t, err)

actualClients, err := q.tailDisconnectedIngesters(context.Background(), &req, testData.connectedIngestersAddr)
require.NoError(t, err)

actualClientsAddr := make([]string, 0, len(actualClients))
for addr, client := range actualClients {
actualClientsAddr = append(actualClientsAddr, addr)

// The returned map of clients should never contain nil values
assert.NotNil(t, client)
}

assert.ElementsMatch(t, testData.expectedClientsAddr, actualClientsAddr)
})
}
}

func mockQuerierConfig() Config {
return Config{
TailMaxDuration: 1 * time.Minute,
Expand Down

0 comments on commit e5d6f3c

Please sign in to comment.