From 632b7cf6b9e9360e3aa50720f0f165793c07a2dd Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Fri, 10 Jan 2020 20:26:24 +0000 Subject: [PATCH] Enabled index-header under experimental flag. (#1986) Enabled it also on all our tests. Depends on: https://github.com/thanos-io/thanos/pull/1952 Signed-off-by: Bartlomiej Plotka --- cmd/thanos/store.go | 9 ++++ pkg/store/bucket.go | 94 +++++++++++++++++++++++++++++++----- pkg/store/bucket_e2e_test.go | 73 ++++++++++++++++------------ pkg/store/bucket_test.go | 5 +- test/e2e/spinup_test.go | 1 + 5 files changed, 140 insertions(+), 42 deletions(-) diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 6937b9f4dfa..5e19037be18 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -75,6 +75,9 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) { selectorRelabelConf := regSelectorRelabelFlags(cmd) + enableIndexHeader := cmd.Flag("experimental.enable-index-header", "If true, Store Gateway will recreate index-header instead of index-cache.json for each block. This will replace index-cache.json permanently once it will be out of experimental stage."). + Hidden().Default("false").Bool() + m[component.Store.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, debugLogging bool) error { if minTime.PrometheusTimestamp() > maxTime.PrometheusTimestamp() { return errors.Errorf("invalid argument: --min-time '%s' can't be greater than --max-time '%s'", @@ -109,6 +112,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) { }, selectorRelabelConf, *advertiseCompatibilityLabel, + *enableIndexHeader, ) } } @@ -140,6 +144,7 @@ func runStore( filterConf *store.FilterConfig, selectorRelabelConf *extflag.PathOrContent, advertiseCompatibilityLabel bool, + enableIndexHeader bool, ) error { // Initiate HTTP listener providing metrics endpoint and readiness/liveness probes. statusProber := prober.New(component, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg)) @@ -214,6 +219,9 @@ func runStore( return errors.Wrap(err, "meta fetcher") } + if enableIndexHeader { + level.Info(logger).Log("msg", "index-header instead of index-cache.json enabled") + } bs, err := store.NewBucketStore( logger, reg, @@ -228,6 +236,7 @@ func runStore( blockSyncConcurrency, filterConf, advertiseCompatibilityLabel, + enableIndexHeader, ) if err != nil { return errors.Wrap(err, "create object storage store") diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 1d6575195ee..10b5bb1c449 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -24,6 +24,7 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/prometheus/prometheus/tsdb/encoding" "github.com/prometheus/prometheus/tsdb/fileutil" "github.com/prometheus/prometheus/tsdb/index" "github.com/thanos-io/thanos/pkg/block" @@ -226,6 +227,7 @@ type BucketStore struct { filterConfig *FilterConfig advLabelSets []storepb.LabelSet enableCompatibilityLabel bool + enableIndexHeader bool } // NewBucketStore creates a new bucket backed store that implements the store API against @@ -244,6 +246,7 @@ func NewBucketStore( blockSyncConcurrency int, filterConfig *FilterConfig, enableCompatibilityLabel bool, + enableIndexHeader bool, ) (*BucketStore, error) { if logger == nil { logger = log.NewNopLogger() @@ -280,6 +283,7 @@ func NewBucketStore( samplesLimiter: NewLimiter(maxSampleCount, metrics.queriesDropped), partitioner: gapBasedPartitioner{maxGapSize: maxGapSize}, enableCompatibilityLabel: enableCompatibilityLabel, + enableIndexHeader: enableIndexHeader, } s.metrics = metrics @@ -439,9 +443,17 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er lset := labels.FromMap(meta.Thanos.Labels) h := lset.Hash() - jr, err := indexheader.NewJSONReader(ctx, s.logger, s.bkt, s.dir, meta.ULID) - if err != nil { - return errors.Wrap(err, "create index header reader") + var indexHeaderReader indexheader.Reader + if s.enableIndexHeader { + indexHeaderReader, err = indexheader.NewBinaryReader(ctx, s.logger, s.bkt, s.dir, meta.ULID) + if err != nil { + return errors.Wrap(err, "create index header reader") + } + } else { + indexHeaderReader, err = indexheader.NewJSONReader(ctx, s.logger, s.bkt, s.dir, meta.ULID) + if err != nil { + return errors.Wrap(err, "create index cache reader") + } } b, err := newBucketBlock( @@ -452,7 +464,7 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er dir, s.indexCache, s.chunkPool, - jr, + indexHeaderReader, s.partitioner, ) if err != nil { @@ -1455,6 +1467,7 @@ func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error { if err != nil { return errors.Wrap(err, "decode postings") } + g.Fill(j, l) continue } @@ -1513,20 +1526,20 @@ func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error { r.stats.postingsFetchedSizeSum += int(length) for _, p := range ptrs[i:j] { - c := b[p.ptr.Start-start : p.ptr.End-start] - - _, fetchedPostings, err := r.dec.Postings(c) + // index-header can estimate endings, which means we need to resize the endings. + pBytes, err := resizePostings(b[p.ptr.Start-start : p.ptr.End-start]) if err != nil { - return errors.Wrap(err, "read postings list") + return err } // Return postings and fill LRU cache. - groups[p.groupID].Fill(p.keyID, fetchedPostings) - r.cache.StorePostings(r.ctx, r.block.meta.ULID, groups[p.groupID].keys[p.keyID], c) + // Truncate first 4 bytes which are length of posting. + groups[p.groupID].Fill(p.keyID, newBigEndianPostings(pBytes[4:])) + r.cache.StorePostings(r.ctx, r.block.meta.ULID, groups[p.groupID].keys[p.keyID], pBytes) // If we just fetched it we still have to update the stats for touched postings. r.stats.postingsTouched++ - r.stats.postingsTouchedSizeSum += len(c) + r.stats.postingsTouchedSizeSum += len(pBytes) } return nil }) @@ -1535,6 +1548,65 @@ func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error { return g.Wait() } +func resizePostings(b []byte) ([]byte, error) { + d := encoding.Decbuf{B: b} + n := d.Be32int() + if d.Err() != nil { + return nil, errors.Wrap(d.Err(), "read postings list") + } + // 4 for posting length, then n * 4, foreach each big endian posting. + return b[:4+n*4], nil +} + +// bigEndianPostings implements the Postings interface over a byte stream of +// big endian numbers. +type bigEndianPostings struct { + list []byte + cur uint32 +} + +// TODO(bwplotka): Expose those inside Prometheus. +func newBigEndianPostings(list []byte) *bigEndianPostings { + return &bigEndianPostings{list: list} +} + +func (it *bigEndianPostings) At() uint64 { + return uint64(it.cur) +} + +func (it *bigEndianPostings) Next() bool { + if len(it.list) >= 4 { + it.cur = binary.BigEndian.Uint32(it.list) + it.list = it.list[4:] + return true + } + return false +} + +func (it *bigEndianPostings) Seek(x uint64) bool { + if uint64(it.cur) >= x { + return true + } + + num := len(it.list) / 4 + // Do binary search between current position and end. + i := sort.Search(num, func(i int) bool { + return binary.BigEndian.Uint32(it.list[i*4:]) >= uint32(x) + }) + if i < num { + j := i * 4 + it.cur = binary.BigEndian.Uint32(it.list[j:]) + it.list = it.list[j+4:] + return true + } + it.list = nil + return false +} + +func (it *bigEndianPostings) Err() error { + return nil +} + func (r *bucketIndexReader) PreloadSeries(ids []uint64) error { const maxSeriesSize = 64 * 1024 diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index ace07aa7b86..482d916cdc9 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -163,6 +163,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m 20, filterConf, true, + true, ) testutil.Ok(t, err) s.store = store @@ -178,7 +179,8 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m return s } -func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) { +// TODO(bwplotka): Benchmark Series. +func testBucketStore_e2e(t *testing.T, ctx context.Context, s *storeSuite) { mint, maxt := s.store.TimeRange() testutil.Equals(t, s.minTime, mint) testutil.Equals(t, s.maxTime, maxt) @@ -392,16 +394,18 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) { }, }, } { - t.Log("Run ", i) - - srv := newStoreSeriesServer(ctx) - - testutil.Ok(t, s.store.Series(tcase.req, srv)) - testutil.Equals(t, len(tcase.expected), len(srv.SeriesSet)) - - for i, s := range srv.SeriesSet { - testutil.Equals(t, tcase.expected[i], s.Labels) - testutil.Equals(t, tcase.expectedChunkLen, len(s.Chunks)) + if ok := t.Run(fmt.Sprint(i), func(t *testing.T) { + srv := newStoreSeriesServer(ctx) + + testutil.Ok(t, s.store.Series(tcase.req, srv)) + testutil.Equals(t, len(tcase.expected), len(srv.SeriesSet)) + + for i, s := range srv.SeriesSet { + testutil.Equals(t, tcase.expected[i], s.Labels) + testutil.Equals(t, tcase.expectedChunkLen, len(s.Chunks)) + } + }); !ok { + return } } } @@ -417,27 +421,36 @@ func TestBucketStore_e2e(t *testing.T) { s := prepareStoreWithTestBlocks(t, dir, bkt, false, 0, emptyRelabelConfig, allowAllFilterConf) - t.Log("Test with no index cache") - s.cache.SwapWith(noopCache{}) - testBucketStore_e2e(t, ctx, s) + if ok := t.Run("no index cache", func(t *testing.T) { + s.cache.SwapWith(noopCache{}) + testBucketStore_e2e(t, ctx, s) + }); !ok { + return + } - t.Log("Test with large, sufficient index cache") - indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(s.logger, nil, storecache.InMemoryIndexCacheConfig{ - MaxItemSize: 1e5, - MaxSize: 2e5, - }) - testutil.Ok(t, err) - s.cache.SwapWith(indexCache) - testBucketStore_e2e(t, ctx, s) + if ok := t.Run("with large, sufficient index cache", func(t *testing.T) { + indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(s.logger, nil, storecache.InMemoryIndexCacheConfig{ + MaxItemSize: 1e5, + MaxSize: 2e5, + }) + testutil.Ok(t, err) + s.cache.SwapWith(indexCache) + testBucketStore_e2e(t, ctx, s) + }); !ok { + return + } - t.Log("Test with small index cache") - indexCache2, err := storecache.NewInMemoryIndexCacheWithConfig(s.logger, nil, storecache.InMemoryIndexCacheConfig{ - MaxItemSize: 50, - MaxSize: 100, - }) - testutil.Ok(t, err) - s.cache.SwapWith(indexCache2) - testBucketStore_e2e(t, ctx, s) + if ok := t.Run("with small index cache", func(t *testing.T) { + indexCache2, err := storecache.NewInMemoryIndexCacheWithConfig(s.logger, nil, storecache.InMemoryIndexCacheConfig{ + MaxItemSize: 50, + MaxSize: 100, + }) + testutil.Ok(t, err) + s.cache.SwapWith(indexCache2) + testBucketStore_e2e(t, ctx, s) + }); !ok { + return + } }) } diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 913714f6555..c5e715afe32 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -447,6 +447,7 @@ func TestBucketStore_Info(t *testing.T) { 20, allowAllFilterConf, true, + true, ) testutil.Ok(t, err) @@ -687,7 +688,9 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul false, 20, allowAllFilterConf, - true) + true, + true, + ) testutil.Ok(t, err) testutil.Ok(t, bucketStore.InitialSync(context.Background())) diff --git a/test/e2e/spinup_test.go b/test/e2e/spinup_test.go index d51a51a60a6..06b3bd3dbf8 100644 --- a/test/e2e/spinup_test.go +++ b/test/e2e/spinup_test.go @@ -269,6 +269,7 @@ func storeGateway(http, grpc address, bucketConfig []byte, relabelConfig []byte) // Accelerated sync time for quicker test (3m by default). "--sync-block-duration", "5s", "--selector.relabel-config", string(relabelConfig), + "--experimental.enable-index-header", )), nil }, }