Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: upgrade to badgerV4 #3340

Merged
merged 14 commits into from
May 30, 2023
Merged
16 changes: 10 additions & 6 deletions enterprise/suppress-user/internal/badgerdb/badgerdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"sync"
"time"

"github.com/dgraph-io/badger/v3"
"github.com/dgraph-io/badger/v3/options"
"github.com/dgraph-io/badger/v4"
cisse21 marked this conversation as resolved.
Show resolved Hide resolved
"github.com/dgraph-io/badger/v4/options"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-server/enterprise/suppress-user/model"
Expand Down Expand Up @@ -66,14 +66,18 @@ type Repository struct {
func NewRepository(basePath string, log logger.Logger, stats stats.Stats, opts ...Opt) (*Repository, error) {
b := &Repository{
log: log,
path: path.Join(basePath, "badgerdbv3"),
path: path.Join(basePath, "badgerdbv4"),
maxGoroutines: 1,
maxSeedWait: 10 * time.Second,
stats: stats,
}
for _, opt := range opts {
opt(b)
}
defer func() {
// TODO : Remove this after badgerdb v2 is completely removed
_ = os.RemoveAll(path.Join(basePath, "badgerdbv3"))
}()
cisse21 marked this conversation as resolved.
Show resolved Hide resolved
err := b.start()
return b, err
}
Expand Down Expand Up @@ -259,9 +263,9 @@ func (b *Repository) start() (startErr error) {
continue
}
statName := "suppress-user"
b.stats.NewTaggedStat("badger_db_size", stats.GaugeType, stats.Tags{"name": statName, "type": "lsm"}).Gauge((lsmSize))
b.stats.NewTaggedStat("badger_db_size", stats.GaugeType, stats.Tags{"name": statName, "type": "vlog"}).Gauge((vlogSize))
b.stats.NewTaggedStat("badger_db_size", stats.GaugeType, stats.Tags{"name": statName, "type": "total"}).Gauge((totSize))
b.stats.NewTaggedStat("badger_db_size", stats.GaugeType, stats.Tags{"name": statName, "type": "lsm"}).Gauge(lsmSize)
b.stats.NewTaggedStat("badger_db_size", stats.GaugeType, stats.Tags{"name": statName, "type": "vlog"}).Gauge(vlogSize)
b.stats.NewTaggedStat("badger_db_size", stats.GaugeType, stats.Tags{"name": statName, "type": "total"}).Gauge(totSize)
}
}()
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"testing"
"time"

"github.com/dgraph-io/badger/v3"
"github.com/dgraph-io/badger/v4"

