Skip to content

Commit

Permalink
Address review issues
Browse files Browse the repository at this point in the history
Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>
  • Loading branch information
kakkoyun committed Mar 11, 2020
1 parent 58e0653 commit 5ff0ea1
Showing 1 changed file with 212 additions and 96 deletions.
308 changes: 212 additions & 96 deletions test/e2e/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,18 @@ import (
"os"
"path"
"path/filepath"
"strconv"
"testing"
"time"

"github.com/cortexproject/cortex/integration/e2e"
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
"github.com/go-kit/kit/log"
"github.com/oklog/ulid"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/objstore/s3"
Expand All @@ -30,109 +33,222 @@ func TestCompact(t *testing.T) {
t.Parallel()
l := log.NewLogfmtLogger(os.Stdout)

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()

s, err := e2e.NewScenario("e2e_test_compact")
testutil.Ok(t, err)
defer s.Close()

const bucket = "thanos"

m := e2edb.NewMinio(80, bucket)
testutil.Ok(t, s.StartAndWaitReady(m))

bkt, err := s3.NewBucketWithConfig(l, s3.Config{
Bucket: bucket,
AccessKey: e2edb.MinioAccessKey,
SecretKey: e2edb.MinioSecretKey,
Endpoint: m.HTTPEndpoint(), // We need separate client config, when connecting to minio from outside.
Insecure: true,
}, "test-feed")
testutil.Ok(t, err)

dir := filepath.Join(s.SharedDir(), "tmp")
testutil.Ok(t, os.MkdirAll(filepath.Join(s.SharedDir(), dir), os.ModePerm))

series := []labels.Labels{labels.FromStrings("a", "1", "b", "2")}
extLset1 := labels.FromStrings("ext1", "value1", "replica", "1")
extLset2 := labels.FromStrings("ext1", "value1", "replica", "2")
extLset3 := labels.FromStrings("ext1", "value1", "rule_replica", "1")

now := time.Now()
id1, err := e2eutil.CreateBlockWithBlockDelay(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), 30*time.Minute, extLset1, 0)
testutil.Ok(t, err)
testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id1.String()), id1.String()))

id2, err := e2eutil.CreateBlockWithBlockDelay(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), 30*time.Minute, extLset2, 0)
testutil.Ok(t, err)
testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id2.String()), id2.String()))

id3, err := e2eutil.CreateBlockWithBlockDelay(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), 30*time.Minute, extLset3, 0)
testutil.Ok(t, err)
testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id3.String()), id3.String()))

cmpt, err := e2ethanos.NewCompactor(s.SharedDir(), "1", client.BucketConfig{
Type: client.S3,
Config: s3.Config{
Bucket: bucket,
AccessKey: e2edb.MinioAccessKey,
SecretKey: e2edb.MinioSecretKey,
Endpoint: m.NetworkHTTPEndpoint(),
Insecure: true,
},
},
"--deduplication.replica-label=replica",
"--deduplication.replica-label=rule_replica",
)

testutil.Ok(t, err)
testutil.Ok(t, s.StartAndWaitReady(cmpt))

testutil.Ok(t, cmpt.WaitSumMetrics(e2e.Equals(3), "thanos_blocks_meta_synced"))
testutil.Ok(t, cmpt.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_sync_failures_total"))
testutil.Ok(t, cmpt.WaitSumMetrics(e2e.Equals(3), "thanos_blocks_meta_modified"))

str, err := e2ethanos.NewStoreGW(s.SharedDir(), "1", client.BucketConfig{
Type: client.S3,
Config: s3.Config{
Bucket: bucket,
AccessKey: e2edb.MinioAccessKey,
SecretKey: e2edb.MinioSecretKey,
Endpoint: m.NetworkHTTPEndpoint(),
Insecure: true,
},
})
testutil.Ok(t, err)
testutil.Ok(t, s.StartAndWaitReady(str))

testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(1), "thanos_blocks_meta_synced"))
testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_sync_failures_total"))

