Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Have scanner wait for batch to finish before starting a new one #1548

Draft
wants to merge 3 commits into
base: api-breakers
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 48 additions & 56 deletions autopilot/scanner/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,7 @@ func (s *scanner) Scan(ctx context.Context, w WorkerRHPScan, force bool) {
go func() {
defer s.wg.Done()

hosts := s.fetchHosts(ctx, cutoff)
scanned := s.scanHosts(ctx, w, hosts)
scanned := s.scanHosts(ctx, w, cutoff)
removed := s.removeOfflineHosts(ctx)

s.mu.Lock()
Expand Down Expand Up @@ -157,56 +156,20 @@ func (s *scanner) UpdateHostsConfig(cfg api.HostsConfig) {
s.hostsCfg = &cfg
}

func (s *scanner) fetchHosts(ctx context.Context, cutoff time.Time) chan scanJob {
jobsChan := make(chan scanJob, s.scanBatchSize)
go func() {
defer close(jobsChan)

var exhausted bool
for offset := 0; !exhausted; offset += s.scanBatchSize {
hosts, err := s.hs.Hosts(ctx, api.HostOptions{
MaxLastScan: api.TimeRFC3339(cutoff),
Offset: offset,
Limit: s.scanBatchSize,
})
if err != nil {
s.logger.Errorf("could not get hosts for scanning, err: %v", err)
return
} else if len(hosts) < s.scanBatchSize {
exhausted = true
}

s.logger.Debugf("fetched %d hosts for scanning", len(hosts))
for _, h := range hosts {
select {
case <-s.interruptChan:
return
case <-s.shutdownChan:
return
case jobsChan <- scanJob{
hostKey: h.PublicKey,
hostIP: h.NetAddress,
}:
}
}
}
}()

return jobsChan
}

func (s *scanner) scanHosts(ctx context.Context, w WorkerRHPScan, hosts chan scanJob) (scanned uint64) {
func (s *scanner) scanHosts(ctx context.Context, w WorkerRHPScan, cutoff time.Time) (scanned uint64) {
// define worker
worker := func() {
for h := range hosts {
worker := func(jobChan <-chan scanJob) {
for h := range jobChan {
if s.isShutdown() || s.isInterrupted() {
break // shutdown
}

scan, err := w.RHPScan(ctx, h.hostKey, h.hostIP, DefaultScanTimeout)
if err != nil {
if errors.Is(err, context.Canceled) {
return
} else if err != nil {
s.logger.Errorw("worker stopped", zap.Error(err), "hk", h.hostKey)
break // abort
return // abort
} else if err := scan.Error(); err != nil {
s.logger.Debugw("host scan failed", zap.Error(err), "hk", h.hostKey, "ip", h.hostIP)
} else {
Expand All @@ -216,18 +179,47 @@ func (s *scanner) scanHosts(ctx context.Context, w WorkerRHPScan, hosts chan sca
}
}

// launch all workers
var wg sync.WaitGroup
for t := 0; t < s.scanThreads; t++ {
wg.Add(1)
go func() {
worker()
wg.Done()
}()
}
var exhausted bool
for !exhausted && !s.isShutdown() {
jobs := make(chan scanJob)
var wg sync.WaitGroup

joinWorkers := func() {
close(jobs)
wg.Wait()
}

// launch all workers for this batch
for t := 0; t < s.scanThreads; t++ {
wg.Add(1)
go func() {
worker(jobs)
wg.Done()
}()
}

// wait until they're done
wg.Wait()
// fetch batch
hosts, err := s.hs.Hosts(ctx, api.HostOptions{
MaxLastScan: api.TimeRFC3339(cutoff),
Offset: 0,
Limit: s.scanBatchSize,
})
if err != nil {
s.logger.Errorf("could not get hosts for scanning, err: %v", err)
joinWorkers()
break
}
exhausted = len(hosts) < s.scanBatchSize

// send batch to workers
for _, h := range hosts {
jobs <- scanJob{
hostKey: h.PublicKey,
hostIP: h.NetAddress,
}
}
joinWorkers()
}

s.statsHostPingMS.Recompute()
return
Expand Down
38 changes: 32 additions & 6 deletions autopilot/scanner/scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,25 @@ func (hs *mockHostStore) Hosts(ctx context.Context, opts api.HostOptions) ([]api
defer hs.mu.Unlock()
hs.scans = append(hs.scans, fmt.Sprintf("%d-%d", opts.Offset, opts.Offset+opts.Limit))

var hosts []api.Host
for _, host := range hs.hosts {
if !opts.MaxLastScan.IsZero() && opts.MaxLastScan.Std().Before(host.Interactions.LastScan) {
continue
}
hosts = append(hosts, host)
}

start := opts.Offset
if start > len(hs.hosts) {
if start > len(hosts) {
return nil, nil
}

end := opts.Offset + opts.Limit
if end > len(hs.hosts) {
end = len(hs.hosts)
if end > len(hosts) {
end = len(hosts)
}

return hs.hosts[start:end], nil
return hosts[start:end], nil
}

func (hs *mockHostStore) RemoveOfflineHosts(ctx context.Context, maxConsecutiveScanFailures uint64, maxDowntime time.Duration) (uint64, error) {
Expand All @@ -51,6 +59,18 @@ func (hs *mockHostStore) RemoveOfflineHosts(ctx context.Context, maxConsecutiveS
return 0, nil
}

func (hs *mockHostStore) recordScan(hk types.PublicKey) {
hs.mu.Lock()
defer hs.mu.Unlock()
for i, host := range hs.hosts {
if host.PublicKey == hk {
hs.hosts[i].Interactions.LastScan = time.Now().UTC()
return
}
}
panic("unknown host")
}

func (hs *mockHostStore) state() ([]string, []string) {
hs.mu.Lock()
defer hs.mu.Unlock()
Expand All @@ -59,6 +79,7 @@ func (hs *mockHostStore) state() ([]string, []string) {

type mockWorker struct {
blockChan chan struct{}
hs *mockHostStore

mu sync.Mutex
scanCount int
Expand All @@ -69,6 +90,8 @@ func (w *mockWorker) RHPScan(ctx context.Context, hostKey types.PublicKey, hostI
<-w.blockChan
}

w.hs.recordScan(hostKey)

w.mu.Lock()
defer w.mu.Unlock()
w.scanCount++
Expand All @@ -94,7 +117,10 @@ func TestScanner(t *testing.T) {
}

// initiate a host scan using a worker that blocks
w := &mockWorker{blockChan: make(chan struct{})}
w := &mockWorker{
blockChan: make(chan struct{}),
hs: hs,
}
s.Scan(context.Background(), w, false)

// assert it's scanning
Expand All @@ -121,7 +147,7 @@ func TestScanner(t *testing.T) {
// assert the scanner made 3 batch reqs
if scans, _ := hs.state(); len(scans) != 3 {
t.Fatalf("unexpected number of requests, %v != 3", len(scans))
} else if scans[0] != "0-40" || scans[1] != "40-80" || scans[2] != "80-120" {
} else if scans[0] != "0-40" || scans[1] != "0-40" || scans[2] != "0-40" {
t.Fatalf("unexpected requests, %v", scans)
}

Expand Down
Loading