From 32f613e4d4450b09da3c81982dd6d7dba9c6f6f2 Mon Sep 17 00:00:00 2001 From: Viraj Bhartiya Date: Tue, 3 Dec 2024 15:44:34 +0530 Subject: [PATCH] fix(migration): improve job progress logging (#329) --- migration/runner.go | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/migration/runner.go b/migration/runner.go index 2a614aa1..fde0e260 100644 --- a/migration/runner.go +++ b/migration/runner.go @@ -58,7 +58,7 @@ func RunMigration(ctx context.Context, cfg Config, cache MigrationCache, store c }); err != nil { return xerrors.Errorf("error iterating actors: %w", err) } - log.Log(rt.INFO, "Done creating %d migration jobs after %v", jobCount, time.Since(startTime)) + log.Log(rt.INFO, "Done creating %d migration jobs after %v", jobCount, time.Since(startTime).Round(100*time.Millisecond)) return nil }) @@ -66,7 +66,6 @@ func RunMigration(ctx context.Context, cfg Config, cache MigrationCache, store c var workerWg sync.WaitGroup for i := uint(0); i < cfg.MaxWorkers; i++ { workerWg.Add(1) - workerId := i grp.Go(func() error { defer workerWg.Done() for job := range jobCh { @@ -91,7 +90,6 @@ func RunMigration(ctx context.Context, cfg Config, cache MigrationCache, store c atomic.AddUint32(&doneCount, 1) } - log.Log(rt.INFO, "Worker %d done", workerId) return nil }) } @@ -106,13 +104,13 @@ func RunMigration(ctx context.Context, cfg Config, cache MigrationCache, store c for { select { case <-time.After(cfg.ProgressLogPeriod): - jobsNow := jobCount // Snapshot values to avoid incorrect-looking arithmetic if they change. - doneNow := doneCount - pendingNow := jobsNow - doneNow + jobsNow := atomic.LoadUint32(&jobCount) + doneNow := atomic.LoadUint32(&doneCount) elapsed := time.Since(startTime) rate := float64(doneNow) / elapsed.Seconds() - log.Log(rt.INFO, "%d jobs created, %d done, %d pending after %v (%.0f/s)", - jobsNow, doneNow, pendingNow, elapsed, rate) + + log.Log(rt.INFO, "Performing migration: %d of %d jobs processed (%.0f/s) [%v elapsed]", + doneNow, jobsNow, rate, elapsed.Round(time.Second)) case <-workersFinished: return case <-ctx.Done(): @@ -127,7 +125,7 @@ func RunMigration(ctx context.Context, cfg Config, cache MigrationCache, store c workerWg.Wait() close(jobResultCh) close(workersFinished) - log.Log(rt.INFO, "All workers done after %v", time.Since(startTime)) + log.Log(rt.INFO, "All workers done after %v", time.Since(startTime).Round(100*time.Millisecond)) return nil }) @@ -148,7 +146,7 @@ func RunMigration(ctx context.Context, cfg Config, cache MigrationCache, store c } resultCount++ } - log.Log(rt.INFO, "Result writer wrote %d results to state tree after %v", resultCount, time.Since(startTime)) + log.Log(rt.INFO, "Result writer wrote %d results to state tree after %v", resultCount, time.Since(startTime).Round(100*time.Millisecond)) return nil }) @@ -197,7 +195,7 @@ func RunMigration(ctx context.Context, cfg Config, cache MigrationCache, store c elapsed := time.Since(startTime) rate := float64(doneCount) / elapsed.Seconds() - log.Log(rt.INFO, "All %d done after %v (%.0f/s)", doneCount, elapsed, rate) + log.Log(rt.INFO, "All %d done after %v (%.0f/s)", doneCount, elapsed.Round(100*time.Millisecond), rate) return actorsOut, nil }