From 49306e9c9201410108c827c63a7476434de67e0a Mon Sep 17 00:00:00 2001 From: Mudit Joshi Date: Fri, 14 Feb 2025 15:36:28 +0530 Subject: [PATCH 01/14] Chores: - Add logs before each process start - Move default interval time to daemonset module: --- .../kube/daemonset/{env_constants.go => constants.go} | 7 +++++++ cmd/internal/kube/daemonset/daemonset.go | 11 +++++++++-- cmd/internal/kube/daemonset/telemetry.go | 2 ++ integrations/kube_apis/kube_apis.go | 2 +- 4 files changed, 19 insertions(+), 3 deletions(-) rename cmd/internal/kube/daemonset/{env_constants.go => constants.go} (60%) diff --git a/cmd/internal/kube/daemonset/env_constants.go b/cmd/internal/kube/daemonset/constants.go similarity index 60% rename from cmd/internal/kube/daemonset/env_constants.go rename to cmd/internal/kube/daemonset/constants.go index e7f0865..f5d11da 100644 --- a/cmd/internal/kube/daemonset/env_constants.go +++ b/cmd/internal/kube/daemonset/constants.go @@ -1,9 +1,16 @@ package daemonset +import "time" + type podEnvVars string const ( + // Pod environment variables POSTMAN_INSIGHTS_PROJECT_ID podEnvVars = "POSTMAN_INSIGHTS_PROJECT_ID" POSTMAN_INSIGHTS_API_KEY podEnvVars = "POSTMAN_INSIGHTS_API_KEY" POSTMAN_INSIGHTS_ENV podEnvVars = "POSTMAN_INSIGHTS_ENV" + + // Workers intervals + DefaultTelemetryInterval = 5 * time.Minute + DefaultPodHealthCheckInterval = 5 * time.Minute ) diff --git a/cmd/internal/kube/daemonset/daemonset.go b/cmd/internal/kube/daemonset/daemonset.go index 188b284..ca19f1c 100644 --- a/cmd/internal/kube/daemonset/daemonset.go +++ b/cmd/internal/kube/daemonset/daemonset.go @@ -10,7 +10,6 @@ import ( "time" "github.com/pkg/errors" - "github.com/postmanlabs/postman-insights-agent/apispec" "github.com/postmanlabs/postman-insights-agent/integrations/cri_apis" "github.com/postmanlabs/postman-insights-agent/integrations/kube_apis" "github.com/postmanlabs/postman-insights-agent/printer" @@ -58,7 +57,7 @@ func StartDaemonset() error { // Send initial telemetry clusterName := os.Getenv("POSTMAN_CLUSTER_NAME") - telemetryInterval := apispec.DefaultTelemetryInterval_seconds * time.Second + telemetryInterval := DefaultTelemetryInterval if clusterName == "" { printer.Infof( "The cluster name is missing. Telemetry will not be sent from this agent, " + @@ -99,6 +98,7 @@ func StartDaemonset() error { CRIClient: criClient, FrontClient: frontClient, TelemetryInterval: telemetryInterval, + PodHealthCheckInterval: DefaultPodHealthCheckInterval, } daemonsetRun.Run() }() @@ -115,18 +115,23 @@ func StartDaemonset() error { // 7. Stops all apidump processes. // 8. Exits the daemonset agent. func (d *Daemonset) Run() error { + printer.Infof("Starting daemonset agent...\n") done := make(chan struct{}) // Start the telemetry worker + printer.Infof("Starting telemetry worker...\n") go d.TelemetryWorker(done) // Start the kubernetes events worker + printer.Infof("Starting kubernetes events worker...\n") go d.KubernetesEventsWorker(done) // Start the pods health worker + printer.Infof("Starting pods health worker...\n") go d.PodsHealthWorker(done) // Start the process in the existing pods + printer.Infof("Starting process in existing pods...\n") err := d.StartProcessInExistingPods() if err != nil { printer.Errorf("Failed to start process in existing pods, error: %v\n", err) @@ -198,6 +203,7 @@ func (d *Daemonset) addPodArgsToMap(podUID types.UID, args *PodArgs, startingSta // The worker runs until the provided done channel is closed. func (d *Daemonset) TelemetryWorker(done <-chan struct{}) { if d.TelemetryInterval <= 0 { + printer.Debugf("Telemetry interval is set to 0, telemetry worker will not run\n") return } @@ -282,6 +288,7 @@ func (d *Daemonset) KubernetesEventsWorker(done <-chan struct{}) { // It runs until the provided done channel is closed. func (d *Daemonset) PodsHealthWorker(done <-chan struct{}) { if d.PodHealthCheckInterval <= 0 { + printer.Debugf("Pod health check interval is set to 0, pods health worker will not run\n") return } diff --git a/cmd/internal/kube/daemonset/telemetry.go b/cmd/internal/kube/daemonset/telemetry.go index 0d1e5f2..7cd5c0b 100644 --- a/cmd/internal/kube/daemonset/telemetry.go +++ b/cmd/internal/kube/daemonset/telemetry.go @@ -32,6 +32,8 @@ func (d *Daemonset) dumpPodsApiDumpProcessState() { } logf := printer.Debugf + logf("Dumping pods api dump process state, time: %s\n", time.Now().UTC()) + logf("========================================================\n") logf("Pods active and their states:\n") logf("%15v %15v %25v\n", "podName", "projectID", "currentState") diff --git a/integrations/kube_apis/kube_apis.go b/integrations/kube_apis/kube_apis.go index c13acf8..12ff321 100644 --- a/integrations/kube_apis/kube_apis.go +++ b/integrations/kube_apis/kube_apis.go @@ -119,7 +119,7 @@ func (kc *KubeClient) GetPodsByUIDs(podUIDs []types.UID) ([]coreV1.Pod, error) { } if len(filteredPods) == 0 { - return []coreV1.Pod{}, errors.Errorf("no pods found with names: %v", podUIDs) + return []coreV1.Pod{}, errors.Errorf("no pods found with UIDs: %v", podUIDs) } return filteredPods, nil From 38dcb255810f44c9d44d9b3a93e5183e28b3613f Mon Sep 17 00:00:00 2001 From: Mudit Joshi Date: Fri, 14 Feb 2025 16:36:08 +0530 Subject: [PATCH 02/14] chore: change StateChangeMutex to sync.Mutex instead to pointer --- cmd/internal/kube/daemonset/pod_args.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/internal/kube/daemonset/pod_args.go b/cmd/internal/kube/daemonset/pod_args.go index c842a42..0da6092 100644 --- a/cmd/internal/kube/daemonset/pod_args.go +++ b/cmd/internal/kube/daemonset/pod_args.go @@ -62,7 +62,7 @@ type PodArgs struct { // for state management PodTrafficMonitorState PodTrafficMonitorState - StateChangeMutex *sync.Mutex + StateChangeMutex sync.Mutex // send stop signal to apidump process StopChan chan error From 317c5e1c75b04d71fb8455998f4276d78107e476 Mon Sep 17 00:00:00 2001 From: Mudit Joshi Date: Fri, 14 Feb 2025 17:00:52 +0530 Subject: [PATCH 03/14] chore: change PodTrafficMonitorState from int to string --- .../kube/daemonset/apidump_process.go | 8 ++-- cmd/internal/kube/daemonset/daemonset.go | 4 +- .../kube/daemonset/kube_events_worker.go | 2 +- cmd/internal/kube/daemonset/pod_args.go | 43 ++++++------------- .../kube/daemonset/pods_healthcheck_worker.go | 4 +- 5 files changed, 21 insertions(+), 40 deletions(-) diff --git a/cmd/internal/kube/daemonset/apidump_process.go b/cmd/internal/kube/daemonset/apidump_process.go index fdef5b2..44f615d 100644 --- a/cmd/internal/kube/daemonset/apidump_process.go +++ b/cmd/internal/kube/daemonset/apidump_process.go @@ -23,8 +23,8 @@ func (d *Daemonset) StartApiDumpProcess(podUID types.UID) error { err = podArgs.changePodTrafficMonitorState(TrafficMonitoringStarted, PodDetected, PodInitialized) if err != nil { - return errors.Wrapf(err, "failed to change pod state, pod name: %s, from: %d to: %d", - podArgs.PodName, podArgs.PodTrafficMonitorState, TrafficMonitoringStopped) + return errors.Wrapf(err, "failed to change pod state, pod name: %s, from: %s to: %s", + podArgs.PodName, podArgs.PodTrafficMonitorState, TrafficMonitoringStarted) } go func() (funcErr error) { @@ -45,7 +45,7 @@ func (d *Daemonset) StartApiDumpProcess(podUID types.UID) error { err = podArgs.changePodTrafficMonitorState(nextState, TrafficMonitoringStarted) if err != nil { - printer.Errorf("Failed to change pod state, pod name: %s, from: %d to: %d, error: %v\n", + printer.Errorf("Failed to change pod state, pod name: %s, from: %s to: %s, error: %v\n", podArgs.PodName, podArgs.PodTrafficMonitorState, nextState, err) return } @@ -99,7 +99,7 @@ func (d *Daemonset) StopApiDumpProcess(podUID types.UID, stopErr error) error { err = podArgs.changePodTrafficMonitorState(TrafficMonitoringStopped, PodTerminated, DaemonSetShutdown, TrafficMonitoringFailed, TrafficMonitoringEnded) if err != nil { - return errors.Wrapf(err, "failed to change pod state, pod name: %s, from: %d to: %d", + return errors.Wrapf(err, "failed to change pod state, pod name: %s, from: %s to: %s", podArgs.PodName, podArgs.PodTrafficMonitorState, TrafficMonitoringStopped) } diff --git a/cmd/internal/kube/daemonset/daemonset.go b/cmd/internal/kube/daemonset/daemonset.go index ca19f1c..a463b44 100644 --- a/cmd/internal/kube/daemonset/daemonset.go +++ b/cmd/internal/kube/daemonset/daemonset.go @@ -191,7 +191,7 @@ func (d *Daemonset) addPodArgsToMap(podUID types.UID, args *PodArgs, startingSta if loaded { err := argsFromMap.changePodTrafficMonitorState(startingState) if err != nil { - printer.Errorf("Failed to change pod state, pod name: %s, from: %d to: %d, error: %v\n", + printer.Errorf("Failed to change pod state, pod name: %s, from: %s to: %s, error: %v\n", argsFromMap.PodName, argsFromMap.PodTrafficMonitorState, startingState, err) } } else { @@ -318,7 +318,7 @@ func (d *Daemonset) StopAllApiDumpProcesses() { // Since this state can happen at any time so no check for allowed current states err := podArgs.changePodTrafficMonitorState(DaemonSetShutdown) if err != nil { - printer.Errorf("Failed to change pod state, pod name: %s, from: %d to: %d, error: %v\n", + printer.Errorf("Failed to change pod state, pod name: %s, from: %s to: %s, error: %v\n", podArgs.PodName, podArgs.PodTrafficMonitorState, DaemonSetShutdown, err) return true } diff --git a/cmd/internal/kube/daemonset/kube_events_worker.go b/cmd/internal/kube/daemonset/kube_events_worker.go index bd620a8..481c3da 100644 --- a/cmd/internal/kube/daemonset/kube_events_worker.go +++ b/cmd/internal/kube/daemonset/kube_events_worker.go @@ -82,7 +82,7 @@ func (d *Daemonset) handlePodDeleteEvent(podUID types.UID) { err = podArgs.changePodTrafficMonitorState(PodTerminated, TrafficMonitoringStarted) if err != nil { - printer.Errorf("Failed to change pod state, pod name: %s, from: %d to: %d, error: %v\n", + printer.Errorf("Failed to change pod state, pod name: %s, from: %s to: %s, error: %v\n", podArgs.PodName, podArgs.PodTrafficMonitorState, PodTerminated, err) return } diff --git a/cmd/internal/kube/daemonset/pod_args.go b/cmd/internal/kube/daemonset/pod_args.go index 0da6092..6643e79 100644 --- a/cmd/internal/kube/daemonset/pod_args.go +++ b/cmd/internal/kube/daemonset/pod_args.go @@ -9,41 +9,22 @@ import ( "github.com/pkg/errors" ) -type PodTrafficMonitorState int +type PodTrafficMonitorState string // Different states of pod traffic monitoring // The state transition is as follows: // PodDetected/PodInitialized -> TrafficMonitoringStarted -> TrafficMonitoringFailed/TrafficMonitoringEnded/PodTerminated -> TrafficMonitoringStopped -> RemovePodFromMap // 'DaemonSetShutdown' is a special state which is used to stop the daemonset agent and can be triggered at any time const ( - _ PodTrafficMonitorState = iota - - // When agent finds an already running pod - PodDetected - - // When agent will receive pod created event - PodInitialized - - // When apidump process is started for the pod - TrafficMonitoringStarted - - // When apidump process is errored for the pod - TrafficMonitoringFailed - - // When apidump process is ended without any issue for the pod - TrafficMonitoringEnded - - // When agent will receive pod deleted event or pod is in terminal state while checking status - PodTerminated - - // When the daemonset agent starts the shutdown process - DaemonSetShutdown - - // When apidump process is stopped for the pod - TrafficMonitoringStopped - - // Final state after which pod will be removed from the map - RemovePodFromMap + PodDetected PodTrafficMonitorState = "PodDetected" // When agent finds an already running pod + PodInitialized PodTrafficMonitorState = "PodInitialized" // When agent will receive pod created event + TrafficMonitoringStarted PodTrafficMonitorState = "TrafficMonitoringStarted" // When apidump process is started for the pod + TrafficMonitoringFailed PodTrafficMonitorState = "TrafficMonitoringFailed" // When apidump process is errored for the pod + TrafficMonitoringEnded PodTrafficMonitorState = "TrafficMonitoringEnded" // When apidump process is ended without any issue for the pod + PodTerminated PodTrafficMonitorState = "PodTerminated" // When agent will receive pod deleted event or pod is in terminal state while checking status + DaemonSetShutdown PodTrafficMonitorState = "DaemonSetShutdown" // When the daemonset agent starts the shutdown process + TrafficMonitoringStopped PodTrafficMonitorState = "TrafficMonitoringStopped" // When apidump process is stopped for the pod + RemovePodFromMap PodTrafficMonitorState = "RemovePodFromMap" // Final state after which pod will be removed from the map ) type PodCreds struct { @@ -87,11 +68,11 @@ func (p *PodArgs) changePodTrafficMonitorState( defer p.StateChangeMutex.Unlock() if !slices.Contains(allowedCurrentStates, p.PodTrafficMonitorState) { - return errors.New(fmt.Sprintf("Invalid current state for pod %s: %d", p.PodName, p.PodTrafficMonitorState)) + return errors.New(fmt.Sprintf("Invalid current state for pod %s: %s", p.PodName, p.PodTrafficMonitorState)) } if p.PodTrafficMonitorState == nextState { - return errors.New(fmt.Sprintf("API dump process for pod %s is already in state %d", p.PodName, nextState)) + return errors.New(fmt.Sprintf("API dump process for pod %s is already in state %s", p.PodName, nextState)) } p.PodTrafficMonitorState = nextState diff --git a/cmd/internal/kube/daemonset/pods_healthcheck_worker.go b/cmd/internal/kube/daemonset/pods_healthcheck_worker.go index a7bbe80..ef0a5c8 100644 --- a/cmd/internal/kube/daemonset/pods_healthcheck_worker.go +++ b/cmd/internal/kube/daemonset/pods_healthcheck_worker.go @@ -40,7 +40,7 @@ func (d *Daemonset) checkPodsHealth() { err = podArgs.changePodTrafficMonitorState(PodTerminated, TrafficMonitoringStarted) if err != nil { - printer.Infof("Failed to change pod state, pod name: %s, from: %d to: %d, error: %v\n", + printer.Infof("Failed to change pod state, pod name: %s, from: %s to: %s, error: %v\n", podArgs.PodName, podArgs.PodTrafficMonitorState, PodTerminated, err) continue } @@ -67,7 +67,7 @@ func (d *Daemonset) pruneStoppedProcesses() { case TrafficMonitoringStopped: err := podArgs.changePodTrafficMonitorState(RemovePodFromMap, TrafficMonitoringStopped) if err != nil { - printer.Errorf("Failed to change pod state, pod name: %s, from: %d to: %d\n", + printer.Errorf("Failed to change pod state, pod name: %s, from: %s to: %s\n", podArgs.PodName, podArgs.PodTrafficMonitorState, RemovePodFromMap) } case RemovePodFromMap: From 79d2b4e8fbf379cd31e18c4a54f9a0be7fa436e8 Mon Sep 17 00:00:00 2001 From: Mudit Joshi Date: Fri, 14 Feb 2025 17:03:54 +0530 Subject: [PATCH 04/14] chore: pass pointer of mutext parent and run go mod tidy --- cmd/internal/kube/daemonset/daemonset.go | 2 +- .../kube/daemonset/kube_events_worker.go | 18 +++++++++--------- go.mod | 2 +- go.sum | 14 +++++++------- 4 files changed, 18 insertions(+), 18 deletions(-) diff --git a/cmd/internal/kube/daemonset/daemonset.go b/cmd/internal/kube/daemonset/daemonset.go index a463b44..c070c32 100644 --- a/cmd/internal/kube/daemonset/daemonset.go +++ b/cmd/internal/kube/daemonset/daemonset.go @@ -250,7 +250,7 @@ func (d *Daemonset) StartProcessInExistingPods() error { continue } - d.addPodArgsToMap(pod.UID, &args, PodDetected) + d.addPodArgsToMap(pod.UID, args, PodDetected) err = d.StartApiDumpProcess(pod.UID) if err != nil { printer.Errorf("Failed to start api dump process, pod name: %s, error: %v\n", pod.Name, err) diff --git a/cmd/internal/kube/daemonset/kube_events_worker.go b/cmd/internal/kube/daemonset/kube_events_worker.go index 481c3da..d44b826 100644 --- a/cmd/internal/kube/daemonset/kube_events_worker.go +++ b/cmd/internal/kube/daemonset/kube_events_worker.go @@ -61,7 +61,7 @@ func (d *Daemonset) handlePodAddEvent(podUID types.UID) { return } - d.addPodArgsToMap(pod.UID, &args, PodInitialized) + d.addPodArgsToMap(pod.UID, args, PodInitialized) err = d.StartApiDumpProcess(pod.UID) if err != nil { printer.Errorf("Failed to start api dump process, pod name: %s, error: %v\n", pod.Name, err) @@ -97,17 +97,17 @@ func (d *Daemonset) handlePodDeleteEvent(podUID types.UID) { // required for the Postman Insights project. It retrieves the UUID of the main container // in the pod, fetches the environment variables of that container, and extracts the // necessary variables such as the project ID, API key, and environment. -func (d *Daemonset) inspectPodForEnvVars(pod coreV1.Pod) (PodArgs, error) { +func (d *Daemonset) inspectPodForEnvVars(pod coreV1.Pod) (*PodArgs, error) { // Get the UUID of the main container in the pod containerUUID, err := d.KubeClient.GetMainContainerUUID(pod) if err != nil { - return PodArgs{}, errors.Wrapf(err, "failed to get main container UUID for pod: %s", pod.Name) + return nil, errors.Wrapf(err, "failed to get main container UUID for pod: %s", pod.Name) } // Get the environment variables of the main container envVars, err := d.CRIClient.GetEnvVars(containerUUID) if err != nil { - return PodArgs{}, errors.Wrapf(err, "failed to get environment variables for pod/container : %s/%s", pod.Name, containerUUID) + return nil, errors.Wrapf(err, "failed to get environment variables for pod/container : %s/%s", pod.Name, containerUUID) } var ( @@ -122,7 +122,7 @@ func (d *Daemonset) inspectPodForEnvVars(pod coreV1.Pod) (PodArgs, error) { case string(POSTMAN_INSIGHTS_PROJECT_ID): err := akid.ParseIDAs(value, &insightsProjectID) if err != nil { - return PodArgs{}, errors.Wrap(err, "failed to parse project ID") + return nil, errors.Wrap(err, "failed to parse project ID") } case string(POSTMAN_INSIGHTS_API_KEY): insightsAPIKey = value @@ -132,17 +132,17 @@ func (d *Daemonset) inspectPodForEnvVars(pod coreV1.Pod) (PodArgs, error) { } if (insightsProjectID == akid.ServiceID{}) && insightsAPIKey == "" { - return PodArgs{}, allRequiredEnvVarsAbsentErr + return nil, allRequiredEnvVarsAbsentErr } if (insightsProjectID == akid.ServiceID{}) { printer.Errorf("Project ID is missing, set it using the environment variable %s, pod name: %s\n", POSTMAN_INSIGHTS_PROJECT_ID, pod.Name) - return PodArgs{}, requiredEnvVarMissingErr + return nil, requiredEnvVarMissingErr } if insightsAPIKey == "" { printer.Errorf("API key is missing, set it using the environment variable %s, pod name: %s\n", POSTMAN_INSIGHTS_API_KEY, pod.Name) - return PodArgs{}, requiredEnvVarMissingErr + return nil, requiredEnvVarMissingErr } args := PodArgs{ @@ -157,5 +157,5 @@ func (d *Daemonset) inspectPodForEnvVars(pod coreV1.Pod) (PodArgs, error) { StopChan: make(chan error, 2), } - return args, nil + return &args, nil } diff --git a/go.mod b/go.mod index ef3207c..8cd0df4 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.16.7 github.com/aws/smithy-go v1.13.4 github.com/c9s/goprocinfo v0.0.0-20210130143923-c95fcf8c64a8 + github.com/containernetworking/plugins v1.5.1 github.com/golang/gddo v0.0.0-20210115222349-20d68f94ee1f github.com/golang/mock v1.4.4 github.com/golang/protobuf v1.5.4 @@ -63,7 +64,6 @@ require ( github.com/aws/aws-sdk-go-v2/service/sso v1.11.25 // indirect github.com/aws/aws-sdk-go-v2/service/ssooidc v1.13.8 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.17.1 // indirect - github.com/containernetworking/plugins v1.5.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect github.com/fsnotify/fsnotify v1.4.9 // indirect diff --git a/go.sum b/go.sum index 7ea8428..3555a0f 100644 --- a/go.sum +++ b/go.sum @@ -90,6 +90,8 @@ github.com/c9s/goprocinfo v0.0.0-20210130143923-c95fcf8c64a8 h1:SjZ2GvvOononHOpK github.com/c9s/goprocinfo v0.0.0-20210130143923-c95fcf8c64a8/go.mod h1:uEyr4WpAH4hio6LFriaPkL938XnrvLpNPmQHBdrmbIE= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/containernetworking/cni v1.1.2 h1:wtRGZVv7olUHMOqouPpn3cXJWpJgM6+EUl31EQbXALQ= +github.com/containernetworking/cni v1.1.2/go.mod h1:sDpYKmGVENF3s6uvMvGgldDWeG8dMxakj/u+i9ht9vw= github.com/containernetworking/plugins v1.5.1 h1:T5ji+LPYjjgW0QM+KyrigZbLsZ8jaX+E5J/EcKOE4gQ= github.com/containernetworking/plugins v1.5.1/go.mod h1:MIQfgMayGuHYs0XdNudf31cLLAC+i242hNm6KuDGqCM= github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= @@ -131,7 +133,8 @@ github.com/go-openapi/swag v0.22.3/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+ github.com/go-stack/stack v1.6.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= -github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls= +github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= +github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= github.com/go-test/deep v1.0.3 h1:ZrJSEWsXzPOxaZnFteGEfooLba+ju3FYIbOrS+rQd68= github.com/go-test/deep v1.0.3/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= @@ -174,9 +177,8 @@ github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/ github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= -github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 h1:K6RDEckDVWvDI9JAJYCmNdQXq6neHJOYx3V6jnqNEec= -github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6 h1:k7nVchz72niMH6YLQNvHSdIE7iqsQxK1P41mySCvssg= +github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6/go.mod h1:kf6iHlnVGwgKolg33glAes7Yg/8iWP8ukqeldJSO7jw= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -309,12 +311,10 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= -github.com/onsi/ginkgo/v2 v2.13.0 h1:0jY9lJquiL8fcf3M4LAXN5aMlS/b2BV86HFFPCPMgE4= -github.com/onsi/ginkgo/v2 v2.13.0/go.mod h1:TE309ZR8s5FsKKpuB1YAQYBzCaAfUgatB/xlT/ETL/o= github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA= -github.com/onsi/gomega v1.29.0 h1:KIA/t2t5UBzoirT4H9tsML45GEbo3ouUnBHsCfD2tVg= -github.com/onsi/gomega v1.29.0/go.mod h1:9sxs+SwGrKI0+PWe4Fxa9tFQQBG5xSsSbMXOI8PPpoQ= +github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To= github.com/onsi/gomega v1.33.1 h1:dsYjIxxSR755MDmKVsaFQTE22ChNBcuuTWgkUDSubOk= +github.com/onsi/gomega v1.33.1/go.mod h1:U4R44UsT+9eLIaYRB2a5qajjtQYn0hauxvRm16AVYg0= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= From 4fe19375d6df256decc3c0d0d4b7bdabda7ad5e6 Mon Sep 17 00:00:00 2001 From: Mudit Joshi Date: Fri, 14 Feb 2025 17:05:03 +0530 Subject: [PATCH 05/14] chore: use for range instead of single switch case --- cmd/internal/kube/daemonset/daemonset.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/cmd/internal/kube/daemonset/daemonset.go b/cmd/internal/kube/daemonset/daemonset.go index c070c32..cf3efaf 100644 --- a/cmd/internal/kube/daemonset/daemonset.go +++ b/cmd/internal/kube/daemonset/daemonset.go @@ -148,12 +148,9 @@ func (d *Daemonset) Run() error { // Continue until an interrupt DoneWaitingForSignal: - for { - select { - case received := <-sig: - printer.Stderr.Infof("Received %v, stopping daemonset...\n", received.String()) - break DoneWaitingForSignal - } + for received := range sig { + printer.Stderr.Infof("Received %v, stopping daemonset...\n", received.String()) + break DoneWaitingForSignal } } From 2b6316203a2f5798716ca316095a2553079a6142 Mon Sep 17 00:00:00 2001 From: Mudit Joshi Date: Fri, 14 Feb 2025 20:39:21 +0530 Subject: [PATCH 06/14] chore: set revisionVersion to only fetch events after agent creation --- cmd/internal/kube/daemonset/daemonset.go | 6 ++- integrations/kube_apis/kube_apis.go | 23 +++++++++- integrations/tests/kube_cri_apis/main.go | 53 ++++++++++++++++++------ 3 files changed, 66 insertions(+), 16 deletions(-) diff --git a/cmd/internal/kube/daemonset/daemonset.go b/cmd/internal/kube/daemonset/daemonset.go index cf3efaf..0185d84 100644 --- a/cmd/internal/kube/daemonset/daemonset.go +++ b/cmd/internal/kube/daemonset/daemonset.go @@ -162,6 +162,10 @@ func (d *Daemonset) Run() error { printer.Debugf("Stopping all apidump processes...\n") d.StopAllApiDumpProcesses() + // Stop K8s Watcher + printer.Debugf("Stopping k8s watcher...\n") + d.KubeClient.Close() + printer.Infof("Exiting daemonset agent...\n") return nil } @@ -192,7 +196,7 @@ func (d *Daemonset) addPodArgsToMap(podUID types.UID, args *PodArgs, startingSta argsFromMap.PodName, argsFromMap.PodTrafficMonitorState, startingState, err) } } else { - printer.Errorf("Pod is already loaded in the map and is in state %d\n", argsFromMap.PodTrafficMonitorState) + printer.Errorf("Pod is already loaded in the map and is in state %s\n", argsFromMap.PodTrafficMonitorState) } } diff --git a/integrations/kube_apis/kube_apis.go b/integrations/kube_apis/kube_apis.go index 12ff321..35ba460 100644 --- a/integrations/kube_apis/kube_apis.go +++ b/integrations/kube_apis/kube_apis.go @@ -23,6 +23,7 @@ type KubeClient struct { Clientset *kubernetes.Clientset EventWatch watch.Interface AgentNode string + AgentHost string } // NewKubeClient initializes a new Kubernetes client @@ -47,9 +48,15 @@ func NewKubeClient() (KubeClient, error) { return KubeClient{}, errors.New("POSTMAN_K8S_NODE environment variable not set") } + agentHostName, err := os.Hostname() + if err != nil { + return KubeClient{}, errors.Wrap(err, "error getting hostname") + } + kubeClient := KubeClient{ Clientset: clientset, AgentNode: agentNodeName, + AgentHost: agentHostName, } // Initialize event watcher @@ -68,9 +75,21 @@ func (kc *KubeClient) Close() { // initEventWatcher creates a new go-channel to listen for pod events in the cluster func (kc *KubeClient) initEventWatcher() error { + // Fetch own pod details + fieldSelector := fmt.Sprintf("metadata.name=%s", kc.AgentHost) + pod, err := kc.Clientset.CoreV1().Pods("").List(context.Background(), metaV1.ListOptions{ + FieldSelector: fieldSelector, + }) + if err != nil { + return errors.Wrap(err, "error getting own pod details") + } + + // Create a watcher for pod events + // Here ResourceVersion is set to the pod's ResourceVersion to watch events after the pod's creation watcher, err := kc.Clientset.CoreV1().Events("").Watch(context.Background(), metaV1.ListOptions{ - Watch: true, - FieldSelector: "involvedObject.kind=Pod", + Watch: true, + FieldSelector: "involvedObject.kind=Pod", + ResourceVersion: pod.ResourceVersion, }) if err != nil { return errors.Wrap(err, "error creating watcher") diff --git a/integrations/tests/kube_cri_apis/main.go b/integrations/tests/kube_cri_apis/main.go index 3b1fbcd..6bcb926 100644 --- a/integrations/tests/kube_cri_apis/main.go +++ b/integrations/tests/kube_cri_apis/main.go @@ -16,22 +16,38 @@ package main import ( + "encoding/json" "fmt" "github.com/postmanlabs/postman-insights-agent/integrations/cri_apis" "github.com/postmanlabs/postman-insights-agent/integrations/kube_apis" "github.com/postmanlabs/postman-insights-agent/printer" + coreV1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/watch" ) -func k8s_funcs() (string, error) { - // Initialize KubeClient - kubeClient, err := kube_apis.NewKubeClient() - if err != nil { - return "", fmt.Errorf("failed to create KubeClient: %v", err) +func k8s_watcher(kubeClient kube_apis.KubeClient) { + // Watch for pod events + for event := range kubeClient.EventWatch.ResultChan() { + printer.Infof("Received event: %v\n", event.Type) + switch event.Type { + case watch.Added, watch.Deleted: + if e, ok := event.Object.(*coreV1.Event); ok { + jsonData, err := json.Marshal(e) + if err != nil { + printer.Errorf("Failed to marshal event data: %v\n", err) + continue + } + printer.Infof("Event data: %s\n", string(jsonData)) + } + default: + printer.Infof("Unhandled event type: %v\n", event.Type) + } } - defer kubeClient.Close() +} +func k8s_funcs(kubeClient kube_apis.KubeClient) (string, error) { // GetPodsInNode podsInNode, err := kubeClient.GetPodsInAgentNode() if err != nil { @@ -109,19 +125,30 @@ func cri_funcs(containerUUID string) error { } func main() { + // Initialize KubeClient + kubeClient, err := kube_apis.NewKubeClient() + if err != nil { + printer.Errorf("Failed to create KubeClient: %v\n", err) + return + } + defer kubeClient.Close() + // Call k8s_funcs - containerUUID, err := k8s_funcs() + containerUUID, err := k8s_funcs(kubeClient) if err != nil { printer.Errorf("Error from k8s_funcs: %v\n", err) } // Call cri_funcs if containerUUID == "" { - printer.Infoln("Container UUID not found, exiting...") - return - } - err = cri_funcs(containerUUID) - if err != nil { - printer.Errorf("Error from cri_funcs: %v\n", err) + printer.Infoln("Container UUID not found, skipping CRI functions...") + } else { + err = cri_funcs(containerUUID) + if err != nil { + printer.Errorf("Error from cri_funcs: %v\n", err) + } } + + // Watch for pod events + k8s_watcher(kubeClient) } From 0335c52b58a91cc3f5b7417b54f5ff0499bd6736 Mon Sep 17 00:00:00 2001 From: Mudit Joshi Date: Fri, 14 Feb 2025 23:17:06 +0530 Subject: [PATCH 07/14] fix: parent run command starting in separate go routine --- cmd/internal/kube/daemonset/daemonset.go | 25 ++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/cmd/internal/kube/daemonset/daemonset.go b/cmd/internal/kube/daemonset/daemonset.go index 0185d84..e3292d0 100644 --- a/cmd/internal/kube/daemonset/daemonset.go +++ b/cmd/internal/kube/daemonset/daemonset.go @@ -10,6 +10,7 @@ import ( "time" "github.com/pkg/errors" + "github.com/postmanlabs/postman-insights-agent/cmd/internal/cmderr" "github.com/postmanlabs/postman-insights-agent/integrations/cri_apis" "github.com/postmanlabs/postman-insights-agent/integrations/kube_apis" "github.com/postmanlabs/postman-insights-agent/printer" @@ -90,18 +91,18 @@ func StartDaemonset() error { return errors.Wrap(err, "failed to create CRI client") } - go func() { - daemonsetRun := &Daemonset{ - ClusterName: clusterName, - InsightsReproModeEnabled: insightsReproModeEnabled, - KubeClient: kubeClient, - CRIClient: criClient, - FrontClient: frontClient, - TelemetryInterval: telemetryInterval, - PodHealthCheckInterval: DefaultPodHealthCheckInterval, - } - daemonsetRun.Run() - }() + daemonsetRun := &Daemonset{ + ClusterName: clusterName, + InsightsReproModeEnabled: insightsReproModeEnabled, + KubeClient: kubeClient, + CRIClient: criClient, + FrontClient: frontClient, + TelemetryInterval: telemetryInterval, + PodHealthCheckInterval: DefaultPodHealthCheckInterval, + } + if err := daemonsetRun.Run(); err != nil { + return cmderr.AkitaErr{Err: err} + } return nil } From b8f9005487d175c45d959f8e1b22a0d77c1734da Mon Sep 17 00:00:00 2001 From: Mudit Joshi Date: Sat, 15 Feb 2025 00:16:30 +0530 Subject: [PATCH 08/14] Fixes: - Handle case if any state change is allowed in changePodTrafficMonitorState() - if condition in addPodArgsToMap() is opposite also not this func will return error --- cmd/internal/kube/daemonset/daemonset.go | 19 +++++++++++++------ .../kube/daemonset/kube_events_worker.go | 7 ++++++- cmd/internal/kube/daemonset/pod_args.go | 4 +++- 3 files changed, 22 insertions(+), 8 deletions(-) diff --git a/cmd/internal/kube/daemonset/daemonset.go b/cmd/internal/kube/daemonset/daemonset.go index e3292d0..9049589 100644 --- a/cmd/internal/kube/daemonset/daemonset.go +++ b/cmd/internal/kube/daemonset/daemonset.go @@ -187,18 +187,20 @@ func (d *Daemonset) getPodArgsFromMap(podUID types.UID) (*PodArgs, error) { // addPodArgsToMap adds the podArgs to the map with the podUID as the key // This function ensures that the pod is not already loaded in the map -func (d *Daemonset) addPodArgsToMap(podUID types.UID, args *PodArgs, startingState PodTrafficMonitorState) { +func (d *Daemonset) addPodArgsToMap(podUID types.UID, args *PodArgs, startingState PodTrafficMonitorState) error { value, loaded := d.PodArgsByNameMap.LoadOrStore(podUID, args) argsFromMap := value.(*PodArgs) - if loaded { + if !loaded { err := argsFromMap.changePodTrafficMonitorState(startingState) if err != nil { - printer.Errorf("Failed to change pod state, pod name: %s, from: %s to: %s, error: %v\n", - argsFromMap.PodName, argsFromMap.PodTrafficMonitorState, startingState, err) + return errors.Wrapf(err, "failed to change pod state, pod name: %s, from: %s to: %s", + argsFromMap.PodName, argsFromMap.PodTrafficMonitorState, startingState) } } else { - printer.Errorf("Pod is already loaded in the map and is in state %s\n", argsFromMap.PodTrafficMonitorState) + return errors.Errorf("pod is already loaded in the map and is in state %s", argsFromMap.PodTrafficMonitorState) } + + return nil } // TelemetryWorker starts a worker that periodically sends telemetry data and dumps the state of the Pods API dump process. @@ -252,7 +254,12 @@ func (d *Daemonset) StartProcessInExistingPods() error { continue } - d.addPodArgsToMap(pod.UID, args, PodDetected) + err = d.addPodArgsToMap(pod.UID, args, PodDetected) + if err != nil { + printer.Errorf("Failed to add pod args to map, pod name: %s, error: %v\n", pod.Name, err) + continue + } + err = d.StartApiDumpProcess(pod.UID) if err != nil { printer.Errorf("Failed to start api dump process, pod name: %s, error: %v\n", pod.Name, err) diff --git a/cmd/internal/kube/daemonset/kube_events_worker.go b/cmd/internal/kube/daemonset/kube_events_worker.go index d44b826..06cfad3 100644 --- a/cmd/internal/kube/daemonset/kube_events_worker.go +++ b/cmd/internal/kube/daemonset/kube_events_worker.go @@ -61,7 +61,12 @@ func (d *Daemonset) handlePodAddEvent(podUID types.UID) { return } - d.addPodArgsToMap(pod.UID, args, PodInitialized) + err = d.addPodArgsToMap(pod.UID, args, PodInitialized) + if err != nil { + printer.Errorf("Failed to add pod args to map, pod name: %s, error: %v\n", pod.Name, err) + return + } + err = d.StartApiDumpProcess(pod.UID) if err != nil { printer.Errorf("Failed to start api dump process, pod name: %s, error: %v\n", pod.Name, err) diff --git a/cmd/internal/kube/daemonset/pod_args.go b/cmd/internal/kube/daemonset/pod_args.go index 6643e79..c38f4eb 100644 --- a/cmd/internal/kube/daemonset/pod_args.go +++ b/cmd/internal/kube/daemonset/pod_args.go @@ -67,7 +67,9 @@ func (p *PodArgs) changePodTrafficMonitorState( p.StateChangeMutex.Lock() defer p.StateChangeMutex.Unlock() - if !slices.Contains(allowedCurrentStates, p.PodTrafficMonitorState) { + // Check if the current state is allowed for the transition + // If the allowedCurrentStates is empty, then any state is allowed + if len(allowedCurrentStates) != 0 && !slices.Contains(allowedCurrentStates, p.PodTrafficMonitorState) { return errors.New(fmt.Sprintf("Invalid current state for pod %s: %s", p.PodName, p.PodTrafficMonitorState)) } From aa932f6fbe741238dade55dea06d61d0cccd2746 Mon Sep 17 00:00:00 2001 From: Mudit Joshi Date: Sat, 15 Feb 2025 00:23:01 +0530 Subject: [PATCH 09/14] chore: if pod is already running but agent is not listening to it start apidump process in next healthcheck run --- .../kube/daemonset/pods_healthcheck_worker.go | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/cmd/internal/kube/daemonset/pods_healthcheck_worker.go b/cmd/internal/kube/daemonset/pods_healthcheck_worker.go index ef0a5c8..8723d90 100644 --- a/cmd/internal/kube/daemonset/pods_healthcheck_worker.go +++ b/cmd/internal/kube/daemonset/pods_healthcheck_worker.go @@ -29,7 +29,8 @@ func (d *Daemonset) checkPodsHealth() { } for podUID, podStatus := range podStatuses { - if podStatus == coreV1.PodSucceeded || podStatus == coreV1.PodFailed { + switch podStatus { + case coreV1.PodSucceeded, coreV1.PodFailed: printer.Infof("Pod %s has stopped running\n", podStatus) podArgs, err := d.getPodArgsFromMap(podUID) @@ -49,6 +50,24 @@ func (d *Daemonset) checkPodsHealth() { if err != nil { printer.Errorf("Failed to stop api dump process, pod name: %s, error: %v\n", podArgs.PodName, err) } + case coreV1.PodRunning: + printer.Debugf("Pod %s is running\n", podStatus) + + podArgs, err := d.getPodArgsFromMap(podUID) + if err != nil { + printer.Infof("Failed to get podArgs for podUID %s: %v\n", podUID, err) + continue + } + + // If pod's monitoring state is still in PodDetected or PodInitialized, it means there is a bug. + // The program should have started the API dump process if it is stored in the map. + if podArgs.PodTrafficMonitorState == PodDetected || podArgs.PodTrafficMonitorState == PodInitialized { + printer.Debugf("Apidump process not started for pod %s during it's initialization, starting now\n", podArgs.PodName) + err = d.StartApiDumpProcess(podUID) + if err != nil { + printer.Errorf("Failed to start api dump process, pod name: %s, error: %v\n", podArgs.PodName, err) + } + } } } } From d11593eee886d76b96e73d6fa11ac2008d2315cc Mon Sep 17 00:00:00 2001 From: Mudit Joshi Date: Sat, 15 Feb 2025 00:43:29 +0530 Subject: [PATCH 10/14] fix: prepend '/host' to network namespace --- cmd/internal/kube/daemonset/apidump_process.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cmd/internal/kube/daemonset/apidump_process.go b/cmd/internal/kube/daemonset/apidump_process.go index 44f615d..f4d419c 100644 --- a/cmd/internal/kube/daemonset/apidump_process.go +++ b/cmd/internal/kube/daemonset/apidump_process.go @@ -64,6 +64,8 @@ func (d *Daemonset) StartApiDumpProcess(podUID types.UID) error { podArgs.PodName, podArgs.ContainerUUID, err) return } + // Prepend '/host' to network namespace, since '/proc' folder is mounted to '/host/proc' + networkNamespace = "/host" + networkNamespace apidumpArgs := apidump.Args{ ClientID: telemetry.GetClientID(), From 7718c4307e1f95e47c39674dd8d2de4e0b8bf6d5 Mon Sep 17 00:00:00 2001 From: Mudit Joshi Date: Sat, 15 Feb 2025 01:11:49 +0530 Subject: [PATCH 11/14] fix: pass learnClient in util.NewLearnSession() --- apidump/apidump.go | 4 ++-- integrations/nginx/backend.go | 2 +- util/util.go | 9 ++++++--- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/apidump/apidump.go b/apidump/apidump.go index 2e8aac0..2771bf2 100644 --- a/apidump/apidump.go +++ b/apidump/apidump.go @@ -416,7 +416,7 @@ func (a *apidump) RotateLearnSession(done <-chan struct{}, collectors []trace.Le case <-t.C: traceName := util.RandomLearnSessionName() - backendLrn, err := util.NewLearnSession(args.Domain, args.ClientID, a.backendSvc, traceName, traceTags, nil) + backendLrn, err := util.NewLearnSession(a.learnClient, traceName, traceTags, nil) if err != nil { telemetry.Error("new learn session", err) printer.Errorf("Failed to create trace %s: %v\n", traceName, err) @@ -605,7 +605,7 @@ func (a *apidump) Run() error { var backendLrn akid.LearnSessionID if a.TargetIsRemote() { uri := a.Out.AkitaURI - backendLrn, err = util.NewLearnSession(args.Domain, args.ClientID, a.backendSvc, uri.ObjectName, traceTags, nil) + backendLrn, err = util.NewLearnSession(a.learnClient, uri.ObjectName, traceTags, nil) if err == nil { printer.Infof("Created new trace on Postman Cloud: %s\n", uri) } else { diff --git a/integrations/nginx/backend.go b/integrations/nginx/backend.go index b7839ac..d8c5394 100644 --- a/integrations/nginx/backend.go +++ b/integrations/nginx/backend.go @@ -171,7 +171,7 @@ func NewNginxBackend(args *Args) (*NginxBackend, error) { tags.XAkitaSource: "nginx", } traceName := util.RandomLearnSessionName() - backendLrn, err := util.NewLearnSession(args.Domain, args.ClientID, b.backendSvc, traceName, traceTags, nil) + backendLrn, err := util.NewLearnSession(b.learnClient, traceName, traceTags, nil) if err != nil { return nil, errors.Wrap(err, "failed to create trace or fetch existing trace") } diff --git a/util/util.go b/util/util.go index 17cab1a..b6f5314 100644 --- a/util/util.go +++ b/util/util.go @@ -41,9 +41,12 @@ var ( apiTimeout = 20 * time.Second ) -func NewLearnSession(domain string, clientID akid.ClientID, svc akid.ServiceID, sessionName string, tags map[tags.Key]string, baseSpecRef *kgxapi.APISpecReference) (akid.LearnSessionID, error) { - learnClient := rest.NewLearnClient(domain, clientID, svc, nil) - +func NewLearnSession( + learnClient rest.LearnClient, + sessionName string, + tags map[tags.Key]string, + baseSpecRef *kgxapi.APISpecReference, +) (akid.LearnSessionID, error) { // Create a new learn session. ctx, cancel := context.WithTimeout(context.Background(), apiTimeout) defer cancel() From 2168fc425f9e9caed54abaaa68391845b2b4bb84 Mon Sep 17 00:00:00 2001 From: Mudit Joshi Date: Mon, 17 Feb 2025 11:27:07 +0530 Subject: [PATCH 12/14] chore: set default apidump args --- apidump/apidump.go | 2 +- apispec/defaults.go | 3 +++ cmd/internal/apidump/apidump.go | 2 +- cmd/internal/kube/daemonset/apidump_process.go | 17 +++++++++++++---- 4 files changed, 18 insertions(+), 6 deletions(-) diff --git a/apidump/apidump.go b/apidump/apidump.go index 2771bf2..66bb504 100644 --- a/apidump/apidump.go +++ b/apidump/apidump.go @@ -74,7 +74,7 @@ const ( // Args for running apidump as daemonset in Kubernetes type DaemonsetArgs struct { TargetNetworkNamespaceOpt string - StopChan <-chan error + StopChan <-chan error `json:"-"` APIKey string Environment string } diff --git a/apispec/defaults.go b/apispec/defaults.go index ccb7c9a..fa74acd 100644 --- a/apispec/defaults.go +++ b/apispec/defaults.go @@ -35,4 +35,7 @@ const ( // How often to rotate traces in the back end. DefaultTraceRotateInterval = time.Hour + + // Process all possible witness data + DefaultSampleRate = 1.0 ) diff --git a/cmd/internal/apidump/apidump.go b/cmd/internal/apidump/apidump.go index bbe72a9..7725809 100644 --- a/cmd/internal/apidump/apidump.go +++ b/cmd/internal/apidump/apidump.go @@ -256,7 +256,7 @@ func init() { Cmd.Flags().Float64Var( &sampleRateFlag, "sample-rate", - 1.0, + apispec.DefaultSampleRate, "A number between [0.0, 1.0] to control sampling.", ) Cmd.Flags().MarkDeprecated("sample-rate", "use --rate-limit instead.") diff --git a/cmd/internal/kube/daemonset/apidump_process.go b/cmd/internal/kube/daemonset/apidump_process.go index f4d419c..eac4e50 100644 --- a/cmd/internal/kube/daemonset/apidump_process.go +++ b/cmd/internal/kube/daemonset/apidump_process.go @@ -6,6 +6,7 @@ import ( "github.com/akitasoftware/go-utils/optionals" "github.com/pkg/errors" "github.com/postmanlabs/postman-insights-agent/apidump" + "github.com/postmanlabs/postman-insights-agent/apispec" "github.com/postmanlabs/postman-insights-agent/printer" "github.com/postmanlabs/postman-insights-agent/rest" "github.com/postmanlabs/postman-insights-agent/telemetry" @@ -68,10 +69,18 @@ func (d *Daemonset) StartApiDumpProcess(podUID types.UID) error { networkNamespace = "/host" + networkNamespace apidumpArgs := apidump.Args{ - ClientID: telemetry.GetClientID(), - Domain: rest.Domain, - ServiceID: podArgs.InsightsProjectID, - ReproMode: d.InsightsReproModeEnabled, + ClientID: telemetry.GetClientID(), + Domain: rest.Domain, + ServiceID: podArgs.InsightsProjectID, + SampleRate: apispec.DefaultSampleRate, + WitnessesPerMinute: apispec.DefaultRateLimit, + LearnSessionLifetime: apispec.DefaultTraceRotateInterval, + TelemetryInterval: apispec.DefaultTelemetryInterval_seconds, + ProcFSPollingInterval: apispec.DefaultProcFSPollingInterval_seconds, + CollectTCPAndTLSReports: apispec.DefaultCollectTCPAndTLSReports, + ParseTLSHandshakes: apispec.DefaultParseTLSHandshakes, + MaxWitnessSize_bytes: apispec.DefaultMaxWitnessSize_bytes, + ReproMode: d.InsightsReproModeEnabled, DaemonsetArgs: optionals.Some(apidump.DaemonsetArgs{ TargetNetworkNamespaceOpt: networkNamespace, StopChan: podArgs.StopChan, From 3811e45c1703f13945a9e4746b3e1e73bb9e5212 Mon Sep 17 00:00:00 2001 From: Mudit Joshi Date: Mon, 17 Feb 2025 13:04:55 +0530 Subject: [PATCH 13/14] fix: - telemetry api erroring our due to nil resp var - already deleted pod is not getting removed during pod health check --- .../kube/daemonset/pods_healthcheck_worker.go | 80 +++++++++++-------- integrations/kube_apis/kube_apis.go | 5 +- rest/front_client.go | 3 +- 3 files changed, 52 insertions(+), 36 deletions(-) diff --git a/cmd/internal/kube/daemonset/pods_healthcheck_worker.go b/cmd/internal/kube/daemonset/pods_healthcheck_worker.go index 8723d90..80f8550 100644 --- a/cmd/internal/kube/daemonset/pods_healthcheck_worker.go +++ b/cmd/internal/kube/daemonset/pods_healthcheck_worker.go @@ -25,49 +25,63 @@ func (d *Daemonset) checkPodsHealth() { podStatuses, err := d.KubeClient.GetPodsStatusByUIDs(podUIDs) if err != nil { printer.Errorf("Failed to get pods status: %v\n", err) - return } - for podUID, podStatus := range podStatuses { + for _, podUID := range podUIDs { + podStatus, ok := podStatuses[podUID] + if !ok { + printer.Infof("Pod status not found for podUID %s, Pod doesn't exists anymore\n", podUID) + d.handleTerminatedPod(podUID, errors.Errorf("pod %s doesn't exists anymore", podUID)) + } + switch podStatus { case coreV1.PodSucceeded, coreV1.PodFailed: printer.Infof("Pod %s has stopped running\n", podStatus) + d.handleTerminatedPod(podUID, errors.Errorf("pod %s has stopped running, status: %s", podUID, podStatus)) + case coreV1.PodRunning: + printer.Debugf("Pod %s is running\n", podStatus) + d.handleUnmonitoredPod(podUID) + } + } +} - podArgs, err := d.getPodArgsFromMap(podUID) - if err != nil { - printer.Infof("Failed to get podArgs for podUID %s: %v\n", podUID, err) - continue - } +// handleTerminatedPod handles the terminated pod by changing the pod's traffic monitor state to PodTerminated +// and stopping the API dump process for that pod. +func (d *Daemonset) handleTerminatedPod(podUID types.UID, podStatusErr error) { + podArgs, err := d.getPodArgsFromMap(podUID) + if err != nil { + printer.Infof("Failed to get podArgs for podUID %s: %v\n", podUID, err) + return + } - err = podArgs.changePodTrafficMonitorState(PodTerminated, TrafficMonitoringStarted) - if err != nil { - printer.Infof("Failed to change pod state, pod name: %s, from: %s to: %s, error: %v\n", - podArgs.PodName, podArgs.PodTrafficMonitorState, PodTerminated, err) - continue - } + err = podArgs.changePodTrafficMonitorState(PodTerminated, TrafficMonitoringStarted) + if err != nil { + printer.Infof("Failed to change pod state, pod name: %s, from: %s to: %s, error: %v\n", + podArgs.PodName, podArgs.PodTrafficMonitorState, PodTerminated, err) + return + } - err = d.StopApiDumpProcess(podUID, errors.Errorf("pod %s has stopped running, status: %s", podArgs.PodName, podStatus)) - if err != nil { - printer.Errorf("Failed to stop api dump process, pod name: %s, error: %v\n", podArgs.PodName, err) - } - case coreV1.PodRunning: - printer.Debugf("Pod %s is running\n", podStatus) + err = d.StopApiDumpProcess(podUID, podStatusErr) + if err != nil { + printer.Errorf("Failed to stop api dump process, pod name: %s, error: %v\n", podArgs.PodName, err) + } +} - podArgs, err := d.getPodArgsFromMap(podUID) - if err != nil { - printer.Infof("Failed to get podArgs for podUID %s: %v\n", podUID, err) - continue - } +// handleUnmonitoredPod starts the API dump process for the pod if it is not already started. +// If pod's monitoring state is still in PodDetected or PodInitialized, it means there is a bug. +// The program should have started the API dump process if it is stored in the map. +func (d *Daemonset) handleUnmonitoredPod(podUID types.UID) { + podArgs, err := d.getPodArgsFromMap(podUID) + if err != nil { + printer.Infof("Failed to get podArgs for podUID %s: %v\n", podUID, err) + return + } - // If pod's monitoring state is still in PodDetected or PodInitialized, it means there is a bug. - // The program should have started the API dump process if it is stored in the map. - if podArgs.PodTrafficMonitorState == PodDetected || podArgs.PodTrafficMonitorState == PodInitialized { - printer.Debugf("Apidump process not started for pod %s during it's initialization, starting now\n", podArgs.PodName) - err = d.StartApiDumpProcess(podUID) - if err != nil { - printer.Errorf("Failed to start api dump process, pod name: %s, error: %v\n", podArgs.PodName, err) - } - } + if podArgs.PodTrafficMonitorState == PodDetected || podArgs.PodTrafficMonitorState == PodInitialized { + printer.Debugf("Apidump process not started for pod %s during it's initialization, starting now\n", podArgs.PodName) + err = d.StartApiDumpProcess(podUID) + if err != nil { + printer.Errorf("Failed to start api dump process, pod name: %s, error: %v\n", podArgs.PodName, err) } } } diff --git a/integrations/kube_apis/kube_apis.go b/integrations/kube_apis/kube_apis.go index 35ba460..6953529 100644 --- a/integrations/kube_apis/kube_apis.go +++ b/integrations/kube_apis/kube_apis.go @@ -179,12 +179,13 @@ func (kc *KubeClient) GetMainContainerUUID(pod coreV1.Pod) (string, error) { // GetPodsStatus returns the statuses for list of pods func (kc *KubeClient) GetPodsStatusByUIDs(podUIDs []types.UID) (maps.Map[types.UID, coreV1.PodPhase], error) { + statuses := maps.NewMap[types.UID, coreV1.PodPhase]() + pods, err := kc.GetPodsByUIDs(podUIDs) if err != nil { - return nil, err + return statuses, err } - statuses := maps.NewMap[types.UID, coreV1.PodPhase]() for _, pod := range pods { statuses[pod.UID] = pod.Status.Phase } diff --git a/rest/front_client.go b/rest/front_client.go index 7b05c3a..63798fe 100644 --- a/rest/front_client.go +++ b/rest/front_client.go @@ -96,5 +96,6 @@ func (c *frontClientImpl) PostDaemonsetAgentTelemetry(ctx context.Context, clust KubernetesCluster: clusterName, } path := "/v2/agent/daemonset/telemetry" - return c.Post(ctx, path, req, nil) + var resp struct{} + return c.Post(ctx, path, req, &resp) } From 295b355cc03a5ee577a2ef20e70689b8a122469b Mon Sep 17 00:00:00 2001 From: Mudit Joshi Date: Mon, 17 Feb 2025 13:04:55 +0530 Subject: [PATCH 14/14] fix: - telemetry api erroring our due to nil resp var - already deleted pod is not getting removed during pod health check --- cmd/internal/kube/daemonset/daemonset.go | 2 + .../kube/daemonset/pods_healthcheck_worker.go | 88 ++++++++++++------- integrations/kube_apis/kube_apis.go | 5 +- rest/front_client.go | 3 +- 4 files changed, 61 insertions(+), 37 deletions(-) diff --git a/cmd/internal/kube/daemonset/daemonset.go b/cmd/internal/kube/daemonset/daemonset.go index 9049589..b3c7878 100644 --- a/cmd/internal/kube/daemonset/daemonset.go +++ b/cmd/internal/kube/daemonset/daemonset.go @@ -34,6 +34,8 @@ type Daemonset struct { CRIClient *cri_apis.CriClient FrontClient rest.FrontClient + // Note: Only filtered pods are stored in this map, i.e., they have required env vars + // and do not have the agent sidecar container PodArgsByNameMap sync.Map PodHealthCheckInterval time.Duration diff --git a/cmd/internal/kube/daemonset/pods_healthcheck_worker.go b/cmd/internal/kube/daemonset/pods_healthcheck_worker.go index 8723d90..e35ec10 100644 --- a/cmd/internal/kube/daemonset/pods_healthcheck_worker.go +++ b/cmd/internal/kube/daemonset/pods_healthcheck_worker.go @@ -11,8 +11,9 @@ import ( // checkPodsHealth checks the health status of pods managed by the Daemonset. // It retrieves the current status of each pod and performs actions based on their status. -// If a pod has stopped running (either succeeded or failed), it updates the pod's traffic monitor state +// If a pod has stopped running (either succeeded or failed) or not exists anymore, it updates the pod's traffic monitor state // and stops the API dump process for that pod. +// If a pod is running and it's traffic is not monitored, it starts the API dump process for that pod. func (d *Daemonset) checkPodsHealth() { printer.Debugf("Checking pods health, time: %s\n", time.Now().UTC()) @@ -22,52 +23,71 @@ func (d *Daemonset) checkPodsHealth() { return true }) + if len(podUIDs) == 0 { + printer.Debugf("No pods to check health\n") + return + } + podStatuses, err := d.KubeClient.GetPodsStatusByUIDs(podUIDs) if err != nil { printer.Errorf("Failed to get pods status: %v\n", err) - return } - for podUID, podStatus := range podStatuses { + for _, podUID := range podUIDs { + podStatus, ok := podStatuses[podUID] + if !ok { + printer.Infof("Pod status not found for podUID %s, Pod doesn't exists anymore\n", podUID) + d.handleTerminatedPod(podUID, errors.Errorf("pod %s doesn't exists anymore", podUID)) + } + switch podStatus { case coreV1.PodSucceeded, coreV1.PodFailed: printer.Infof("Pod %s has stopped running\n", podStatus) + d.handleTerminatedPod(podUID, errors.Errorf("pod %s has stopped running, status: %s", podUID, podStatus)) + case coreV1.PodRunning: + printer.Debugf("Pod %s is running\n", podStatus) + d.handleUnmonitoredPod(podUID) + } + } +} - podArgs, err := d.getPodArgsFromMap(podUID) - if err != nil { - printer.Infof("Failed to get podArgs for podUID %s: %v\n", podUID, err) - continue - } +// handleTerminatedPod handles the terminated pod by changing the pod's traffic monitor state to PodTerminated +// and stopping the API dump process for that pod. +func (d *Daemonset) handleTerminatedPod(podUID types.UID, podStatusErr error) { + podArgs, err := d.getPodArgsFromMap(podUID) + if err != nil { + printer.Infof("Failed to get podArgs for podUID %s: %v\n", podUID, err) + return + } - err = podArgs.changePodTrafficMonitorState(PodTerminated, TrafficMonitoringStarted) - if err != nil { - printer.Infof("Failed to change pod state, pod name: %s, from: %s to: %s, error: %v\n", - podArgs.PodName, podArgs.PodTrafficMonitorState, PodTerminated, err) - continue - } + err = podArgs.changePodTrafficMonitorState(PodTerminated, TrafficMonitoringStarted) + if err != nil { + printer.Infof("Failed to change pod state, pod name: %s, from: %s to: %s, error: %v\n", + podArgs.PodName, podArgs.PodTrafficMonitorState, PodTerminated, err) + return + } - err = d.StopApiDumpProcess(podUID, errors.Errorf("pod %s has stopped running, status: %s", podArgs.PodName, podStatus)) - if err != nil { - printer.Errorf("Failed to stop api dump process, pod name: %s, error: %v\n", podArgs.PodName, err) - } - case coreV1.PodRunning: - printer.Debugf("Pod %s is running\n", podStatus) + err = d.StopApiDumpProcess(podUID, podStatusErr) + if err != nil { + printer.Errorf("Failed to stop api dump process, pod name: %s, error: %v\n", podArgs.PodName, err) + } +} - podArgs, err := d.getPodArgsFromMap(podUID) - if err != nil { - printer.Infof("Failed to get podArgs for podUID %s: %v\n", podUID, err) - continue - } +// handleUnmonitoredPod starts the API dump process for the pod if it is not already started. +// If pod's monitoring state is still in PodDetected or PodInitialized, it means there is a bug. +// The program should have started the API dump process if it is stored in the map. +func (d *Daemonset) handleUnmonitoredPod(podUID types.UID) { + podArgs, err := d.getPodArgsFromMap(podUID) + if err != nil { + printer.Infof("Failed to get podArgs for podUID %s: %v\n", podUID, err) + return + } - // If pod's monitoring state is still in PodDetected or PodInitialized, it means there is a bug. - // The program should have started the API dump process if it is stored in the map. - if podArgs.PodTrafficMonitorState == PodDetected || podArgs.PodTrafficMonitorState == PodInitialized { - printer.Debugf("Apidump process not started for pod %s during it's initialization, starting now\n", podArgs.PodName) - err = d.StartApiDumpProcess(podUID) - if err != nil { - printer.Errorf("Failed to start api dump process, pod name: %s, error: %v\n", podArgs.PodName, err) - } - } + if podArgs.PodTrafficMonitorState == PodDetected || podArgs.PodTrafficMonitorState == PodInitialized { + printer.Debugf("Apidump process not started for pod %s during it's initialization, starting now\n", podArgs.PodName) + err = d.StartApiDumpProcess(podUID) + if err != nil { + printer.Errorf("Failed to start api dump process, pod name: %s, error: %v\n", podArgs.PodName, err) } } } diff --git a/integrations/kube_apis/kube_apis.go b/integrations/kube_apis/kube_apis.go index 35ba460..6953529 100644 --- a/integrations/kube_apis/kube_apis.go +++ b/integrations/kube_apis/kube_apis.go @@ -179,12 +179,13 @@ func (kc *KubeClient) GetMainContainerUUID(pod coreV1.Pod) (string, error) { // GetPodsStatus returns the statuses for list of pods func (kc *KubeClient) GetPodsStatusByUIDs(podUIDs []types.UID) (maps.Map[types.UID, coreV1.PodPhase], error) { + statuses := maps.NewMap[types.UID, coreV1.PodPhase]() + pods, err := kc.GetPodsByUIDs(podUIDs) if err != nil { - return nil, err + return statuses, err } - statuses := maps.NewMap[types.UID, coreV1.PodPhase]() for _, pod := range pods { statuses[pod.UID] = pod.Status.Phase } diff --git a/rest/front_client.go b/rest/front_client.go index 7b05c3a..63798fe 100644 --- a/rest/front_client.go +++ b/rest/front_client.go @@ -96,5 +96,6 @@ func (c *frontClientImpl) PostDaemonsetAgentTelemetry(ctx context.Context, clust KubernetesCluster: clusterName, } path := "/v2/agent/daemonset/telemetry" - return c.Post(ctx, path, req, nil) + var resp struct{} + return c.Post(ctx, path, req, &resp) }