Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disable index dedupe when rf > 1 and current or upcoming index type is boltdb-shipper #2206

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/sources/operations/storage/boltdb-shipper.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,5 +86,14 @@ Frequency for checking updates can be configured with `resync_interval` config.
To avoid keeping downloaded index files forever there is a ttl for them which defaults to 24 hours, which means if index files for a period are not used for 24 hours they would be removed from cache location.
ttl can be configured using `cache_ttl` config.

### Write Deduplication disabled

Loki does write deduplication of chunks and index using Chunks and WriteDedupe cache respectively, configured with [ChunkStoreConfig](../../configuration/README.md#chunk_store_config).
The problem with write deduplication when using `boltdb-shipper` though is ingesters only keep uploading boltdb files periodically to make them available to all the other services which means there would be a brief period where some of the services would not have received updated index yet.
The problem due to that is if an ingester which first wrote the chunks and index goes down and all the other ingesters which were part of replication scheme skipped writing those chunks and index due to deduplication, we would end up missing those logs from query responses since only the ingester which had the index went down.
This problem would be faced even during rollouts which is quite common.

To avoid this, Loki disables deduplication of index when the replication factor is greater than 1 and `boltdb-shipper` is an active or upcoming index type.
While using `boltdb-shipper` please avoid configuring WriteDedupe cache since it is used purely for the index deduplication, so it would not be used anyways.


31 changes: 25 additions & 6 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/cache"
"github.com/cortexproject/cortex/pkg/chunk/storage"
"github.com/cortexproject/cortex/pkg/cortex"
cortex_querier "github.com/cortexproject/cortex/pkg/querier"
Expand Down Expand Up @@ -178,7 +179,7 @@ func (t *Loki) initIngester() (_ services.Service, err error) {
t.cfg.Ingester.LifecyclerConfig.ListenPort = t.cfg.Server.GRPCListenPort

// We want ingester to also query the store when using boltdb-shipper
pc := activePeriodConfig(t.cfg.SchemaConfig)
pc := t.cfg.SchemaConfig.Configs[activePeriodConfig(t.cfg.SchemaConfig)]
if pc.IndexType == local.BoltDBShipperType {
t.cfg.Ingester.QueryStore = true
mlb, err := calculateMaxLookBack(pc, t.cfg.Ingester.QueryStoreMaxLookBackPeriod, t.cfg.Ingester.MaxChunkAge)
Expand Down Expand Up @@ -240,7 +241,7 @@ func (t *Loki) initTableManager() (services.Service, error) {
}

func (t *Loki) initStore() (_ services.Service, err error) {
if activePeriodConfig(t.cfg.SchemaConfig).IndexType == local.BoltDBShipperType {
if t.cfg.SchemaConfig.Configs[activePeriodConfig(t.cfg.SchemaConfig)].IndexType == local.BoltDBShipperType {
t.cfg.StorageConfig.BoltDBShipperConfig.IngesterName = t.cfg.Ingester.LifecyclerConfig.ID
switch t.cfg.Target {
case Ingester:
Expand All @@ -254,6 +255,13 @@ func (t *Loki) initStore() (_ services.Service, err error) {
}
}

// If RF > 1 and current or upcoming index type is boltdb-shipper then disable both chunks dedupe and write dedupe cache.
// This is to ensure that index entries are replicated to all the boltdb files in ingesters flushing replicated data.
if t.cfg.Ingester.LifecyclerConfig.RingConfig.ReplicationFactor > 1 && usingBoltdbShipper(t.cfg.SchemaConfig) {
t.cfg.ChunkStoreConfig.DisableIndexDeduplication = true
t.cfg.ChunkStoreConfig.WriteDedupeCacheConfig = cache.Config{}
}

t.store, err = loki_storage.NewStore(t.cfg.StorageConfig, t.cfg.ChunkStoreConfig, t.cfg.SchemaConfig, t.overrides, prometheus.DefaultRegisterer)
if err != nil {
return
Expand Down Expand Up @@ -329,17 +337,28 @@ func (t *Loki) initMemberlistKV() (services.Service, error) {
return t.memberlistKV, nil
}

// activePeriodConfig type returns index type which would be applicable to logs that would be pushed starting now
// Note: Another periodic config can be applicable in future which can change index type
func activePeriodConfig(cfg chunk.SchemaConfig) chunk.PeriodConfig {
// activePeriodConfig type returns index of active PeriodicConfig which would be applicable to logs that would be pushed starting now.
// Note: Another PeriodicConfig might be applicable for future logs which can change index type.
func activePeriodConfig(cfg chunk.SchemaConfig) int {
now := model.Now()
i := sort.Search(len(cfg.Configs), func(i int) bool {
return cfg.Configs[i].From.Time > now
})
if i > 0 {
i--
}
return cfg.Configs[i]
return i
}

// usingBoltdbShipper check whether current or the next index type is boltdb-shipper, returns true if yes.
func usingBoltdbShipper(cfg chunk.SchemaConfig) bool {
activePCIndex := activePeriodConfig(cfg)
if cfg.Configs[activePCIndex].IndexType == local.BoltDBShipperType ||
(len(cfg.Configs)-1 > activePCIndex && cfg.Configs[activePCIndex+1].IndexType == local.BoltDBShipperType) {
return true
}

return false
}

func calculateMaxLookBack(pc chunk.PeriodConfig, maxLookBackConfig, maxChunkAge time.Duration) (time.Duration, error) {
Expand Down
27 changes: 24 additions & 3 deletions pkg/loki/modules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,43 @@ func TestActiveIndexType(t *testing.T) {
IndexType: "first",
}}

assert.Equal(t, cfg.Configs[0], activePeriodConfig(cfg))
assert.Equal(t, 0, activePeriodConfig(cfg))

// add a newer PeriodConfig in the past which should be considered
cfg.Configs = append(cfg.Configs, chunk.PeriodConfig{
From: chunk.DayTime{Time: model.Now().Add(-12 * time.Hour)},
IndexType: "second",
})
assert.Equal(t, cfg.Configs[1], activePeriodConfig(cfg))
assert.Equal(t, 1, activePeriodConfig(cfg))

// add a newer PeriodConfig in the future which should not be considered
cfg.Configs = append(cfg.Configs, chunk.PeriodConfig{
From: chunk.DayTime{Time: model.Now().Add(time.Hour)},
IndexType: "third",
})
assert.Equal(t, cfg.Configs[1], activePeriodConfig(cfg))
assert.Equal(t, 1, activePeriodConfig(cfg))
}

func TestUsingBoltdbShipper(t *testing.T) {
var cfg chunk.SchemaConfig

// just one PeriodConfig in the past using boltdb-shipper
cfg.Configs = []chunk.PeriodConfig{{
From: chunk.DayTime{Time: model.Now().Add(-24 * time.Hour)},
IndexType: "boltdb-shipper",
}}
assert.Equal(t, true, usingBoltdbShipper(cfg))

// just one PeriodConfig in the past not using boltdb-shipper
cfg.Configs[0].IndexType = "boltdb"
assert.Equal(t, false, usingBoltdbShipper(cfg))

// add a newer PeriodConfig in the future using boltdb-shipper
cfg.Configs = append(cfg.Configs, chunk.PeriodConfig{
From: chunk.DayTime{Time: model.Now().Add(time.Hour)},
IndexType: "boltdb-shipper",
})
assert.Equal(t, true, usingBoltdbShipper(cfg))
}

func Test_calculateMaxLookBack(t *testing.T) {
Expand Down