"github.com/google/uuid"
"github.com/rudderlabs/rudder-go-kit/logger"
Expand Down
5 changes: 1 addition & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ require (
github.com/cenkalti/backoff/v4 v4.2.1
github.com/confluentinc/confluent-kafka-go/v2 v2.1.1
github.com/denisenkom/go-mssqldb v0.12.3
github.com/dgraph-io/badger/v2 v2.2007.4
github.com/dgraph-io/badger/v3 v3.2103.5
github.com/dgraph-io/badger/v4 v4.1.0
github.com/docker/docker v23.0.4+incompatible
github.com/go-chi/chi/v5 v5.0.8
github.com/go-redis/redis v6.15.9+incompatible
Expand Down Expand Up @@ -148,7 +147,6 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.4.0 // indirect
github.com/bugsnag/panicwrap v1.3.4 // indirect
github.com/cespare/xxhash v1.1.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58 // indirect
github.com/containerd/continuity v0.3.0 // indirect
Expand All @@ -159,7 +157,6 @@ require (
github.com/databricks/databricks-sql-go v1.2.0
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgraph-io/ristretto v0.1.1 // indirect
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/dnephin/pflag v1.0.7 // indirect
github.com/docker/cli v20.10.17+incompatible // indirect
Expand Down
14 changes: 2 additions & 12 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,6 @@ github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb0
github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c=
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw=
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8D7ML55dXQrVaamCz2vxCfdQBasLZfHKk=
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/PuerkitoBio/purell v1.0.0/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0=
github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0=
Expand Down Expand Up @@ -891,7 +890,6 @@ github.com/census-instrumentation/opencensus-proto v0.3.0/go.mod h1:f6KPmirojxKA
github.com/census-instrumentation/opencensus-proto v0.4.1/go.mod h1:4T9NM4+4Vw91VeyqjLS6ao50K5bOcLKN6Q42XnYaRYw=
github.com/certifi/gocertifi v0.0.0-20191021191039-0944d244cd40/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA=
github.com/certifi/gocertifi v0.0.0-20200922220541-2c3bb06c6054/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
Expand Down Expand Up @@ -967,7 +965,6 @@ github.com/coreos/go-systemd/v22 v22.1.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+
github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/coreos/go-systemd/v22 v22.3.3-0.20220203105225-a9a7ef127534 h1:rtAn27wIbmOGUs7RIbVgPEjb31ehTVniDwPGXyMxm5U=
github.com/coreos/go-systemd/v22 v22.3.3-0.20220203105225-a9a7ef127534/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w=
Expand All @@ -993,11 +990,8 @@ github.com/denisenkom/go-mssqldb v0.12.0/go.mod h1:iiK0YP1ZeepvmBQk/QpLEhhTNJgfz
github.com/denisenkom/go-mssqldb v0.12.3 h1:pBSGx9Tq67pBOTLmxNuirNTeB8Vjmf886Kx+8Y+8shw=
github.com/denisenkom/go-mssqldb v0.12.3/go.mod h1:k0mtMFOnU+AihqFxPMiF05rtiDrorD1Vrm1KEz5hxDo=
github.com/devigned/tab v0.1.1/go.mod h1:XG9mPq0dFghrYvoBF3xdRrJzSTX1b7IQrvaL9mzjeJY=
github.com/dgraph-io/badger/v2 v2.2007.4 h1:TRWBQg8UrlUhaFdco01nO2uXwzKS7zd+HVdwV/GHc4o=
github.com/dgraph-io/badger/v2 v2.2007.4/go.mod h1:vSw/ax2qojzbN6eXHIx6KPKtCSHJN/Uz0X0VPruTIhk=
github.com/dgraph-io/badger/v3 v3.2103.5 h1:ylPa6qzbjYRQMU6jokoj4wzcaweHylt//CH0AKt0akg=
github.com/dgraph-io/badger/v3 v3.2103.5/go.mod h1:4MPiseMeDQ3FNCYwRbbcBOGJLf5jsE0PPFzRiKjtcdw=
github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E=
github.com/dgraph-io/badger/v4 v4.1.0 h1:E38jc0f+RATYrycSUf9LMv/t47XAy+3CApyYSq4APOQ=
github.com/dgraph-io/badger/v4 v4.1.0/go.mod h1:P50u28d39ibBRmIJuQC/NSdBOg46HnHw7al2SW5QRHg=
github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWajOK8=
github.com/dgraph-io/ristretto v0.1.1/go.mod h1:S1GPSBCYCIhmVNfcth17y2zZtQT6wzkzgwUve0VDWWA=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
Expand Down Expand Up @@ -1251,7 +1245,6 @@ github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Z
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.1/go.mod h1:xXMiIv4Fb/0kKde4SpL7qlzvu5cMJDRkFDxJfI9uaxA=
github.com/google/flatbuffers v1.11.0/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
github.com/google/flatbuffers v1.12.1/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
github.com/google/flatbuffers v2.0.0+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
github.com/google/flatbuffers v2.0.8+incompatible h1:ivUb1cGomAB101ZM1T0nOiWz9pSrTMoa9+EiY7igmkM=
github.com/google/flatbuffers v2.0.8+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
Expand Down Expand Up @@ -1536,7 +1529,6 @@ github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j
github.com/klauspost/compress v1.9.7/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.11.13/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.12.3/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/klauspost/compress v1.13.1/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
Expand Down Expand Up @@ -1845,7 +1837,6 @@ github.com/rudderlabs/rudder-go-kit v0.14.2 h1:n3+/Ogvd3v5YBTx9RC8noGRmvujAPyXH8
github.com/rudderlabs/rudder-go-kit v0.14.2/go.mod h1:xIjOLO/hnJX0kcx3ZKoh1YfaDv7bDvU93PuPdhjG7bU=
github.com/rudderlabs/sql-tunnels v0.1.3 h1:o7/MX4Yj0WpAaw0uxkRmkagtzedGxUPRwyho4SMbWMQ=
github.com/rudderlabs/sql-tunnels v0.1.3/go.mod h1:1TolUkSsrQxdXS0iyGlbLADsgkebmPcz1MxU5xBl6dE=
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
github.com/russross/blackfriday v1.6.0/go.mod h1:ti0ldHuxg49ri4ksnFxlkCfN+hvslNlmVHqNRXXJNAY=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
Expand Down Expand Up @@ -1907,7 +1898,6 @@ github.com/spf13/afero v1.9.3/go.mod h1:iUV7ddyEEZPO5gA3zD4fJt6iStLlL+Lg4m2cihcD
github.com/spf13/cast v1.5.0 h1:rj3WzYc11XZaIZMPKmwP96zkFEnnAmV8s6XbB2aY32w=
github.com/spf13/cast v1.5.0/go.mod h1:SpXXQ5YoyJw6s3/6cMTQuxvgRl3PCJiyaX9p6b155UU=
github.com/spf13/cobra v0.0.2-0.20171109065643-2da4a54c5cee/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ=
github.com/spf13/cobra v0.0.5/go.mod h1:3K3wKZymM7VvHMDS9+Akkh4K60UwM26emMESw8tLCHU=
github.com/spf13/cobra v1.0.0/go.mod h1:/6GTrnGXV9HjY+aR4k0oJ5tcvakLuG6EuKReYlHNrgE=
github.com/spf13/cobra v1.1.3/go.mod h1:pGADOWyqRD/YMrPZigI/zbliZ2wVD/23d+is3pSWzOo=
github.com/spf13/jwalterweatherman v1.1.0 h1:ue6voC5bR5F8YxI5S67j9i582FU4Qvo2bmqnqMYADFk=
Expand Down
2 changes: 1 addition & 1 deletion processor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (proc *LifecycleManager) Start() error {

proc.Handle.Setup(
proc.BackendConfig, proc.gatewayDB, proc.routerDB, proc.batchRouterDB, proc.errDB, proc.esDB,
proc.clearDB, proc.ReportingI, proc.MultitenantStats, proc.transientSources, proc.fileuploader, proc.rsourcesService, proc.destDebugger, proc.transDebugger,
proc.ReportingI, proc.MultitenantStats, proc.transientSources, proc.fileuploader, proc.rsourcesService, proc.destDebugger, proc.transDebugger,
)

currentCtx, cancel := context.WithCancel(context.Background())
Expand Down
19 changes: 5 additions & 14 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import (
"sync"
"time"

"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"

jsoniter "github.com/json-iterator/go"
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
Expand Down Expand Up @@ -44,8 +47,6 @@ import (
"github.com/rudderlabs/rudder-server/utils/workerpool"
"github.com/samber/lo"
"github.com/tidwall/gjson"
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"
)

const (
Expand Down Expand Up @@ -113,8 +114,6 @@ type Handle struct {
readLoopSleep time.Duration
maxLoopSleep time.Duration
storeTimeout time.Duration
loopSleep time.Duration // DEPRECATED: used only on the old mainLoop
fixedLoopSleep time.Duration // DEPRECATED: used only on the old mainLoop
maxEventsToProcess int
transformBatchSize int
userTransformBatchSize int
Expand Down Expand Up @@ -321,7 +320,7 @@ func (proc *Handle) newEventFilterStat(sourceID, workspaceID string, destination
// Setup initializes the module
func (proc *Handle) Setup(
backendConfig backendconfig.BackendConfig, gatewayDB, routerDB,
batchRouterDB, errorDB, eventSchemaDB jobsdb.JobsDB, clearDB *bool, reporting types.ReportingI,
batchRouterDB, errorDB, eventSchemaDB jobsdb.JobsDB, reporting types.ReportingI,
multiTenantStat multitenant.MultiTenantI, transientSources transientsource.Service,
fileuploader fileuploader.Provider, rsourcesService rsources.JobService, destDebugger destinationdebugger.DestinationDebugger, transDebugger transformationdebugger.TransformationDebugger,
) {
Expand Down Expand Up @@ -410,11 +409,7 @@ func (proc *Handle) Setup(
proc.eventSchemaHandler = eventschema.GetInstance()
}
if proc.config.enableDedup {
opts := []dedup.OptFn{}
if *clearDB {
opts = append(opts, dedup.WithClearDB())
}
proc.dedup = dedup.New(dedup.DefaultPath(), opts...)
proc.dedup = dedup.New(dedup.DefaultPath())
}

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -572,10 +567,6 @@ func (proc *Handle) loadConfig() {

config.RegisterDurationConfigVariable(1000, &proc.config.pingerSleep, true, time.Millisecond, "Processor.pingerSleep")
config.RegisterDurationConfigVariable(1000, &proc.config.readLoopSleep, true, time.Millisecond, "Processor.readLoopSleep")
// DEPRECATED: used only on the old mainLoop:
config.RegisterDurationConfigVariable(10, &proc.config.loopSleep, true, time.Millisecond, []string{"Processor.loopSleep", "Processor.loopSleepInMS"}...)
// DEPRECATED: used only on the old mainLoop:
config.RegisterDurationConfigVariable(0, &proc.config.fixedLoopSleep, true, time.Millisecond, []string{"Processor.fixedLoopSleep", "Processor.fixedLoopSleepInMS"}...)
config.RegisterIntConfigVariable(100, &proc.config.transformBatchSize, true, 1, "Processor.transformBatchSize")
config.RegisterIntConfigVariable(200, &proc.config.userTransformBatchSize, true, 1, "Processor.userTransformBatchSize")
// Enable dedup of incoming events by default
Expand Down
16 changes: 5 additions & 11 deletions processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,6 @@ var _ = Describe("Processor", Ordered, func() {
})

Context("Initialization", func() {
clearDB := false
It("should initialize (no jobs to recover)", func() {
mockTransformer := mocksTransformer.NewMockTransformer(c.mockCtrl)
mockTransformer.EXPECT().Setup().Times(1)
Expand All @@ -635,7 +634,7 @@ var _ = Describe("Processor", Ordered, func() {
// crash recover returns empty list
c.mockGatewayJobsDB.EXPECT().DeleteExecuting().Times(1)

processor.Setup(c.mockBackendConfig, c.mockGatewayJobsDB, c.mockRouterJobsDB, c.mockBatchRouterJobsDB, c.mockProcErrorsDB, nil, &clearDB, nil, c.MockMultitenantHandle, transientsource.NewEmptyService(), fileuploader.NewDefaultProvider(), c.MockRsourcesService, destinationdebugger.NewNoOpService(), transformationdebugger.NewNoOpService())
processor.Setup(c.mockBackendConfig, c.mockGatewayJobsDB, c.mockRouterJobsDB, c.mockBatchRouterJobsDB, c.mockProcErrorsDB, nil, nil, c.MockMultitenantHandle, transientsource.NewEmptyService(), fileuploader.NewDefaultProvider(), c.MockRsourcesService, destinationdebugger.NewNoOpService(), transformationdebugger.NewNoOpService())
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
Expect(processor.config.asyncInit.WaitContext(ctx)).To(BeNil())
Expand All @@ -649,15 +648,14 @@ var _ = Describe("Processor", Ordered, func() {

c.mockGatewayJobsDB.EXPECT().DeleteExecuting().Times(1)

processor.Setup(c.mockBackendConfig, c.mockGatewayJobsDB, c.mockRouterJobsDB, c.mockBatchRouterJobsDB, c.mockProcErrorsDB, nil, &clearDB, nil, c.MockMultitenantHandle, transientsource.NewEmptyService(), fileuploader.NewDefaultProvider(), c.MockRsourcesService, destinationdebugger.NewNoOpService(), transformationdebugger.NewNoOpService())
processor.Setup(c.mockBackendConfig, c.mockGatewayJobsDB, c.mockRouterJobsDB, c.mockBatchRouterJobsDB, c.mockProcErrorsDB, nil, nil, c.MockMultitenantHandle, transientsource.NewEmptyService(), fileuploader.NewDefaultProvider(), c.MockRsourcesService, destinationdebugger.NewNoOpService(), transformationdebugger.NewNoOpService())
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
Expect(processor.config.asyncInit.WaitContext(ctx)).To(BeNil())
})
})

Context("normal operation", func() {
clearDB := false
BeforeEach(func() {
// crash recovery check
c.mockGatewayJobsDB.EXPECT().DeleteExecuting().Times(1)
Expand All @@ -669,7 +667,7 @@ var _ = Describe("Processor", Ordered, func() {

processor := prepareHandle(NewHandle(mockTransformer))

processor.Setup(c.mockBackendConfig, c.mockGatewayJobsDB, c.mockRouterJobsDB, c.mockBatchRouterJobsDB, c.mockProcErrorsDB, nil, &clearDB, c.MockReportingI, c.MockMultitenantHandle, transientsource.NewEmptyService(), fileuploader.NewDefaultProvider(), c.MockRsourcesService, destinationdebugger.NewNoOpService(), transformationdebugger.NewNoOpService())
processor.Setup(c.mockBackendConfig, c.mockGatewayJobsDB, c.mockRouterJobsDB, c.mockBatchRouterJobsDB, c.mockProcErrorsDB, nil, c.MockReportingI, c.MockMultitenantHandle, transientsource.NewEmptyService(), fileuploader.NewDefaultProvider(), c.MockRsourcesService, destinationdebugger.NewNoOpService(), transformationdebugger.NewNoOpService())
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
Expect(processor.config.asyncInit.WaitContext(ctx)).To(BeNil())
Expand Down Expand Up @@ -1549,7 +1547,6 @@ var _ = Describe("Processor", Ordered, func() {
})

Context("MainLoop Tests", func() {
clearDB := false
It("Should not handle jobs when transformer features are not set", func() {
mockTransformer := mocksTransformer.NewMockTransformer(c.mockCtrl)
mockTransformer.EXPECT().Setup().Times(1)
Expand All @@ -1559,7 +1556,7 @@ var _ = Describe("Processor", Ordered, func() {
// crash recover returns empty list
c.mockGatewayJobsDB.EXPECT().DeleteExecuting().Times(1)
processor.config.featuresRetryMaxAttempts = 0
processor.Setup(c.mockBackendConfig, c.mockGatewayJobsDB, c.mockRouterJobsDB, c.mockBatchRouterJobsDB, c.mockProcErrorsDB, nil, &clearDB, nil, c.MockMultitenantHandle, transientsource.NewEmptyService(), fileuploader.NewDefaultProvider(), c.MockRsourcesService, destinationdebugger.NewNoOpService(), transformationdebugger.NewNoOpService())
processor.Setup(c.mockBackendConfig, c.mockGatewayJobsDB, c.mockRouterJobsDB, c.mockBatchRouterJobsDB, c.mockProcErrorsDB, nil, nil, c.MockMultitenantHandle, transientsource.NewEmptyService(), fileuploader.NewDefaultProvider(), c.MockRsourcesService, destinationdebugger.NewNoOpService(), transformationdebugger.NewNoOpService())

setMainLoopTimeout(processor, 1*time.Second)

Expand Down Expand Up @@ -1592,7 +1589,6 @@ var _ = Describe("Processor", Ordered, func() {
})

Context("ProcessorLoop Tests", func() {
clearDB := false
It("Should not handle jobs when transformer features are not set", func() {
mockTransformer := mocksTransformer.NewMockTransformer(c.mockCtrl)
mockTransformer.EXPECT().Setup().Times(1)
Expand All @@ -1602,7 +1598,7 @@ var _ = Describe("Processor", Ordered, func() {
// crash recover returns empty list
c.mockGatewayJobsDB.EXPECT().DeleteExecuting().Times(1)
processor.config.featuresRetryMaxAttempts = 0
processor.Setup(c.mockBackendConfig, c.mockGatewayJobsDB, c.mockRouterJobsDB, c.mockBatchRouterJobsDB, c.mockProcErrorsDB, nil, &clearDB, c.MockReportingI, c.MockMultitenantHandle, transientsource.NewEmptyService(), fileuploader.NewDefaultProvider(), c.MockRsourcesService, destinationdebugger.NewNoOpService(), transformationdebugger.NewNoOpService())
processor.Setup(c.mockBackendConfig, c.mockGatewayJobsDB, c.mockRouterJobsDB, c.mockBatchRouterJobsDB, c.mockProcErrorsDB, nil, c.MockReportingI, c.MockMultitenantHandle, transientsource.NewEmptyService(), fileuploader.NewDefaultProvider(), c.MockRsourcesService, destinationdebugger.NewNoOpService(), transformationdebugger.NewNoOpService())
defer processor.Shutdown()
c.MockReportingI.EXPECT().WaitForSetup(gomock.Any(), gomock.Any()).Times(1)

Expand Down Expand Up @@ -3040,7 +3036,6 @@ func processorSetupAndAssertJobHandling(processor *Handle, c *testContext) {
}

func Setup(processor *Handle, c *testContext, enableDedup, enableReporting bool) {
clearDB := false
setDisableDedupFeature(processor, enableDedup)
processor.Setup(
c.mockBackendConfig,
Expand All @@ -3049,7 +3044,6 @@ func Setup(processor *Handle, c *testContext, enableDedup, enableReporting bool)
c.mockBatchRouterJobsDB,
c.mockProcErrorsDB,
c.mockEventSchemasDB,
&clearDB,
c.MockReportingI,
c.MockMultitenantHandle,
transientsource.NewEmptyService(),
Expand Down
Loading