diff --git a/sdktests/server_side_persistence_base.go b/sdktests/server_side_persistence_base.go index 0e61fe2..ad5175d 100644 --- a/sdktests/server_side_persistence_base.go +++ b/sdktests/server_side_persistence_base.go @@ -341,6 +341,7 @@ func (s *ServerSidePersistentTests) Run(t *ldtest.T) { // No cache is enabled s.runWithEmptyStore(t, "initializes store when data received", func(t *ldtest.T) { persistence := NewPersistence() + persistence.SetStoreMode(servicedef.DataStoreModeReadWrite) persistence.SetStore(servicedef.SDKConfigPersistentStore{ Type: s.persistentStore.Type(), DSN: s.persistentStore.DSN(), @@ -362,6 +363,7 @@ func (s *ServerSidePersistentTests) Run(t *ldtest.T) { s.runWithEmptyStore(t, "applies updates to store", func(t *ldtest.T) { persistence := NewPersistence() + persistence.SetStoreMode(servicedef.DataStoreModeReadWrite) persistence.SetStore(servicedef.SDKConfigPersistentStore{ Type: s.persistentStore.Type(), DSN: s.persistentStore.DSN(), @@ -384,7 +386,8 @@ func (s *ServerSidePersistentTests) Run(t *ldtest.T) { }) updateData := s.makeFlagData("flag-key", 2, ldvalue.String("new-value")) - dataSystem.PrimarySync().streaming.PushUpdate("flags", "flag-key", 2, updateData) + dataSystem.PrimarySync().streaming.PushUpdate("flag", "flag-key", 2, updateData) + dataSystem.PrimarySync().streaming.PushPayloadTransferred("updated", 2) s.eventuallyValidateFlagData(t, s.defaultPrefix, map[string]m.Matcher{ "flag-key": basicFlagValidationMatcher("flag-key", 2, "new-value"), }) @@ -392,6 +395,7 @@ func (s *ServerSidePersistentTests) Run(t *ldtest.T) { s.runWithEmptyStore(t, "data source updates respect versioning", func(t *ldtest.T) { persistence := NewPersistence() + persistence.SetStoreMode(servicedef.DataStoreModeReadWrite) persistence.SetStore(servicedef.SDKConfigPersistentStore{ Type: s.persistentStore.Type(), DSN: s.persistentStore.DSN(), @@ -411,7 +415,8 @@ func (s *ServerSidePersistentTests) Run(t *ldtest.T) { // Lower versioned updates are ignored updateData := s.makeFlagData("flag-key", 1, ldvalue.String("new-value")) - dataSystem.PrimarySync().streaming.PushUpdate("flags", "flag-key", 1, updateData) + dataSystem.PrimarySync().streaming.PushUpdate("flag", "flag-key", 1, updateData) + dataSystem.PrimarySync().streaming.PushPayloadTransferred("updated", 2) s.neverValidateFlagData(t, s.defaultPrefix, map[string]m.Matcher{ "flag-key": basicFlagValidationMatcher("flag-key", 1, "new-value"), "uncached-flag-key": basicFlagValidationMatcher("uncached-flag-key", 100, "value"), @@ -419,7 +424,8 @@ func (s *ServerSidePersistentTests) Run(t *ldtest.T) { // Same versioned updates are ignored updateData = s.makeFlagData("flag-key", 100, ldvalue.String("new-value")) - dataSystem.PrimarySync().streaming.PushUpdate("flags", "flag-key", 100, updateData) + dataSystem.PrimarySync().streaming.PushUpdate("flag", "flag-key", 100, updateData) + dataSystem.PrimarySync().streaming.PushPayloadTransferred("updated", 3) s.neverValidateFlagData(t, s.defaultPrefix, map[string]m.Matcher{ "flag-key": basicFlagValidationMatcher("flag-key", 1, "new-value"), "uncached-flag-key": basicFlagValidationMatcher("uncached-flag-key", 100, "value"), @@ -427,7 +433,8 @@ func (s *ServerSidePersistentTests) Run(t *ldtest.T) { // Higher versioned updates are applied updateData = s.makeFlagData("flag-key", 200, ldvalue.String("new-value")) - dataSystem.PrimarySync().streaming.PushUpdate("flags", "flag-key", 200, updateData) + dataSystem.PrimarySync().streaming.PushUpdate("flag", "flag-key", 200, updateData) + dataSystem.PrimarySync().streaming.PushPayloadTransferred("updated", 4) s.neverValidateFlagData(t, s.defaultPrefix, map[string]m.Matcher{ "flag-key": basicFlagValidationMatcher("flag-key", 200, "new-value"), "uncached-flag-key": basicFlagValidationMatcher("uncached-flag-key", 100, "value"), @@ -436,6 +443,7 @@ func (s *ServerSidePersistentTests) Run(t *ldtest.T) { s.runWithEmptyStore(t, "data source deletions respect versioning", func(t *ldtest.T) { persistence := NewPersistence() + persistence.SetStoreMode(servicedef.DataStoreModeReadWrite) persistence.SetStore(servicedef.SDKConfigPersistentStore{ Type: s.persistentStore.Type(), DSN: s.persistentStore.DSN(), @@ -454,14 +462,16 @@ func (s *ServerSidePersistentTests) Run(t *ldtest.T) { require.NoError(t, s.persistentStore.WriteMap(s.defaultPrefix, "features", s.initialFlags)) // Lower versioned deletes are ignored - dataSystem.PrimarySync().streaming.PushDelete("flags", "flag-key", 1) + dataSystem.PrimarySync().streaming.PushDelete("flag", "flag-key", 1) + dataSystem.PrimarySync().streaming.PushPayloadTransferred("updated", 2) s.neverValidateFlagData(t, s.defaultPrefix, map[string]m.Matcher{ "flag-key": basicDeletedFlagValidationMatcher("flag-key", 1), "uncached-flag-key": basicFlagValidationMatcher("uncached-flag-key", 100, "fallthrough"), }) // Higher versioned deletes are applied - dataSystem.PrimarySync().streaming.PushDelete("flags", "flag-key", 200) + dataSystem.PrimarySync().streaming.PushDelete("flag", "flag-key", 200) + dataSystem.PrimarySync().streaming.PushPayloadTransferred("updated", 3) s.eventuallyValidateFlagData(t, s.defaultPrefix, map[string]m.Matcher{ "flag-key": basicDeletedFlagValidationMatcher("flag-key", 200), "uncached-flag-key": basicFlagValidationMatcher("uncached-flag-key", 100, "fallthrough"), @@ -477,6 +487,7 @@ func (s *ServerSidePersistentTests) Run(t *ldtest.T) { t.Run(fmt.Sprintf("cache mode %s", cacheConfig.Mode), func(t *ldtest.T) { s.runWithEmptyStore(t, "does not cache flag miss", func(t *ldtest.T) { persistence := NewPersistence() + persistence.SetStoreMode(servicedef.DataStoreModeReadWrite) persistence.SetStore(servicedef.SDKConfigPersistentStore{ Type: s.persistentStore.Type(), DSN: s.persistentStore.DSN(), @@ -500,7 +511,8 @@ func (s *ServerSidePersistentTests) Run(t *ldtest.T) { m.In(t).Assert(response.Value, m.Equal(ldvalue.String("default"))) updateData := s.makeFlagData("flag-key", 2, ldvalue.String("new-value")) - dataSystem.PrimarySync().streaming.PushUpdate("flags", "flag-key", 2, updateData) + dataSystem.PrimarySync().streaming.PushUpdate("flag", "flag-key", 2, updateData) + dataSystem.PrimarySync().streaming.PushPayloadTransferred("updated", 2) h.RequireEventually(t, checkForUpdatedValue(t, client, "flag-key", context, @@ -509,6 +521,7 @@ func (s *ServerSidePersistentTests) Run(t *ldtest.T) { }) s.runWithEmptyStore(t, "sdk reflects data source updates even with cache", func(t *ldtest.T) { persistence := NewPersistence() + persistence.SetStoreMode(servicedef.DataStoreModeReadWrite) persistence.SetStore(servicedef.SDKConfigPersistentStore{ Type: s.persistentStore.Type(), DSN: s.persistentStore.DSN(), @@ -527,7 +540,8 @@ func (s *ServerSidePersistentTests) Run(t *ldtest.T) { ldvalue.String("default"), ldvalue.String("value"), ldvalue.String("default")) updateData := s.makeFlagData("flag-key", 2, ldvalue.String("new-value")) - dataSystem.PrimarySync().streaming.PushUpdate("flags", "flag-key", 2, updateData) + dataSystem.PrimarySync().streaming.PushUpdate("flag", "flag-key", 2, updateData) + dataSystem.PrimarySync().streaming.PushPayloadTransferred("updated", 2) // This change is reflected in less time than the cache TTL. This should // prove it isn't caching that value. @@ -538,6 +552,7 @@ func (s *ServerSidePersistentTests) Run(t *ldtest.T) { }) s.runWithEmptyStore(t, "ignores direct database modifications", func(t *ldtest.T) { persistence := NewPersistence() + persistence.SetStoreMode(servicedef.DataStoreModeReadWrite) persistence.SetStore(servicedef.SDKConfigPersistentStore{ Type: s.persistentStore.Type(), DSN: s.persistentStore.DSN(), @@ -590,6 +605,7 @@ func (s *ServerSidePersistentTests) Run(t *ldtest.T) { s.runWithEmptyStore(t, "ignores dropped flags", func(t *ldtest.T) { persistence := NewPersistence() + persistence.SetStoreMode(servicedef.DataStoreModeReadWrite) persistence.SetStore(servicedef.SDKConfigPersistentStore{ Type: s.persistentStore.Type(), DSN: s.persistentStore.DSN(), diff --git a/sdktests/testapi_sdk_persistence.go b/sdktests/testapi_sdk_persistence.go index 96046da..900070f 100644 --- a/sdktests/testapi_sdk_persistence.go +++ b/sdktests/testapi_sdk_persistence.go @@ -8,8 +8,9 @@ import ( ) type Persistence struct { - Store o.Maybe[servicedef.SDKConfigPersistentStore] - Cache o.Maybe[servicedef.SDKConfigPersistentCache] + Store o.Maybe[servicedef.SDKConfigPersistentStore] + StoreMode o.Maybe[servicedef.DataStoreMode] + Cache o.Maybe[servicedef.SDKConfigPersistentCache] } func NewPersistence() *Persistence { @@ -20,6 +21,10 @@ func (p *Persistence) SetStore(store servicedef.SDKConfigPersistentStore) { p.Store = o.Some(store) } +func (p *Persistence) SetStoreMode(mode servicedef.DataStoreMode) { + p.StoreMode = o.Some(mode) +} + func (p *Persistence) SetCache(cache servicedef.SDKConfigPersistentCache) { p.Cache = o.Some(cache) } @@ -29,10 +34,16 @@ func (p Persistence) Configure(target *servicedef.SDKConfigParams) error { return errors.New("Persistence must have a store and cache configuration") } - target.PersistentDataStore = o.Some(servicedef.SDKConfigPersistentDataStoreParams{ - Store: p.Store.Value(), - Cache: p.Cache.Value(), + dataSystem := target.DataSystem.OrElse(servicedef.DataSystem{}) + dataSystem.Store = o.Some(servicedef.DataStore{ + PersistentDataStore: o.Some(servicedef.SDKConfigPersistentDataStoreParams{ + Store: p.Store.Value(), + Cache: p.Cache.Value(), + }), }) + dataSystem.StoreMode = p.StoreMode.OrElse(servicedef.DataStoreModeRead) + + target.DataSystem = o.Some(dataSystem) return nil }