Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[POA-2928] Fixes discovered during testing #80

Draft
wants to merge 9 commits into
base: mudit/poa-2609
Choose a base branch
from
8 changes: 4 additions & 4 deletions cmd/internal/kube/daemonset/apidump_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ func (d *Daemonset) StartApiDumpProcess(podUID types.UID) error {

err = podArgs.changePodTrafficMonitorState(TrafficMonitoringStarted, PodDetected, PodInitialized)
if err != nil {
return errors.Wrapf(err, "failed to change pod state, pod name: %s, from: %d to: %d",
podArgs.PodName, podArgs.PodTrafficMonitorState, TrafficMonitoringStopped)
return errors.Wrapf(err, "failed to change pod state, pod name: %s, from: %s to: %s",
podArgs.PodName, podArgs.PodTrafficMonitorState, TrafficMonitoringStarted)
}

go func() (funcErr error) {
Expand All @@ -45,7 +45,7 @@ func (d *Daemonset) StartApiDumpProcess(podUID types.UID) error {

err = podArgs.changePodTrafficMonitorState(nextState, TrafficMonitoringStarted)
if err != nil {
printer.Errorf("Failed to change pod state, pod name: %s, from: %d to: %d, error: %v\n",
printer.Errorf("Failed to change pod state, pod name: %s, from: %s to: %s, error: %v\n",
podArgs.PodName, podArgs.PodTrafficMonitorState, nextState, err)
return
}
Expand Down Expand Up @@ -99,7 +99,7 @@ func (d *Daemonset) StopApiDumpProcess(podUID types.UID, stopErr error) error {
err = podArgs.changePodTrafficMonitorState(TrafficMonitoringStopped,
PodTerminated, DaemonSetShutdown, TrafficMonitoringFailed, TrafficMonitoringEnded)
if err != nil {
return errors.Wrapf(err, "failed to change pod state, pod name: %s, from: %d to: %d",
return errors.Wrapf(err, "failed to change pod state, pod name: %s, from: %s to: %s",
podArgs.PodName, podArgs.PodTrafficMonitorState, TrafficMonitoringStopped)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
package daemonset

import "time"

type podEnvVars string

const (
// Pod environment variables
POSTMAN_INSIGHTS_PROJECT_ID podEnvVars = "POSTMAN_INSIGHTS_PROJECT_ID"
POSTMAN_INSIGHTS_API_KEY podEnvVars = "POSTMAN_INSIGHTS_API_KEY"
POSTMAN_INSIGHTS_ENV podEnvVars = "POSTMAN_INSIGHTS_ENV"

// Workers intervals
DefaultTelemetryInterval = 5 * time.Minute
DefaultPodHealthCheckInterval = 5 * time.Minute
)
68 changes: 42 additions & 26 deletions cmd/internal/kube/daemonset/daemonset.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"time"

"github.com/pkg/errors"
"github.com/postmanlabs/postman-insights-agent/apispec"
"github.com/postmanlabs/postman-insights-agent/cmd/internal/cmderr"
"github.com/postmanlabs/postman-insights-agent/integrations/cri_apis"
"github.com/postmanlabs/postman-insights-agent/integrations/kube_apis"
"github.com/postmanlabs/postman-insights-agent/printer"
Expand Down Expand Up @@ -58,7 +58,7 @@ func StartDaemonset() error {

// Send initial telemetry
clusterName := os.Getenv("POSTMAN_CLUSTER_NAME")
telemetryInterval := apispec.DefaultTelemetryInterval_seconds * time.Second
telemetryInterval := DefaultTelemetryInterval
if clusterName == "" {
printer.Infof(
"The cluster name is missing. Telemetry will not be sent from this agent, " +
Expand Down Expand Up @@ -91,17 +91,18 @@ func StartDaemonset() error {
return errors.Wrap(err, "failed to create CRI client")
}

go func() {
daemonsetRun := &Daemonset{
ClusterName: clusterName,
InsightsReproModeEnabled: insightsReproModeEnabled,
KubeClient: kubeClient,
CRIClient: criClient,
FrontClient: frontClient,
TelemetryInterval: telemetryInterval,
}
daemonsetRun.Run()
}()
daemonsetRun := &Daemonset{
ClusterName: clusterName,
InsightsReproModeEnabled: insightsReproModeEnabled,
KubeClient: kubeClient,
CRIClient: criClient,
FrontClient: frontClient,
TelemetryInterval: telemetryInterval,
PodHealthCheckInterval: DefaultPodHealthCheckInterval,
}
if err := daemonsetRun.Run(); err != nil {
return cmderr.AkitaErr{Err: err}
}

return nil
}
Expand All @@ -115,18 +116,23 @@ func StartDaemonset() error {
// 7. Stops all apidump processes.
// 8. Exits the daemonset agent.
func (d *Daemonset) Run() error {
printer.Infof("Starting daemonset agent...\n")
done := make(chan struct{})

// Start the telemetry worker
printer.Infof("Starting telemetry worker...\n")
go d.TelemetryWorker(done)

// Start the kubernetes events worker
printer.Infof("Starting kubernetes events worker...\n")
go d.KubernetesEventsWorker(done)

// Start the pods health worker
printer.Infof("Starting pods health worker...\n")
go d.PodsHealthWorker(done)

// Start the process in the existing pods
printer.Infof("Starting process in existing pods...\n")
err := d.StartProcessInExistingPods()
if err != nil {
printer.Errorf("Failed to start process in existing pods, error: %v\n", err)
Expand All @@ -143,12 +149,9 @@ func (d *Daemonset) Run() error {

// Continue until an interrupt
DoneWaitingForSignal:
for {
select {
case received := <-sig:
printer.Stderr.Infof("Received %v, stopping daemonset...\n", received.String())
break DoneWaitingForSignal
}
for received := range sig {
printer.Stderr.Infof("Received %v, stopping daemonset...\n", received.String())
break DoneWaitingForSignal
}
}

Expand All @@ -160,6 +163,10 @@ func (d *Daemonset) Run() error {
printer.Debugf("Stopping all apidump processes...\n")
d.StopAllApiDumpProcesses()

// Stop K8s Watcher
printer.Debugf("Stopping k8s watcher...\n")
d.KubeClient.Close()

printer.Infof("Exiting daemonset agent...\n")
return nil
}
Expand All @@ -180,24 +187,27 @@ func (d *Daemonset) getPodArgsFromMap(podUID types.UID) (*PodArgs, error) {

// addPodArgsToMap adds the podArgs to the map with the podUID as the key
// This function ensures that the pod is not already loaded in the map
func (d *Daemonset) addPodArgsToMap(podUID types.UID, args *PodArgs, startingState PodTrafficMonitorState) {
func (d *Daemonset) addPodArgsToMap(podUID types.UID, args *PodArgs, startingState PodTrafficMonitorState) error {
value, loaded := d.PodArgsByNameMap.LoadOrStore(podUID, args)
argsFromMap := value.(*PodArgs)
if loaded {
if !loaded {
err := argsFromMap.changePodTrafficMonitorState(startingState)
if err != nil {
printer.Errorf("Failed to change pod state, pod name: %s, from: %d to: %d, error: %v\n",
argsFromMap.PodName, argsFromMap.PodTrafficMonitorState, startingState, err)
return errors.Wrapf(err, "failed to change pod state, pod name: %s, from: %s to: %s",
argsFromMap.PodName, argsFromMap.PodTrafficMonitorState, startingState)
}
} else {
printer.Errorf("Pod is already loaded in the map and is in state %d\n", argsFromMap.PodTrafficMonitorState)
return errors.Errorf("pod is already loaded in the map and is in state %s", argsFromMap.PodTrafficMonitorState)
}

return nil
}

// TelemetryWorker starts a worker that periodically sends telemetry data and dumps the state of the Pods API dump process.
// The worker runs until the provided done channel is closed.
func (d *Daemonset) TelemetryWorker(done <-chan struct{}) {
if d.TelemetryInterval <= 0 {
printer.Debugf("Telemetry interval is set to 0, telemetry worker will not run\n")
return
}

Expand Down Expand Up @@ -244,7 +254,12 @@ func (d *Daemonset) StartProcessInExistingPods() error {
continue
}

d.addPodArgsToMap(pod.UID, &args, PodDetected)
err = d.addPodArgsToMap(pod.UID, args, PodDetected)
if err != nil {
printer.Errorf("Failed to add pod args to map, pod name: %s, error: %v\n", pod.Name, err)
continue
}

err = d.StartApiDumpProcess(pod.UID)
if err != nil {
printer.Errorf("Failed to start api dump process, pod name: %s, error: %v\n", pod.Name, err)
Expand Down Expand Up @@ -282,6 +297,7 @@ func (d *Daemonset) KubernetesEventsWorker(done <-chan struct{}) {
// It runs until the provided done channel is closed.
func (d *Daemonset) PodsHealthWorker(done <-chan struct{}) {
if d.PodHealthCheckInterval <= 0 {
printer.Debugf("Pod health check interval is set to 0, pods health worker will not run\n")
return
}

Expand Down Expand Up @@ -311,7 +327,7 @@ func (d *Daemonset) StopAllApiDumpProcesses() {
// Since this state can happen at any time so no check for allowed current states
err := podArgs.changePodTrafficMonitorState(DaemonSetShutdown)
if err != nil {
printer.Errorf("Failed to change pod state, pod name: %s, from: %d to: %d, error: %v\n",
printer.Errorf("Failed to change pod state, pod name: %s, from: %s to: %s, error: %v\n",
podArgs.PodName, podArgs.PodTrafficMonitorState, DaemonSetShutdown, err)
return true
}
Expand Down
25 changes: 15 additions & 10 deletions cmd/internal/kube/daemonset/kube_events_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,12 @@ func (d *Daemonset) handlePodAddEvent(podUID types.UID) {
return
}

d.addPodArgsToMap(pod.UID, &args, PodInitialized)
err = d.addPodArgsToMap(pod.UID, args, PodInitialized)
if err != nil {
printer.Errorf("Failed to add pod args to map, pod name: %s, error: %v\n", pod.Name, err)
return
}

err = d.StartApiDumpProcess(pod.UID)
if err != nil {
printer.Errorf("Failed to start api dump process, pod name: %s, error: %v\n", pod.Name, err)
Expand All @@ -82,7 +87,7 @@ func (d *Daemonset) handlePodDeleteEvent(podUID types.UID) {

err = podArgs.changePodTrafficMonitorState(PodTerminated, TrafficMonitoringStarted)
if err != nil {
printer.Errorf("Failed to change pod state, pod name: %s, from: %d to: %d, error: %v\n",
printer.Errorf("Failed to change pod state, pod name: %s, from: %s to: %s, error: %v\n",
podArgs.PodName, podArgs.PodTrafficMonitorState, PodTerminated, err)
return
}
Expand All @@ -97,17 +102,17 @@ func (d *Daemonset) handlePodDeleteEvent(podUID types.UID) {
// required for the Postman Insights project. It retrieves the UUID of the main container
// in the pod, fetches the environment variables of that container, and extracts the
// necessary variables such as the project ID, API key, and environment.
func (d *Daemonset) inspectPodForEnvVars(pod coreV1.Pod) (PodArgs, error) {
func (d *Daemonset) inspectPodForEnvVars(pod coreV1.Pod) (*PodArgs, error) {
// Get the UUID of the main container in the pod
containerUUID, err := d.KubeClient.GetMainContainerUUID(pod)
if err != nil {
return PodArgs{}, errors.Wrapf(err, "failed to get main container UUID for pod: %s", pod.Name)
return nil, errors.Wrapf(err, "failed to get main container UUID for pod: %s", pod.Name)
}

// Get the environment variables of the main container
envVars, err := d.CRIClient.GetEnvVars(containerUUID)
if err != nil {
return PodArgs{}, errors.Wrapf(err, "failed to get environment variables for pod/container : %s/%s", pod.Name, containerUUID)
return nil, errors.Wrapf(err, "failed to get environment variables for pod/container : %s/%s", pod.Name, containerUUID)
}

var (
Expand All @@ -122,7 +127,7 @@ func (d *Daemonset) inspectPodForEnvVars(pod coreV1.Pod) (PodArgs, error) {
case string(POSTMAN_INSIGHTS_PROJECT_ID):
err := akid.ParseIDAs(value, &insightsProjectID)
if err != nil {
return PodArgs{}, errors.Wrap(err, "failed to parse project ID")
return nil, errors.Wrap(err, "failed to parse project ID")
}
case string(POSTMAN_INSIGHTS_API_KEY):
insightsAPIKey = value
Expand All @@ -132,17 +137,17 @@ func (d *Daemonset) inspectPodForEnvVars(pod coreV1.Pod) (PodArgs, error) {
}

if (insightsProjectID == akid.ServiceID{}) && insightsAPIKey == "" {
return PodArgs{}, allRequiredEnvVarsAbsentErr
return nil, allRequiredEnvVarsAbsentErr
}

if (insightsProjectID == akid.ServiceID{}) {
printer.Errorf("Project ID is missing, set it using the environment variable %s, pod name: %s\n", POSTMAN_INSIGHTS_PROJECT_ID, pod.Name)
return PodArgs{}, requiredEnvVarMissingErr
return nil, requiredEnvVarMissingErr
}

if insightsAPIKey == "" {
printer.Errorf("API key is missing, set it using the environment variable %s, pod name: %s\n", POSTMAN_INSIGHTS_API_KEY, pod.Name)
return PodArgs{}, requiredEnvVarMissingErr
return nil, requiredEnvVarMissingErr
}

args := PodArgs{
Expand All @@ -157,5 +162,5 @@ func (d *Daemonset) inspectPodForEnvVars(pod coreV1.Pod) (PodArgs, error) {
StopChan: make(chan error, 2),
}

return args, nil
return &args, nil
}
49 changes: 16 additions & 33 deletions cmd/internal/kube/daemonset/pod_args.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,41 +9,22 @@ import (
"github.com/pkg/errors"
)

type PodTrafficMonitorState int
type PodTrafficMonitorState string

// Different states of pod traffic monitoring
// The state transition is as follows:
// PodDetected/PodInitialized -> TrafficMonitoringStarted -> TrafficMonitoringFailed/TrafficMonitoringEnded/PodTerminated -> TrafficMonitoringStopped -> RemovePodFromMap
// 'DaemonSetShutdown' is a special state which is used to stop the daemonset agent and can be triggered at any time
const (
_ PodTrafficMonitorState = iota

// When agent finds an already running pod
PodDetected

// When agent will receive pod created event
PodInitialized

// When apidump process is started for the pod
TrafficMonitoringStarted

// When apidump process is errored for the pod
TrafficMonitoringFailed

// When apidump process is ended without any issue for the pod
TrafficMonitoringEnded

// When agent will receive pod deleted event or pod is in terminal state while checking status
PodTerminated

// When the daemonset agent starts the shutdown process
DaemonSetShutdown

// When apidump process is stopped for the pod
TrafficMonitoringStopped

// Final state after which pod will be removed from the map
RemovePodFromMap
PodDetected PodTrafficMonitorState = "PodDetected" // When agent finds an already running pod
PodInitialized PodTrafficMonitorState = "PodInitialized" // When agent will receive pod created event
TrafficMonitoringStarted PodTrafficMonitorState = "TrafficMonitoringStarted" // When apidump process is started for the pod
TrafficMonitoringFailed PodTrafficMonitorState = "TrafficMonitoringFailed" // When apidump process is errored for the pod
TrafficMonitoringEnded PodTrafficMonitorState = "TrafficMonitoringEnded" // When apidump process is ended without any issue for the pod
PodTerminated PodTrafficMonitorState = "PodTerminated" // When agent will receive pod deleted event or pod is in terminal state while checking status
DaemonSetShutdown PodTrafficMonitorState = "DaemonSetShutdown" // When the daemonset agent starts the shutdown process
TrafficMonitoringStopped PodTrafficMonitorState = "TrafficMonitoringStopped" // When apidump process is stopped for the pod
RemovePodFromMap PodTrafficMonitorState = "RemovePodFromMap" // Final state after which pod will be removed from the map
)

type PodCreds struct {
Expand All @@ -62,7 +43,7 @@ type PodArgs struct {

// for state management
PodTrafficMonitorState PodTrafficMonitorState
StateChangeMutex *sync.Mutex
StateChangeMutex sync.Mutex

// send stop signal to apidump process
StopChan chan error
Expand All @@ -86,12 +67,14 @@ func (p *PodArgs) changePodTrafficMonitorState(
p.StateChangeMutex.Lock()
defer p.StateChangeMutex.Unlock()

if !slices.Contains(allowedCurrentStates, p.PodTrafficMonitorState) {
return errors.New(fmt.Sprintf("Invalid current state for pod %s: %d", p.PodName, p.PodTrafficMonitorState))
// Check if the current state is allowed for the transition
// If the allowedCurrentStates is empty, then any state is allowed
if len(allowedCurrentStates) != 0 && !slices.Contains(allowedCurrentStates, p.PodTrafficMonitorState) {
return errors.New(fmt.Sprintf("Invalid current state for pod %s: %s", p.PodName, p.PodTrafficMonitorState))
}

if p.PodTrafficMonitorState == nextState {
return errors.New(fmt.Sprintf("API dump process for pod %s is already in state %d", p.PodName, nextState))
return errors.New(fmt.Sprintf("API dump process for pod %s is already in state %s", p.PodName, nextState))
}

p.PodTrafficMonitorState = nextState
Expand Down
Loading