Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes unaligned shards between ingesters and storage. #4087

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 11 additions & 14 deletions pkg/ingester/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,21 +58,15 @@ func (ii *InvertedIndex) getShards(shard *astmapper.ShardAnnotation) []*indexSha
return ii.shards
}

indexFactor := int(ii.totalShards)
// calculate the start of the hash ring desired
lowerBound := shard.Shard * indexFactor / shard.Of
// calculate the end of the hash ring desired
upperBound := (shard.Shard + 1) * indexFactor / shard.Of
// see if the upper bound is cleanly doesn't align cleanly with the next shard
// which can happen when the schema sharding factor and inverted index
// sharding factor are not multiples of each other.
rem := (shard.Shard + 1) * indexFactor % shard.Of
if rem > 0 {
// there's overlap on the upper shard
upperBound = upperBound + 1
totalRequested := int(ii.totalShards) / shard.Of
result := make([]*indexShard, totalRequested)
var j int
for i := 0; i < totalRequested; i++ {
subShard := ((shard.Shard) + (i * shard.Of))
result[j] = ii.shards[subShard]
j++
}

return ii.shards[lowerBound:upperBound]
return result
}

func validateShard(totalShards uint32, shard *astmapper.ShardAnnotation) error {
Expand Down Expand Up @@ -131,6 +125,9 @@ func labelsSeriesID(ls labels.Labels, dest []byte) {

// Backwards-compatible with model.Metric.String()
func labelsString(b *bytes.Buffer, ls labels.Labels) {
// metrics name is used in the store for computing shards.
// see chunk/schema_util.go for more details. `labelsString()`
b.WriteString("logs")
b.WriteByte('{')
i := 0
for _, l := range ls {
Expand Down
45 changes: 23 additions & 22 deletions pkg/ingester/index/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,10 @@ func Test_GetShards(t *testing.T) {
{16, &astmapper.ShardAnnotation{Shard: 15, Of: 16}, []uint32{15}},

// idx factor a larger multiple of schema factor
{32, &astmapper.ShardAnnotation{Shard: 0, Of: 16}, []uint32{0, 1}},
{32, &astmapper.ShardAnnotation{Shard: 4, Of: 16}, []uint32{8, 9}},
{32, &astmapper.ShardAnnotation{Shard: 15, Of: 16}, []uint32{30, 31}},
{64, &astmapper.ShardAnnotation{Shard: 15, Of: 16}, []uint32{60, 61, 62, 63}},

// schema factor is a larger multiple of idx factor
{16, &astmapper.ShardAnnotation{Shard: 0, Of: 32}, []uint32{0}},
{16, &astmapper.ShardAnnotation{Shard: 4, Of: 32}, []uint32{2}},
{16, &astmapper.ShardAnnotation{Shard: 15, Of: 32}, []uint32{7}},

// idx factor smaller but not a multiple of schema factor
{4, &astmapper.ShardAnnotation{Shard: 0, Of: 5}, []uint32{0}},
{4, &astmapper.ShardAnnotation{Shard: 1, Of: 5}, []uint32{0, 1}},
{4, &astmapper.ShardAnnotation{Shard: 4, Of: 5}, []uint32{3}},

// schema factor smaller but not a multiple of idx factor
{8, &astmapper.ShardAnnotation{Shard: 0, Of: 5}, []uint32{0, 1}},
{8, &astmapper.ShardAnnotation{Shard: 2, Of: 5}, []uint32{3, 4}},
{8, &astmapper.ShardAnnotation{Shard: 3, Of: 5}, []uint32{4, 5, 6}},
{8, &astmapper.ShardAnnotation{Shard: 4, Of: 5}, []uint32{6, 7}},
Comment on lines -28 to -47
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have a check for Of to be a multiple of totalShards. Do we want to keep these tests or add a check for it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need those. Ingester and storage uses respectively always 16 and 32. I don't think the storage will change soon enough.

{32, &astmapper.ShardAnnotation{Shard: 0, Of: 16}, []uint32{0, 16}},
{32, &astmapper.ShardAnnotation{Shard: 4, Of: 16}, []uint32{4, 20}},
{32, &astmapper.ShardAnnotation{Shard: 15, Of: 16}, []uint32{15, 31}},
{64, &astmapper.ShardAnnotation{Shard: 15, Of: 16}, []uint32{15, 31, 47, 63}},
} {
tt := tt
t.Run(tt.shard.String()+fmt.Sprintf("_total_%d", tt.total), func(t *testing.T) {
Expand Down Expand Up @@ -95,9 +79,9 @@ func TestDeleteAddLoopkup(t *testing.T) {
}
sort.Sort(cortexpb.FromLabelAdaptersToLabels(lbs))

require.Equal(t, uint32(7), labelsSeriesIDHash(cortexpb.FromLabelAdaptersToLabels(lbs))%32)
require.Equal(t, uint32(26), labelsSeriesIDHash(cortexpb.FromLabelAdaptersToLabels(lbs))%32)
// make sure we consistent
require.Equal(t, uint32(7), labelsSeriesIDHash(cortexpb.FromLabelAdaptersToLabels(lbs))%32)
require.Equal(t, uint32(26), labelsSeriesIDHash(cortexpb.FromLabelAdaptersToLabels(lbs))%32)
index.Add(lbs, model.Fingerprint((cortexpb.FromLabelAdaptersToLabels(lbs).Hash())))
index.Delete(cortexpb.FromLabelAdaptersToLabels(lbs), model.Fingerprint(cortexpb.FromLabelAdaptersToLabels(lbs).Hash()))
ids, err := index.Lookup([]*labels.Matcher{
Expand All @@ -106,3 +90,20 @@ func TestDeleteAddLoopkup(t *testing.T) {
require.NoError(t, err)
require.Len(t, ids, 0)
}

func Test_hash(t *testing.T) {
ii := NewWithShards(32)
ii.Add(cortexpb.FromLabelsToLabelAdapters(labels.Labels{
labels.Label{Name: "compose_project", Value: "loki-boltdb-storage-s3"},
labels.Label{Name: "compose_service", Value: "ingester-2"},
labels.Label{Name: "container_name", Value: "loki-boltdb-storage-s3_ingester-2_1"},
labels.Label{Name: "filename", Value: "/var/log/docker/790fef4c6a587c3b386fe85c07e03f3a1613f4929ca3abaa4880e14caadb5ad1/json.log"},
labels.Label{Name: "host", Value: "docker-desktop"},
labels.Label{Name: "source", Value: "stderr"},
}), 1)

res, err := ii.Lookup([]*labels.Matcher{{Type: labels.MatchEqual, Name: "compose_project", Value: "loki-boltdb-storage-s3"}}, &astmapper.ShardAnnotation{Shard: 9, Of: 16})
require.NoError(t, err)
require.Len(t, res, 1)
require.Equal(t, model.Fingerprint(1), res[0])
}