From 33663ac3b6e49bbeab82c8971f9f121fccaecdb5 Mon Sep 17 00:00:00 2001 From: Rohith BCS Date: Wed, 17 May 2023 13:24:14 +0530 Subject: [PATCH 1/8] chore: upgrade to badgerV4 --- go.mod | 3 +- go.sum | 5 +-- processor/manager.go | 2 +- processor/processor.go | 19 +++------ processor/processor_test.go | 16 +++----- .../debugger/cache/internal/badger/badger.go | 14 +++---- .../cache/internal/badger/badger_test.go | 3 +- services/dedup/badger.go | 36 ++++------------- services/dedup/dedup.go | 39 +++++++++---------- services/dedup/dedup_test.go | 18 +++------ 10 files changed, 54 insertions(+), 101 deletions(-) diff --git a/go.mod b/go.mod index cd72994f82..7b4b642b86 100644 --- a/go.mod +++ b/go.mod @@ -40,8 +40,8 @@ require ( github.com/cenkalti/backoff/v4 v4.2.1 github.com/confluentinc/confluent-kafka-go/v2 v2.1.1 github.com/denisenkom/go-mssqldb v0.12.3 - github.com/dgraph-io/badger/v2 v2.2007.4 github.com/dgraph-io/badger/v3 v3.2103.5 + github.com/dgraph-io/badger/v4 v4.1.0 github.com/docker/docker v23.0.4+incompatible github.com/go-chi/chi/v5 v5.0.8 github.com/go-redis/redis v6.15.9+incompatible @@ -162,7 +162,6 @@ require ( github.com/databricks/databricks-sql-go v1.2.0 github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgraph-io/ristretto v0.1.1 // indirect - github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dnephin/pflag v1.0.7 // indirect github.com/docker/cli v20.10.17+incompatible // indirect diff --git a/go.sum b/go.sum index 53a777dbc2..f3b6ff3536 100644 --- a/go.sum +++ b/go.sum @@ -993,11 +993,10 @@ github.com/denisenkom/go-mssqldb v0.12.0/go.mod h1:iiK0YP1ZeepvmBQk/QpLEhhTNJgfz github.com/denisenkom/go-mssqldb v0.12.3 h1:pBSGx9Tq67pBOTLmxNuirNTeB8Vjmf886Kx+8Y+8shw= github.com/denisenkom/go-mssqldb v0.12.3/go.mod h1:k0mtMFOnU+AihqFxPMiF05rtiDrorD1Vrm1KEz5hxDo= github.com/devigned/tab v0.1.1/go.mod h1:XG9mPq0dFghrYvoBF3xdRrJzSTX1b7IQrvaL9mzjeJY= -github.com/dgraph-io/badger/v2 v2.2007.4 h1:TRWBQg8UrlUhaFdco01nO2uXwzKS7zd+HVdwV/GHc4o= -github.com/dgraph-io/badger/v2 v2.2007.4/go.mod h1:vSw/ax2qojzbN6eXHIx6KPKtCSHJN/Uz0X0VPruTIhk= github.com/dgraph-io/badger/v3 v3.2103.5 h1:ylPa6qzbjYRQMU6jokoj4wzcaweHylt//CH0AKt0akg= github.com/dgraph-io/badger/v3 v3.2103.5/go.mod h1:4MPiseMeDQ3FNCYwRbbcBOGJLf5jsE0PPFzRiKjtcdw= -github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= +github.com/dgraph-io/badger/v4 v4.1.0 h1:E38jc0f+RATYrycSUf9LMv/t47XAy+3CApyYSq4APOQ= +github.com/dgraph-io/badger/v4 v4.1.0/go.mod h1:P50u28d39ibBRmIJuQC/NSdBOg46HnHw7al2SW5QRHg= github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWajOK8= github.com/dgraph-io/ristretto v0.1.1/go.mod h1:S1GPSBCYCIhmVNfcth17y2zZtQT6wzkzgwUve0VDWWA= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA= diff --git a/processor/manager.go b/processor/manager.go index dd499c7dc3..81c35850af 100644 --- a/processor/manager.go +++ b/processor/manager.go @@ -50,7 +50,7 @@ func (proc *LifecycleManager) Start() error { proc.Handle.Setup( proc.BackendConfig, proc.gatewayDB, proc.routerDB, proc.batchRouterDB, proc.errDB, proc.esDB, - proc.clearDB, proc.ReportingI, proc.MultitenantStats, proc.transientSources, proc.fileuploader, proc.rsourcesService, proc.destDebugger, proc.transDebugger, + proc.ReportingI, proc.MultitenantStats, proc.transientSources, proc.fileuploader, proc.rsourcesService, proc.destDebugger, proc.transDebugger, ) currentCtx, cancel := context.WithCancel(context.Background()) diff --git a/processor/processor.go b/processor/processor.go index 0918ebaf23..c6fa9cf733 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -14,6 +14,9 @@ import ( "sync" "time" + "golang.org/x/exp/slices" + "golang.org/x/sync/errgroup" + jsoniter "github.com/json-iterator/go" "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/logger" @@ -44,8 +47,6 @@ import ( "github.com/rudderlabs/rudder-server/utils/workerpool" "github.com/samber/lo" "github.com/tidwall/gjson" - "golang.org/x/exp/slices" - "golang.org/x/sync/errgroup" ) const ( @@ -113,8 +114,6 @@ type Handle struct { readLoopSleep time.Duration maxLoopSleep time.Duration storeTimeout time.Duration - loopSleep time.Duration // DEPRECATED: used only on the old mainLoop - fixedLoopSleep time.Duration // DEPRECATED: used only on the old mainLoop maxEventsToProcess int transformBatchSize int userTransformBatchSize int @@ -321,7 +320,7 @@ func (proc *Handle) newEventFilterStat(sourceID, workspaceID string, destination // Setup initializes the module func (proc *Handle) Setup( backendConfig backendconfig.BackendConfig, gatewayDB, routerDB, - batchRouterDB, errorDB, eventSchemaDB jobsdb.JobsDB, clearDB *bool, reporting types.ReportingI, + batchRouterDB, errorDB, eventSchemaDB jobsdb.JobsDB, reporting types.ReportingI, multiTenantStat multitenant.MultiTenantI, transientSources transientsource.Service, fileuploader fileuploader.Provider, rsourcesService rsources.JobService, destDebugger destinationdebugger.DestinationDebugger, transDebugger transformationdebugger.TransformationDebugger, ) { @@ -410,11 +409,7 @@ func (proc *Handle) Setup( proc.eventSchemaHandler = eventschema.GetInstance() } if proc.config.enableDedup { - opts := []dedup.OptFn{} - if *clearDB { - opts = append(opts, dedup.WithClearDB()) - } - proc.dedup = dedup.New(dedup.DefaultPath(), opts...) + proc.dedup = dedup.New(dedup.DefaultPath()) } ctx, cancel := context.WithCancel(context.Background()) @@ -571,10 +566,6 @@ func (proc *Handle) loadConfig() { config.RegisterDurationConfigVariable(1000, &proc.config.pingerSleep, true, time.Millisecond, "Processor.pingerSleep") config.RegisterDurationConfigVariable(1000, &proc.config.readLoopSleep, true, time.Millisecond, "Processor.readLoopSleep") - // DEPRECATED: used only on the old mainLoop: - config.RegisterDurationConfigVariable(10, &proc.config.loopSleep, true, time.Millisecond, []string{"Processor.loopSleep", "Processor.loopSleepInMS"}...) - // DEPRECATED: used only on the old mainLoop: - config.RegisterDurationConfigVariable(0, &proc.config.fixedLoopSleep, true, time.Millisecond, []string{"Processor.fixedLoopSleep", "Processor.fixedLoopSleepInMS"}...) config.RegisterIntConfigVariable(100, &proc.config.transformBatchSize, true, 1, "Processor.transformBatchSize") config.RegisterIntConfigVariable(200, &proc.config.userTransformBatchSize, true, 1, "Processor.userTransformBatchSize") // Enable dedup of incoming events by default diff --git a/processor/processor_test.go b/processor/processor_test.go index 926f12e9ae..95422eb73d 100644 --- a/processor/processor_test.go +++ b/processor/processor_test.go @@ -626,7 +626,6 @@ var _ = Describe("Processor", Ordered, func() { }) Context("Initialization", func() { - clearDB := false It("should initialize (no jobs to recover)", func() { mockTransformer := mocksTransformer.NewMockTransformer(c.mockCtrl) mockTransformer.EXPECT().Setup().Times(1) @@ -636,7 +635,7 @@ var _ = Describe("Processor", Ordered, func() { // crash recover returns empty list c.mockGatewayJobsDB.EXPECT().DeleteExecuting().Times(1) - processor.Setup(c.mockBackendConfig, c.mockGatewayJobsDB, c.mockRouterJobsDB, c.mockBatchRouterJobsDB, c.mockProcErrorsDB, nil, &clearDB, nil, c.MockMultitenantHandle, transientsource.NewEmptyService(), fileuploader.NewDefaultProvider(), c.MockRsourcesService, destinationdebugger.NewNoOpService(), transformationdebugger.NewNoOpService()) + processor.Setup(c.mockBackendConfig, c.mockGatewayJobsDB, c.mockRouterJobsDB, c.mockBatchRouterJobsDB, c.mockProcErrorsDB, nil, nil, c.MockMultitenantHandle, transientsource.NewEmptyService(), fileuploader.NewDefaultProvider(), c.MockRsourcesService, destinationdebugger.NewNoOpService(), transformationdebugger.NewNoOpService()) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() Expect(processor.config.asyncInit.WaitContext(ctx)).To(BeNil()) @@ -650,7 +649,7 @@ var _ = Describe("Processor", Ordered, func() { c.mockGatewayJobsDB.EXPECT().DeleteExecuting().Times(1) - processor.Setup(c.mockBackendConfig, c.mockGatewayJobsDB, c.mockRouterJobsDB, c.mockBatchRouterJobsDB, c.mockProcErrorsDB, nil, &clearDB, nil, c.MockMultitenantHandle, transientsource.NewEmptyService(), fileuploader.NewDefaultProvider(), c.MockRsourcesService, destinationdebugger.NewNoOpService(), transformationdebugger.NewNoOpService()) + processor.Setup(c.mockBackendConfig, c.mockGatewayJobsDB, c.mockRouterJobsDB, c.mockBatchRouterJobsDB, c.mockProcErrorsDB, nil, nil, c.MockMultitenantHandle, transientsource.NewEmptyService(), fileuploader.NewDefaultProvider(), c.MockRsourcesService, destinationdebugger.NewNoOpService(), transformationdebugger.NewNoOpService()) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() Expect(processor.config.asyncInit.WaitContext(ctx)).To(BeNil()) @@ -658,7 +657,6 @@ var _ = Describe("Processor", Ordered, func() { }) Context("normal operation", func() { - clearDB := false BeforeEach(func() { // crash recovery check c.mockGatewayJobsDB.EXPECT().DeleteExecuting().Times(1) @@ -670,7 +668,7 @@ var _ = Describe("Processor", Ordered, func() { processor := prepareHandle(NewHandle(mockTransformer)) - processor.Setup(c.mockBackendConfig, c.mockGatewayJobsDB, c.mockRouterJobsDB, c.mockBatchRouterJobsDB, c.mockProcErrorsDB, nil, &clearDB, c.MockReportingI, c.MockMultitenantHandle, transientsource.NewEmptyService(), fileuploader.NewDefaultProvider(), c.MockRsourcesService, destinationdebugger.NewNoOpService(), transformationdebugger.NewNoOpService()) + processor.Setup(c.mockBackendConfig, c.mockGatewayJobsDB, c.mockRouterJobsDB, c.mockBatchRouterJobsDB, c.mockProcErrorsDB, nil, c.MockReportingI, c.MockMultitenantHandle, transientsource.NewEmptyService(), fileuploader.NewDefaultProvider(), c.MockRsourcesService, destinationdebugger.NewNoOpService(), transformationdebugger.NewNoOpService()) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() Expect(processor.config.asyncInit.WaitContext(ctx)).To(BeNil()) @@ -1550,7 +1548,6 @@ var _ = Describe("Processor", Ordered, func() { }) Context("MainLoop Tests", func() { - clearDB := false It("Should not handle jobs when transformer features are not set", func() { mockTransformer := mocksTransformer.NewMockTransformer(c.mockCtrl) mockTransformer.EXPECT().Setup().Times(1) @@ -1560,7 +1557,7 @@ var _ = Describe("Processor", Ordered, func() { // crash recover returns empty list c.mockGatewayJobsDB.EXPECT().DeleteExecuting().Times(1) processor.config.featuresRetryMaxAttempts = 0 - processor.Setup(c.mockBackendConfig, c.mockGatewayJobsDB, c.mockRouterJobsDB, c.mockBatchRouterJobsDB, c.mockProcErrorsDB, nil, &clearDB, nil, c.MockMultitenantHandle, transientsource.NewEmptyService(), fileuploader.NewDefaultProvider(), c.MockRsourcesService, destinationdebugger.NewNoOpService(), transformationdebugger.NewNoOpService()) + processor.Setup(c.mockBackendConfig, c.mockGatewayJobsDB, c.mockRouterJobsDB, c.mockBatchRouterJobsDB, c.mockProcErrorsDB, nil, nil, c.MockMultitenantHandle, transientsource.NewEmptyService(), fileuploader.NewDefaultProvider(), c.MockRsourcesService, destinationdebugger.NewNoOpService(), transformationdebugger.NewNoOpService()) setMainLoopTimeout(processor, 1*time.Second) @@ -1593,7 +1590,6 @@ var _ = Describe("Processor", Ordered, func() { }) Context("ProcessorLoop Tests", func() { - clearDB := false It("Should not handle jobs when transformer features are not set", func() { mockTransformer := mocksTransformer.NewMockTransformer(c.mockCtrl) mockTransformer.EXPECT().Setup().Times(1) @@ -1603,7 +1599,7 @@ var _ = Describe("Processor", Ordered, func() { // crash recover returns empty list c.mockGatewayJobsDB.EXPECT().DeleteExecuting().Times(1) processor.config.featuresRetryMaxAttempts = 0 - processor.Setup(c.mockBackendConfig, c.mockGatewayJobsDB, c.mockRouterJobsDB, c.mockBatchRouterJobsDB, c.mockProcErrorsDB, nil, &clearDB, c.MockReportingI, c.MockMultitenantHandle, transientsource.NewEmptyService(), fileuploader.NewDefaultProvider(), c.MockRsourcesService, destinationdebugger.NewNoOpService(), transformationdebugger.NewNoOpService()) + processor.Setup(c.mockBackendConfig, c.mockGatewayJobsDB, c.mockRouterJobsDB, c.mockBatchRouterJobsDB, c.mockProcErrorsDB, nil, c.MockReportingI, c.MockMultitenantHandle, transientsource.NewEmptyService(), fileuploader.NewDefaultProvider(), c.MockRsourcesService, destinationdebugger.NewNoOpService(), transformationdebugger.NewNoOpService()) defer processor.Shutdown() c.MockReportingI.EXPECT().WaitForSetup(gomock.Any(), gomock.Any()).Times(1) @@ -3041,7 +3037,6 @@ func processorSetupAndAssertJobHandling(processor *Handle, c *testContext) { } func Setup(processor *Handle, c *testContext, enableDedup, enableReporting bool) { - clearDB := false setDisableDedupFeature(processor, enableDedup) processor.Setup( c.mockBackendConfig, @@ -3050,7 +3045,6 @@ func Setup(processor *Handle, c *testContext, enableDedup, enableReporting bool) c.mockBatchRouterJobsDB, c.mockProcErrorsDB, c.mockEventSchemasDB, - &clearDB, c.MockReportingI, c.MockMultitenantHandle, transientsource.NewEmptyService(), diff --git a/services/debugger/cache/internal/badger/badger.go b/services/debugger/cache/internal/badger/badger.go index 746d315cbd..4edc09c52f 100644 --- a/services/debugger/cache/internal/badger/badger.go +++ b/services/debugger/cache/internal/badger/badger.go @@ -7,8 +7,8 @@ import ( "path" "time" - "github.com/dgraph-io/badger/v3" - "github.com/dgraph-io/badger/v3/options" + "github.com/dgraph-io/badger/v4" + "github.com/dgraph-io/badger/v4/options" "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/logger" "github.com/rudderlabs/rudder-go-kit/stats" @@ -31,7 +31,7 @@ func (e *Cache[E]) loadCacheConfig() { config.RegisterIntConfigVariable(5, &e.numLevelZeroTables, false, 1, "LiveEvent.cache."+e.origin+".NumLevelZeroTables", "LiveEvent.cache.NumLevelZeroTables") config.RegisterIntConfigVariable(15, &e.numLevelZeroTablesStall, false, 1, "LiveEvent.cache."+e.origin+".NumLevelZeroTablesStall", "LiveEvent.cache.NumLevelZeroTablesStall") // Using the maximum value threshold: (1 << 20) == 1048576 (1MB) - config.RegisterInt64ConfigVariable((1 << 20), &e.valueThreshold, false, 1, "LiveEvent.cache."+e.origin+".ValueThreshold", "LiveEvent.cache.ValueThreshold") + config.RegisterInt64ConfigVariable(1<<20, &e.valueThreshold, false, 1, "LiveEvent.cache."+e.origin+".ValueThreshold", "LiveEvent.cache.ValueThreshold") config.RegisterBoolConfigVariable(false, &e.syncWrites, false, "LiveEvent.cache."+e.origin+".SyncWrites", "LiveEvent.cache.SyncWrites") config.RegisterBoolConfigVariable(true, &e.cleanupOnStartup, false, "LiveEvent.cache."+e.origin+".CleanupOnStartup", "LiveEvent.cache.CleanupOnStartup") } @@ -145,7 +145,7 @@ func New[E any](origin string, log logger.Logger, stats stats.Stats, opts ...fun opt(e) } badgerOpts := badger. - DefaultOptions(storagePath). + DefaultOptions(e.path). WithLogger(badgerLogger{e.logger}). WithCompression(options.None). WithIndexCacheSize(16 << 20). // 16mb @@ -193,9 +193,9 @@ func (e *Cache[E]) gcBadgerDB() { } statName := fmt.Sprintf("liveevent-cache-%s", e.origin) - e.stats.NewTaggedStat("badger_db_size", stats.GaugeType, stats.Tags{"name": statName, "type": "lsm"}).Gauge((lsmSize)) - e.stats.NewTaggedStat("badger_db_size", stats.GaugeType, stats.Tags{"name": statName, "type": "vlog"}).Gauge((vlogSize)) - e.stats.NewTaggedStat("badger_db_size", stats.GaugeType, stats.Tags{"name": statName, "type": "total"}).Gauge((totSize)) + e.stats.NewTaggedStat("badger_db_size", stats.GaugeType, stats.Tags{"name": statName, "type": "lsm"}).Gauge(lsmSize) + e.stats.NewTaggedStat("badger_db_size", stats.GaugeType, stats.Tags{"name": statName, "type": "vlog"}).Gauge(vlogSize) + e.stats.NewTaggedStat("badger_db_size", stats.GaugeType, stats.Tags{"name": statName, "type": "total"}).Gauge(totSize) } } diff --git a/services/debugger/cache/internal/badger/badger_test.go b/services/debugger/cache/internal/badger/badger_test.go index cf54e8f67f..214cad4a3b 100644 --- a/services/debugger/cache/internal/badger/badger_test.go +++ b/services/debugger/cache/internal/badger/badger_test.go @@ -6,12 +6,13 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "github.com/stretchr/testify/assert" + "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/logger" "github.com/rudderlabs/rudder-go-kit/stats" svcMetric "github.com/rudderlabs/rudder-go-kit/stats/metric" "github.com/rudderlabs/rudder-server/utils/misc" - "github.com/stretchr/testify/assert" ) var _ = Describe("cache", func() { diff --git a/services/dedup/badger.go b/services/dedup/badger.go index 226afc75f1..50e2d42a65 100644 --- a/services/dedup/badger.go +++ b/services/dedup/badger.go @@ -4,8 +4,7 @@ import ( "strconv" "time" - "github.com/dgraph-io/badger/v2" - "github.com/dgraph-io/badger/v2/options" + "github.com/dgraph-io/badger/v4" "github.com/rudderlabs/rudder-go-kit/stats" "github.com/rudderlabs/rudder-server/rruntime" "github.com/rudderlabs/rudder-server/utils/misc" @@ -20,6 +19,7 @@ type badgerDB struct { gcDone chan struct{} path string clearDB bool + opts badger.Options } func (d *badgerDB) Get(key string) (int64, bool) { @@ -69,32 +69,10 @@ func (d *badgerDB) Close() { _ = d.badgerDB.Close() } -func (d *badgerDB) start(memOptimized bool) { +func (d *badgerDB) start() { var err error - opts := badger. - DefaultOptions(d.path). - WithTruncate(true). - WithLogger(d.logger). - // Disable compression - Set options.Compression = options.None. - // This means we won’t allocate memory for decompression - // (this can be a lot in case of ZSTD decompression). - // In our case, compression is not useful since we are storing messageIDs with high entropy. - WithCompression(options.None) - - if memOptimized { - // Memory usage optimizations: - // Inspired by https://github.com/dgraph-io/badger/issues/1304#issuecomment-630078745 - // With modifications to ensure no performance degradation for dedup. - opts.TableLoadingMode = options.FileIO - opts.ValueLogLoadingMode = options.FileIO - opts.NumMemtables = 3 - opts.MaxTableSize = 16 << 20 - opts.NumLevelZeroTables = 1 - opts.NumLevelZeroTablesStall = 2 - opts.KeepL0InMemory = false - } - d.badgerDB, err = badger.Open(opts) + d.badgerDB, err = badger.Open(d.opts) if err != nil { panic(err) } @@ -132,9 +110,9 @@ func (d *badgerDB) gcLoop() { continue } statName := "dedup" - d.stats.NewTaggedStat("badger_db_size", stats.GaugeType, stats.Tags{"name": statName, "type": "lsm"}).Gauge((lsmSize)) - d.stats.NewTaggedStat("badger_db_size", stats.GaugeType, stats.Tags{"name": statName, "type": "vlog"}).Gauge((vlogSize)) - d.stats.NewTaggedStat("badger_db_size", stats.GaugeType, stats.Tags{"name": statName, "type": "total"}).Gauge((totSize)) + d.stats.NewTaggedStat("badger_db_size", stats.GaugeType, stats.Tags{"name": statName, "type": "lsm"}).Gauge(lsmSize) + d.stats.NewTaggedStat("badger_db_size", stats.GaugeType, stats.Tags{"name": statName, "type": "vlog"}).Gauge(vlogSize) + d.stats.NewTaggedStat("badger_db_size", stats.GaugeType, stats.Tags{"name": statName, "type": "total"}).Gauge(totSize) } } diff --git a/services/dedup/dedup.go b/services/dedup/dedup.go index 87a5dc5ebf..f1ff51e2eb 100644 --- a/services/dedup/dedup.go +++ b/services/dedup/dedup.go @@ -7,6 +7,8 @@ import ( "sync" "time" + "github.com/dgraph-io/badger/v4" + "github.com/dgraph-io/badger/v4/options" "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/logger" "github.com/rudderlabs/rudder-go-kit/stats" @@ -15,20 +17,6 @@ import ( type OptFn func(*badgerDB) -// WithWindow sets the window for deduplication -func WithWindow(d time.Duration) OptFn { - return func(dht *badgerDB) { - dht.window = &d - } -} - -// WithClearDB clears the DB on startup -func WithClearDB() OptFn { - return func(dht *badgerDB) { - dht.clearDB = true - } -} - // DefaultPath returns the default path for the deduplication service's badger DB func DefaultPath() string { badgerPathName := "/badgerdbv2" @@ -40,10 +28,23 @@ func DefaultPath() string { } // New creates a new deduplication service. The service needs to be closed after use. -func New(path string, fns ...OptFn) Dedup { +func New(path string) Dedup { var dedupWindow time.Duration config.RegisterDurationConfigVariable(3600, &dedupWindow, true, time.Second, []string{"Dedup.dedupWindow", "Dedup.dedupWindowInS"}...) log := logger.NewLogger().Child("dedup") + badgerOpts := badger. + DefaultOptions(path). + WithCompression(options.None). + WithIndexCacheSize(16 << 20). // 16mb + WithNumGoroutines(1). + WithNumMemtables(config.GetInt("BadgerDB.numMemtable", 1)). + WithValueThreshold(config.GetInt64("BadgerDB.valueThreshold", 1048576)). + WithBlockCacheSize(0). + WithNumVersionsToKeep(1). + WithNumLevelZeroTables(config.GetInt("BadgerDB.numLevelZeroTables", 5)). + WithNumLevelZeroTablesStall(config.GetInt("BadgerDB.numLevelZeroTablesStall", 15)). + WithSyncWrites(config.GetBool("BadgerDB.syncWrites", false)) + db := &badgerDB{ stats: stats.Default, logger: loggerForBadger{log}, @@ -51,13 +52,9 @@ func New(path string, fns ...OptFn) Dedup { gcDone: make(chan struct{}), close: make(chan struct{}), window: &dedupWindow, + opts: badgerOpts, } - for _, fn := range fns { - fn(db) - } - db.start(config.GetBool("Dedup.memOptimized", true)) - log.Info("Setting up dedup") - + db.start() return &dedup{ badgerDB: db, cache: make(map[string]int64), diff --git a/services/dedup/dedup_test.go b/services/dedup/dedup_test.go index b49a123ccf..fa0792321e 100644 --- a/services/dedup/dedup_test.go +++ b/services/dedup/dedup_test.go @@ -24,7 +24,7 @@ func Test_Dedup(t *testing.T) { defer func() { _ = os.RemoveAll(dbPath) }() _ = os.RemoveAll(dbPath) - d := dedup.New(dbPath, dedup.WithClearDB(), dedup.WithWindow(time.Hour)) + d := dedup.New(dbPath) defer d.Close() t.Run("if message id is not present in cache and badger db", func(t *testing.T) { @@ -65,8 +65,8 @@ func Test_Dedup_Window(t *testing.T) { dbPath := os.TempDir() + "/dedup_test" defer func() { _ = os.RemoveAll(dbPath) }() _ = os.RemoveAll(dbPath) - - d := dedup.New(dbPath, dedup.WithClearDB(), dedup.WithWindow(time.Second)) + config.Set("Dedup.dedupWindow", "1s") + d := dedup.New(dbPath) defer d.Close() found, _ := d.Set(dedup.KeyValue{Key: "to be deleted", Value: 1}) @@ -93,7 +93,7 @@ func Test_Dedup_ClearDB(t *testing.T) { _ = os.RemoveAll(dbPath) t.Run("Setting a messageid with clear db and dedup window", func(t *testing.T) { - d := dedup.New(dbPath, dedup.WithClearDB(), dedup.WithWindow(time.Hour)) + d := dedup.New(dbPath) found, _ := d.Set(dedup.KeyValue{Key: "a", Value: 1}) require.Equal(t, true, found) err := d.Commit([]string{"a"}) @@ -107,12 +107,6 @@ func Test_Dedup_ClearDB(t *testing.T) { require.Equal(t, int64(1), size) dNew.Close() }) - t.Run("Setting a messageid with cleardb should return true", func(t *testing.T) { - dWithClear := dedup.New(dbPath, dedup.WithClearDB()) - found, _ := dWithClear.Set(dedup.KeyValue{Key: "a", Value: 1}) - require.Equal(t, true, found) - dWithClear.Close() - }) } func Test_Dedup_ErrTxnTooBig(t *testing.T) { @@ -122,7 +116,7 @@ func Test_Dedup_ErrTxnTooBig(t *testing.T) { dbPath := os.TempDir() + "/dedup_test_errtxntoobig" defer os.RemoveAll(dbPath) os.RemoveAll(dbPath) - d := dedup.New(dbPath, dedup.WithClearDB(), dedup.WithWindow(time.Hour)) + d := dedup.New(dbPath) defer d.Close() size := 105_000 @@ -142,7 +136,7 @@ func Benchmark_Dedup(b *testing.B) { b.Logf("using path %s, since tmpDir has issues in macOS\n", dbPath) defer func() { _ = os.RemoveAll(dbPath) }() _ = os.MkdirAll(dbPath, 0o750) - d := dedup.New(dbPath, dedup.WithClearDB(), dedup.WithWindow(time.Minute)) + d := dedup.New(dbPath) b.Run("no duplicates 1000 batch unique", func(b *testing.B) { batchSize := 1000 From 7d5478bebd9b82bd26921119f1846671438168bc Mon Sep 17 00:00:00 2001 From: Rohith BCS Date: Wed, 17 May 2023 13:25:35 +0530 Subject: [PATCH 2/8] chore: upgrade to badgerV4 --- enterprise/suppress-user/internal/badgerdb/badgerdb.go | 10 +++++----- .../suppress-user/internal/badgerdb/badgerdb_test.go | 2 +- go.mod | 2 -- go.sum | 9 --------- 4 files changed, 6 insertions(+), 17 deletions(-) diff --git a/enterprise/suppress-user/internal/badgerdb/badgerdb.go b/enterprise/suppress-user/internal/badgerdb/badgerdb.go index 4a3a46b30d..ee7094768c 100644 --- a/enterprise/suppress-user/internal/badgerdb/badgerdb.go +++ b/enterprise/suppress-user/internal/badgerdb/badgerdb.go @@ -9,8 +9,8 @@ import ( "sync" "time" - "github.com/dgraph-io/badger/v3" - "github.com/dgraph-io/badger/v3/options" + "github.com/dgraph-io/badger/v4" + "github.com/dgraph-io/badger/v4/options" "github.com/rudderlabs/rudder-go-kit/logger" "github.com/rudderlabs/rudder-go-kit/stats" "github.com/rudderlabs/rudder-server/enterprise/suppress-user/model" @@ -259,9 +259,9 @@ func (b *Repository) start() (startErr error) { continue } statName := "suppress-user" - b.stats.NewTaggedStat("badger_db_size", stats.GaugeType, stats.Tags{"name": statName, "type": "lsm"}).Gauge((lsmSize)) - b.stats.NewTaggedStat("badger_db_size", stats.GaugeType, stats.Tags{"name": statName, "type": "vlog"}).Gauge((vlogSize)) - b.stats.NewTaggedStat("badger_db_size", stats.GaugeType, stats.Tags{"name": statName, "type": "total"}).Gauge((totSize)) + b.stats.NewTaggedStat("badger_db_size", stats.GaugeType, stats.Tags{"name": statName, "type": "lsm"}).Gauge(lsmSize) + b.stats.NewTaggedStat("badger_db_size", stats.GaugeType, stats.Tags{"name": statName, "type": "vlog"}).Gauge(vlogSize) + b.stats.NewTaggedStat("badger_db_size", stats.GaugeType, stats.Tags{"name": statName, "type": "total"}).Gauge(totSize) } }() return nil diff --git a/enterprise/suppress-user/internal/badgerdb/badgerdb_test.go b/enterprise/suppress-user/internal/badgerdb/badgerdb_test.go index 36e1455d8e..e1bfd8fda0 100644 --- a/enterprise/suppress-user/internal/badgerdb/badgerdb_test.go +++ b/enterprise/suppress-user/internal/badgerdb/badgerdb_test.go @@ -10,7 +10,7 @@ import ( "testing" "time" - "github.com/dgraph-io/badger/v3" + "github.com/dgraph-io/badger/v4" "github.com/google/uuid" "github.com/rudderlabs/rudder-go-kit/logger" diff --git a/go.mod b/go.mod index 7b4b642b86..229f26a4f8 100644 --- a/go.mod +++ b/go.mod @@ -40,7 +40,6 @@ require ( github.com/cenkalti/backoff/v4 v4.2.1 github.com/confluentinc/confluent-kafka-go/v2 v2.1.1 github.com/denisenkom/go-mssqldb v0.12.3 - github.com/dgraph-io/badger/v3 v3.2103.5 github.com/dgraph-io/badger/v4 v4.1.0 github.com/docker/docker v23.0.4+incompatible github.com/go-chi/chi/v5 v5.0.8 @@ -151,7 +150,6 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/bits-and-blooms/bitset v1.4.0 // indirect github.com/bugsnag/panicwrap v1.3.4 // indirect - github.com/cespare/xxhash v1.1.0 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58 // indirect github.com/containerd/continuity v0.3.0 // indirect diff --git a/go.sum b/go.sum index f3b6ff3536..eaf346816e 100644 --- a/go.sum +++ b/go.sum @@ -705,7 +705,6 @@ github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb0 github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c= github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw= github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8D7ML55dXQrVaamCz2vxCfdQBasLZfHKk= -github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/PuerkitoBio/purell v1.0.0/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= @@ -891,7 +890,6 @@ github.com/census-instrumentation/opencensus-proto v0.3.0/go.mod h1:f6KPmirojxKA github.com/census-instrumentation/opencensus-proto v0.4.1/go.mod h1:4T9NM4+4Vw91VeyqjLS6ao50K5bOcLKN6Q42XnYaRYw= github.com/certifi/gocertifi v0.0.0-20191021191039-0944d244cd40/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA= github.com/certifi/gocertifi v0.0.0-20200922220541-2c3bb06c6054/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA= -github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= @@ -967,7 +965,6 @@ github.com/coreos/go-systemd/v22 v22.1.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+ github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/coreos/go-systemd/v22 v22.3.3-0.20220203105225-a9a7ef127534 h1:rtAn27wIbmOGUs7RIbVgPEjb31ehTVniDwPGXyMxm5U= github.com/coreos/go-systemd/v22 v22.3.3-0.20220203105225-a9a7ef127534/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= -github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w= @@ -993,8 +990,6 @@ github.com/denisenkom/go-mssqldb v0.12.0/go.mod h1:iiK0YP1ZeepvmBQk/QpLEhhTNJgfz github.com/denisenkom/go-mssqldb v0.12.3 h1:pBSGx9Tq67pBOTLmxNuirNTeB8Vjmf886Kx+8Y+8shw= github.com/denisenkom/go-mssqldb v0.12.3/go.mod h1:k0mtMFOnU+AihqFxPMiF05rtiDrorD1Vrm1KEz5hxDo= github.com/devigned/tab v0.1.1/go.mod h1:XG9mPq0dFghrYvoBF3xdRrJzSTX1b7IQrvaL9mzjeJY= -github.com/dgraph-io/badger/v3 v3.2103.5 h1:ylPa6qzbjYRQMU6jokoj4wzcaweHylt//CH0AKt0akg= -github.com/dgraph-io/badger/v3 v3.2103.5/go.mod h1:4MPiseMeDQ3FNCYwRbbcBOGJLf5jsE0PPFzRiKjtcdw= github.com/dgraph-io/badger/v4 v4.1.0 h1:E38jc0f+RATYrycSUf9LMv/t47XAy+3CApyYSq4APOQ= github.com/dgraph-io/badger/v4 v4.1.0/go.mod h1:P50u28d39ibBRmIJuQC/NSdBOg46HnHw7al2SW5QRHg= github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWajOK8= @@ -1251,7 +1246,6 @@ github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Z github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.1/go.mod h1:xXMiIv4Fb/0kKde4SpL7qlzvu5cMJDRkFDxJfI9uaxA= github.com/google/flatbuffers v1.11.0/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= -github.com/google/flatbuffers v1.12.1/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/flatbuffers v2.0.0+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/flatbuffers v2.0.8+incompatible h1:ivUb1cGomAB101ZM1T0nOiWz9pSrTMoa9+EiY7igmkM= github.com/google/flatbuffers v2.0.8+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= @@ -1537,7 +1531,6 @@ github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j github.com/klauspost/compress v1.9.7/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.11.13/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= -github.com/klauspost/compress v1.12.3/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= github.com/klauspost/compress v1.13.1/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= @@ -1845,7 +1838,6 @@ github.com/rudderlabs/rudder-go-kit v0.13.3 h1:Tl1yN7TZXjVXYe7TBkMYV2yv6tfdOZpDv github.com/rudderlabs/rudder-go-kit v0.13.3/go.mod h1:3P7g4yt8TxiN6zSGvl2h/9dh1ae6GQi5zRP02rva+28= github.com/rudderlabs/sql-tunnels v0.1.3 h1:o7/MX4Yj0WpAaw0uxkRmkagtzedGxUPRwyho4SMbWMQ= github.com/rudderlabs/sql-tunnels v0.1.3/go.mod h1:1TolUkSsrQxdXS0iyGlbLADsgkebmPcz1MxU5xBl6dE= -github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/russross/blackfriday v1.6.0/go.mod h1:ti0ldHuxg49ri4ksnFxlkCfN+hvslNlmVHqNRXXJNAY= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= @@ -1907,7 +1899,6 @@ github.com/spf13/afero v1.9.3/go.mod h1:iUV7ddyEEZPO5gA3zD4fJt6iStLlL+Lg4m2cihcD github.com/spf13/cast v1.5.0 h1:rj3WzYc11XZaIZMPKmwP96zkFEnnAmV8s6XbB2aY32w= github.com/spf13/cast v1.5.0/go.mod h1:SpXXQ5YoyJw6s3/6cMTQuxvgRl3PCJiyaX9p6b155UU= github.com/spf13/cobra v0.0.2-0.20171109065643-2da4a54c5cee/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= -github.com/spf13/cobra v0.0.5/go.mod h1:3K3wKZymM7VvHMDS9+Akkh4K60UwM26emMESw8tLCHU= github.com/spf13/cobra v1.0.0/go.mod h1:/6GTrnGXV9HjY+aR4k0oJ5tcvakLuG6EuKReYlHNrgE= github.com/spf13/cobra v1.1.3/go.mod h1:pGADOWyqRD/YMrPZigI/zbliZ2wVD/23d+is3pSWzOo= github.com/spf13/jwalterweatherman v1.1.0 h1:ue6voC5bR5F8YxI5S67j9i582FU4Qvo2bmqnqMYADFk= From f1796d83d0d8c7722faeb4eea06150d16630525b Mon Sep 17 00:00:00 2001 From: Rohith BCS Date: Tue, 23 May 2023 11:52:00 +0530 Subject: [PATCH 3/8] chore: change write path badger --- services/dedup/dedup.go | 2 +- warehouse/internal/repo/upload.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/services/dedup/dedup.go b/services/dedup/dedup.go index f1ff51e2eb..f18518a163 100644 --- a/services/dedup/dedup.go +++ b/services/dedup/dedup.go @@ -19,7 +19,7 @@ type OptFn func(*badgerDB) // DefaultPath returns the default path for the deduplication service's badger DB func DefaultPath() string { - badgerPathName := "/badgerdbv2" + badgerPathName := "/badgerdbv4" tmpDirPath, err := misc.CreateTMPDIR() if err != nil { panic(err) diff --git a/warehouse/internal/repo/upload.go b/warehouse/internal/repo/upload.go index 8b04cdd76e..7e47224ae6 100644 --- a/warehouse/internal/repo/upload.go +++ b/warehouse/internal/repo/upload.go @@ -103,10 +103,10 @@ func (uploads *Uploads) CreateWithStagingFiles(ctx context.Context, upload model } var firstEventAt, lastEventAt time.Time - if ok := files[0].FirstEventAt.IsZero(); !ok { + if !files[0].FirstEventAt.IsZero() { firstEventAt = files[0].FirstEventAt } - if ok := files[len(files)-1].LastEventAt.IsZero(); !ok { + if !files[len(files)-1].LastEventAt.IsZero() { lastEventAt = files[len(files)-1].LastEventAt } From b5a82fdd3b43c612085665fdcc4f3b4491149fb2 Mon Sep 17 00:00:00 2001 From: Rohith BCS Date: Tue, 23 May 2023 16:53:22 +0530 Subject: [PATCH 4/8] chore: minor fixes --- services/pgnotifier/pgnotifier.go | 3 +++ warehouse/upload.go | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/services/pgnotifier/pgnotifier.go b/services/pgnotifier/pgnotifier.go index f8e51bc707..69b6df7756 100644 --- a/services/pgnotifier/pgnotifier.go +++ b/services/pgnotifier/pgnotifier.go @@ -256,6 +256,9 @@ func (notifier *PGNotifier) trackUploadBatch(ctx context.Context, batchID string Error: jobError.String, }) } + if rows.Err() != nil { + panic(fmt.Errorf("Failed to scan result from query: %s\nwith Error : %w", stmt, rows.Err())) + } _ = rows.Close() *ch <- responses pkgLogger.Infof("PgNotifier: Completed processing all files in batch: %s", batchID) diff --git a/warehouse/upload.go b/warehouse/upload.go index e040e96c1c..9ed7259243 100644 --- a/warehouse/upload.go +++ b/warehouse/upload.go @@ -1020,7 +1020,7 @@ func (job *UploadJob) loadTable(tName string) (bool, error) { return alteredSchema, fmt.Errorf("update schema: %w", err) } - pkgLogger.Infow("stating load for table", + pkgLogger.Infow("starting load for table", logfield.UploadJobID, job.upload.ID, logfield.SourceID, job.warehouse.Source.ID, logfield.DestinationID, job.warehouse.Destination.ID, From fa17f6811a12a920e949fb96dd91ffede1c644bf Mon Sep 17 00:00:00 2001 From: Rohith BCS Date: Tue, 23 May 2023 17:17:40 +0530 Subject: [PATCH 5/8] chore: remove baderV2 folder --- services/dedup/dedup.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/services/dedup/dedup.go b/services/dedup/dedup.go index f18518a163..448a1041e3 100644 --- a/services/dedup/dedup.go +++ b/services/dedup/dedup.go @@ -4,6 +4,7 @@ package dedup import ( "fmt" + "os" "sync" "time" @@ -32,6 +33,11 @@ func New(path string) Dedup { var dedupWindow time.Duration config.RegisterDurationConfigVariable(3600, &dedupWindow, true, time.Second, []string{"Dedup.dedupWindow", "Dedup.dedupWindowInS"}...) log := logger.NewLogger().Child("dedup") + defer func() { + // TODO : Remove this after badgerdb v2 is completely removed + tmpDirPath, _ := misc.CreateTMPDIR() + _ = os.RemoveAll(fmt.Sprintf(`%v%v`, tmpDirPath, "/badgerdbv2")) + }() badgerOpts := badger. DefaultOptions(path). WithCompression(options.None). From c8cc63409882aca113ef31a4529d95109dde14a2 Mon Sep 17 00:00:00 2001 From: Rohith BCS Date: Wed, 24 May 2023 09:15:36 +0530 Subject: [PATCH 6/8] chore: remove baderV2 folder --- services/dedup/dedup_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/services/dedup/dedup_test.go b/services/dedup/dedup_test.go index fa0792321e..0bc976d03b 100644 --- a/services/dedup/dedup_test.go +++ b/services/dedup/dedup_test.go @@ -9,6 +9,7 @@ import ( "github.com/google/uuid" "github.com/rudderlabs/rudder-go-kit/testhelper/rand" + "github.com/rudderlabs/rudder-server/utils/misc" "github.com/stretchr/testify/require" "github.com/rudderlabs/rudder-go-kit/config" @@ -19,11 +20,10 @@ import ( func Test_Dedup(t *testing.T) { config.Reset() logger.Reset() - + misc.Init() dbPath := os.TempDir() + "/dedup_test" defer func() { _ = os.RemoveAll(dbPath) }() _ = os.RemoveAll(dbPath) - d := dedup.New(dbPath) defer d.Close() From a1d577b02ba4a6fe9e590c8b967f829fed267639 Mon Sep 17 00:00:00 2001 From: Rohith BCS Date: Tue, 30 May 2023 09:11:09 +0530 Subject: [PATCH 7/8] chore: review comments --- enterprise/suppress-user/internal/badgerdb/badgerdb.go | 6 +++++- services/debugger/cache/internal/badger/badger.go | 6 +++++- services/dedup/dedup.go | 5 +++-- 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/enterprise/suppress-user/internal/badgerdb/badgerdb.go b/enterprise/suppress-user/internal/badgerdb/badgerdb.go index ee7094768c..4416e97ea4 100644 --- a/enterprise/suppress-user/internal/badgerdb/badgerdb.go +++ b/enterprise/suppress-user/internal/badgerdb/badgerdb.go @@ -66,7 +66,7 @@ type Repository struct { func NewRepository(basePath string, log logger.Logger, stats stats.Stats, opts ...Opt) (*Repository, error) { b := &Repository{ log: log, - path: path.Join(basePath, "badgerdbv3"), + path: path.Join(basePath, "badgerdbv4"), maxGoroutines: 1, maxSeedWait: 10 * time.Second, stats: stats, @@ -74,6 +74,10 @@ func NewRepository(basePath string, log logger.Logger, stats stats.Stats, opts . for _, opt := range opts { opt(b) } + defer func() { + // TODO : Remove this after badgerdb v2 is completely removed + _ = os.RemoveAll(path.Join(basePath, "badgerdbv3")) + }() err := b.start() return b, err } diff --git a/services/debugger/cache/internal/badger/badger.go b/services/debugger/cache/internal/badger/badger.go index 4edc09c52f..35f3f7fd88 100644 --- a/services/debugger/cache/internal/badger/badger.go +++ b/services/debugger/cache/internal/badger/badger.go @@ -125,7 +125,11 @@ func New[E any](origin string, log logger.Logger, stats stats.Stats, opts ...fun stats: stats, } e.loadCacheConfig() - badgerPathName := e.origin + "/cache/badgerdbv3" + badgerPathName := e.origin + "/cache/badgerdbv4" + defer func() { + // TODO : Remove this after badgerdb v2 is completely removed + _ = os.RemoveAll(fmt.Sprintf(`%v%v`, e.origin, "/badgerdbv3")) + }() tmpDirPath, err := misc.CreateTMPDIR() if err != nil { e.logger.Errorf("Unable to create tmp directory: %v", err) diff --git a/services/dedup/dedup.go b/services/dedup/dedup.go index 448a1041e3..607b2a69b5 100644 --- a/services/dedup/dedup.go +++ b/services/dedup/dedup.go @@ -43,13 +43,14 @@ func New(path string) Dedup { WithCompression(options.None). WithIndexCacheSize(16 << 20). // 16mb WithNumGoroutines(1). - WithNumMemtables(config.GetInt("BadgerDB.numMemtable", 1)). + WithNumMemtables(config.GetInt("BadgerDB.numMemtable", 5)). WithValueThreshold(config.GetInt64("BadgerDB.valueThreshold", 1048576)). WithBlockCacheSize(0). WithNumVersionsToKeep(1). WithNumLevelZeroTables(config.GetInt("BadgerDB.numLevelZeroTables", 5)). WithNumLevelZeroTablesStall(config.GetInt("BadgerDB.numLevelZeroTablesStall", 15)). - WithSyncWrites(config.GetBool("BadgerDB.syncWrites", false)) + WithSyncWrites(config.GetBool("BadgerDB.syncWrites", false)). + WithDetectConflicts(config.GetBool("BadgerDB.detectConflicts", false)) db := &badgerDB{ stats: stats.Default, From eb25ca4ca7784cd767ab429ba551442615959c09 Mon Sep 17 00:00:00 2001 From: Rohith BCS Date: Tue, 30 May 2023 11:04:49 +0530 Subject: [PATCH 8/8] Update enterprise/suppress-user/internal/badgerdb/badgerdb.go Co-authored-by: Aris Tzoumas --- enterprise/suppress-user/internal/badgerdb/badgerdb.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/enterprise/suppress-user/internal/badgerdb/badgerdb.go b/enterprise/suppress-user/internal/badgerdb/badgerdb.go index 4416e97ea4..b5e6024cd8 100644 --- a/enterprise/suppress-user/internal/badgerdb/badgerdb.go +++ b/enterprise/suppress-user/internal/badgerdb/badgerdb.go @@ -74,10 +74,6 @@ func NewRepository(basePath string, log logger.Logger, stats stats.Stats, opts . for _, opt := range opts { opt(b) } - defer func() { - // TODO : Remove this after badgerdb v2 is completely removed - _ = os.RemoveAll(path.Join(basePath, "badgerdbv3")) - }() err := b.start() return b, err }