q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{str.GRPCNetworkEndpoint()}, nil)
testutil.Ok(t, err)
testutil.Ok(t, s.StartAndWaitReady(q))

ctx, cancel = context.WithTimeout(context.Background(), 3*time.Minute)
defer cancel()
// blockDesc describes a recipe to generate blocks from the given series and external labels.
type blockDesc struct {
series []labels.Labels
extLsets []labels.Labels
numberOfSamplesPerSeries int
}

for i, tcase := range []struct {
name string
blocks []blockDesc
replicaLabels []string
query string

expected []model.Metric
numberOfModifiedBlocks float64
numberOfBlocks uint64
numberOfSamples uint64
numberOfSeries uint64
numberOfChunks uint64
}{
{
name: "overlapping blocks with matching replica labels",
blocks: []blockDesc{
{
series: []labels.Labels{labels.FromStrings("a", "1", "b", "2")},
extLsets: []labels.Labels{
labels.FromStrings("ext1", "value1", "replica", "1"),
labels.FromStrings("ext1", "value1", "replica", "2"),
labels.FromStrings("ext1", "value1", "rule_replica", "1"),
},
numberOfSamplesPerSeries: 10,
},
},
replicaLabels: []string{"replica", "rule_replica"},
query: "{a=\"1\"}",

t.Run("query works", func(t *testing.T) {
queryAndAssert(t, ctx, q.HTTPEndpoint(), "{a=\"1\"}",
promclient.QueryOptions{
Deduplicate: false, // This should be false, so that we can be sure deduplication was offline.
expected: []model.Metric{
{
"a": "1",
"b": "2",
"ext1": "value1",
},
},
numberOfModifiedBlocks: 3,
numberOfBlocks: 1,
numberOfSamples: 10,
numberOfSeries: 1,
numberOfChunks: 1,
},
{
name: "overlapping blocks with non-matching replica labels",
blocks: []blockDesc{
{
series: []labels.Labels{labels.FromStrings("a", "1", "b", "2")},
extLsets: []labels.Labels{
labels.FromStrings("ext1", "value1"),
labels.FromStrings("ext1", "value2"),
},
numberOfSamplesPerSeries: 2,
},
},
[]model.Metric{
replicaLabels: []string{"replica"},
query: "{a=\"1\"}",

expected: []model.Metric{
{
"a": "1",
"b": "2",
"ext1": "value1",
},
{
"a": "1",
"b": "2",
"ext1": "value2",
},
},
)
numberOfModifiedBlocks: 0,
numberOfBlocks: 2,
numberOfSamples: 4,
numberOfSeries: 2,
numberOfChunks: 2,
},
} {
i := i
tcase := tcase
t.Run(tcase.name, func(t *testing.T) {
t.Parallel()

s, err := e2e.NewScenario("e2e_test_compact_" + strconv.Itoa(i))
testutil.Ok(t, err)
defer s.Close()

dir := filepath.Join(s.SharedDir(), "tmp")
testutil.Ok(t, os.MkdirAll(filepath.Join(s.SharedDir(), dir), os.ModePerm))

bucket := "thanos_" + strconv.Itoa(i)

m := e2edb.NewMinio(8080+i, bucket)
testutil.Ok(t, s.StartAndWaitReady(m))

bkt, err := s3.NewBucketWithConfig(l, s3.Config{
Bucket: bucket,
AccessKey: e2edb.MinioAccessKey,
SecretKey: e2edb.MinioSecretKey,
Endpoint: m.HTTPEndpoint(), // We need separate client config, when connecting to minio from outside.
Insecure: true,
}, "test-feed")
testutil.Ok(t, err)

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()

now := time.Now()

var rawBlockIds []ulid.ULID
for _, b := range tcase.blocks {
for _, extLset := range b.extLsets {
id, err := e2eutil.CreateBlockWithBlockDelay(ctx, dir, b.series, b.numberOfSamplesPerSeries, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), 30*time.Minute, extLset, 0)
testutil.Ok(t, err)
testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id.String()), id.String()))
rawBlockIds = append(rawBlockIds, id)
}
}

dedupFlags := make([]string, 0, len(tcase.replicaLabels))
for _, l := range tcase.replicaLabels {
dedupFlags = append(dedupFlags, "--deduplication.replica-label="+l)
}

cmpt, err := e2ethanos.NewCompactor(s.SharedDir(), strconv.Itoa(i), client.BucketConfig{
Type: client.S3,
Config: s3.Config{
Bucket: bucket,
AccessKey: e2edb.MinioAccessKey,
SecretKey: e2edb.MinioSecretKey,
Endpoint: m.NetworkHTTPEndpoint(),
Insecure: true,
},
},
dedupFlags...,
)

testutil.Ok(t, err)
testutil.Ok(t, s.StartAndWaitReady(cmpt))

testutil.Ok(t, cmpt.WaitSumMetrics(e2e.Equals(float64(len(rawBlockIds))), "thanos_blocks_meta_synced"))
testutil.Ok(t, cmpt.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_sync_failures_total"))
testutil.Ok(t, cmpt.WaitSumMetrics(e2e.Equals(tcase.numberOfModifiedBlocks), "thanos_blocks_meta_modified"))

str, err := e2ethanos.NewStoreGW(s.SharedDir(), "compact_"+strconv.Itoa(i), client.BucketConfig{
Type: client.S3,
Config: s3.Config{
Bucket: bucket,
AccessKey: e2edb.MinioAccessKey,
SecretKey: e2edb.MinioSecretKey,
Endpoint: m.NetworkHTTPEndpoint(),
Insecure: true,
},
})
testutil.Ok(t, err)
testutil.Ok(t, s.StartAndWaitReady(str))

testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(float64(tcase.numberOfBlocks)), "thanos_blocks_meta_synced"))
testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_sync_failures_total"))

// Make sure only necessary amount of blocks fetched from store, to observe affects of offline deduplication.
testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(3), "thanos_bucket_store_series_data_touched"))
testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(3), "thanos_bucket_store_series_data_fetched"))
testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(1), "thanos_bucket_store_series_blocks_queried"))
})
q, err := e2ethanos.NewQuerier(s.SharedDir(), "compact_"+strconv.Itoa(i), []string{str.GRPCNetworkEndpoint()}, nil)
testutil.Ok(t, err)
testutil.Ok(t, s.StartAndWaitReady(q))

