Skip to content

Commit

Permalink
add more room for scope reports & optimize memory usage
Browse files Browse the repository at this point in the history
  • Loading branch information
noboruma committed Apr 17, 2024
1 parent ab80730 commit 9ef1478
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 71 deletions.
2 changes: 1 addition & 1 deletion deepfence_agent/tools/apache/scope/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ require (
github.com/weaveworks/tcptracer-bpf v0.0.0-00010101000000-000000000000
golang.org/x/net v0.19.0
golang.org/x/sys v0.15.0
golang.org/x/time v0.3.0
google.golang.org/grpc v1.58.3
k8s.io/api v0.29.0
k8s.io/apimachinery v0.29.0
Expand Down Expand Up @@ -128,6 +127,7 @@ require (
golang.org/x/oauth2 v0.12.0 // indirect
golang.org/x/term v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.13.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 // indirect
Expand Down
154 changes: 84 additions & 70 deletions deepfence_agent/tools/apache/scope/probe/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,17 @@ package probe

import (
"context"
"errors"
"sync"
"time"

"github.com/deepfence/ThreatMapper/deepfence_utils/log"
"github.com/hashicorp/go-metrics"
"golang.org/x/time/rate"

"github.com/weaveworks/scope/report"
)

const (
spiedReportBufferSize = 16
shortcutReportBufferSize = 1024
spiedReportBufferSize = 1024
)

// ReportPublisher publishes reports, probably to a remote collector.
Expand All @@ -23,23 +21,56 @@ type ReportPublisher interface {
PublishInterval() int32
}

type DrainableChan struct {
spiedReports chan report.Report
drainedReports chan report.Report
access sync.Mutex
}

func NewDrainableChan(bufferSize int) DrainableChan {
return DrainableChan{
spiedReports: make(chan report.Report, bufferSize),
drainedReports: make(chan report.Report, bufferSize),
}
}

func (dc *DrainableChan) Add(r *report.Report) error {
dc.access.Lock()
defer dc.access.Unlock()
select {
case dc.spiedReports <- r.Copy():
default:
return errors.New("reports full")
}
return nil
}

func (dc *DrainableChan) Drain() <-chan report.Report {
dc.access.Lock()
defer dc.access.Unlock()
tmp := dc.drainedReports
dc.drainedReports = dc.spiedReports
dc.spiedReports = tmp
return dc.drainedReports
}

// Probe sits there, generating and publishing reports.
type Probe struct {
spyInterval, publishInterval time.Duration
publisher ReportPublisher
rateLimiter *rate.Limiter
ticksPerFullReport int
noControls bool

tickers []Ticker
reporters []Reporter
taggers []Tagger
tickers []Ticker
reporters []Reporter
taggers []Tagger
reportersReports []report.Report
readyIndxes chan int

quit chan struct{}
done sync.WaitGroup
ctx context.Context
cancel context.CancelFunc

spiedReports chan report.Report
shortcutReports chan report.Report
spiedReports DrainableChan
}

// Tagger tags nodes with value-add node metadata.
Expand Down Expand Up @@ -82,16 +113,16 @@ func New(
ticksPerFullReport int,
noControls bool,
) *Probe {
ctx, cancel := context.WithCancel(context.Background())
result := &Probe{
spyInterval: spyInterval,
publishInterval: publishInterval,
publisher: publisher,
rateLimiter: rate.NewLimiter(rate.Every(publishInterval/100), 1),
ticksPerFullReport: ticksPerFullReport,
noControls: noControls,
quit: make(chan struct{}),
spiedReports: make(chan report.Report, spiedReportBufferSize),
shortcutReports: make(chan report.Report, shortcutReportBufferSize),
spiedReports: NewDrainableChan(spiedReportBufferSize),
ctx: ctx,
cancel: cancel,
}
return result
}
Expand All @@ -104,6 +135,9 @@ func (p *Probe) AddTagger(ts ...Tagger) {
// AddReporter adds a new Reported to the Probe
func (p *Probe) AddReporter(rs ...Reporter) {
p.reporters = append(p.reporters, rs...)

p.reportersReports = append(p.reportersReports, report.Report{})
p.readyIndxes = make(chan int, len(p.reporters))
}

// AddTicker adds a new Ticker to the Probe
Expand All @@ -113,75 +147,67 @@ func (p *Probe) AddTicker(ts ...Ticker) {

// Stop stops the probe
func (p *Probe) Stop() error {
close(p.quit)
p.done.Wait()
p.cancel()
return nil
}

// Publish will queue a report for immediate publication,
// bypassing the spy tick
func (p *Probe) Publish(rpt report.Report) {
rpt = p.tag(rpt)
p.spiedReports <- rpt
err := p.spiedReports.Add(&rpt)
if err != nil {
log.Error().Err(err).Msg("Spy enqueue failed")
}
}

func (p *Probe) spyLoop() {
defer p.done.Done()
spyTick := time.Tick(p.spyInterval)

func (p *Probe) spyLoop(ctx context.Context) {
rpt := report.MakeReport()
for {
select {
case <-spyTick:
p.tick()
rpt := p.report()
case <-time.After(p.spyInterval):
for _, ticker := range p.tickers {
err := ticker.Tick()
if err != nil {
log.Error().Err(err).Msgf("Spy ticks for %v failed", ticker.Name())
}
}
rpt.Clear()
rpt := p.report(rpt)
rpt = p.tag(rpt)
p.spiedReports <- rpt
case <-p.quit:
err := p.spiedReports.Add(&rpt)
if err != nil {
log.Error().Err(err).Msg("Spy enqueue failed")
}
case <-ctx.Done():
return
}
}
}

func (p *Probe) tick() {
for _, ticker := range p.tickers {
t := time.Now()
err := ticker.Tick()
metrics.MeasureSinceWithLabels([]string{"duration", "seconds"}, t, []metrics.Label{
{Name: "operation", Value: "ticker"},
{Name: "module", Value: ticker.Name()},
})
if err != nil {
log.Error().Msgf("Error doing ticker: %v", err)
}
}
}

func (p *Probe) report() report.Report {
reports := make(chan report.Report, len(p.reporters))
for _, rep := range p.reporters {
func (p *Probe) report(result report.Report) report.Report {
for i, rep := range p.reporters {
go func(rep Reporter) {
t := time.Now()
timer := time.AfterFunc(p.spyInterval, func() { log.Warn().Msgf("%v reporter took longer than %v", rep.Name(), p.spyInterval) })
newReport, err := rep.Report()
if !timer.Stop() {
log.Warn().Msgf("%v reporter took %v (longer than %v)", rep.Name(), time.Now().Sub(t), p.spyInterval)
}
metrics.MeasureSinceWithLabels([]string{"duration", "seconds"}, t, []metrics.Label{
{Name: "operation", Value: "reporter"},
{Name: "module", Value: rep.Name()},
})
if err != nil {
log.Error().Msgf("Error generating %s report: %v", rep.Name(), err)
newReport = report.MakeReport() // empty is OK to merge
}
reports <- newReport
p.reportersReports[i] = newReport
p.readyIndxes <- i
}(rep)
}

result := report.MakeReport()
result.TS = time.Now()
for i := 0; i < cap(reports); i++ {
result.UnsafeMerge(<-reports)
for i := 0; i < cap(p.readyIndxes); i++ {
index := <-p.readyIndxes
result.UnsafeMerge(p.reportersReports[index])
p.reportersReports[index].Clear()
}
return result
}
Expand All @@ -195,32 +221,20 @@ func (p *Probe) tag(r report.Report) report.Report {
if !timer.Stop() {
log.Warn().Msgf("%v tagger took %v (longer than %v)", tagger.Name(), time.Now().Sub(t), p.spyInterval)
}
metrics.MeasureSinceWithLabels([]string{"duration", "seconds"}, t, []metrics.Label{
{Name: "operation", Value: "tagger"},
{Name: "module", Value: tagger.Name()},
})
if err != nil {
log.Error().Msgf("Error applying tagger: %v", err)
}
}
return r
}

func (p *Probe) drainAndSanitise(rpt report.Report, rs chan report.Report) (report.Report, int) {
p.rateLimiter.Wait(context.Background())
rpt = rpt.Copy()
func (p *Probe) drainAndSanitise(rpt report.Report, rs <-chan report.Report) (report.Report, int) {
//rpt = rpt.Copy()
count := 0
ForLoop:
for {
select {
case r := <-rs:
rpt.UnsafeMerge(r)
count++
default:
break ForLoop
}
for len(rs) != 0 {
rpt.UnsafeMerge(<-rs)
count++
}

if p.noControls {
//rpt.WalkTopologies(func(t *report.Topology) {
// t.Controls = report.Controls{}
Expand Down

0 comments on commit 9ef1478

Please sign in to comment.