diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 57cfb7129334e..c7454b7a9f7ee 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -321,34 +321,50 @@ func (q *Querier) Tail(ctx context.Context, req *logproto.TailRequest) (*Tailer, // passed to tailer for (re)connecting to new or disconnected ingesters func (q *Querier) tailDisconnectedIngesters(ctx context.Context, req *logproto.TailRequest, connectedIngestersAddr []string) (map[string]logproto.Querier_TailClient, error) { - tailClients := make(map[string]logproto.Querier_TailClient) - for i := range connectedIngestersAddr { - tailClients[connectedIngestersAddr[i]] = nil + // Build a map to easily check if an ingester address is already connected + connected := make(map[string]bool) + for _, addr := range connectedIngestersAddr { + connected[addr] = true } - disconnectedIngesters := []ring.IngesterDesc{} + // Get the current replication set from the ring replicationSet, err := q.ring.GetAll() if err != nil { return nil, err } + // Look for disconnected ingesters or new one we should (re)connect to + reconnectIngesters := []ring.IngesterDesc{} + for _, ingester := range replicationSet.Ingesters { - if _, isOk := tailClients[ingester.Addr]; isOk { - delete(tailClients, ingester.Addr) - } else { - disconnectedIngesters = append(disconnectedIngesters, ingester) + if _, ok := connected[ingester.Addr]; ok { + continue } + + // Skip ingesters which are leaving or joining the cluster + if ingester.State != ring.ACTIVE { + continue + } + + reconnectIngesters = append(reconnectIngesters, ingester) } - clients, err := q.forGivenIngesters(ring.ReplicationSet{Ingesters: disconnectedIngesters}, func(client logproto.QuerierClient) (interface{}, error) { + if len(reconnectIngesters) == 0 { + return nil, nil + } + + // Instance a tail client for each ingester to re(connect) + reconnectClients, err := q.forGivenIngesters(ring.ReplicationSet{Ingesters: reconnectIngesters}, func(client logproto.QuerierClient) (interface{}, error) { return client.Tail(ctx, req) }) if err != nil { return nil, err } - for i := range clients { - tailClients[clients[i].addr] = clients[i].response.(logproto.Querier_TailClient) + reconnectClientsMap := make(map[string]logproto.Querier_TailClient) + for _, client := range reconnectClients { + reconnectClientsMap[client.addr] = client.response.(logproto.Querier_TailClient) } - return tailClients, nil + + return reconnectClientsMap, nil } diff --git a/pkg/querier/querier_mock_test.go b/pkg/querier/querier_mock_test.go index 37f15457e8716..c9e2bc9bea94a 100644 --- a/pkg/querier/querier_mock_test.go +++ b/pkg/querier/querier_mock_test.go @@ -246,6 +246,15 @@ func mockReadRingWithOneActiveIngester() *readRingMock { }) } +func mockIngesterDesc(addr string, state ring.IngesterState) ring.IngesterDesc { + return ring.IngesterDesc{ + Addr: addr, + Timestamp: time.Now().UnixNano(), + State: state, + Tokens: []uint32{1, 2, 3}, + } +} + // mockStreamIterator returns an iterator with 1 stream and quantity entries, // where entries timestamp and line string are constructed as sequential numbers // starting at from diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index 8b57d5f8c4b24..7b17bf2040102 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/cortexproject/cortex/pkg/ring" "github.com/grafana/loki/pkg/logproto" "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" @@ -162,6 +163,97 @@ func TestQuerier_Tail_QueryTimeoutConfigFlag(t *testing.T) { store.AssertExpectations(t) } +func TestQuerier_tailDisconnectedIngesters(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + connectedIngestersAddr []string + ringIngesters []ring.IngesterDesc + expectedClientsAddr []string + }{ + "no connected ingesters and empty ring": { + connectedIngestersAddr: []string{}, + ringIngesters: []ring.IngesterDesc{}, + expectedClientsAddr: []string{}, + }, + "no connected ingesters and ring containing new ingesters": { + connectedIngestersAddr: []string{}, + ringIngesters: []ring.IngesterDesc{mockIngesterDesc("1.1.1.1", ring.ACTIVE)}, + expectedClientsAddr: []string{"1.1.1.1"}, + }, + "connected ingesters and ring contain the same ingesters": { + connectedIngestersAddr: []string{"1.1.1.1", "2.2.2.2"}, + ringIngesters: []ring.IngesterDesc{mockIngesterDesc("2.2.2.2", ring.ACTIVE), mockIngesterDesc("1.1.1.1", ring.ACTIVE)}, + expectedClientsAddr: []string{}, + }, + "ring contains new ingesters compared to the connected one": { + connectedIngestersAddr: []string{"1.1.1.1"}, + ringIngesters: []ring.IngesterDesc{mockIngesterDesc("1.1.1.1", ring.ACTIVE), mockIngesterDesc("2.2.2.2", ring.ACTIVE), mockIngesterDesc("3.3.3.3", ring.ACTIVE)}, + expectedClientsAddr: []string{"2.2.2.2", "3.3.3.3"}, + }, + "connected ingesters contain ingesters not in the ring anymore": { + connectedIngestersAddr: []string{"1.1.1.1", "2.2.2.2", "3.3.3.3"}, + ringIngesters: []ring.IngesterDesc{mockIngesterDesc("1.1.1.1", ring.ACTIVE), mockIngesterDesc("3.3.3.3", ring.ACTIVE)}, + expectedClientsAddr: []string{}, + }, + "connected ingesters contain ingesters not in the ring anymore and the ring contains new ingesters too": { + connectedIngestersAddr: []string{"1.1.1.1", "2.2.2.2", "3.3.3.3"}, + ringIngesters: []ring.IngesterDesc{mockIngesterDesc("1.1.1.1", ring.ACTIVE), mockIngesterDesc("3.3.3.3", ring.ACTIVE), mockIngesterDesc("4.4.4.4", ring.ACTIVE)}, + expectedClientsAddr: []string{"4.4.4.4"}, + }, + "ring contains ingester in LEAVING state not listed in the connected ingesters": { + connectedIngestersAddr: []string{"1.1.1.1"}, + ringIngesters: []ring.IngesterDesc{mockIngesterDesc("1.1.1.1", ring.ACTIVE), mockIngesterDesc("2.2.2.2", ring.LEAVING)}, + expectedClientsAddr: []string{}, + }, + "ring contains ingester in PENDING state not listed in the connected ingesters": { + connectedIngestersAddr: []string{"1.1.1.1"}, + ringIngesters: []ring.IngesterDesc{mockIngesterDesc("1.1.1.1", ring.ACTIVE), mockIngesterDesc("2.2.2.2", ring.PENDING)}, + expectedClientsAddr: []string{}, + }, + } + + for testName, testData := range tests { + testData := testData + + t.Run(testName, func(t *testing.T) { + req := logproto.TailRequest{ + Query: "{type=\"test\"}", + Regex: "", + DelayFor: 0, + Limit: 10, + Start: time.Now(), + } + + // For this test's purpose, whenever a new ingester client needs to + // be created, the factory will always return the same mock instance + ingesterClient := newQuerierClientMock() + ingesterClient.On("Tail", mock.Anything, &req, mock.Anything).Return(newTailClientMock(), nil) + + q, err := newQuerier( + mockQuerierConfig(), + mockIngesterClientConfig(), + newIngesterClientMockFactory(ingesterClient), + newReadRingMock(testData.ringIngesters), + newStoreMock()) + require.NoError(t, err) + + actualClients, err := q.tailDisconnectedIngesters(context.Background(), &req, testData.connectedIngestersAddr) + require.NoError(t, err) + + actualClientsAddr := make([]string, 0, len(actualClients)) + for addr, client := range actualClients { + actualClientsAddr = append(actualClientsAddr, addr) + + // The returned map of clients should never contain nil values + assert.NotNil(t, client) + } + + assert.ElementsMatch(t, testData.expectedClientsAddr, actualClientsAddr) + }) + } +} + func mockQuerierConfig() Config { return Config{ TailMaxDuration: 1 * time.Minute,