ctx, cancel = context.WithTimeout(context.Background(), 3*time.Minute)
defer cancel()

queryAndAssert(t, ctx, q.HTTPEndpoint(),
tcase.query,
promclient.QueryOptions{
Deduplicate: false, // This should be false, so that we can be sure deduplication was offline.
},
tcase.expected,
)

var numberOfBlocks uint64
var numberOfSamples uint64
var numberOfSeries uint64
var numberOfChunks uint64
var sources []ulid.ULID

testutil.Ok(t, bkt.Iter(ctx, "", func(n string) error {
id, ok := block.IsBlockDir(n)
if !ok {
return nil
}

numberOfBlocks += 1

meta, err := block.DownloadMeta(ctx, l, bkt, id)
if err != nil {
return err
}

numberOfSamples += meta.Stats.NumSamples
numberOfSeries += meta.Stats.NumSeries
numberOfChunks += meta.Stats.NumChunks
sources = append(sources, meta.Compaction.Sources...)

return nil
}))

// Make sure only necessary amount of blocks fetched from store, to observe affects of offline deduplication.
testutil.Equals(t, tcase.numberOfBlocks, numberOfBlocks)
if len(rawBlockIds) < int(tcase.numberOfBlocks) { // check sources only if compacted.
testutil.Equals(t, rawBlockIds, sources)
}

testutil.Equals(t, tcase.numberOfSamples, numberOfSamples)
testutil.Equals(t, tcase.numberOfSeries, numberOfSeries)
testutil.Equals(t, tcase.numberOfChunks, numberOfChunks)
})
}
}

0 comments on commit 5ff0ea1

Please sign in to comment.