Skip to content

Commit

Permalink
Add lock around progress setting.
Browse files Browse the repository at this point in the history
  • Loading branch information
ahrav committed Mar 27, 2023
1 parent 5fd2d61 commit 33e30f5
Showing 1 changed file with 8 additions and 6 deletions.
14 changes: 8 additions & 6 deletions pkg/sources/gcs/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"

diskbufferreader "github.com/bill-rich/disk-buffer-reader"
"github.com/go-errors/errors"
Expand Down Expand Up @@ -61,7 +60,8 @@ type Source struct {
processedObjects int32
cache cache.Cache

sources.Progress
mu sync.Mutex
sources.Progress // progress is not thread safe
}

// persistableCache is a wrapper around cache.Cache that allows
Expand Down Expand Up @@ -258,14 +258,16 @@ func (s *Source) Chunks(ctx context.Context, chunksChan chan *sources.Chunk) err
}

func (s *Source) setProgress(ctx context.Context, objName string) {
atomic.AddInt32(&s.processedObjects, 1)
s.mu.Lock()
defer s.mu.Unlock()

ctx.Logger().V(5).Info("setting progress for object", "object-name", objName)
s.processedObjects++

s.cache.Set(objName, objName)
processed := atomic.LoadInt32(&s.processedObjects)
s.Progress.SectionsCompleted = processed
s.Progress.SectionsCompleted = s.processedObjects
s.Progress.SectionsRemaining = int32(s.stats.numObjects)
s.Progress.PercentComplete = int64(float64(processed) / float64(s.stats.numObjects) * 100)
s.Progress.PercentComplete = int64(float64(s.processedObjects) / float64(s.stats.numObjects) * 100)
}

func (s *Source) completeProgress(ctx context.Context) {
Expand Down

0 comments on commit 33e30f5

Please sign in to comment.