Skip to content

Commit

Permalink
Improve push back mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
noboruma committed Apr 26, 2023
1 parent 157fad1 commit b99557e
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 59 deletions.
11 changes: 5 additions & 6 deletions deepfence_server/controls/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ func GetAgentActions(ctx context.Context, nodeId string, work_num_to_extract int
actions = append(actions, diagnosticLogActions...)
}

return actions, []error{}
return actions, []error{scan_err, upgrade_err}
}

func GetPendingAgentScans(ctx context.Context, nodeId string) ([]controls.Action, error) {
func GetPendingAgentScans(ctx context.Context, nodeId string, availableWorkload int) ([]controls.Action, error) {
res := []controls.Action{}
if len(nodeId) == 0 {
return res, errors.New("Missing node_id")
Expand All @@ -52,8 +52,7 @@ func GetPendingAgentScans(ctx context.Context, nodeId string) ([]controls.Action
return res, err
}

// TODO: 5
if has, err := hasPendingAgentScans(client, nodeId, 5); !has || err != nil {
if has, err := hasPendingAgentScans(client, nodeId, availableWorkload); !has || err != nil {
return res, err
}

Expand Down Expand Up @@ -123,7 +122,7 @@ func hasAgentDiagnosticLogRequests(client neo4j.Driver, nodeId string, nodeType
defer tx.Close()

r, err := tx.Run(`MATCH (s:AgentDiagnosticLogs) -[:SCHEDULEDLOGS]-> (n{node_id:$id})
WHERE (n:`+controls.ResourceTypeToNeo4j(nodeType)+`)
WHERE (n:`+controls.ResourceTypeToNeo4j(nodeType)+`)
AND s.status = '`+utils.SCAN_STATUS_STARTING+`'
AND s.retries < 3
WITH s LIMIT $max_work
Expand Down Expand Up @@ -168,7 +167,7 @@ func ExtractAgentDiagnosticLogRequests(ctx context.Context, nodeId string, nodeT
defer tx.Close()

r, err := tx.Run(`MATCH (s:AgentDiagnosticLogs) -[:SCHEDULEDLOGS]-> (n{node_id:$id})
WHERE (n:`+controls.ResourceTypeToNeo4j(nodeType)+`)
WHERE (n:`+controls.ResourceTypeToNeo4j(nodeType)+`)
AND s.status = '`+utils.SCAN_STATUS_STARTING+`'
AND s.retries < 3
WITH s LIMIT $max_work
Expand Down
2 changes: 1 addition & 1 deletion deepfence_server/handler/agent_controls.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (h *Handler) GetAgentInitControls(w http.ResponseWriter, r *http.Request) {
return
}

actions, err := controls.GetPendingAgentScans(ctx, agentId.NodeId)
actions, err := controls.GetPendingAgentScans(ctx, agentId.NodeId, agentId.AvailableWorkload)
if err != nil {
log.Warn().Msgf("Cannot get actions: %s, skipping", err)
}
Expand Down
34 changes: 0 additions & 34 deletions deepfence_server/handler/agent_report.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ import (

var agent_report_ingesters sync.Map

//var agent_report_ingesters map[directory.NamespaceID]*ingesters.Ingester[report.Report]
//var access sync.RWMutex

func init() {
agent_report_ingesters = sync.Map{}
}
Expand Down Expand Up @@ -51,26 +48,6 @@ func getAgentReportIngester(ctx context.Context) (*ingesters.Ingester[*report.Re
func (h *Handler) IngestAgentReport(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()

//contentType := r.Header.Get("Content-Type")
//var isMsgpack int
//switch {
//case strings.HasPrefix(contentType, "application/msgpack"):
// isMsgpack = 1
//case strings.HasPrefix(contentType, "application/json"):
// isMsgpack = 0
//case strings.HasPrefix(contentType, "application/binc"):
// isMsgpack = 2
//default:
// respondWith(ctx, w, http.StatusBadRequest, fmt.Errorf("Unsupported Content-Type: %v", contentType))
// return
//}
//data, err := io.ReadAll(r.Body)
//r.Body.Close()
//if err != nil {
// log.Error().Msgf("Error reading all: %v", err)
// respondWith(ctx, w, http.StatusBadRequest, err)
// return
//}
rawReport := reportUtils.RawReport{}

dec := jsoniter.NewDecoder(r.Body)
Expand All @@ -93,22 +70,11 @@ func (h *Handler) IngestAgentReport(w http.ResponseWriter, r *http.Request) {
respondWith(ctx, w, http.StatusBadRequest, err)
return
}
//data, err = io.ReadAll(gzr)
//if err != nil {
// log.Error().Msgf("Error read all raw: %v", err)
// respondWith(ctx, w, http.StatusBadRequest, err)
// return
//}

//os.WriteFile("/tmp/report-"+time.Now().Format("2006-01-02-15-04-05")+".json", data, 0644)

rpt := report.MakeReport()
dec_inner := jsoniter.NewDecoder(gzr)
err = dec_inner.Decode(&rpt)

//err = sonic.Unmarshal(data, &rpt)

//if err := codec.NewDecoderBytes([]byte(rawReport.GetPayload()), &codec.JsonHandle{}).Decode(&rpt); err != nil {
if err != nil {
log.Error().Msgf("Error sonic unmarshal: %v", err)
respondWith(ctx, w, http.StatusBadRequest, err)
Expand Down
90 changes: 74 additions & 16 deletions deepfence_server/ingesters/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"os"
"strconv"
"strings"
"sync/atomic"
Expand All @@ -22,23 +23,46 @@ const (
REDIS_NETWORK_MAP_KEY = "network_map"
REDIS_IPPORTPID_MAP_KEY = "ipportpid_map"
workers_num = 2
db_input_size = 100
default_db_input_size = 10
db_batch_size = 1_000
db_batch_timeout = time.Second * 10
resolver_batch_size = 1_000
default_ingester_size = 15_000
db_batch_timeout = time.Second * 10
resolver_timeout = time.Second * 10
ingester_size = 15_000
max_network_maps_size = 1024 * 1024 * 1024 // 1 GB per maps
enqueer_timeout = time.Second * 30
agent_base_timeout = time.Second * 30
localhost_ip = "127.0.0.1"
default_push_back = 1
)

var (
Push_back atomic.Int32
Push_back atomic.Int32
ingester_size int
db_input_size int
)

func init() {
Push_back.Store(2)
Push_back.Store(default_push_back)
push := os.Getenv("DF_INGEST_PUSH_BACK")
if push != "" {
push_int, err := strconv.Atoi(push)
if err == nil {
Push_back.Store(int32(push_int))
}
}

ingester_size = default_ingester_size
bsize := os.Getenv("DF_INGEST_REPORT_SIZE")
if bsize != "" {
ingester_size, _ = strconv.Atoi(bsize)
}

db_input_size = default_db_input_size
dbsize := os.Getenv("DF_INGEST_DB_SIZE")
if dbsize != "" {
db_input_size, _ = strconv.Atoi(dbsize)
}
}

type EndpointResolvers struct {
Expand Down Expand Up @@ -640,7 +664,7 @@ func (nc *neo4jIngester) runIngester() {
log.Info().Msgf("runIngester ended")
}

func (nc *neo4jIngester) runDBBatcher(db_pusher chan ReportIngestionData) {
func (nc *neo4jIngester) runDBBatcher(db_pusher chan ReportIngestionData, notify_full chan struct{}, num_pushes atomic.Int32) {
batch := make([]ReportIngestionData, db_batch_size)
size := 0
send := false
Expand Down Expand Up @@ -669,13 +693,17 @@ loop:
for i := 1; i < size; i++ {
final_batch.merge(&batch[i])
}
log.Info().Msgf("Pushing %v reports to DB", size)
log.Debug().Msgf("Pushing %v reports to DB", size)
size = 0
num_pushes.Add(1)
select {
case db_pusher <- final_batch:
default:
log.Warn().Msgf("DB channel full")
Push_back.Add(1)
select {
case notify_full <- struct{}{}:
default:
}
}
}
if reset_timeout {
Expand All @@ -689,12 +717,20 @@ loop:
func (nc *neo4jIngester) runDBPusher(db_pusher chan ReportIngestionData) {
for batches := range db_pusher {
span := telemetry.NewSpan(context.Background(), "ingester", "PushAgentReportsToDB")
defer span.End()
err := nc.PushToDB(batches)
if err != nil {
span.EndWithErr(err)
log.Error().Msgf("push to neo4j err: %v", err)
Push_back.Add(1)
retry := 0
for {
err := nc.PushToDB(batches)
if err != nil {
log.Error().Msgf("push to neo4j err: %v", err)
if retry == 1 {
span.EndWithErr(err)
break
}
retry += 1
} else {
span.End()
break
}
}
}
log.Info().Msgf("runDBPusher ended")
Expand Down Expand Up @@ -746,15 +782,37 @@ func NewNeo4jCollector(ctx context.Context) (Ingester[*report.Report], error) {
go nc.runPreparer()
}

db_pusher := make(chan ReportIngestionData, 10)
go nc.runDBBatcher(db_pusher)
notify_full := make(chan struct{}, 1)
num_pushes := atomic.Int32{}
db_pusher := make(chan ReportIngestionData, db_input_size)
go nc.runDBBatcher(db_pusher, notify_full, num_pushes)
go nc.runDBPusher(db_pusher)

go nc.resolversUpdater()

go nc.runIngester()
go nc.runEnqueueReport()

// Push back decreaser
go func() {
prev_num_pushes := int32(0)
for {
select {
case <-time.After(agent_base_timeout * time.Duration(Push_back.Load())):
}
select {
case <-notify_full:
Push_back.Add(1)
default:
if Push_back.Load() > 1 && prev_num_pushes > num_pushes.Load() {
Push_back.Add(-1)
}
prev_num_pushes = num_pushes.Swap(0)
}
log.Info().Msgf("Push back: %v", Push_back.Load())
}
}()

return nc, nil
}

Expand Down
4 changes: 2 additions & 2 deletions deepfence_worker/cronjobs/neo4j.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ import (

const (
diagnosticLogsCleanUpTimeout = time.Hour * 6
dbReportCleanUpTimeout = time.Minute * 5
dbReportCleanUpTimeout = time.Minute * 2
dbRegistryCleanUpTimeout = time.Minute * 30
dbScanTimeout = time.Minute * 5
dbScanTimeout = time.Minute * 2
dbUpgradeTimeout = time.Minute * 10
defaultDBScannedResourceCleanUpTimeout = time.Hour * 24 * 30
)
Expand Down

0 comments on commit b99557e

Please sign in to comment.