Skip to content

Commit

Permalink
DATA-1419 | DATA-861 - Check sync config in reconfigure (#2309)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexis-wei authored May 1, 2023
1 parent deda2a1 commit d81ee66
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 20 deletions.
45 changes: 25 additions & 20 deletions services/datamanager/builtin/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ type builtIn struct {
syncerConstructor datasync.ManagerConstructor
cloudConnSvc cloud.ConnectionService
cloudConn rpc.ClientConn
syncTicker *clk.Ticker
}

var viamCaptureDotDir = filepath.Join(os.Getenv("HOME"), ".viam", "capture")
Expand Down Expand Up @@ -425,28 +426,32 @@ func (svc *builtIn) Reconfigure(
}
}
svc.collectors = newCollectors

svc.syncDisabled = svcConfig.ScheduledSyncDisabled
svc.syncIntervalMins = svcConfig.SyncIntervalMins
svc.additionalSyncPaths = svcConfig.AdditionalSyncPaths

// TODO DATA-861: this means that the ticker is reset everytime we call Update with sync enabled, regardless of
// whether or not the interval has changed. We should not do that.
svc.cancelSyncScheduler()
if !svc.syncDisabled && svc.syncIntervalMins != 0.0 {
if svc.syncer == nil {
if err := svc.initSyncer(ctx); err != nil {
return err
if svc.syncDisabled != svcConfig.ScheduledSyncDisabled || svc.syncIntervalMins != svcConfig.SyncIntervalMins {
svc.syncDisabled = svcConfig.ScheduledSyncDisabled
svc.syncIntervalMins = svcConfig.SyncIntervalMins

svc.cancelSyncScheduler()
if !svc.syncDisabled && svc.syncIntervalMins != 0.0 {
if svc.syncer == nil {
if err := svc.initSyncer(ctx); err != nil {
return err
}
} else if reinitSyncer {
svc.closeSyncer()
if err := svc.initSyncer(ctx); err != nil {
return err
}
}
} else if reinitSyncer {
svc.closeSyncer()
if err := svc.initSyncer(ctx); err != nil {
return err
svc.startSyncScheduler(svc.syncIntervalMins)
} else {
if svc.syncTicker != nil {
svc.syncTicker.Stop()
svc.syncTicker = nil
}
svc.closeSyncer()
}
svc.startSyncScheduler(svc.syncIntervalMins)
} else {
svc.closeSyncer()
}

return nil
Expand Down Expand Up @@ -474,11 +479,11 @@ func (svc *builtIn) uploadData(cancelCtx context.Context, intervalMins float64)
intervalMillis := 60000.0 * intervalMins
// The ticker must be created before uploadData returns to prevent race conditions between clock.Ticker and
// clock.Add in sync_test.go.
ticker := clock.Ticker(time.Millisecond * time.Duration(intervalMillis))
svc.syncTicker = clock.Ticker(time.Millisecond * time.Duration(intervalMillis))
svc.backgroundWorkers.Add(1)
goutils.PanicCapturingGo(func() {
defer svc.backgroundWorkers.Done()
defer ticker.Stop()
defer svc.syncTicker.Stop()

for {
if err := cancelCtx.Err(); err != nil {
Expand All @@ -491,7 +496,7 @@ func (svc *builtIn) uploadData(cancelCtx context.Context, intervalMins float64)
select {
case <-cancelCtx.Done():
return
case <-ticker.C:
case <-svc.syncTicker.C:
svc.lock.Lock()
if svc.syncer != nil {
svc.sync()
Expand Down
92 changes: 92 additions & 0 deletions services/datamanager/builtin/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,98 @@ func TestArbitraryFileUpload(t *testing.T) {
}
}

func TestSyncConfigUpdateBehavior(t *testing.T) {
newSyncIntervalMins := 0.009
tests := []struct {
name string
initSyncDisabled bool
initSyncIntervalMins float64
newSyncDisabled bool
newSyncIntervalMins float64
}{
{
name: "all sync config stays the same, syncer should not cancel, ticker stays the same",
initSyncDisabled: false,
initSyncIntervalMins: syncIntervalMins,
newSyncDisabled: false,
newSyncIntervalMins: syncIntervalMins,
},
{
name: "sync config changes, new ticker should be created for sync",
initSyncDisabled: false,
initSyncIntervalMins: syncIntervalMins,
newSyncDisabled: false,
newSyncIntervalMins: newSyncIntervalMins,
},
{
name: "sync gets disabled, syncer should be nil",
initSyncDisabled: false,
initSyncIntervalMins: syncIntervalMins,
newSyncDisabled: true,
newSyncIntervalMins: syncIntervalMins,
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
// Set up server.
mockClock := clk.NewMock()
// Make mockClock the package level clock used by the dmsvc so that we can simulate time's passage
clock = mockClock
tmpDir := t.TempDir()

// Set up data manager.
dmsvc, r := newTestDataManager(t)
defer dmsvc.Close(context.Background())
var mockClient = mockDataSyncServiceClient{
succesfulDCRequests: make(chan *v1.DataCaptureUploadRequest, 100),
failedDCRequests: make(chan *v1.DataCaptureUploadRequest, 100),
fail: &atomic.Bool{},
}
dmsvc.SetSyncerConstructor(getTestSyncerConstructorMock(mockClient))
cfg, deps := setupConfig(t, enabledBinaryCollectorConfigPath)

// Set up service config.
cfg.CaptureDisabled = false
cfg.ScheduledSyncDisabled = tc.initSyncDisabled
cfg.CaptureDir = tmpDir
cfg.SyncIntervalMins = tc.initSyncIntervalMins

resources := resourcesFromDeps(t, r, deps)
err := dmsvc.Reconfigure(context.Background(), resources, resource.Config{
ConvertedAttributes: cfg,
})
test.That(t, err, test.ShouldBeNil)

builtInSvc := dmsvc.(*builtIn)
initTicker := builtInSvc.syncTicker

// Reconfigure the dmsvc with new sync configs
cfg.ScheduledSyncDisabled = tc.newSyncDisabled
cfg.SyncIntervalMins = tc.newSyncIntervalMins

err = dmsvc.Reconfigure(context.Background(), resources, resource.Config{
ConvertedAttributes: cfg,
})
test.That(t, err, test.ShouldBeNil)

newBuildInSvc := dmsvc.(*builtIn)
newTicker := newBuildInSvc.syncTicker
newSyncer := newBuildInSvc.syncer

if tc.newSyncDisabled {
test.That(t, newSyncer, test.ShouldBeNil)
}

if tc.initSyncDisabled != tc.newSyncDisabled ||
tc.initSyncIntervalMins != tc.newSyncIntervalMins {
test.That(t, initTicker, test.ShouldNotEqual, newTicker)
}
})
}

}

func getAllFilePaths(dir string) []string {
var filePaths []string

Expand Down

0 comments on commit d81ee66

Please sign in to comment.