From b99557e83757b6a805fd2e74ea9f248ec2f6d07a Mon Sep 17 00:00:00 2001 From: Thomas Legris Date: Wed, 26 Apr 2023 22:25:19 +0900 Subject: [PATCH] Improve push back mechanism --- deepfence_server/controls/agent.go | 11 ++- deepfence_server/handler/agent_controls.go | 2 +- deepfence_server/handler/agent_report.go | 34 -------- deepfence_server/ingesters/agent.go | 90 ++++++++++++++++++---- deepfence_worker/cronjobs/neo4j.go | 4 +- 5 files changed, 82 insertions(+), 59 deletions(-) diff --git a/deepfence_server/controls/agent.go b/deepfence_server/controls/agent.go index b528adb7ed..c3a006ce68 100644 --- a/deepfence_server/controls/agent.go +++ b/deepfence_server/controls/agent.go @@ -38,10 +38,10 @@ func GetAgentActions(ctx context.Context, nodeId string, work_num_to_extract int actions = append(actions, diagnosticLogActions...) } - return actions, []error{} + return actions, []error{scan_err, upgrade_err} } -func GetPendingAgentScans(ctx context.Context, nodeId string) ([]controls.Action, error) { +func GetPendingAgentScans(ctx context.Context, nodeId string, availableWorkload int) ([]controls.Action, error) { res := []controls.Action{} if len(nodeId) == 0 { return res, errors.New("Missing node_id") @@ -52,8 +52,7 @@ func GetPendingAgentScans(ctx context.Context, nodeId string) ([]controls.Action return res, err } - // TODO: 5 - if has, err := hasPendingAgentScans(client, nodeId, 5); !has || err != nil { + if has, err := hasPendingAgentScans(client, nodeId, availableWorkload); !has || err != nil { return res, err } @@ -123,7 +122,7 @@ func hasAgentDiagnosticLogRequests(client neo4j.Driver, nodeId string, nodeType defer tx.Close() r, err := tx.Run(`MATCH (s:AgentDiagnosticLogs) -[:SCHEDULEDLOGS]-> (n{node_id:$id}) - WHERE (n:`+controls.ResourceTypeToNeo4j(nodeType)+`) + WHERE (n:`+controls.ResourceTypeToNeo4j(nodeType)+`) AND s.status = '`+utils.SCAN_STATUS_STARTING+`' AND s.retries < 3 WITH s LIMIT $max_work @@ -168,7 +167,7 @@ func ExtractAgentDiagnosticLogRequests(ctx context.Context, nodeId string, nodeT defer tx.Close() r, err := tx.Run(`MATCH (s:AgentDiagnosticLogs) -[:SCHEDULEDLOGS]-> (n{node_id:$id}) - WHERE (n:`+controls.ResourceTypeToNeo4j(nodeType)+`) + WHERE (n:`+controls.ResourceTypeToNeo4j(nodeType)+`) AND s.status = '`+utils.SCAN_STATUS_STARTING+`' AND s.retries < 3 WITH s LIMIT $max_work diff --git a/deepfence_server/handler/agent_controls.go b/deepfence_server/handler/agent_controls.go index 4ea14f874a..30b384d948 100644 --- a/deepfence_server/handler/agent_controls.go +++ b/deepfence_server/handler/agent_controls.go @@ -68,7 +68,7 @@ func (h *Handler) GetAgentInitControls(w http.ResponseWriter, r *http.Request) { return } - actions, err := controls.GetPendingAgentScans(ctx, agentId.NodeId) + actions, err := controls.GetPendingAgentScans(ctx, agentId.NodeId, agentId.AvailableWorkload) if err != nil { log.Warn().Msgf("Cannot get actions: %s, skipping", err) } diff --git a/deepfence_server/handler/agent_report.go b/deepfence_server/handler/agent_report.go index be27e23883..a7ef21fe75 100644 --- a/deepfence_server/handler/agent_report.go +++ b/deepfence_server/handler/agent_report.go @@ -20,9 +20,6 @@ import ( var agent_report_ingesters sync.Map -//var agent_report_ingesters map[directory.NamespaceID]*ingesters.Ingester[report.Report] -//var access sync.RWMutex - func init() { agent_report_ingesters = sync.Map{} } @@ -51,26 +48,6 @@ func getAgentReportIngester(ctx context.Context) (*ingesters.Ingester[*report.Re func (h *Handler) IngestAgentReport(w http.ResponseWriter, r *http.Request) { ctx := r.Context() - //contentType := r.Header.Get("Content-Type") - //var isMsgpack int - //switch { - //case strings.HasPrefix(contentType, "application/msgpack"): - // isMsgpack = 1 - //case strings.HasPrefix(contentType, "application/json"): - // isMsgpack = 0 - //case strings.HasPrefix(contentType, "application/binc"): - // isMsgpack = 2 - //default: - // respondWith(ctx, w, http.StatusBadRequest, fmt.Errorf("Unsupported Content-Type: %v", contentType)) - // return - //} - //data, err := io.ReadAll(r.Body) - //r.Body.Close() - //if err != nil { - // log.Error().Msgf("Error reading all: %v", err) - // respondWith(ctx, w, http.StatusBadRequest, err) - // return - //} rawReport := reportUtils.RawReport{} dec := jsoniter.NewDecoder(r.Body) @@ -93,22 +70,11 @@ func (h *Handler) IngestAgentReport(w http.ResponseWriter, r *http.Request) { respondWith(ctx, w, http.StatusBadRequest, err) return } - //data, err = io.ReadAll(gzr) - //if err != nil { - // log.Error().Msgf("Error read all raw: %v", err) - // respondWith(ctx, w, http.StatusBadRequest, err) - // return - //} - - //os.WriteFile("/tmp/report-"+time.Now().Format("2006-01-02-15-04-05")+".json", data, 0644) rpt := report.MakeReport() dec_inner := jsoniter.NewDecoder(gzr) err = dec_inner.Decode(&rpt) - //err = sonic.Unmarshal(data, &rpt) - - //if err := codec.NewDecoderBytes([]byte(rawReport.GetPayload()), &codec.JsonHandle{}).Decode(&rpt); err != nil { if err != nil { log.Error().Msgf("Error sonic unmarshal: %v", err) respondWith(ctx, w, http.StatusBadRequest, err) diff --git a/deepfence_server/ingesters/agent.go b/deepfence_server/ingesters/agent.go index 5c4a65e42c..5ffdd8b64a 100644 --- a/deepfence_server/ingesters/agent.go +++ b/deepfence_server/ingesters/agent.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "os" "strconv" "strings" "sync/atomic" @@ -22,23 +23,46 @@ const ( REDIS_NETWORK_MAP_KEY = "network_map" REDIS_IPPORTPID_MAP_KEY = "ipportpid_map" workers_num = 2 - db_input_size = 100 + default_db_input_size = 10 db_batch_size = 1_000 - db_batch_timeout = time.Second * 10 resolver_batch_size = 1_000 + default_ingester_size = 15_000 + db_batch_timeout = time.Second * 10 resolver_timeout = time.Second * 10 - ingester_size = 15_000 max_network_maps_size = 1024 * 1024 * 1024 // 1 GB per maps enqueer_timeout = time.Second * 30 + agent_base_timeout = time.Second * 30 localhost_ip = "127.0.0.1" + default_push_back = 1 ) var ( - Push_back atomic.Int32 + Push_back atomic.Int32 + ingester_size int + db_input_size int ) func init() { - Push_back.Store(2) + Push_back.Store(default_push_back) + push := os.Getenv("DF_INGEST_PUSH_BACK") + if push != "" { + push_int, err := strconv.Atoi(push) + if err == nil { + Push_back.Store(int32(push_int)) + } + } + + ingester_size = default_ingester_size + bsize := os.Getenv("DF_INGEST_REPORT_SIZE") + if bsize != "" { + ingester_size, _ = strconv.Atoi(bsize) + } + + db_input_size = default_db_input_size + dbsize := os.Getenv("DF_INGEST_DB_SIZE") + if dbsize != "" { + db_input_size, _ = strconv.Atoi(dbsize) + } } type EndpointResolvers struct { @@ -640,7 +664,7 @@ func (nc *neo4jIngester) runIngester() { log.Info().Msgf("runIngester ended") } -func (nc *neo4jIngester) runDBBatcher(db_pusher chan ReportIngestionData) { +func (nc *neo4jIngester) runDBBatcher(db_pusher chan ReportIngestionData, notify_full chan struct{}, num_pushes atomic.Int32) { batch := make([]ReportIngestionData, db_batch_size) size := 0 send := false @@ -669,13 +693,17 @@ loop: for i := 1; i < size; i++ { final_batch.merge(&batch[i]) } - log.Info().Msgf("Pushing %v reports to DB", size) + log.Debug().Msgf("Pushing %v reports to DB", size) size = 0 + num_pushes.Add(1) select { case db_pusher <- final_batch: default: log.Warn().Msgf("DB channel full") - Push_back.Add(1) + select { + case notify_full <- struct{}{}: + default: + } } } if reset_timeout { @@ -689,12 +717,20 @@ loop: func (nc *neo4jIngester) runDBPusher(db_pusher chan ReportIngestionData) { for batches := range db_pusher { span := telemetry.NewSpan(context.Background(), "ingester", "PushAgentReportsToDB") - defer span.End() - err := nc.PushToDB(batches) - if err != nil { - span.EndWithErr(err) - log.Error().Msgf("push to neo4j err: %v", err) - Push_back.Add(1) + retry := 0 + for { + err := nc.PushToDB(batches) + if err != nil { + log.Error().Msgf("push to neo4j err: %v", err) + if retry == 1 { + span.EndWithErr(err) + break + } + retry += 1 + } else { + span.End() + break + } } } log.Info().Msgf("runDBPusher ended") @@ -746,8 +782,10 @@ func NewNeo4jCollector(ctx context.Context) (Ingester[*report.Report], error) { go nc.runPreparer() } - db_pusher := make(chan ReportIngestionData, 10) - go nc.runDBBatcher(db_pusher) + notify_full := make(chan struct{}, 1) + num_pushes := atomic.Int32{} + db_pusher := make(chan ReportIngestionData, db_input_size) + go nc.runDBBatcher(db_pusher, notify_full, num_pushes) go nc.runDBPusher(db_pusher) go nc.resolversUpdater() @@ -755,6 +793,26 @@ func NewNeo4jCollector(ctx context.Context) (Ingester[*report.Report], error) { go nc.runIngester() go nc.runEnqueueReport() + // Push back decreaser + go func() { + prev_num_pushes := int32(0) + for { + select { + case <-time.After(agent_base_timeout * time.Duration(Push_back.Load())): + } + select { + case <-notify_full: + Push_back.Add(1) + default: + if Push_back.Load() > 1 && prev_num_pushes > num_pushes.Load() { + Push_back.Add(-1) + } + prev_num_pushes = num_pushes.Swap(0) + } + log.Info().Msgf("Push back: %v", Push_back.Load()) + } + }() + return nc, nil } diff --git a/deepfence_worker/cronjobs/neo4j.go b/deepfence_worker/cronjobs/neo4j.go index 208e6eaf68..28964fb8d1 100644 --- a/deepfence_worker/cronjobs/neo4j.go +++ b/deepfence_worker/cronjobs/neo4j.go @@ -16,9 +16,9 @@ import ( const ( diagnosticLogsCleanUpTimeout = time.Hour * 6 - dbReportCleanUpTimeout = time.Minute * 5 + dbReportCleanUpTimeout = time.Minute * 2 dbRegistryCleanUpTimeout = time.Minute * 30 - dbScanTimeout = time.Minute * 5 + dbScanTimeout = time.Minute * 2 dbUpgradeTimeout = time.Minute * 10 defaultDBScannedResourceCleanUpTimeout = time.Hour * 24 * 30 )