Skip to content

Commit

Permalink
Fix redis cache client metrics registration error (#5751)
Browse files Browse the repository at this point in the history
* fix redis cache registration error

Signed-off-by: Ben Ye <benye@amazon.com>

* integration tests for having both index and chunks cache

Signed-off-by: Ben Ye <benye@amazon.com>

* update tests

Signed-off-by: Ben Ye <benye@amazon.com>

* fix lint

Signed-off-by: Ben Ye <benye@amazon.com>

---------

Signed-off-by: Ben Ye <benye@amazon.com>
  • Loading branch information
yeya24 authored Feb 1, 2024
1 parent 8ef6813 commit f2b7591
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 11 deletions.
25 changes: 15 additions & 10 deletions integration/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,27 +31,32 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) {
tenantShardSize int
ingesterStreamingEnabled bool
indexCacheBackend string
chunkCacheBackend string
bucketIndexEnabled bool
}{
"blocks sharding disabled, ingester gRPC streaming disabled, memcached index cache": {
blocksShardingStrategy: "",
ingesterStreamingEnabled: false,
indexCacheBackend: tsdb.IndexCacheBackendMemcached,
chunkCacheBackend: tsdb.CacheBackendMemcached,
},
"blocks sharding disabled, ingester gRPC streaming disabled, multilevel index cache (inmemory, memcached)": {
blocksShardingStrategy: "",
ingesterStreamingEnabled: false,
indexCacheBackend: fmt.Sprintf("%v,%v", tsdb.IndexCacheBackendInMemory, tsdb.IndexCacheBackendMemcached),
chunkCacheBackend: tsdb.CacheBackendMemcached,
},
"blocks sharding disabled, ingester gRPC streaming disabled, redis index cache": {
blocksShardingStrategy: "",
ingesterStreamingEnabled: false,
indexCacheBackend: tsdb.IndexCacheBackendRedis,
chunkCacheBackend: tsdb.CacheBackendRedis,
},
"blocks sharding disabled, ingester gRPC streaming disabled, multilevel index cache (inmemory, redis)": {
blocksShardingStrategy: "",
ingesterStreamingEnabled: false,
indexCacheBackend: fmt.Sprintf("%v,%v", tsdb.IndexCacheBackendInMemory, tsdb.IndexCacheBackendRedis),
chunkCacheBackend: tsdb.CacheBackendRedis,
},
"blocks default sharding, ingester gRPC streaming disabled, inmemory index cache": {
blocksShardingStrategy: "default",
Expand All @@ -67,12 +72,14 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) {
blocksShardingStrategy: "default",
ingesterStreamingEnabled: true,
indexCacheBackend: tsdb.IndexCacheBackendMemcached,
chunkCacheBackend: tsdb.CacheBackendMemcached,
},
"blocks shuffle sharding, ingester gRPC streaming enabled, memcached index cache": {
blocksShardingStrategy: "shuffle-sharding",
tenantShardSize: 1,
ingesterStreamingEnabled: true,
indexCacheBackend: tsdb.IndexCacheBackendMemcached,
chunkCacheBackend: tsdb.CacheBackendMemcached,
},
"blocks default sharding, ingester gRPC streaming enabled, inmemory index cache, bucket index enabled": {
blocksShardingStrategy: "default",
Expand All @@ -91,13 +98,15 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) {
blocksShardingStrategy: "default",
ingesterStreamingEnabled: true,
indexCacheBackend: tsdb.IndexCacheBackendRedis,
chunkCacheBackend: tsdb.CacheBackendRedis,
bucketIndexEnabled: true,
},
"blocks shuffle sharding, ingester gRPC streaming enabled, redis index cache, bucket index enabled": {
blocksShardingStrategy: "shuffle-sharding",
tenantShardSize: 1,
ingesterStreamingEnabled: true,
indexCacheBackend: tsdb.IndexCacheBackendRedis,
chunkCacheBackend: tsdb.CacheBackendRedis,
bucketIndexEnabled: true,
},
}
Expand All @@ -121,6 +130,7 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) {
"-blocks-storage.bucket-store.sync-interval": "1s",
"-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),
"-blocks-storage.bucket-store.index-cache.backend": testCfg.indexCacheBackend,
"-blocks-storage.bucket-store.chunks-cache.backend": testCfg.chunkCacheBackend,
"-store-gateway.sharding-enabled": strconv.FormatBool(testCfg.blocksShardingStrategy != ""),
"-store-gateway.sharding-strategy": testCfg.blocksShardingStrategy,
"-store-gateway.tenant-shard-size": fmt.Sprintf("%d", testCfg.tenantShardSize),
Expand All @@ -144,6 +154,11 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) {
if strings.Contains(testCfg.indexCacheBackend, tsdb.IndexCacheBackendRedis) {
flags["-blocks-storage.bucket-store.index-cache.redis.addresses"] = redis.NetworkEndpoint(e2ecache.RedisPort)
}
if testCfg.chunkCacheBackend == tsdb.CacheBackendMemcached {
flags["-blocks-storage.bucket-store.chunks-cache.memcached.addresses"] = "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort)
} else if testCfg.chunkCacheBackend == tsdb.CacheBackendRedis {
flags["-blocks-storage.bucket-store.chunks-cache.redis.addresses"] = redis.NetworkEndpoint(e2ecache.RedisPort)
}

// Start Cortex components.
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
Expand Down Expand Up @@ -257,17 +272,12 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) {
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(float64((5+5+2)*numberOfCacheBackends)), "thanos_store_index_cache_requests_total"))
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(0), "thanos_store_index_cache_hits_total")) // no cache hit cause the cache was empty

