Skip to content

Commit

Permalink
Enabled index-header under experimental flag.
Browse files Browse the repository at this point in the history
Enabled it also on all our tests.

Depends on: #1952

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Jan 10, 2020
1 parent 2163a97 commit cca4da9
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 39 deletions.
9 changes: 9 additions & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {

selectorRelabelConf := regSelectorRelabelFlags(cmd)

enableIndexHeader := cmd.Flag("experimental.enable-index-header", "If true, Store Gateway will recreate index-header instead of index-cache.json for each block. This will replace index-cache.json permanently once it will be out of experimental stage.").
Hidden().Default("false").Bool()

m[component.Store.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, debugLogging bool) error {
if minTime.PrometheusTimestamp() > maxTime.PrometheusTimestamp() {
return errors.Errorf("invalid argument: --min-time '%s' can't be greater than --max-time '%s'",
Expand Down Expand Up @@ -109,6 +112,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
},
selectorRelabelConf,
*advertiseCompatibilityLabel,
*enableIndexHeader,
)
}
}
Expand Down Expand Up @@ -140,6 +144,7 @@ func runStore(
filterConf *store.FilterConfig,
selectorRelabelConf *extflag.PathOrContent,
advertiseCompatibilityLabel bool,
enableIndexHeader bool,
) error {
// Initiate HTTP listener providing metrics endpoint and readiness/liveness probes.
statusProber := prober.New(component, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
Expand Down Expand Up @@ -214,6 +219,9 @@ func runStore(
return errors.Wrap(err, "meta fetcher")
}

if enableIndexHeader {
level.Info(logger).Log("msg", "index-header instead of index-cache.json enabled")
}
bs, err := store.NewBucketStore(
logger,
reg,
Expand All @@ -228,6 +236,7 @@ func runStore(
blockSyncConcurrency,
filterConf,
advertiseCompatibilityLabel,
enableIndexHeader,
)
if err != nil {
return errors.Wrap(err, "create object storage store")
Expand Down
49 changes: 41 additions & 8 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ type BucketStore struct {
filterConfig *FilterConfig
advLabelSets []storepb.LabelSet
enableCompatibilityLabel bool
enableIndexHeader bool
}

// NewBucketStore creates a new bucket backed store that implements the store API against
Expand All @@ -244,6 +245,7 @@ func NewBucketStore(
blockSyncConcurrency int,
filterConfig *FilterConfig,
enableCompatibilityLabel bool,
enableIndexHeader bool,
) (*BucketStore, error) {
if logger == nil {
logger = log.NewNopLogger()
Expand Down Expand Up @@ -280,6 +282,7 @@ func NewBucketStore(
samplesLimiter: NewLimiter(maxSampleCount, metrics.queriesDropped),
partitioner: gapBasedPartitioner{maxGapSize: maxGapSize},
enableCompatibilityLabel: enableCompatibilityLabel,
enableIndexHeader: enableIndexHeader,
}
s.metrics = metrics

Expand Down Expand Up @@ -439,9 +442,17 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er
lset := labels.FromMap(meta.Thanos.Labels)
h := lset.Hash()

jr, err := indexheader.NewJSONReader(ctx, s.logger, s.bkt, s.dir, meta.ULID)
if err != nil {
return errors.Wrap(err, "create index header reader")
var indexHeaderReader indexheader.Reader
if s.enableIndexHeader {
indexHeaderReader, err = indexheader.NewBinaryReader(ctx, s.logger, s.bkt, s.dir, meta.ULID)
if err != nil {
return errors.Wrap(err, "create index header reader")
}
} else {
indexHeaderReader, err = indexheader.NewJSONReader(ctx, s.logger, s.bkt, s.dir, meta.ULID)
if err != nil {
return errors.Wrap(err, "create index cache reader")
}
}

b, err := newBucketBlock(
Expand All @@ -452,7 +463,7 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er
dir,
s.indexCache,
s.chunkPool,
jr,
indexHeaderReader,
s.partitioner,
)
if err != nil {
Expand Down Expand Up @@ -1448,11 +1459,13 @@ func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error {
r.stats.postingsTouched++
r.stats.postingsTouchedSizeSum += len(b)

_, l, err := r.dec.Postings(b)
n, l, err := r.dec.Postings(b)
if err != nil {
return errors.Wrap(err, "decode postings")
}
g.Fill(j, l)

// We have to limit postings due to overfetch that can happen for last posting.
g.Fill(j, newLimitedPostings(l, n))
continue
}

Expand Down Expand Up @@ -1512,13 +1525,14 @@ func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error {
for _, p := range ptrs[i:j] {
c := b[p.ptr.Start-start : p.ptr.End-start]

_, fetchedPostings, err := r.dec.Postings(c)
n, fetchedPostings, err := r.dec.Postings(c)
if err != nil {
return errors.Wrap(err, "read postings list")
}

// Return postings and fill LRU cache.
groups[p.groupID].Fill(p.keyID, fetchedPostings)
// We have to limit postings due to overfetch that can happen for last posting.
groups[p.groupID].Fill(p.keyID, newLimitedPostings(fetchedPostings, n))
r.cache.StorePostings(r.ctx, r.block.meta.ULID, groups[p.groupID].keys[p.keyID], c)

// If we just fetched it we still have to update the stats for touched postings.
Expand All @@ -1532,6 +1546,25 @@ func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error {
return g.Wait()
}

type limitedPostings struct {
index.Postings

limit int
count int
}

func newLimitedPostings(postings index.Postings, limit int) *limitedPostings {
return &limitedPostings{Postings: postings, limit: limit}
}

func (it *limitedPostings) Next() bool {
if it.count >= it.limit {
return false
}
it.count++
return it.Postings.Next()
}

func (r *bucketIndexReader) PreloadSeries(ids []uint64) error {
const maxSeriesSize = 64 * 1024

Expand Down
73 changes: 43 additions & 30 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
20,
filterConf,
true,
true,
)
testutil.Ok(t, err)
s.store = store
Expand All @@ -178,7 +179,8 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
return s
}

func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) {
// TODO(bwplotka): Benchmark Series.
func testBucketStore_e2e(t *testing.T, ctx context.Context, s *storeSuite) {
mint, maxt := s.store.TimeRange()
testutil.Equals(t, s.minTime, mint)
testutil.Equals(t, s.maxTime, maxt)
Expand Down Expand Up @@ -392,16 +394,18 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) {
},
},
} {
t.Log("Run ", i)

srv := newStoreSeriesServer(ctx)

testutil.Ok(t, s.store.Series(tcase.req, srv))
testutil.Equals(t, len(tcase.expected), len(srv.SeriesSet))

for i, s := range srv.SeriesSet {
testutil.Equals(t, tcase.expected[i], s.Labels)
testutil.Equals(t, tcase.expectedChunkLen, len(s.Chunks))
if ok := t.Run(fmt.Sprint(i), func(t *testing.T) {
srv := newStoreSeriesServer(ctx)

testutil.Ok(t, s.store.Series(tcase.req, srv))
testutil.Equals(t, len(tcase.expected), len(srv.SeriesSet))

for i, s := range srv.SeriesSet {
testutil.Equals(t, tcase.expected[i], s.Labels)
testutil.Equals(t, tcase.expectedChunkLen, len(s.Chunks))
}
}); !ok {
return
}
}
}
Expand All @@ -417,27 +421,36 @@ func TestBucketStore_e2e(t *testing.T) {

s := prepareStoreWithTestBlocks(t, dir, bkt, false, 0, emptyRelabelConfig, allowAllFilterConf)

t.Log("Test with no index cache")
s.cache.SwapWith(noopCache{})
testBucketStore_e2e(t, ctx, s)
if ok := t.Run("no index cache", func(t *testing.T) {
s.cache.SwapWith(noopCache{})
testBucketStore_e2e(t, ctx, s)
}); !ok {
return
}

t.Log("Test with large, sufficient index cache")
indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(s.logger, nil, storecache.InMemoryIndexCacheConfig{
MaxItemSize: 1e5,
MaxSize: 2e5,
})
testutil.Ok(t, err)
s.cache.SwapWith(indexCache)
testBucketStore_e2e(t, ctx, s)
if ok := t.Run("with large, sufficient index cache", func(t *testing.T) {
indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(s.logger, nil, storecache.InMemoryIndexCacheConfig{
MaxItemSize: 1e5,
MaxSize: 2e5,
})
testutil.Ok(t, err)
s.cache.SwapWith(indexCache)
testBucketStore_e2e(t, ctx, s)
}); !ok {
return
}

t.Log("Test with small index cache")
indexCache2, err := storecache.NewInMemoryIndexCacheWithConfig(s.logger, nil, storecache.InMemoryIndexCacheConfig{
MaxItemSize: 50,
MaxSize: 100,
})
testutil.Ok(t, err)
s.cache.SwapWith(indexCache2)
testBucketStore_e2e(t, ctx, s)
if ok := t.Run("with small index cache", func(t *testing.T) {
indexCache2, err := storecache.NewInMemoryIndexCacheWithConfig(s.logger, nil, storecache.InMemoryIndexCacheConfig{
MaxItemSize: 50,
MaxSize: 100,
})
testutil.Ok(t, err)
s.cache.SwapWith(indexCache2)
testBucketStore_e2e(t, ctx, s)
}); !ok {
return
}
})
}

Expand Down
5 changes: 4 additions & 1 deletion pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,7 @@ func TestBucketStore_Info(t *testing.T) {
20,
allowAllFilterConf,
true,
true,
)
testutil.Ok(t, err)

Expand Down Expand Up @@ -687,7 +688,9 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul
false,
20,
allowAllFilterConf,
true)
true,
true,
)
testutil.Ok(t, err)

testutil.Ok(t, bucketStore.InitialSync(context.Background()))
Expand Down
1 change: 1 addition & 0 deletions test/e2e/spinup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ func storeGateway(http, grpc address, bucketConfig []byte, relabelConfig []byte)
// Accelerated sync time for quicker test (3m by default).
"--sync-block-duration", "5s",
"--selector.relabel-config", string(relabelConfig),
"--experimental.enable-index-header",
)), nil
},
}
Expand Down

0 comments on commit cca4da9

Please sign in to comment.