Skip to content

Commit

Permalink
Configurable block listing strategy (#5828)
Browse files Browse the repository at this point in the history
* make block listing strategy configurable. Change default strategy from Recursive to Concurrent

Signed-off-by: Ben Ye <benye@amazon.com>

* update docs

Signed-off-by: Ben Ye <benye@amazon.com>

---------

Signed-off-by: Ben Ye <benye@amazon.com>
  • Loading branch information
yeya24 authored Apr 9, 2024
1 parent b5e84e4 commit d6f922d
Show file tree
Hide file tree
Showing 13 changed files with 127 additions and 36 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
* [CHANGE] Ruler: Remove `cortex_ruler_write_requests_total`, `cortex_ruler_write_requests_failed_total`, `cortex_ruler_queries_total`, `cortex_ruler_queries_failed_total`, and `cortex_ruler_query_seconds_total` metrics for the tenant when the ruler deletes the manager for the tenant. #5772
* [CHANGE] Main: Mark `mem-ballast-size-bytes` flag as deprecated. #5816
* [CHANGE] Querier: Mark `-querier.ingester-streaming` flag as deprecated. Now query ingester streaming is always enabled. #5817
* [CHANGE] AlertManager API: Removal of all api/v1/ endpoints following [2970](https://github.com/prometheus/alertmanager/pull/2970). [5841]
* [CHANGE] Compactor/Bucket Store: Added `-blocks-storage.bucket-store.block-discovery-strategy` to configure different block listing strategy. Reverted the current recursive block listing mechanism and use the strategy `Concurrent` as in 1.15. #5828
* [FEATURE] Ingester: Add per-tenant new metric `cortex_ingester_tsdb_data_replay_duration_seconds`. #5477
* [FEATURE] Query Frontend/Scheduler: Add query priority support. #5605
* [FEATURE] Tracing: Add `kuberesolver` to resolve endpoints address with `kubernetes://` prefix as Kubernetes service. #5731
Expand Down
11 changes: 11 additions & 0 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -1314,6 +1314,17 @@ blocks_storage:
# CLI flag: -blocks-storage.bucket-store.bucket-index.max-stale-period
[max_stale_period: <duration> | default = 1h]

# One of concurrent, recursive, bucket_index. When set to concurrent, stores
# will concurrently issue one call per directory to discover active blocks
# in the bucket. The recursive strategy iterates through all objects in the
# bucket, recursively traversing into each directory. This avoids N+1 calls
# at the expense of having slower bucket iterations. bucket_index strategy
# can be used in Compactor only and utilizes the existing bucket index to
# fetch block IDs to sync. This avoids iterating the bucket but can be
# impacted by delays of cleaner creating bucket index.
# CLI flag: -blocks-storage.bucket-store.block-discovery-strategy
[block_discovery_strategy: <string> | default = "concurrent"]

# Max size - in bytes - of a chunks pool, used to reduce memory allocations.
# The pool is shared across all tenants. 0 to disable the limit.
# CLI flag: -blocks-storage.bucket-store.max-chunk-pool-bytes
Expand Down
11 changes: 11 additions & 0 deletions docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -1423,6 +1423,17 @@ blocks_storage:
# CLI flag: -blocks-storage.bucket-store.bucket-index.max-stale-period
[max_stale_period: <duration> | default = 1h]

# One of concurrent, recursive, bucket_index. When set to concurrent, stores
# will concurrently issue one call per directory to discover active blocks
# in the bucket. The recursive strategy iterates through all objects in the
# bucket, recursively traversing into each directory. This avoids N+1 calls
# at the expense of having slower bucket iterations. bucket_index strategy
# can be used in Compactor only and utilizes the existing bucket index to
# fetch block IDs to sync. This avoids iterating the bucket but can be
# impacted by delays of cleaner creating bucket index.
# CLI flag: -blocks-storage.bucket-store.block-discovery-strategy
[block_discovery_strategy: <string> | default = "concurrent"]

# Max size - in bytes - of a chunks pool, used to reduce memory allocations.
# The pool is shared across all tenants. 0 to disable the limit.
# CLI flag: -blocks-storage.bucket-store.max-chunk-pool-bytes
Expand Down
11 changes: 11 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -1850,6 +1850,17 @@ bucket_store:
# CLI flag: -blocks-storage.bucket-store.bucket-index.max-stale-period
[max_stale_period: <duration> | default = 1h]

# One of concurrent, recursive, bucket_index. When set to concurrent, stores
# will concurrently issue one call per directory to discover active blocks in
# the bucket. The recursive strategy iterates through all objects in the
# bucket, recursively traversing into each directory. This avoids N+1 calls at
# the expense of having slower bucket iterations. bucket_index strategy can be
# used in Compactor only and utilizes the existing bucket index to fetch block
# IDs to sync. This avoids iterating the bucket but can be impacted by delays
# of cleaner creating bucket index.
# CLI flag: -blocks-storage.bucket-store.block-discovery-strategy
[block_discovery_strategy: <string> | default = "concurrent"]

# Max size - in bytes - of a chunks pool, used to reduce memory allocations.
# The pool is shared across all tenants. 0 to disable the limit.
# CLI flag: -blocks-storage.bucket-store.max-chunk-pool-bytes
Expand Down
26 changes: 15 additions & 11 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -827,22 +827,26 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error {
// out of order chunks or index file too big.
noCompactMarkerFilter := compact.NewGatherNoCompactionMarkFilter(ulogger, bucket, c.compactorCfg.MetaSyncConcurrency)

var blockIDsFetcher block.Lister
var fetcherULogger log.Logger
if c.storageCfg.BucketStore.BucketIndex.Enabled {
fetcherULogger = log.With(ulogger, "blockIdsFetcher", "BucketIndexBlockIDsFetcher")
blockIDsFetcher = bucketindex.NewBlockIDsFetcher(fetcherULogger, c.bucketClient, userID, c.limits)

} else {
fetcherULogger = log.With(ulogger, "blockIdsFetcher", "BaseBlockIDsFetcher")
blockIDsFetcher = block.NewRecursiveLister(fetcherULogger, bucket)
var blockLister block.Lister
switch cortex_tsdb.BlockDiscoveryStrategy(c.storageCfg.BucketStore.BlockDiscoveryStrategy) {
case cortex_tsdb.ConcurrentDiscovery:
blockLister = block.NewConcurrentLister(ulogger, bucket)
case cortex_tsdb.RecursiveDiscovery:
blockLister = block.NewRecursiveLister(ulogger, bucket)
case cortex_tsdb.BucketIndexDiscovery:
if !c.storageCfg.BucketStore.BucketIndex.Enabled {
return cortex_tsdb.ErrInvalidBucketIndexBlockDiscoveryStrategy
}
blockLister = bucketindex.NewBlockLister(ulogger, c.bucketClient, userID, c.limits)
default:
return cortex_tsdb.ErrBlockDiscoveryStrategy
}

fetcher, err := block.NewMetaFetcher(
fetcherULogger,
ulogger,
c.compactorCfg.MetaSyncConcurrency,
bucket,
blockIDsFetcher,
blockLister,
c.metaSyncDirForUser(userID),
reg,
// List of filters to apply (order matters).
Expand Down
1 change: 1 addition & 0 deletions pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1673,6 +1673,7 @@ func prepareConfig() Config {
func prepare(t *testing.T, compactorCfg Config, bucketClient objstore.InstrumentedBucket, limits *validation.Limits) (*Compactor, *tsdbCompactorMock, *tsdbPlannerMock, *concurrency.SyncBuffer, prometheus.Gatherer) {
storageCfg := cortex_tsdb.BlocksStorageConfig{}
flagext.DefaultValues(&storageCfg)
storageCfg.BucketStore.BlockDiscoveryStrategy = string(cortex_tsdb.RecursiveDiscovery)

// Create a temporary directory for compactor data.
compactorCfg.DataDir = t.TempDir()
Expand Down
19 changes: 17 additions & 2 deletions pkg/querier/blocks_finder_bucket_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ type BucketScanBlocksFinderConfig struct {
ConsistencyDelay time.Duration
IgnoreDeletionMarksDelay time.Duration
IgnoreBlocksWithin time.Duration

BlockDiscoveryStrategy string
}

// BucketScanBlocksFinder is a BlocksFinder implementation periodically scanning the bucket to discover blocks.
Expand Down Expand Up @@ -384,12 +386,25 @@ func (d *BucketScanBlocksFinder) createMetaFetcher(userID string) (block.Metadat
filters = append(filters, storegateway.NewIgnoreNonQueryableBlocksFilter(d.logger, d.cfg.IgnoreBlocksWithin))
}

blockIdsFetcher := block.NewRecursiveLister(userLogger, userBucket)
var (
err error
blockLister block.Lister
)
switch cortex_tsdb.BlockDiscoveryStrategy(d.cfg.BlockDiscoveryStrategy) {
case cortex_tsdb.ConcurrentDiscovery:
blockLister = block.NewConcurrentLister(userLogger, userBucket)
case cortex_tsdb.RecursiveDiscovery:
blockLister = block.NewRecursiveLister(userLogger, userBucket)
case cortex_tsdb.BucketIndexDiscovery:
return nil, nil, nil, cortex_tsdb.ErrInvalidBucketIndexBlockDiscoveryStrategy
default:
return nil, nil, nil, cortex_tsdb.ErrBlockDiscoveryStrategy
}
f, err := block.NewMetaFetcher(
userLogger,
d.cfg.MetasConcurrency,
userBucket,
blockIdsFetcher,
blockLister,
// The fetcher stores cached metas in the "meta-syncer/" sub directory.
filepath.Join(d.cfg.CacheDir, userID),
userReg,
Expand Down
1 change: 1 addition & 0 deletions pkg/querier/blocks_finder_bucket_scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,5 +521,6 @@ func prepareBucketScanBlocksFinderConfig() BucketScanBlocksFinderConfig {
MetasConcurrency: 10,
IgnoreDeletionMarksDelay: time.Hour,
IgnoreBlocksWithin: 10 * time.Hour, // All blocks created in the last 10 hour shouldn't be scanned.
BlockDiscoveryStrategy: string(cortex_tsdb.RecursiveDiscovery),
}
}
1 change: 1 addition & 0 deletions pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegatewa
CacheDir: storageCfg.BucketStore.SyncDir,
IgnoreDeletionMarksDelay: storageCfg.BucketStore.IgnoreDeletionMarksDelay,
IgnoreBlocksWithin: storageCfg.BucketStore.IgnoreBlocksWithin,
BlockDiscoveryStrategy: storageCfg.BucketStore.BlockDiscoveryStrategy,
}, bucketClient, limits, logger, reg)
}

Expand Down
34 changes: 17 additions & 17 deletions pkg/storage/tsdb/bucketindex/block_ids_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,40 +13,40 @@ import (
"github.com/cortexproject/cortex/pkg/storage/bucket"
)

type BlockIDsFetcher struct {
logger log.Logger
bkt objstore.Bucket
userID string
cfgProvider bucket.TenantConfigProvider
baseBlockIDsFetcher block.Lister
type BlockLister struct {
logger log.Logger
bkt objstore.Bucket
userID string
cfgProvider bucket.TenantConfigProvider
baseLister block.Lister
}

func NewBlockIDsFetcher(logger log.Logger, bkt objstore.Bucket, userID string, cfgProvider bucket.TenantConfigProvider) *BlockIDsFetcher {
func NewBlockLister(logger log.Logger, bkt objstore.Bucket, userID string, cfgProvider bucket.TenantConfigProvider) *BlockLister {
userBkt := bucket.NewUserBucketClient(userID, bkt, cfgProvider)
baseBlockIDsFetcher := block.NewRecursiveLister(logger, userBkt)
return &BlockIDsFetcher{
logger: logger,
bkt: bkt,
userID: userID,
cfgProvider: cfgProvider,
baseBlockIDsFetcher: baseBlockIDsFetcher,
baseLister := block.NewConcurrentLister(logger, userBkt)
return &BlockLister{
logger: logger,
bkt: bkt,
userID: userID,
cfgProvider: cfgProvider,
baseLister: baseLister,
}
}

func (f *BlockIDsFetcher) GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error) {
func (f *BlockLister) GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error) {
// Fetch the bucket index.
idx, err := ReadIndex(ctx, f.bkt, f.userID, f.cfgProvider, f.logger)
if errors.Is(err, ErrIndexNotFound) {
// This is a legit case happening when the first blocks of a tenant have recently been uploaded by ingesters
// and their bucket index has not been created yet.
// Fallback to BaseBlockIDsFetcher.
return f.baseBlockIDsFetcher.GetActiveAndPartialBlockIDs(ctx, ch)
return f.baseLister.GetActiveAndPartialBlockIDs(ctx, ch)
}
if errors.Is(err, ErrIndexCorrupted) {
// In case a single tenant bucket index is corrupted, we want to return empty active blocks and parital blocks, so skipping this compaction cycle
level.Error(f.logger).Log("msg", "corrupted bucket index found", "user", f.userID, "err", err)
// Fallback to BaseBlockIDsFetcher.
return f.baseBlockIDsFetcher.GetActiveAndPartialBlockIDs(ctx, ch)
return f.baseLister.GetActiveAndPartialBlockIDs(ctx, ch)
}

if errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/tsdb/bucketindex/block_ids_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestBlockIDsFetcher_Fetch(t *testing.T) {
UpdatedAt: now.Unix(),
}))

blockIdsFetcher := NewBlockIDsFetcher(logger, bkt, userID, nil)
blockIdsFetcher := NewBlockLister(logger, bkt, userID, nil)
ch := make(chan ulid.ULID)
var wg sync.WaitGroup
var blockIds []ulid.ULID
Expand Down Expand Up @@ -94,7 +94,7 @@ func TestBlockIDsFetcherFetcher_Fetch_NoBucketIndex(t *testing.T) {
require.NoError(t, json.NewEncoder(&buf).Encode(mark))
require.NoError(t, bkt.Upload(ctx, path.Join(userID, mark.ID.String(), metadata.DeletionMarkFilename), &buf))
}
blockIdsFetcher := NewBlockIDsFetcher(logger, bkt, userID, nil)
blockIdsFetcher := NewBlockLister(logger, bkt, userID, nil)
ch := make(chan ulid.ULID)
var wg sync.WaitGroup
var blockIds []ulid.ULID
Expand Down
24 changes: 24 additions & 0 deletions pkg/storage/tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/thanos-io/thanos/pkg/store"

"github.com/cortexproject/cortex/pkg/storage/bucket"
"github.com/cortexproject/cortex/pkg/util"
)

const (
Expand Down Expand Up @@ -48,6 +49,9 @@ var (
errInvalidStripeSize = errors.New("invalid TSDB stripe size")
errInvalidOutOfOrderCapMax = errors.New("invalid TSDB OOO chunks capacity (in samples)")
errEmptyBlockranges = errors.New("empty block ranges for TSDB")

ErrInvalidBucketIndexBlockDiscoveryStrategy = errors.New("bucket index block discovery strategy can only be enabled when bucket index is enabled")
ErrBlockDiscoveryStrategy = errors.New("invalid block discovery strategy")
)

// BlocksStorageConfig holds the config information for the blocks storage.
Expand Down Expand Up @@ -252,6 +256,7 @@ type BucketStoreConfig struct {
IgnoreDeletionMarksDelay time.Duration `yaml:"ignore_deletion_mark_delay"`
IgnoreBlocksWithin time.Duration `yaml:"ignore_blocks_within"`
BucketIndex BucketIndexConfig `yaml:"bucket_index"`
BlockDiscoveryStrategy string `yaml:"block_discovery_strategy"`

// Chunk pool.
MaxChunkPoolBytes uint64 `yaml:"max_chunk_pool_bytes"`
Expand Down Expand Up @@ -315,6 +320,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) {
f.Uint64Var(&cfg.EstimatedMaxChunkSizeBytes, "blocks-storage.bucket-store.estimated-max-chunk-size-bytes", store.EstimatedMaxChunkSize, "Estimated max chunk size in bytes. Setting a large value might result in over fetching data while a small value might result in data refetch. Default value is 16KiB.")
f.BoolVar(&cfg.LazyExpandedPostingsEnabled, "blocks-storage.bucket-store.lazy-expanded-postings-enabled", false, "If true, Store Gateway will estimate postings size and try to lazily expand postings if it downloads less data than expanding all postings.")
f.IntVar(&cfg.SeriesBatchSize, "blocks-storage.bucket-store.series-batch-size", store.SeriesBatchSize, "Controls how many series to fetch per batch in Store Gateway. Default value is 10000.")
f.StringVar(&cfg.BlockDiscoveryStrategy, "blocks-storage.bucket-store.block-discovery-strategy", string(ConcurrentDiscovery), "One of "+strings.Join(supportedBlockDiscoveryStrategies, ", ")+". When set to concurrent, stores will concurrently issue one call per directory to discover active blocks in the bucket. The recursive strategy iterates through all objects in the bucket, recursively traversing into each directory. This avoids N+1 calls at the expense of having slower bucket iterations. bucket_index strategy can be used in Compactor only and utilizes the existing bucket index to fetch block IDs to sync. This avoids iterating the bucket but can be impacted by delays of cleaner creating bucket index.")
}

// Validate the config.
Expand All @@ -331,6 +337,9 @@ func (cfg *BucketStoreConfig) Validate() error {
if err != nil {
return errors.Wrap(err, "metadata-cache configuration")
}
if !util.StringsContain(supportedBlockDiscoveryStrategies, cfg.BlockDiscoveryStrategy) {
return ErrInvalidBucketIndexBlockDiscoveryStrategy
}
return nil
}

Expand All @@ -347,3 +356,18 @@ func (cfg *BucketIndexConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix st
f.DurationVar(&cfg.IdleTimeout, prefix+"idle-timeout", time.Hour, "How long a unused bucket index should be cached. Once this timeout expires, the unused bucket index is removed from the in-memory cache. This option is used only by querier.")
f.DurationVar(&cfg.MaxStalePeriod, prefix+"max-stale-period", time.Hour, "The maximum allowed age of a bucket index (last updated) before queries start failing because the bucket index is too old. The bucket index is periodically updated by the compactor, while this check is enforced in the querier (at query time).")
}

// BlockDiscoveryStrategy configures how to list block IDs from object storage.
type BlockDiscoveryStrategy string

const (
ConcurrentDiscovery BlockDiscoveryStrategy = "concurrent"
RecursiveDiscovery BlockDiscoveryStrategy = "recursive"
BucketIndexDiscovery BlockDiscoveryStrategy = "bucket_index"
)

var supportedBlockDiscoveryStrategies = []string{
string(ConcurrentDiscovery),
string(RecursiveDiscovery),
string(BucketIndexDiscovery),
}
18 changes: 15 additions & 3 deletions pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,13 +552,25 @@ func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, erro
// BucketStore metrics are correctly updated.
fetcherBkt := NewShardingBucketReaderAdapter(userID, u.shardingStrategy, userBkt)

var err error
blockIdsFetcher := block.NewRecursiveLister(userLogger, fetcherBkt)
var (
err error
blockLister block.Lister
)
switch tsdb.BlockDiscoveryStrategy(u.cfg.BucketStore.BlockDiscoveryStrategy) {
case tsdb.ConcurrentDiscovery:
blockLister = block.NewConcurrentLister(userLogger, userBkt)
case tsdb.RecursiveDiscovery:
blockLister = block.NewRecursiveLister(userLogger, userBkt)
case tsdb.BucketIndexDiscovery:
return nil, tsdb.ErrInvalidBucketIndexBlockDiscoveryStrategy
default:
return nil, tsdb.ErrBlockDiscoveryStrategy
}
fetcher, err = block.NewMetaFetcher(
userLogger,
u.cfg.BucketStore.MetaSyncConcurrency,
fetcherBkt,
blockIdsFetcher,
blockLister,
u.syncDirForUser(userID), // The fetcher stores cached metas in the "meta-syncer/" sub directory
fetcherReg,
filters,
Expand Down

0 comments on commit d6f922d

Please sign in to comment.