if testCfg.indexCacheBackend == tsdb.IndexCacheBackendMemcached {
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(21), "thanos_memcached_operations_total")) // 14 gets + 7 sets
}

// Query back again the 1st series from storage. This time it should use the index cache.
result, err = c.Query("series_1", series1Timestamp)
require.NoError(t, err)
require.Equal(t, model.ValVector, result.Type())
assert.Equal(t, expectedVector1, result.(model.Vector))

var l0CacheHits float64
if numberOfCacheBackends > 1 {
// 6 requests for Expanded Postings, 5 for Postings and 3 for Series.
require.NoError(t, storeGateways.WaitSumMetricsWithOptions(e2e.Equals(float64(6+5+3)), []string{"thanos_store_index_cache_requests_total"}, e2e.WithLabelMatchers(
Expand All @@ -287,17 +297,12 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) {
require.NoError(t, err)
// Make sure l1 cache requests + l0 cache hits is 14.
require.Equal(t, float64(14), l1IndexCacheRequests[0]+l0IndexCacheHits[0])
l0CacheHits = l0IndexCacheHits[0]
} else {
// 6 requests for Expanded Postings, 5 for Postings and 3 for Series.
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(float64(6+5+3)), "thanos_store_index_cache_requests_total"))
}
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(2), "thanos_store_index_cache_hits_total")) // this time has used the index cache

if testCfg.indexCacheBackend == tsdb.IndexCacheBackendMemcached {
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(23-l0CacheHits), "thanos_memcached_operations_total")) // as before + 2 gets - cache hits
}

// Query metadata.
testMetadataQueriesWithBlocksStorage(t, c, series1[0], series2[0], series3[0], blockRangePeriod)

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/tsdb/index_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func NewIndexCache(cfg IndexCacheConfig, logger log.Logger, registerer prometheu
caches = append(caches, cache)
enabledItems = append(enabledItems, cfg.Memcached.EnabledItems)
case IndexCacheBackendRedis:
c, err := newRedisIndexCacheClient(cfg.Redis.ClientConfig, logger, iReg)
c, err := newRedisIndexCacheClient(cfg.Redis.ClientConfig, logger, registerer)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit f2b7591

Please sign in to comment.